1
0
mirror of https://github.com/pbnjay/grate.git synced 2025-03-04 16:16:03 +02:00

implement concurrent search and sync aggregation

This commit is contained in:
Jeremy Jay 2021-02-14 14:17:25 -05:00
parent d9b725da93
commit 2896a36e3a

View File

@ -2,6 +2,7 @@ package main
import ( import (
"bufio" "bufio"
"crypto/md5"
"flag" "flag"
"fmt" "fmt"
"io" "io"
@ -13,6 +14,7 @@ import (
"runtime" "runtime"
"runtime/pprof" "runtime/pprof"
"strings" "strings"
"sync"
"time" "time"
"github.com/pbnjay/grate" "github.com/pbnjay/grate"
@ -30,10 +32,23 @@ var (
skipBlanks = flag.Bool("b", true, "discard blank rows from the output") skipBlanks = flag.Bool("b", true, "discard blank rows from the output")
cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file") cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file")
memprofile = flag.String("memprofile", "", "write memory profile to file") memprofile = flag.String("memprofile", "", "write memory profile to file")
timeFormat = "2006-01-02 15:04:05"
fstats *os.File
procWG sync.WaitGroup
cleanup = make(chan *output, 100)
outpool = sync.Pool{New: func() interface{} {
return &output{}
}}
) )
type output struct {
f *os.File
b *bufio.Writer
}
func main() { func main() {
timeFormat := "2006-01-02 15:04:05"
flag.Parse() flag.Parse()
if *memprofile != "" { if *memprofile != "" {
@ -66,7 +81,18 @@ func main() {
log.SetOutput(fo) log.SetOutput(fo)
} }
fstats, err := os.OpenFile(*infoFile, os.O_CREATE|os.O_RDWR, 0644) done := make(chan int)
go func() {
for x := range cleanup {
x.b.Flush()
x.f.Close()
outpool.Put(x)
}
done <- 1
}()
var err error
fstats, err = os.OpenFile(*infoFile, os.O_CREATE|os.O_RDWR, 0644)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
@ -78,12 +104,36 @@ func main() {
if pos == 0 { if pos == 0 {
fmt.Fprintf(fstats, "time\tfilename\tsheet\trows\tcolumns\terrors\n") fmt.Fprintf(fstats, "time\tfilename\tsheet\trows\tcolumns\terrors\n")
} }
filenameChan := make(chan string)
// fan out to 1/2 of CPU cores
// (e.g. each file-processor can use 2 cpus)
outMu := &sync.Mutex{}
nparallel := runtime.NumCPU() / 2
procWG.Add(nparallel)
for i := 0; i < nparallel; i++ {
go runProcessor(filenameChan, outMu)
}
for _, fn := range flag.Args() { for _, fn := range flag.Args() {
filenameChan <- fn
}
close(filenameChan)
procWG.Wait()
close(cleanup)
<-done
}
func runProcessor(from chan string, mu *sync.Mutex) {
for fn := range from {
nowFmt := time.Now().Format(timeFormat) nowFmt := time.Now().Format(timeFormat)
results, err := processFile(fn) results, err := processFile(fn)
mu.Lock()
if err != nil { if err != nil {
// returned errors are fatal // returned errors are fatal
fmt.Fprintf(fstats, "%s\t%s\t-\t-\t-\t%s\n", nowFmt, fn, err.Error()) fmt.Fprintf(fstats, "%s\t%s\t-\t-\t-\t%s\n", nowFmt, fn, err.Error())
mu.Unlock()
continue continue
} }
@ -95,7 +145,9 @@ func main() {
fmt.Fprintf(fstats, "%s\t%s\t%s\t%d\t%d\t%s\n", nowFmt, res.Filename, res.SheetName, fmt.Fprintf(fstats, "%s\t%s\t%s\t%d\t%d\t%s\n", nowFmt, res.Filename, res.SheetName,
res.NumRows, res.NumCols, e) res.NumRows, res.NumCols, e)
} }
mu.Unlock()
} }
procWG.Done()
} }
var ( var (
@ -105,27 +157,29 @@ var (
type stats struct { type stats struct {
Filename string Filename string
Hash string
SheetName string SheetName string
NumRows int NumRows int
NumCols int NumCols int
Err error Err error
} }
type Flusher interface {
Flush() error
}
func processFile(fn string) ([]stats, error) { func processFile(fn string) ([]stats, error) {
log.Printf("Opening file '%s' ...", fn) //log.Printf("Opening file '%s' ...", fn)
wb, err := grate.Open(fn) wb, err := grate.Open(fn)
if err != nil { if err != nil {
return nil, err return nil, err
} }
defer wb.Close()
results := []stats{} results := []stats{}
ext := filepath.Ext(fn) ext := filepath.Ext(fn)
fn2 := filepath.Base(strings.TrimSuffix(fn, ext)) fn2 := filepath.Base(strings.TrimSuffix(fn, ext))
subparts := fmt.Sprintf("%x", md5.Sum([]byte(fn2)))
subdir := filepath.Join("results", subparts[:2], subparts[2:4])
os.MkdirAll(subdir, 0755)
log.Printf(subparts[:8]+" Processing file '%s'", fn2)
sheets, err := wb.List() sheets, err := wb.List()
if err != nil { if err != nil {
@ -134,9 +188,10 @@ func processFile(fn string) ([]stats, error) {
for _, s := range sheets { for _, s := range sheets {
ps := stats{ ps := stats{
Filename: fn, Filename: fn,
Hash: subparts[:8],
SheetName: s, SheetName: s,
} }
log.Printf(" Opening Sheet '%s'...", s) log.Printf(subparts[:8]+" Opening Sheet '%s'...", s)
sheet, err := wb.Get(s) sheet, err := wb.Get(s)
if err != nil { if err != nil {
ps.Err = err ps.Err = err
@ -144,7 +199,7 @@ func processFile(fn string) ([]stats, error) {
continue continue
} }
if sheet.IsEmpty() { if sheet.IsEmpty() {
log.Println(" Empty sheet. Skipping.") log.Println(subparts[:8] + " Empty sheet. Skipping.")
results = append(results, ps) results = append(results, ps)
continue continue
} }
@ -152,14 +207,17 @@ func processFile(fn string) ([]stats, error) {
if s == fn { if s == fn {
s2 = "main" s2 = "main"
} }
var ox *output
var w io.Writer = ioutil.Discard var w io.Writer = ioutil.Discard
if !*pretend { if !*pretend {
f, err := os.Create(fn2 + "." + s2 + ".tsv") f, err := os.Create(subdir + "/" + fn2 + "." + s2 + ".tsv")
if err != nil { if err != nil {
return nil, err return nil, err
} }
defer f.Close() ox = outpool.Get().(*output)
w = bufio.NewWriter(f) ox.f = f
ox.b = bufio.NewWriter(f)
w = ox.b
} }
for sheet.Next() { for sheet.Next() {
@ -181,17 +239,20 @@ func processFile(fn string) ([]stats, error) {
} }
} }
if nonblank || !*skipBlanks { if nonblank || !*skipBlanks {
fmt.Fprintln(w, strings.Join(row, "\t")) for i, v := range row {
if i != 0 {
w.Write([]byte{'\t'})
}
w.Write([]byte(v))
}
w.Write([]byte{'\n'})
//fmt.Fprintln(w, strings.Join(row, "\t"))
ps.NumRows++ ps.NumRows++
} }
} }
results = append(results, ps) results = append(results, ps)
if ox != nil {
if ff, ok := w.(Flusher); ok { cleanup <- ox
ff.Flush()
}
if c, ok := w.(io.Closer); ok {
c.Close()
} }
} }
return results, nil return results, nil