summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoshua MacDonald <josh.macdonald@gmail.com>2015-11-26 22:53:32 -0800
committerJoshua MacDonald <josh.macdonald@gmail.com>2015-11-26 22:53:32 -0800
commit21ec9031bf1703fd8f82a343f31f2964b9567428 (patch)
tree70bbf6f24dce0f35e32d69296b0852b66485c766
parentb24969720ac9c92f3d7327250b3ee8e86acc6def (diff)
Wait for I/O to finish before waiting for subprocesses to exit
-rw-r--r--xdelta3/go/src/regtest.go5
-rw-r--r--xdelta3/go/src/xdelta/rstream.go3
-rw-r--r--xdelta3/go/src/xdelta/test.go25
-rw-r--r--xdelta3/go/src/xdelta/tgroup.go27
4 files changed, 31 insertions, 29 deletions
diff --git a/xdelta3/go/src/regtest.go b/xdelta3/go/src/regtest.go
index 385c936..76d253c 100644
--- a/xdelta3/go/src/regtest.go
+++ b/xdelta3/go/src/regtest.go
@@ -52,7 +52,7 @@ func smokeTest(r *xdelta.Runner, p *xdelta.Program) {
52 if decoded != target { 52 if decoded != target {
53 g.Panic(errors.New("It's not working!!!")) 53 g.Panic(errors.New("It's not working!!!"))
54 } 54 }
55 t.Wait(g) 55 t.Wait(g, enc, dec)
56 fmt.Println("Smoketest pass") 56 fmt.Println("Smoketest pass")
57} 57}
58 58
@@ -80,8 +80,7 @@ func offsetTest(r *xdelta.Runner, p *xdelta.Program, bufsize, offset, length int
80 80
81 xdelta.WriteRstreams(t, "encode", seed, offset, length, enc.Srcin, enc.Stdin) 81 xdelta.WriteRstreams(t, "encode", seed, offset, length, enc.Srcin, enc.Stdin)
82 xdelta.WriteRstreams(t, "decode", seed, offset, length, dec.Srcin, write) 82 xdelta.WriteRstreams(t, "decode", seed, offset, length, dec.Srcin, write)
83 83 t.Wait(g, enc, dec)
84 t.Wait(g)
85} 84}
86 85
87func main() { 86func main() {
diff --git a/xdelta3/go/src/xdelta/rstream.go b/xdelta3/go/src/xdelta/rstream.go
index e31b0b0..67d23e6 100644
--- a/xdelta3/go/src/xdelta/rstream.go
+++ b/xdelta3/go/src/xdelta/rstream.go
@@ -22,6 +22,9 @@ func WriteRstreams(t *TestGroup, desc string, seed, offset, len int64,
22 22
23func writeOne(g Goroutine, seed, offset, len int64, stream io.WriteCloser, readall bool) { 23func writeOne(g Goroutine, seed, offset, len int64, stream io.WriteCloser, readall bool) {
24 if !readall { 24 if !readall {
25 // Allow the source-read to fail or block until the process terminates.
26 // This behavior is reserved for the decoder, which is not required to
27 // read the entire source.
25 g.OK() 28 g.OK()
26 } 29 }
27 if offset != 0 { 30 if offset != 0 {
diff --git a/xdelta3/go/src/xdelta/test.go b/xdelta3/go/src/xdelta/test.go
index ab4137c..05de487 100644
--- a/xdelta3/go/src/xdelta/test.go
+++ b/xdelta3/go/src/xdelta/test.go
@@ -47,9 +47,8 @@ func (t *TestGroup) Drain(f io.ReadCloser, desc string) <-chan []byte {
47 return c 47 return c
48} 48}
49 49
50func (t *TestGroup) Empty(f io.ReadCloser, desc string) { 50func (t *TestGroup) Empty(f io.ReadCloser, desc string) Goroutine {
51 t.Go(desc, func (g Goroutine) { 51 return t.Go("empty:"+desc, func (g Goroutine) {
52 g.OK()
53 s := bufio.NewScanner(f) 52 s := bufio.NewScanner(f)
54 for s.Scan() { 53 for s.Scan() {
55 os.Stderr.Write([]byte(fmt.Sprint(desc, ": ", s.Text(), "\n"))) 54 os.Stderr.Write([]byte(fmt.Sprint(desc, ": ", s.Text(), "\n")))
@@ -57,9 +56,9 @@ func (t *TestGroup) Empty(f io.ReadCloser, desc string) {
57 err := s.Err() 56 err := s.Err()
58 f.Close() 57 f.Close()
59 if err != nil { 58 if err != nil {
60 fmt.Println("Empty", desc, err)
61 g.Panic(err) 59 g.Panic(err)
62 } 60 }
61 g.OK()
63 }) 62 })
64} 63}
65 64
@@ -75,8 +74,8 @@ func TestWrite(what string, f io.WriteCloser, b []byte) error {
75 return nil 74 return nil
76} 75}
77 76
78func (t *TestGroup) CopyStreams(r io.ReadCloser, w io.WriteCloser) { 77func (t *TestGroup) CopyStreams(r io.ReadCloser, w io.WriteCloser) Goroutine {
79 t.Go("copy", func(g Goroutine) { 78 return t.Go("copy", func(g Goroutine) {
80 _, err := io.Copy(w, r) 79 _, err := io.Copy(w, r)
81 if err != nil { 80 if err != nil {
82 fmt.Println("CopyS", err) 81 fmt.Println("CopyS", err)
@@ -96,8 +95,8 @@ func (t *TestGroup) CopyStreams(r io.ReadCloser, w io.WriteCloser) {
96 }) 95 })
97} 96}
98 97
99func (t *TestGroup) CompareStreams(r1 io.ReadCloser, r2 io.ReadCloser, length int64) { 98func (t *TestGroup) CompareStreams(r1 io.ReadCloser, r2 io.ReadCloser, length int64) Goroutine {
100 t.Go("compare", func(g Goroutine) { 99 return t.Go("compare", func(g Goroutine) {
101 b1 := make([]byte, blocksize) 100 b1 := make([]byte, blocksize)
102 b2 := make([]byte, blocksize) 101 b2 := make([]byte, blocksize)
103 var idx int64 102 var idx int64
@@ -162,15 +161,13 @@ func (t *TestGroup) Exec(desc string, p *Program, srcfifo bool, flags []string)
162 if serr := run.Cmd.Start(); serr != nil { 161 if serr := run.Cmd.Start(); serr != nil {
163 return nil, serr 162 return nil, serr
164 } 163 }
165 t.Go("exec-wait:" + desc, func (g Goroutine) {
166 if err := run.Cmd.Wait(); err != nil {
167 g.Panic(err)
168 }
169 g.OK()
170 })
171 return run, nil 164 return run, nil
172} 165}
173 166
167func (r *Run) Wait() error {
168 return r.Cmd.Wait()
169}
170
174func writeFifo(srcfile string, read io.Reader) error { 171func writeFifo(srcfile string, read io.Reader) error {
175 fifo, err := os.OpenFile(srcfile, os.O_WRONLY, 0600) 172 fifo, err := os.OpenFile(srcfile, os.O_WRONLY, 0600)
176 if err != nil { 173 if err != nil {
diff --git a/xdelta3/go/src/xdelta/tgroup.go b/xdelta3/go/src/xdelta/tgroup.go
index b64827c..c7337c6 100644
--- a/xdelta3/go/src/xdelta/tgroup.go
+++ b/xdelta3/go/src/xdelta/tgroup.go
@@ -40,7 +40,7 @@ func (g *Goroutine) OK() {
40} 40}
41 41
42func (g *Goroutine) Panic(err error) { 42func (g *Goroutine) Panic(err error) {
43 fmt.Print("[", g.name, "] ", err, "\n") 43 fmt.Println(g, err)
44 if g.errChan != nil { 44 if g.errChan != nil {
45 g.errChan <- err 45 g.errChan <- err
46 _ = <- g.errChan 46 _ = <- g.errChan
@@ -48,16 +48,17 @@ func (g *Goroutine) Panic(err error) {
48 select {} 48 select {}
49} 49}
50 50
51func (t *TestGroup) Go(name string, f func(Goroutine)) { 51func (t *TestGroup) Go(name string, f func(Goroutine)) Goroutine {
52 g := Goroutine{name, make(chan error)} 52 g := Goroutine{name, make(chan error)}
53 t.Lock() 53 t.Lock()
54 t.running = append(t.running, g) 54 t.running = append(t.running, g)
55 t.Unlock() 55 t.Unlock()
56 go f(g) 56 go f(g)
57 return g
57} 58}
58 59
59func (t *TestGroup) Wait(g Goroutine) { 60func (t *TestGroup) Wait(self Goroutine, procs... *Run) {
60 g.OK() 61 self.OK()
61 t.Lock() 62 t.Lock()
62 wc := t.waitChan 63 wc := t.waitChan
63 t.waitChan = nil 64 t.waitChan = nil
@@ -66,17 +67,22 @@ func (t *TestGroup) Wait(g Goroutine) {
66 t.Lock() 67 t.Lock()
67 errs := t.errors 68 errs := t.errors
68 t.Unlock() 69 t.Unlock()
70 for _, p := range procs {
71 if err := p.Wait(); err != nil {
72 errs = append(errs, err)
73 }
74 }
69 if len(errs) != 0 { 75 if len(errs) != 0 {
70 panic(fmt.Sprintln(len(errs), "errors in test")) 76 for _, err := range errs {
77 fmt.Println(err)
78 }
79 panic(fmt.Sprint(len(errs), " errors"))
71 } 80 }
72} 81}
73 82
74func waitAll(t *TestGroup, wc chan bool) { 83func waitAll(t *TestGroup, wc chan bool) {
75 for { 84 for {
76 t.Lock() 85 t.Lock()
77 // for _, x := range t.running {
78 // fmt.Println("RUNNING", x.name)
79 // }
80 if len(t.running) == 0 { 86 if len(t.running) == 0 {
81 t.Unlock() 87 t.Unlock()
82 break 88 break
@@ -86,17 +92,14 @@ func waitAll(t *TestGroup, wc chan bool) {
86 t.Unlock() 92 t.Unlock()
87 93
88 timeout := time.After(time.Second) 94 timeout := time.After(time.Second)
89 // fmt.Println("Waiting on", runner) 95
90 select { 96 select {
91 case err := <- runner.errChan: 97 case err := <- runner.errChan:
92 runner.errChan <- err 98 runner.errChan <- err
93 if err != nil { 99 if err != nil {
94 // fmt.Println("[G]", runner, err)
95 t.Lock() 100 t.Lock()
96 t.errors = append(t.errors, err) 101 t.errors = append(t.errors, err)
97 t.Unlock() 102 t.Unlock()
98 } else {
99 // fmt.Println("[G]", runner, "OK")
100 } 103 }
101 case <- timeout: 104 case <- timeout:
102 t.Lock() 105 t.Lock()