From 2896a36e3aa9cfc8f3429f36253de24556baf61e Mon Sep 17 00:00:00 2001 From: Jeremy Jay Date: Sun, 14 Feb 2021 14:17:25 -0500 Subject: [PATCH] implement concurrent search and sync aggregation --- cmd/grate2tsv/main.go | 99 ++++++++++++++++++++++++++++++++++--------- 1 file changed, 80 insertions(+), 19 deletions(-) diff --git a/cmd/grate2tsv/main.go b/cmd/grate2tsv/main.go index 762c640..9801f1d 100644 --- a/cmd/grate2tsv/main.go +++ b/cmd/grate2tsv/main.go @@ -2,6 +2,7 @@ package main import ( "bufio" + "crypto/md5" "flag" "fmt" "io" @@ -13,6 +14,7 @@ import ( "runtime" "runtime/pprof" "strings" + "sync" "time" "github.com/pbnjay/grate" @@ -30,10 +32,23 @@ var ( skipBlanks = flag.Bool("b", true, "discard blank rows from the output") cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file") memprofile = flag.String("memprofile", "", "write memory profile to file") + + timeFormat = "2006-01-02 15:04:05" + fstats *os.File + + procWG sync.WaitGroup + cleanup = make(chan *output, 100) + outpool = sync.Pool{New: func() interface{} { + return &output{} + }} ) +type output struct { + f *os.File + b *bufio.Writer +} + func main() { - timeFormat := "2006-01-02 15:04:05" flag.Parse() if *memprofile != "" { @@ -66,7 +81,18 @@ func main() { log.SetOutput(fo) } - fstats, err := os.OpenFile(*infoFile, os.O_CREATE|os.O_RDWR, 0644) + done := make(chan int) + go func() { + for x := range cleanup { + x.b.Flush() + x.f.Close() + outpool.Put(x) + } + done <- 1 + }() + + var err error + fstats, err = os.OpenFile(*infoFile, os.O_CREATE|os.O_RDWR, 0644) if err != nil { log.Fatal(err) } @@ -78,12 +104,36 @@ func main() { if pos == 0 { fmt.Fprintf(fstats, "time\tfilename\tsheet\trows\tcolumns\terrors\n") } + + filenameChan := make(chan string) + + // fan out to 1/2 of CPU cores + // (e.g. each file-processor can use 2 cpus) + outMu := &sync.Mutex{} + nparallel := runtime.NumCPU() / 2 + procWG.Add(nparallel) + for i := 0; i < nparallel; i++ { + go runProcessor(filenameChan, outMu) + } for _, fn := range flag.Args() { + filenameChan <- fn + } + + close(filenameChan) + procWG.Wait() + close(cleanup) + <-done +} + +func runProcessor(from chan string, mu *sync.Mutex) { + for fn := range from { nowFmt := time.Now().Format(timeFormat) results, err := processFile(fn) + mu.Lock() if err != nil { // returned errors are fatal fmt.Fprintf(fstats, "%s\t%s\t-\t-\t-\t%s\n", nowFmt, fn, err.Error()) + mu.Unlock() continue } @@ -95,7 +145,9 @@ func main() { fmt.Fprintf(fstats, "%s\t%s\t%s\t%d\t%d\t%s\n", nowFmt, res.Filename, res.SheetName, res.NumRows, res.NumCols, e) } + mu.Unlock() } + procWG.Done() } var ( @@ -105,27 +157,29 @@ var ( type stats struct { Filename string + Hash string SheetName string NumRows int NumCols int Err error } -type Flusher interface { - Flush() error -} - func processFile(fn string) ([]stats, error) { - log.Printf("Opening file '%s' ...", fn) + //log.Printf("Opening file '%s' ...", fn) wb, err := grate.Open(fn) if err != nil { return nil, err } + defer wb.Close() results := []stats{} ext := filepath.Ext(fn) fn2 := filepath.Base(strings.TrimSuffix(fn, ext)) + subparts := fmt.Sprintf("%x", md5.Sum([]byte(fn2))) + subdir := filepath.Join("results", subparts[:2], subparts[2:4]) + os.MkdirAll(subdir, 0755) + log.Printf(subparts[:8]+" Processing file '%s'", fn2) sheets, err := wb.List() if err != nil { @@ -134,9 +188,10 @@ func processFile(fn string) ([]stats, error) { for _, s := range sheets { ps := stats{ Filename: fn, + Hash: subparts[:8], SheetName: s, } - log.Printf(" Opening Sheet '%s'...", s) + log.Printf(subparts[:8]+" Opening Sheet '%s'...", s) sheet, err := wb.Get(s) if err != nil { ps.Err = err @@ -144,7 +199,7 @@ func processFile(fn string) ([]stats, error) { continue } if sheet.IsEmpty() { - log.Println(" Empty sheet. Skipping.") + log.Println(subparts[:8] + " Empty sheet. Skipping.") results = append(results, ps) continue } @@ -152,14 +207,17 @@ func processFile(fn string) ([]stats, error) { if s == fn { s2 = "main" } + var ox *output var w io.Writer = ioutil.Discard if !*pretend { - f, err := os.Create(fn2 + "." + s2 + ".tsv") + f, err := os.Create(subdir + "/" + fn2 + "." + s2 + ".tsv") if err != nil { return nil, err } - defer f.Close() - w = bufio.NewWriter(f) + ox = outpool.Get().(*output) + ox.f = f + ox.b = bufio.NewWriter(f) + w = ox.b } for sheet.Next() { @@ -181,17 +239,20 @@ func processFile(fn string) ([]stats, error) { } } if nonblank || !*skipBlanks { - fmt.Fprintln(w, strings.Join(row, "\t")) + for i, v := range row { + if i != 0 { + w.Write([]byte{'\t'}) + } + w.Write([]byte(v)) + } + w.Write([]byte{'\n'}) + //fmt.Fprintln(w, strings.Join(row, "\t")) ps.NumRows++ } } results = append(results, ps) - - if ff, ok := w.(Flusher); ok { - ff.Flush() - } - if c, ok := w.(io.Closer); ok { - c.Close() + if ox != nil { + cleanup <- ox } } return results, nil