From 8c28261fee80292826f0b87d7b9a74692c76b5b7 Mon Sep 17 00:00:00 2001 From: huyng Date: Thu, 15 Aug 2019 14:27:05 +0700 Subject: [PATCH 1/3] Update GetGeoNames and Migration functions. --- internal/geonames/geonames.go | 134 ++++++++++++++++++++++++++++++++++ internal/schema/migrations.go | 98 ++++++++++++++++++++----- 2 files changed, 214 insertions(+), 18 deletions(-) diff --git a/internal/geonames/geonames.go b/internal/geonames/geonames.go index 47a4e48..f184425 100644 --- a/internal/geonames/geonames.go +++ b/internal/geonames/geonames.go @@ -8,10 +8,13 @@ import ( "encoding/csv" "fmt" "io" + "net/http" "strconv" "strings" + "time" "geeks-accelerator/oss/saas-starter-kit/internal/platform/web/webcontext" + "github.com/huandu/go-sqlbuilder" "github.com/jmoiron/sqlx" "github.com/pkg/errors" @@ -325,3 +328,134 @@ func loadGeonameCountry(ctx context.Context, rr chan<- interface{}, country stri } } } + +// GetGeonameCountry downloads geoname data for the country. +// Parses data and returns slice of Geoname +func GetGeonameCountry(ctx context.Context, country string) ([]Geoname, error) { + res := make([]Geoname, 0) + var err error + var resp *http.Response + + u := fmt.Sprintf("http://download.geonames.org/export/zip/%s.zip", country) + resp, err = pester.Get(u) + if err != nil { + for i := 0; i < 3; i++ { + resp, err = pester.Get(u) + if err == nil { + break + } + time.Sleep(time.Second * 1) + } + if err != nil { + err = errors.WithMessagef(err, "Failed to read countries from '%s'", u) + return res, err + } + } + defer resp.Body.Close() + + br := bufio.NewReader(resp.Body) + + buff := bytes.NewBuffer([]byte{}) + size, err := io.Copy(buff, br) + if err != nil { + err = errors.WithStack(err) + return res, err + } + + b := bytes.NewReader(buff.Bytes()) + zr, err := zip.NewReader(b, size) + if err != nil { + err = errors.WithStack(err) + return res, err + } + + for _, f := range zr.File { + if f.Name == "readme.txt" { + continue + } + + fh, err := f.Open() + if err != nil { + err = errors.WithStack(err) + return res, err + } + + scanner := bufio.NewScanner(fh) + for scanner.Scan() { + line := scanner.Text() + + if strings.Contains(line, "\"") { + line = strings.Replace(line, "\"", "\\\"", -1) + } + + r := csv.NewReader(strings.NewReader(line)) + r.Comma = '\t' // Use tab-delimited instead of comma <---- here! + r.LazyQuotes = true + r.FieldsPerRecord = -1 + + lines, err := r.ReadAll() + if err != nil { + err = errors.WithStack(err) + continue + } + + for _, row := range lines { + + /* + fmt.Println("CountryCode: row[0]", row[0]) + fmt.Println("PostalCode: row[1]", row[1]) + fmt.Println("PlaceName: row[2]", row[2]) + fmt.Println("StateName: row[3]", row[3]) + fmt.Println("StateCode : row[4]", row[4]) + fmt.Println("CountyName: row[5]", row[5]) + fmt.Println("CountyCode : row[6]", row[6]) + fmt.Println("CommunityName: row[7]", row[7]) + fmt.Println("CommunityCode: row[8]", row[8]) + fmt.Println("Latitude: row[9]", row[9]) + fmt.Println("Longitude: row[10]", row[10]) + fmt.Println("Accuracy: row[11]", row[11]) + */ + + gn := Geoname{ + CountryCode: row[0], + PostalCode: row[1], + PlaceName: row[2], + StateName: row[3], + StateCode: row[4], + CountyName: row[5], + CountyCode: row[6], + CommunityName: row[7], + CommunityCode: row[8], + } + if row[9] != "" { + gn.Latitude, err = decimal.NewFromString(row[9]) + if err != nil { + err = errors.WithStack(err) + } + } + + if row[10] != "" { + gn.Longitude, err = decimal.NewFromString(row[10]) + if err != nil { + err = errors.WithStack(err) + } + } + + if row[11] != "" { + gn.Accuracy, err = strconv.Atoi(row[11]) + if err != nil { + err = errors.WithStack(err) + } + } + + res = append(res, gn) + } + } + + if err := scanner.Err(); err != nil { + err = errors.WithStack(err) + } + } + + return res, err +} diff --git a/internal/schema/migrations.go b/internal/schema/migrations.go index fe6a3c9..4523de1 100644 --- a/internal/schema/migrations.go +++ b/internal/schema/migrations.go @@ -9,6 +9,10 @@ import ( "strings" "geeks-accelerator/oss/saas-starter-kit/internal/geonames" + + "fmt" + "time" + "github.com/geeks-accelerator/sqlxmigrate" "github.com/jmoiron/sqlx" _ "github.com/lib/pq" @@ -240,33 +244,91 @@ func migrationList(ctx context.Context, db *sqlx.DB, log *log.Logger, isUnittest } } - q := "insert into geonames " + - "(country_code,postal_code,place_name,state_name,state_code,county_name,county_code,community_name,community_code,latitude,longitude,accuracy) " + - "values(?,?,?,?,?,?,?,?,?,?,?,?)" - q = db.Rebind(q) - stmt, err := db.Prepare(q) - if err != nil { - return errors.WithMessagef(err, "Failed to prepare sql query '%s'", q) - } - + countries := geonames.ValidGeonameCountries(context.Background()) if isUnittest { - } else { - resChan := make(chan interface{}) - go geonames.LoadGeonames(ctx, resChan) + } - for r := range resChan { - switch v := r.(type) { - case geonames.Geoname: - _, err = stmt.Exec(v.CountryCode, v.PostalCode, v.PlaceName, v.StateName, v.StateCode, v.CountyName, v.CountyCode, v.CommunityName, v.CommunityCode, v.Latitude, v.Longitude, v.Accuracy) + ncol := 12 + fn := func(geoNames []geonames.Geoname) error { + valueStrings := make([]string, 0, len(geoNames)) + valueArgs := make([]interface{}, 0, len(geoNames)*ncol) + for _, geoname := range geoNames { + valueStrings = append(valueStrings, "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)") + + valueArgs = append(valueArgs, geoname.CountryCode) + valueArgs = append(valueArgs, geoname.PostalCode) + valueArgs = append(valueArgs, geoname.PlaceName) + + valueArgs = append(valueArgs, geoname.StateName) + valueArgs = append(valueArgs, geoname.StateCode) + valueArgs = append(valueArgs, geoname.CountyName) + + valueArgs = append(valueArgs, geoname.CountyCode) + valueArgs = append(valueArgs, geoname.CommunityName) + valueArgs = append(valueArgs, geoname.CommunityCode) + + valueArgs = append(valueArgs, geoname.Latitude) + valueArgs = append(valueArgs, geoname.Longitude) + valueArgs = append(valueArgs, geoname.Accuracy) + } + insertStmt := fmt.Sprintf("insert into geonames "+ + "(country_code,postal_code,place_name,state_name,state_code,county_name,county_code,community_name,community_code,latitude,longitude,accuracy) "+ + "VALUES %s", strings.Join(valueStrings, ",")) + insertStmt = db.Rebind(insertStmt) + + stmt, err := db.Prepare(insertStmt) + if err != nil { + return errors.WithMessagef(err, "Failed to prepare sql query '%s'", insertStmt) + } + + _, err = stmt.Exec(valueArgs...) + return err + } + start := time.Now() + for _, country := range countries { + //fmt.Println("LoadGeonames: start country: ", country) + v, err := geonames.GetGeonameCountry(context.Background(), country) + if err != nil { + return errors.WithStack(err) + } + //fmt.Println("Geoname records: ", len(v)) + + batch := 4500 + n := len(v) / batch + + //fmt.Println("Number of batch: ", n) + + if n == 0 { + err := fn(v) + if err != nil { + return errors.WithStack(err) + } + } else { + for i := 0; i < n; i++ { + vn := v[i*batch : (i+1)*batch] + err := fn(vn) + if err != nil { + return errors.WithStack(err) + } + if n > 0 && n%25 == 0 { + time.Sleep(200) + } + } + if len(v)%batch > 0 { + fmt.Println("Remain part: ", len(v)-n*batch) + vn := v[n*batch:] + err := fn(vn) if err != nil { return errors.WithStack(err) } - case error: - return v } } + + //fmt.Println("Insert Geoname took: ", time.Since(start)) + //fmt.Println("LoadGeonames: end country: ", country) } + fmt.Println("Total Geonames population took: ", time.Since(start)) queries := []string{ `create index idx_geonames_country_code on geonames (country_code)`, From 71713280729d79ee4207a3771cc96d574daa30de Mon Sep 17 00:00:00 2001 From: huyng Date: Thu, 15 Aug 2019 14:40:22 +0700 Subject: [PATCH 2/3] Use ctx param from outer function --- internal/schema/migrations.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/schema/migrations.go b/internal/schema/migrations.go index 4523de1..bab4c17 100644 --- a/internal/schema/migrations.go +++ b/internal/schema/migrations.go @@ -244,7 +244,7 @@ func migrationList(ctx context.Context, db *sqlx.DB, log *log.Logger, isUnittest } } - countries := geonames.ValidGeonameCountries(context.Background()) + countries := geonames.ValidGeonameCountries(ctx) if isUnittest { } From c61a934279b03fb77c87a2afdbff9fe7f7e46d79 Mon Sep 17 00:00:00 2001 From: huyng Date: Thu, 15 Aug 2019 14:46:44 +0700 Subject: [PATCH 3/3] Add more comment --- internal/geonames/geonames.go | 2 ++ internal/schema/migrations.go | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/internal/geonames/geonames.go b/internal/geonames/geonames.go index f184425..ae1c9c8 100644 --- a/internal/geonames/geonames.go +++ b/internal/geonames/geonames.go @@ -339,6 +339,8 @@ func GetGeonameCountry(ctx context.Context, country string) ([]Geoname, error) { u := fmt.Sprintf("http://download.geonames.org/export/zip/%s.zip", country) resp, err = pester.Get(u) if err != nil { + // Add re-try three times after failing first time + // This reduces the risk when network is lagy, we still have chance to re-try. for i := 0; i < 3; i++ { resp, err = pester.Get(u) if err == nil { diff --git a/internal/schema/migrations.go b/internal/schema/migrations.go index bab4c17..2ec3bec 100644 --- a/internal/schema/migrations.go +++ b/internal/schema/migrations.go @@ -293,7 +293,7 @@ func migrationList(ctx context.Context, db *sqlx.DB, log *log.Logger, isUnittest return errors.WithStack(err) } //fmt.Println("Geoname records: ", len(v)) - + // Max argument values of Postgres is about 54460. So the batch size for bulk insert is selected 4500*12 (ncol) batch := 4500 n := len(v) / batch