summaryrefslogtreecommitdiff
path: root/go/src/lib/pipeline/pipeline.go
diff options
context:
space:
mode:
Diffstat (limited to 'go/src/lib/pipeline/pipeline.go')
-rw-r--r--go/src/lib/pipeline/pipeline.go80
1 files changed, 80 insertions, 0 deletions
diff --git a/go/src/lib/pipeline/pipeline.go b/go/src/lib/pipeline/pipeline.go
new file mode 100644
index 0000000..a74e626
--- /dev/null
+++ b/go/src/lib/pipeline/pipeline.go
@@ -0,0 +1,80 @@
+package pipeline
+
+import (
+ "os/exec"
+ "io"
+)
+
+type Pipe struct {
+ Cmds []exec.Cmd
+}
+
+func (pl *Pipe) preStart() error {
+ for _, cmd := range pl.Cmds[:len(pl.Cmds)-2] {
+ if err := cmd.Start(); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func (pl *Pipe) Start() error {
+ for _, cmd := range pl.Cmds {
+ if err := cmd.Start(); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func (pl *Pipe) Wait() error {
+ for _, cmd := range pl.Cmds {
+ if err := cmd.Wait(); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func (pl *Pipe) Run() error {
+ if err := pl.Start(); err != nil {
+ return err
+ }
+ return pl.Wait()
+}
+
+func (pl *Pipe) CombinedOutput() ([]byte, error) {
+ if err := pl.preStart(); err != nil {
+ return nil, err
+ }
+ return pl.Cmds[len(pl.Cmds)-1].CombinedOutput()
+}
+
+func (pl *Pipe) Output() ([]byte, error) {
+ if err := pl.preStart(); err != nil {
+ return nil, err
+ }
+ return pl.Cmds[len(pl.Cmds)-1].Output()
+}
+
+func (pl *Pipe) StdinPipe() (io.WriteCloser, error) {
+ return pl.Cmds[0].StdinPipe()
+}
+
+func (pl *Pipe) StdoutPipe() (io.ReadCloser, error) {
+ return pl.Cmds[len(pl.Cmds)-1].StdoutPipe()
+}
+
+var _ = &Pipe{}
+
+func Pipeline(cmds ...exec.Cmd) (*Pipe, error) {
+ pl := &Pipe{cmds}
+ for i := range pl.Cmds[:len(pl.Cmds)-2] {
+ out, err := pl.Cmds[i].StdoutPipe()
+ if err != nil {
+ return nil, err
+ }
+ pl.Cmds[i+1].Stdin = out
+ }
+ return pl, nil
+}