From ede482d8f3398262adcfb23095b6caad5872d8ba Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Sat, 21 Nov 2015 23:36:51 -0800 Subject: New TestGroup.Go method, improved error handling --- xdelta3/go/src/xdelta/rstream.go | 23 ++++--- xdelta3/go/src/xdelta/run.go | 22 +++++++ xdelta3/go/src/xdelta/test.go | 132 +++++++++++++++------------------------ xdelta3/go/src/xdelta/tgroup.go | 93 +++++++++++++++++++++++++++ 4 files changed, 181 insertions(+), 89 deletions(-) create mode 100644 xdelta3/go/src/xdelta/run.go create mode 100644 xdelta3/go/src/xdelta/tgroup.go (limited to 'xdelta3/go/src/xdelta') diff --git a/xdelta3/go/src/xdelta/rstream.go b/xdelta3/go/src/xdelta/rstream.go index 3e7265f..3f520d7 100644 --- a/xdelta3/go/src/xdelta/rstream.go +++ b/xdelta3/go/src/xdelta/rstream.go @@ -11,25 +11,30 @@ const ( ) func WriteRstreams(t *TestGroup, seed, offset, len int64, - first, second io.WriteCloser) { - go writeOne(t, seed, 0, len, first) - go writeOne(t, seed, offset, len, second) + src, tgt io.WriteCloser) { + t.Go("src", func (g Goroutine) { + go writeOne(g, seed, 0, len, src) + }) + t.Go("tgt", func (g Goroutine) { + go writeOne(g, seed, offset, len, tgt) + }) } -func writeOne(t *TestGroup, seed, offset, len int64, stream io.WriteCloser) error { - t.WaitGroup.Add(1) +func writeOne(g Goroutine, seed, offset, len int64, stream io.WriteCloser) { if offset != 0 { // Fill with other random data until the offset if err := writeRand(rand.New(rand.NewSource(^seed)), offset, stream); err != nil { - return err + g.Panic(err) } } if err := writeRand(rand.New(rand.NewSource(seed)), len - offset, stream); err != nil { - return err + g.Panic(err) + } + if err := stream.Close(); err != nil { + g.Panic(err) } - t.WaitGroup.Done() - return stream.Close() + g.OK() } func writeRand(r *rand.Rand, len int64, s io.Writer) error { diff --git a/xdelta3/go/src/xdelta/run.go b/xdelta3/go/src/xdelta/run.go new file mode 100644 index 0000000..f9b4185 --- /dev/null +++ b/xdelta3/go/src/xdelta/run.go @@ -0,0 +1,22 @@ +package xdelta + +import ( + "io/ioutil" + "os" +) + +type Runner struct { + Testdir string +} + +func NewRunner() (*Runner, error) { + if dir, err := ioutil.TempDir(tmpDir, "xrt"); err != nil { + return nil, err + } else { + return &Runner{dir}, nil + } +} + +func (r *Runner) Cleanup() { + os.RemoveAll(r.Testdir) +} diff --git a/xdelta3/go/src/xdelta/test.go b/xdelta3/go/src/xdelta/test.go index e853554..d11ec29 100644 --- a/xdelta3/go/src/xdelta/test.go +++ b/xdelta3/go/src/xdelta/test.go @@ -11,7 +11,6 @@ import ( "os/exec" "path" "sync/atomic" - "sync" "golang.org/x/sys/unix" ) @@ -25,16 +24,6 @@ type Program struct { Path string } -type Runner struct { - Testdir string -} - -type TestGroup struct { - sync.WaitGroup - sync.Mutex - errs []error -} - type Run struct { Cmd exec.Cmd Srcfile string @@ -44,86 +33,71 @@ type Run struct { Stderr io.ReadCloser } - -func (t *TestGroup) Panic(err error) { - t.Lock() - t.errs = append(t.errs, err) - t.Unlock() - t.Done() // For the caller - t.Wait() - for _, e := range t.errs { - fmt.Println(e) - } - panic(fmt.Sprintf("%d errors", len(t.errs))) -} - -func NewTestGroup() *TestGroup { - return &TestGroup{} -} - func (t *TestGroup) Drain(f io.ReadCloser, desc string) <-chan []byte { c := make(chan []byte) - go func() { - t.Add(1) - //fmt.Println("Draining", desc) + t.Go(desc, func(g Goroutine) { if b, err := ioutil.ReadAll(f); err != nil { - t.Panic(err) + fmt.Println("Drain", err) + g.Panic(err) } else { - //fmt.Println("Draining", desc, "--got it") c <- b } - t.Done() - }() + g.OK() + }) return c } func (t *TestGroup) Empty(f io.ReadCloser, desc string) { - go func() { - t.Add(1) + t.Go(desc, func (g Goroutine) { s := bufio.NewScanner(f) for s.Scan() { os.Stderr.Write([]byte(fmt.Sprint(desc, ": ", s.Text(), "\n"))) } - if err := s.Err(); err != nil { - t.Panic(err) + err := s.Err() + f.Close() + if err != nil { + fmt.Println("Empty", desc, err) + g.Panic(err) } - t.Done() - }() + g.OK() + }) } -func (t *TestGroup) Write(what string, f io.WriteCloser, b []byte) { - //fmt.Println("Write (", what, ") ", len(b), "bytes") +func TestWrite(what string, f io.WriteCloser, b []byte) error { if _, err := f.Write(b); err != nil { - t.Panic(errors.New(fmt.Sprint(what, ":", err))) + fmt.Println("Write", err) + return errors.New(fmt.Sprint(what, ":", err)) } - //fmt.Println("Write (", what, ") closing") if err := f.Close(); err != nil { - t.Panic(errors.New(fmt.Sprint(what, ":", err))) + fmt.Println("Close", err) + return errors.New(fmt.Sprint(what, ":", err)) } + return nil } func (t *TestGroup) CopyStreams(r io.ReadCloser, w io.WriteCloser) { - go func() { - t.Add(1) + t.Go("copy", func(g Goroutine) { _, err := io.Copy(w, r) if err != nil { - t.Panic(err) + fmt.Println("CopyS", err) + g.Panic(err) } err = r.Close() if err != nil { - t.Panic(err) + fmt.Println("CloseS1", err) + g.Panic(err) } err = w.Close() if err != nil { - t.Panic(err) + fmt.Println("CloseS2", err) + g.Panic(err) } - t.Done() - }() + g.OK() + }) } func (t *TestGroup) CompareStreams(r1 io.ReadCloser, r2 io.ReadCloser, length int64) { - go func() { - t.Add(1) + t.Go("compare", func(g Goroutine) { b1 := make([]byte, blocksize) b2 := make([]byte, blocksize) var idx int64 @@ -133,42 +107,32 @@ func (t *TestGroup) CompareStreams(r1 io.ReadCloser, r2 io.ReadCloser, length in c = int(length) } if _, err := io.ReadFull(r1, b1[0:c]); err != nil { - t.Panic(err) + fmt.Println("ReadFull1", err) + g.Panic(err) } if _, err := io.ReadFull(r2, b2[0:c]); err != nil { - t.Panic(err) + fmt.Println("ReadFull2", err) + g.Panic(err) } if bytes.Compare(b1[0:c], b2[0:c]) != 0 { fmt.Println("B1 is", string(b1[0:c])) fmt.Println("B2 is", string(b2[0:c])) - t.Panic(errors.New(fmt.Sprint("Bytes do not compare at ", idx))) + g.Panic(errors.New(fmt.Sprint("Bytes do not compare at ", idx))) } length -= int64(c) idx += int64(c) } - t.Done() - }() + g.OK() + }) } -func NewRunner() (*Runner, error) { - if dir, err := ioutil.TempDir(tmpDir, "xrt"); err != nil { - return nil, err - } else { - return &Runner{dir}, nil - } -} - -func (r *Runner) Cleanup() { - os.RemoveAll(r.Testdir) -} - -func (r *Runner) Exec(p *Program, srcfifo bool, flags []string) (*Run, error) { +func (t *TestGroup) Exec(p *Program, srcfifo bool, flags []string) (*Run, error) { var err error run := &Run{} args := []string{p.Path} if srcfifo { num := atomic.AddInt64(&srcSeq, 1) - run.Srcfile = path.Join(r.Testdir, fmt.Sprint("source", num)) + run.Srcfile = path.Join(t.Runner.Testdir, fmt.Sprint("source", num)) if err = unix.Mkfifo(run.Srcfile, 0600); err != nil { return nil, err } @@ -193,21 +157,29 @@ func (r *Runner) Exec(p *Program, srcfifo bool, flags []string) (*Run, error) { run.Cmd.Path = p.Path run.Cmd.Args = append(args, flags...) - run.Cmd.Dir = r.Testdir + run.Cmd.Dir = t.Runner.Testdir - return run, run.Cmd.Start() + if serr := run.Cmd.Start(); serr != nil { + return nil, serr + } + t.Go("exec-wait", func (g Goroutine) { + if err := run.Cmd.Wait(); err != nil { + g.Panic(err) + } + g.OK() + }) + return run, nil } func writeFifo(srcfile string, read io.Reader) error { fifo, err := os.OpenFile(srcfile, os.O_WRONLY, 0600) if err != nil { + fifo.Close() return err } if _, err := io.Copy(fifo, read); err != nil { + fifo.Close() return err } - if err := fifo.Close(); err != nil { - return err - } - return nil + return fifo.Close() } diff --git a/xdelta3/go/src/xdelta/tgroup.go b/xdelta3/go/src/xdelta/tgroup.go new file mode 100644 index 0000000..bb34258 --- /dev/null +++ b/xdelta3/go/src/xdelta/tgroup.go @@ -0,0 +1,93 @@ +package xdelta + +import ( + "fmt" + "sync" + "time" +) + +type TestGroup struct { + *Runner + sync.Mutex + running []Goroutine + waitChan <-chan bool +} + +type Goroutine struct { + name string + errChan chan error +} + +func NewTestGroup(r *Runner) (*TestGroup, Goroutine) { + g := Goroutine{"main", make(chan error)} + wc := make(chan bool) + tg := &TestGroup{Runner: r, running: []Goroutine{g}, waitChan: wc} + go waitAll(tg, wc) + return tg, g +} + +func (g *Goroutine) String() string { + return fmt.Sprint("[", g.name, "]") +} + +func (g *Goroutine) OK() { + g.errChan <- nil + _ = <- g.errChan +} + +func (g *Goroutine) Panic(err error) { + g.errChan <- err + _ = <- g.errChan + select {} +} + +func (t *TestGroup) Go(name string, f func(Goroutine)) { + g := Goroutine{name, make(chan error)} + t.Lock() + t.running = append(t.running, g) + t.Unlock() + go f(g) +} + +func (t *TestGroup) Wait(g Goroutine) { + g.OK() + t.Lock() + t.waitChan = nil + wc := t.waitChan + t.Unlock() + _ = <- wc +} + +func waitAll(t *TestGroup, wc chan bool) { + for { + t.Lock() + if len(t.running) == 0 { + t.Unlock() + break + } + runner := t.running[0] + t.running = t.running[1:] + t.Unlock() + + timeout := make(chan bool, 1) + go func() { + time.Sleep(1 * time.Second) + timeout <- true + }() + fmt.Println("Waiting on", runner) + select { + case err := <- runner.errChan: + runner.errChan <- err + if err != nil { + fmt.Println("[G]", runner, err) + } else { + fmt.Println("[G]", runner, "OK") + } + case <- timeout: + t.Lock() + t.running = append(t.running, runner) + t.Unlock() + } + } + wc <- true +} -- cgit v1.2.3