From 21ec9031bf1703fd8f82a343f31f2964b9567428 Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Thu, 26 Nov 2015 22:53:32 -0800 Subject: Wait for I/O to finish before waiting for subprocesses to exit --- xdelta3/go/src/xdelta/rstream.go | 3 +++ xdelta3/go/src/xdelta/test.go | 25 +++++++++++-------------- xdelta3/go/src/xdelta/tgroup.go | 27 +++++++++++++++------------ 3 files changed, 29 insertions(+), 26 deletions(-) (limited to 'xdelta3/go/src/xdelta') diff --git a/xdelta3/go/src/xdelta/rstream.go b/xdelta3/go/src/xdelta/rstream.go index e31b0b0..67d23e6 100644 --- a/xdelta3/go/src/xdelta/rstream.go +++ b/xdelta3/go/src/xdelta/rstream.go @@ -22,6 +22,9 @@ func WriteRstreams(t *TestGroup, desc string, seed, offset, len int64, func writeOne(g Goroutine, seed, offset, len int64, stream io.WriteCloser, readall bool) { if !readall { + // Allow the source-read to fail or block until the process terminates. + // This behavior is reserved for the decoder, which is not required to + // read the entire source. g.OK() } if offset != 0 { diff --git a/xdelta3/go/src/xdelta/test.go b/xdelta3/go/src/xdelta/test.go index ab4137c..05de487 100644 --- a/xdelta3/go/src/xdelta/test.go +++ b/xdelta3/go/src/xdelta/test.go @@ -47,9 +47,8 @@ func (t *TestGroup) Drain(f io.ReadCloser, desc string) <-chan []byte { return c } -func (t *TestGroup) Empty(f io.ReadCloser, desc string) { - t.Go(desc, func (g Goroutine) { - g.OK() +func (t *TestGroup) Empty(f io.ReadCloser, desc string) Goroutine { + return t.Go("empty:"+desc, func (g Goroutine) { s := bufio.NewScanner(f) for s.Scan() { os.Stderr.Write([]byte(fmt.Sprint(desc, ": ", s.Text(), "\n"))) @@ -57,9 +56,9 @@ func (t *TestGroup) Empty(f io.ReadCloser, desc string) { err := s.Err() f.Close() if err != nil { - fmt.Println("Empty", desc, err) g.Panic(err) } + g.OK() }) } @@ -75,8 +74,8 @@ func TestWrite(what string, f io.WriteCloser, b []byte) error { return nil } -func (t *TestGroup) CopyStreams(r io.ReadCloser, w io.WriteCloser) { - t.Go("copy", func(g Goroutine) { +func (t *TestGroup) CopyStreams(r io.ReadCloser, w io.WriteCloser) Goroutine { + return t.Go("copy", func(g Goroutine) { _, err := io.Copy(w, r) if err != nil { fmt.Println("CopyS", err) @@ -96,8 +95,8 @@ func (t *TestGroup) CopyStreams(r io.ReadCloser, w io.WriteCloser) { }) } -func (t *TestGroup) CompareStreams(r1 io.ReadCloser, r2 io.ReadCloser, length int64) { - t.Go("compare", func(g Goroutine) { +func (t *TestGroup) CompareStreams(r1 io.ReadCloser, r2 io.ReadCloser, length int64) Goroutine { + return t.Go("compare", func(g Goroutine) { b1 := make([]byte, blocksize) b2 := make([]byte, blocksize) var idx int64 @@ -162,15 +161,13 @@ func (t *TestGroup) Exec(desc string, p *Program, srcfifo bool, flags []string) if serr := run.Cmd.Start(); serr != nil { return nil, serr } - t.Go("exec-wait:" + desc, func (g Goroutine) { - if err := run.Cmd.Wait(); err != nil { - g.Panic(err) - } - g.OK() - }) return run, nil } +func (r *Run) Wait() error { + return r.Cmd.Wait() +} + func writeFifo(srcfile string, read io.Reader) error { fifo, err := os.OpenFile(srcfile, os.O_WRONLY, 0600) if err != nil { diff --git a/xdelta3/go/src/xdelta/tgroup.go b/xdelta3/go/src/xdelta/tgroup.go index b64827c..c7337c6 100644 --- a/xdelta3/go/src/xdelta/tgroup.go +++ b/xdelta3/go/src/xdelta/tgroup.go @@ -40,7 +40,7 @@ func (g *Goroutine) OK() { } func (g *Goroutine) Panic(err error) { - fmt.Print("[", g.name, "] ", err, "\n") + fmt.Println(g, err) if g.errChan != nil { g.errChan <- err _ = <- g.errChan @@ -48,16 +48,17 @@ func (g *Goroutine) Panic(err error) { select {} } -func (t *TestGroup) Go(name string, f func(Goroutine)) { +func (t *TestGroup) Go(name string, f func(Goroutine)) Goroutine { g := Goroutine{name, make(chan error)} t.Lock() t.running = append(t.running, g) t.Unlock() go f(g) + return g } -func (t *TestGroup) Wait(g Goroutine) { - g.OK() +func (t *TestGroup) Wait(self Goroutine, procs... *Run) { + self.OK() t.Lock() wc := t.waitChan t.waitChan = nil @@ -66,17 +67,22 @@ func (t *TestGroup) Wait(g Goroutine) { t.Lock() errs := t.errors t.Unlock() + for _, p := range procs { + if err := p.Wait(); err != nil { + errs = append(errs, err) + } + } if len(errs) != 0 { - panic(fmt.Sprintln(len(errs), "errors in test")) + for _, err := range errs { + fmt.Println(err) + } + panic(fmt.Sprint(len(errs), " errors")) } } func waitAll(t *TestGroup, wc chan bool) { for { t.Lock() - // for _, x := range t.running { - // fmt.Println("RUNNING", x.name) - // } if len(t.running) == 0 { t.Unlock() break @@ -86,17 +92,14 @@ func waitAll(t *TestGroup, wc chan bool) { t.Unlock() timeout := time.After(time.Second) - // fmt.Println("Waiting on", runner) + select { case err := <- runner.errChan: runner.errChan <- err if err != nil { - // fmt.Println("[G]", runner, err) t.Lock() t.errors = append(t.errors, err) t.Unlock() - } else { - // fmt.Println("[G]", runner, "OK") } case <- timeout: t.Lock() -- cgit v1.2.3