summaryrefslogtreecommitdiff
path: root/xdelta3/go/src/xdelta
diff options
context:
space:
mode:
authorJoshua MacDonald <josh.macdonald@gmail.com>2015-11-26 22:53:32 -0800
committerJoshua MacDonald <josh.macdonald@gmail.com>2015-11-26 22:53:32 -0800
commit21ec9031bf1703fd8f82a343f31f2964b9567428 (patch)
tree70bbf6f24dce0f35e32d69296b0852b66485c766 /xdelta3/go/src/xdelta
parentb24969720ac9c92f3d7327250b3ee8e86acc6def (diff)
Wait for I/O to finish before waiting for subprocesses to exit
Diffstat (limited to 'xdelta3/go/src/xdelta')
-rw-r--r--xdelta3/go/src/xdelta/rstream.go3
-rw-r--r--xdelta3/go/src/xdelta/test.go25
-rw-r--r--xdelta3/go/src/xdelta/tgroup.go27
3 files changed, 29 insertions, 26 deletions
diff --git a/xdelta3/go/src/xdelta/rstream.go b/xdelta3/go/src/xdelta/rstream.go
index e31b0b0..67d23e6 100644
--- a/xdelta3/go/src/xdelta/rstream.go
+++ b/xdelta3/go/src/xdelta/rstream.go
@@ -22,6 +22,9 @@ func WriteRstreams(t *TestGroup, desc string, seed, offset, len int64,
22 22
23func writeOne(g Goroutine, seed, offset, len int64, stream io.WriteCloser, readall bool) { 23func writeOne(g Goroutine, seed, offset, len int64, stream io.WriteCloser, readall bool) {
24 if !readall { 24 if !readall {
25 // Allow the source-read to fail or block until the process terminates.
26 // This behavior is reserved for the decoder, which is not required to
27 // read the entire source.
25 g.OK() 28 g.OK()
26 } 29 }
27 if offset != 0 { 30 if offset != 0 {
diff --git a/xdelta3/go/src/xdelta/test.go b/xdelta3/go/src/xdelta/test.go
index ab4137c..05de487 100644
--- a/xdelta3/go/src/xdelta/test.go
+++ b/xdelta3/go/src/xdelta/test.go
@@ -47,9 +47,8 @@ func (t *TestGroup) Drain(f io.ReadCloser, desc string) <-chan []byte {
47 return c 47 return c
48} 48}
49 49
50func (t *TestGroup) Empty(f io.ReadCloser, desc string) { 50func (t *TestGroup) Empty(f io.ReadCloser, desc string) Goroutine {
51 t.Go(desc, func (g Goroutine) { 51 return t.Go("empty:"+desc, func (g Goroutine) {
52 g.OK()
53 s := bufio.NewScanner(f) 52 s := bufio.NewScanner(f)
54 for s.Scan() { 53 for s.Scan() {
55 os.Stderr.Write([]byte(fmt.Sprint(desc, ": ", s.Text(), "\n"))) 54 os.Stderr.Write([]byte(fmt.Sprint(desc, ": ", s.Text(), "\n")))
@@ -57,9 +56,9 @@ func (t *TestGroup) Empty(f io.ReadCloser, desc string) {
57 err := s.Err() 56 err := s.Err()
58 f.Close() 57 f.Close()
59 if err != nil { 58 if err != nil {
60 fmt.Println("Empty", desc, err)
61 g.Panic(err) 59 g.Panic(err)
62 } 60 }
61 g.OK()
63 }) 62 })
64} 63}
65 64
@@ -75,8 +74,8 @@ func TestWrite(what string, f io.WriteCloser, b []byte) error {
75 return nil 74 return nil
76} 75}
77 76
78func (t *TestGroup) CopyStreams(r io.ReadCloser, w io.WriteCloser) { 77func (t *TestGroup) CopyStreams(r io.ReadCloser, w io.WriteCloser) Goroutine {
79 t.Go("copy", func(g Goroutine) { 78 return t.Go("copy", func(g Goroutine) {
80 _, err := io.Copy(w, r) 79 _, err := io.Copy(w, r)
81 if err != nil { 80 if err != nil {
82 fmt.Println("CopyS", err) 81 fmt.Println("CopyS", err)
@@ -96,8 +95,8 @@ func (t *TestGroup) CopyStreams(r io.ReadCloser, w io.WriteCloser) {
96 }) 95 })
97} 96}
98 97
99func (t *TestGroup) CompareStreams(r1 io.ReadCloser, r2 io.ReadCloser, length int64) { 98func (t *TestGroup) CompareStreams(r1 io.ReadCloser, r2 io.ReadCloser, length int64) Goroutine {
100 t.Go("compare", func(g Goroutine) { 99 return t.Go("compare", func(g Goroutine) {
101 b1 := make([]byte, blocksize) 100 b1 := make([]byte, blocksize)
102 b2 := make([]byte, blocksize) 101 b2 := make([]byte, blocksize)
103 var idx int64 102 var idx int64
@@ -162,15 +161,13 @@ func (t *TestGroup) Exec(desc string, p *Program, srcfifo bool, flags []string)
162 if serr := run.Cmd.Start(); serr != nil { 161 if serr := run.Cmd.Start(); serr != nil {
163 return nil, serr 162 return nil, serr
164 } 163 }
165 t.Go("exec-wait:" + desc, func (g Goroutine) {
166 if err := run.Cmd.Wait(); err != nil {
167 g.Panic(err)
168 }
169 g.OK()
170 })
171 return run, nil 164 return run, nil
172} 165}
173 166
167func (r *Run) Wait() error {
168 return r.Cmd.Wait()
169}
170
174func writeFifo(srcfile string, read io.Reader) error { 171func writeFifo(srcfile string, read io.Reader) error {
175 fifo, err := os.OpenFile(srcfile, os.O_WRONLY, 0600) 172 fifo, err := os.OpenFile(srcfile, os.O_WRONLY, 0600)
176 if err != nil { 173 if err != nil {
diff --git a/xdelta3/go/src/xdelta/tgroup.go b/xdelta3/go/src/xdelta/tgroup.go
index b64827c..c7337c6 100644
--- a/xdelta3/go/src/xdelta/tgroup.go
+++ b/xdelta3/go/src/xdelta/tgroup.go
@@ -40,7 +40,7 @@ func (g *Goroutine) OK() {
40} 40}
41 41
42func (g *Goroutine) Panic(err error) { 42func (g *Goroutine) Panic(err error) {
43 fmt.Print("[", g.name, "] ", err, "\n") 43 fmt.Println(g, err)
44 if g.errChan != nil { 44 if g.errChan != nil {
45 g.errChan <- err 45 g.errChan <- err
46 _ = <- g.errChan 46 _ = <- g.errChan
@@ -48,16 +48,17 @@ func (g *Goroutine) Panic(err error) {
48 select {} 48 select {}
49} 49}
50 50
51func (t *TestGroup) Go(name string, f func(Goroutine)) { 51func (t *TestGroup) Go(name string, f func(Goroutine)) Goroutine {
52 g := Goroutine{name, make(chan error)} 52 g := Goroutine{name, make(chan error)}
53 t.Lock() 53 t.Lock()
54 t.running = append(t.running, g) 54 t.running = append(t.running, g)
55 t.Unlock() 55 t.Unlock()
56 go f(g) 56 go f(g)
57 return g
57} 58}
58 59
59func (t *TestGroup) Wait(g Goroutine) { 60func (t *TestGroup) Wait(self Goroutine, procs... *Run) {
60 g.OK() 61 self.OK()
61 t.Lock() 62 t.Lock()
62 wc := t.waitChan 63 wc := t.waitChan
63 t.waitChan = nil 64 t.waitChan = nil
@@ -66,17 +67,22 @@ func (t *TestGroup) Wait(g Goroutine) {
66 t.Lock() 67 t.Lock()
67 errs := t.errors 68 errs := t.errors
68 t.Unlock() 69 t.Unlock()
70 for _, p := range procs {
71 if err := p.Wait(); err != nil {
72 errs = append(errs, err)
73 }
74 }
69 if len(errs) != 0 { 75 if len(errs) != 0 {
70 panic(fmt.Sprintln(len(errs), "errors in test")) 76 for _, err := range errs {
77 fmt.Println(err)
78 }
79 panic(fmt.Sprint(len(errs), " errors"))
71 } 80 }
72} 81}
73 82
74func waitAll(t *TestGroup, wc chan bool) { 83func waitAll(t *TestGroup, wc chan bool) {
75 for { 84 for {
76 t.Lock() 85 t.Lock()
77 // for _, x := range t.running {
78 // fmt.Println("RUNNING", x.name)
79 // }
80 if len(t.running) == 0 { 86 if len(t.running) == 0 {
81 t.Unlock() 87 t.Unlock()
82 break 88 break
@@ -86,17 +92,14 @@ func waitAll(t *TestGroup, wc chan bool) {
86 t.Unlock() 92 t.Unlock()
87 93
88 timeout := time.After(time.Second) 94 timeout := time.After(time.Second)
89 // fmt.Println("Waiting on", runner) 95
90 select { 96 select {
91 case err := <- runner.errChan: 97 case err := <- runner.errChan:
92 runner.errChan <- err 98 runner.errChan <- err
93 if err != nil { 99 if err != nil {
94 // fmt.Println("[G]", runner, err)
95 t.Lock() 100 t.Lock()
96 t.errors = append(t.errors, err) 101 t.errors = append(t.errors, err)
97 t.Unlock() 102 t.Unlock()
98 } else {
99 // fmt.Println("[G]", runner, "OK")
100 } 103 }
101 case <- timeout: 104 case <- timeout:
102 t.Lock() 105 t.Lock()