1
0
mirror of https://github.com/raseels-repos/golang-saas-starter-kit.git synced 2025-06-04 23:37:49 +02:00

Merge branch 'master' of gitlab.com:geeks-accelerator/oss/saas-starter-kit

This commit is contained in:
Lee Brown 2019-08-17 10:58:59 -08:00
commit 81d79c4424
2 changed files with 216 additions and 18 deletions

View File

@ -8,10 +8,13 @@ import (
"encoding/csv"
"fmt"
"io"
"net/http"
"strconv"
"strings"
"time"
"geeks-accelerator/oss/saas-starter-kit/internal/platform/web/webcontext"
"github.com/huandu/go-sqlbuilder"
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
@ -325,3 +328,136 @@ func loadGeonameCountry(ctx context.Context, rr chan<- interface{}, country stri
}
}
}
// GetGeonameCountry downloads geoname data for the country.
// Parses data and returns slice of Geoname
func GetGeonameCountry(ctx context.Context, country string) ([]Geoname, error) {
res := make([]Geoname, 0)
var err error
var resp *http.Response
u := fmt.Sprintf("http://download.geonames.org/export/zip/%s.zip", country)
resp, err = pester.Get(u)
if err != nil {
// Add re-try three times after failing first time
// This reduces the risk when network is lagy, we still have chance to re-try.
for i := 0; i < 3; i++ {
resp, err = pester.Get(u)
if err == nil {
break
}
time.Sleep(time.Second * 1)
}
if err != nil {
err = errors.WithMessagef(err, "Failed to read countries from '%s'", u)
return res, err
}
}
defer resp.Body.Close()
br := bufio.NewReader(resp.Body)
buff := bytes.NewBuffer([]byte{})
size, err := io.Copy(buff, br)
if err != nil {
err = errors.WithStack(err)
return res, err
}
b := bytes.NewReader(buff.Bytes())
zr, err := zip.NewReader(b, size)
if err != nil {
err = errors.WithStack(err)
return res, err
}
for _, f := range zr.File {
if f.Name == "readme.txt" {
continue
}
fh, err := f.Open()
if err != nil {
err = errors.WithStack(err)
return res, err
}
scanner := bufio.NewScanner(fh)
for scanner.Scan() {
line := scanner.Text()
if strings.Contains(line, "\"") {
line = strings.Replace(line, "\"", "\\\"", -1)
}
r := csv.NewReader(strings.NewReader(line))
r.Comma = '\t' // Use tab-delimited instead of comma <---- here!
r.LazyQuotes = true
r.FieldsPerRecord = -1
lines, err := r.ReadAll()
if err != nil {
err = errors.WithStack(err)
continue
}
for _, row := range lines {
/*
fmt.Println("CountryCode: row[0]", row[0])
fmt.Println("PostalCode: row[1]", row[1])
fmt.Println("PlaceName: row[2]", row[2])
fmt.Println("StateName: row[3]", row[3])
fmt.Println("StateCode : row[4]", row[4])
fmt.Println("CountyName: row[5]", row[5])
fmt.Println("CountyCode : row[6]", row[6])
fmt.Println("CommunityName: row[7]", row[7])
fmt.Println("CommunityCode: row[8]", row[8])
fmt.Println("Latitude: row[9]", row[9])
fmt.Println("Longitude: row[10]", row[10])
fmt.Println("Accuracy: row[11]", row[11])
*/
gn := Geoname{
CountryCode: row[0],
PostalCode: row[1],
PlaceName: row[2],
StateName: row[3],
StateCode: row[4],
CountyName: row[5],
CountyCode: row[6],
CommunityName: row[7],
CommunityCode: row[8],
}
if row[9] != "" {
gn.Latitude, err = decimal.NewFromString(row[9])
if err != nil {
err = errors.WithStack(err)
}
}
if row[10] != "" {
gn.Longitude, err = decimal.NewFromString(row[10])
if err != nil {
err = errors.WithStack(err)
}
}
if row[11] != "" {
gn.Accuracy, err = strconv.Atoi(row[11])
if err != nil {
err = errors.WithStack(err)
}
}
res = append(res, gn)
}
}
if err := scanner.Err(); err != nil {
err = errors.WithStack(err)
}
}
return res, err
}

View File

@ -9,6 +9,10 @@ import (
"strings"
"geeks-accelerator/oss/saas-starter-kit/internal/geonames"
"fmt"
"time"
"github.com/geeks-accelerator/sqlxmigrate"
"github.com/jmoiron/sqlx"
_ "github.com/lib/pq"
@ -240,33 +244,91 @@ func migrationList(ctx context.Context, db *sqlx.DB, log *log.Logger, isUnittest
}
}
q := "insert into geonames " +
"(country_code,postal_code,place_name,state_name,state_code,county_name,county_code,community_name,community_code,latitude,longitude,accuracy) " +
"values(?,?,?,?,?,?,?,?,?,?,?,?)"
q = db.Rebind(q)
stmt, err := db.Prepare(q)
if err != nil {
return errors.WithMessagef(err, "Failed to prepare sql query '%s'", q)
}
countries := geonames.ValidGeonameCountries(ctx)
if isUnittest {
} else {
resChan := make(chan interface{})
go geonames.LoadGeonames(ctx, resChan)
}
for r := range resChan {
switch v := r.(type) {
case geonames.Geoname:
_, err = stmt.Exec(v.CountryCode, v.PostalCode, v.PlaceName, v.StateName, v.StateCode, v.CountyName, v.CountyCode, v.CommunityName, v.CommunityCode, v.Latitude, v.Longitude, v.Accuracy)
ncol := 12
fn := func(geoNames []geonames.Geoname) error {
valueStrings := make([]string, 0, len(geoNames))
valueArgs := make([]interface{}, 0, len(geoNames)*ncol)
for _, geoname := range geoNames {
valueStrings = append(valueStrings, "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
valueArgs = append(valueArgs, geoname.CountryCode)
valueArgs = append(valueArgs, geoname.PostalCode)
valueArgs = append(valueArgs, geoname.PlaceName)
valueArgs = append(valueArgs, geoname.StateName)
valueArgs = append(valueArgs, geoname.StateCode)
valueArgs = append(valueArgs, geoname.CountyName)
valueArgs = append(valueArgs, geoname.CountyCode)
valueArgs = append(valueArgs, geoname.CommunityName)
valueArgs = append(valueArgs, geoname.CommunityCode)
valueArgs = append(valueArgs, geoname.Latitude)
valueArgs = append(valueArgs, geoname.Longitude)
valueArgs = append(valueArgs, geoname.Accuracy)
}
insertStmt := fmt.Sprintf("insert into geonames "+
"(country_code,postal_code,place_name,state_name,state_code,county_name,county_code,community_name,community_code,latitude,longitude,accuracy) "+
"VALUES %s", strings.Join(valueStrings, ","))
insertStmt = db.Rebind(insertStmt)
stmt, err := db.Prepare(insertStmt)
if err != nil {
return errors.WithMessagef(err, "Failed to prepare sql query '%s'", insertStmt)
}
_, err = stmt.Exec(valueArgs...)
return err
}
start := time.Now()
for _, country := range countries {
//fmt.Println("LoadGeonames: start country: ", country)
v, err := geonames.GetGeonameCountry(context.Background(), country)
if err != nil {
return errors.WithStack(err)
}
//fmt.Println("Geoname records: ", len(v))
// Max argument values of Postgres is about 54460. So the batch size for bulk insert is selected 4500*12 (ncol)
batch := 4500
n := len(v) / batch
//fmt.Println("Number of batch: ", n)
if n == 0 {
err := fn(v)
if err != nil {
return errors.WithStack(err)
}
} else {
for i := 0; i < n; i++ {
vn := v[i*batch : (i+1)*batch]
err := fn(vn)
if err != nil {
return errors.WithStack(err)
}
if n > 0 && n%25 == 0 {
time.Sleep(200)
}
}
if len(v)%batch > 0 {
fmt.Println("Remain part: ", len(v)-n*batch)
vn := v[n*batch:]
err := fn(vn)
if err != nil {
return errors.WithStack(err)
}
case error:
return v
}
}
//fmt.Println("Insert Geoname took: ", time.Since(start))
//fmt.Println("LoadGeonames: end country: ", country)
}
fmt.Println("Total Geonames population took: ", time.Since(start))
queries := []string{
`create index idx_geonames_country_code on geonames (country_code)`,