1
0
mirror of https://github.com/raseels-repos/golang-saas-starter-kit.git synced 2025-08-08 22:36:41 +02:00

Cache geonames download

This commit is contained in:
Lee Brown
2019-08-17 11:15:45 -08:00
parent 81d79c4424
commit 295e46a885
2 changed files with 51 additions and 179 deletions

View File

@ -5,10 +5,13 @@ import (
"bufio" "bufio"
"bytes" "bytes"
"context" "context"
"crypto/md5"
"encoding/csv" "encoding/csv"
"fmt" "fmt"
"io" "io"
"net/http" "net/http"
"os"
"path/filepath"
"strconv" "strconv"
"strings" "strings"
"time" "time"
@ -191,144 +194,6 @@ func FindGeonameRegions(ctx context.Context, dbConn *sqlx.DB, orderBy, where str
return resp, nil return resp, nil
} }
// LoadGeonames enables streaming retrieval of GeoNames. The downloaded results
// will be written to the interface{} resultReceiver channel enabling processing the results while
// they're still being fetched. After all pages have been processed the channel is closed.
// Possible types sent to the channel are limited to:
// - error
// - GeoName
func LoadGeonames(ctx context.Context, rr chan<- interface{}, countries ...string) {
defer close(rr)
if len(countries) == 0 {
countries = ValidGeonameCountries(ctx)
}
for _, country := range countries {
loadGeonameCountry(ctx, rr, country)
}
}
// loadGeonameCountry enables streaming retrieval of GeoNames. The downloaded results
// will be written to the interface{} resultReceiver channel enabling processing the results while
// they're still being fetched.
// Possible types sent to the channel are limited to:
// - error
// - GeoName
func loadGeonameCountry(ctx context.Context, rr chan<- interface{}, country string) {
u := fmt.Sprintf("http://download.geonames.org/export/zip/%s.zip", country)
resp, err := pester.Get(u)
if err != nil {
rr <- errors.WithMessagef(err, "Failed to read countries from '%s'", u)
return
}
defer resp.Body.Close()
br := bufio.NewReader(resp.Body)
buff := bytes.NewBuffer([]byte{})
size, err := io.Copy(buff, br)
if err != nil {
rr <- errors.WithStack(err)
return
}
b := bytes.NewReader(buff.Bytes())
zr, err := zip.NewReader(b, size)
if err != nil {
rr <- errors.WithStack(err)
return
}
for _, f := range zr.File {
if f.Name == "readme.txt" {
continue
}
fh, err := f.Open()
if err != nil {
rr <- errors.WithStack(err)
return
}
scanner := bufio.NewScanner(fh)
for scanner.Scan() {
line := scanner.Text()
if strings.Contains(line, "\"") {
line = strings.Replace(line, "\"", "\\\"", -1)
}
r := csv.NewReader(strings.NewReader(line))
r.Comma = '\t' // Use tab-delimited instead of comma <---- here!
r.LazyQuotes = true
r.FieldsPerRecord = -1
lines, err := r.ReadAll()
if err != nil {
rr <- errors.WithStack(err)
continue
}
for _, row := range lines {
/*
fmt.Println("CountryCode: row[0]", row[0])
fmt.Println("PostalCode: row[1]", row[1])
fmt.Println("PlaceName: row[2]", row[2])
fmt.Println("StateName: row[3]", row[3])
fmt.Println("StateCode : row[4]", row[4])
fmt.Println("CountyName: row[5]", row[5])
fmt.Println("CountyCode : row[6]", row[6])
fmt.Println("CommunityName: row[7]", row[7])
fmt.Println("CommunityCode: row[8]", row[8])
fmt.Println("Latitude: row[9]", row[9])
fmt.Println("Longitude: row[10]", row[10])
fmt.Println("Accuracy: row[11]", row[11])
*/
gn := Geoname{
CountryCode: row[0],
PostalCode: row[1],
PlaceName: row[2],
StateName: row[3],
StateCode: row[4],
CountyName: row[5],
CountyCode: row[6],
CommunityName: row[7],
CommunityCode: row[8],
}
if row[9] != "" {
gn.Latitude, err = decimal.NewFromString(row[9])
if err != nil {
rr <- errors.WithStack(err)
}
}
if row[10] != "" {
gn.Longitude, err = decimal.NewFromString(row[10])
if err != nil {
rr <- errors.WithStack(err)
}
}
if row[11] != "" {
gn.Accuracy, err = strconv.Atoi(row[11])
if err != nil {
rr <- errors.WithStack(err)
}
}
rr <- gn
}
}
if err := scanner.Err(); err != nil {
rr <- errors.WithStack(err)
}
}
}
// GetGeonameCountry downloads geoname data for the country. // GetGeonameCountry downloads geoname data for the country.
// Parses data and returns slice of Geoname // Parses data and returns slice of Geoname
func GetGeonameCountry(ctx context.Context, country string) ([]Geoname, error) { func GetGeonameCountry(ctx context.Context, country string) ([]Geoname, error) {
@ -337,25 +202,51 @@ func GetGeonameCountry(ctx context.Context, country string) ([]Geoname, error) {
var resp *http.Response var resp *http.Response
u := fmt.Sprintf("http://download.geonames.org/export/zip/%s.zip", country) u := fmt.Sprintf("http://download.geonames.org/export/zip/%s.zip", country)
resp, err = pester.Get(u)
if err != nil {
// Add re-try three times after failing first time
// This reduces the risk when network is lagy, we still have chance to re-try.
for i := 0; i < 3; i++ {
resp, err = pester.Get(u)
if err == nil {
break
}
time.Sleep(time.Second * 1)
}
if err != nil {
err = errors.WithMessagef(err, "Failed to read countries from '%s'", u)
return res, err
}
}
defer resp.Body.Close()
br := bufio.NewReader(resp.Body) h := fmt.Sprintf("%x", md5.Sum([]byte(u)))
cp := filepath.Join(os.TempDir(), h+".zip")
if _, err := os.Stat(cp); err != nil {
resp, err = pester.Get(u)
if err != nil {
// Add re-try three times after failing first time
// This reduces the risk when network is lagy, we still have chance to re-try.
for i := 0; i < 3; i++ {
resp, err = pester.Get(u)
if err == nil {
break
}
time.Sleep(time.Second * 1)
}
if err != nil {
err = errors.WithMessagef(err, "Failed to read countries from '%s'", u)
return res, err
}
}
defer resp.Body.Close()
// Create the file
out, err := os.Create(cp)
if err != nil {
return nil, err
}
defer out.Close()
// Write the body to file
_, err = io.Copy(out, resp.Body)
if err != nil {
return nil, err
}
out.Close()
}
f, err := os.Open(cp)
if err != nil {
return nil, err
}
defer f.Close()
br := bufio.NewReader(f)
buff := bytes.NewBuffer([]byte{}) buff := bytes.NewBuffer([]byte{})
size, err := io.Copy(buff, br) size, err := io.Copy(buff, br)
@ -403,21 +294,6 @@ func GetGeonameCountry(ctx context.Context, country string) ([]Geoname, error) {
for _, row := range lines { for _, row := range lines {
/*
fmt.Println("CountryCode: row[0]", row[0])
fmt.Println("PostalCode: row[1]", row[1])
fmt.Println("PlaceName: row[2]", row[2])
fmt.Println("StateName: row[3]", row[3])
fmt.Println("StateCode : row[4]", row[4])
fmt.Println("CountyName: row[5]", row[5])
fmt.Println("CountyCode : row[6]", row[6])
fmt.Println("CommunityName: row[7]", row[7])
fmt.Println("CommunityCode: row[8]", row[8])
fmt.Println("Latitude: row[9]", row[9])
fmt.Println("Longitude: row[10]", row[10])
fmt.Println("Accuracy: row[11]", row[11])
*/
gn := Geoname{ gn := Geoname{
CountryCode: row[0], CountryCode: row[0],
PostalCode: row[1], PostalCode: row[1],

View File

@ -217,7 +217,7 @@ func migrationList(ctx context.Context, db *sqlx.DB, log *log.Logger, isUnittest
}, },
// Load new geonames table. // Load new geonames table.
{ {
ID: "20190731-02h", ID: "20190731-02l",
Migrate: func(tx *sql.Tx) error { Migrate: func(tx *sql.Tx) error {
schemas := []string{ schemas := []string{
@ -246,7 +246,7 @@ func migrationList(ctx context.Context, db *sqlx.DB, log *log.Logger, isUnittest
countries := geonames.ValidGeonameCountries(ctx) countries := geonames.ValidGeonameCountries(ctx)
if isUnittest { if isUnittest {
countries = []string{"US"}
} }
ncol := 12 ncol := 12
@ -287,7 +287,6 @@ func migrationList(ctx context.Context, db *sqlx.DB, log *log.Logger, isUnittest
} }
start := time.Now() start := time.Now()
for _, country := range countries { for _, country := range countries {
//fmt.Println("LoadGeonames: start country: ", country)
v, err := geonames.GetGeonameCountry(context.Background(), country) v, err := geonames.GetGeonameCountry(context.Background(), country)
if err != nil { if err != nil {
return errors.WithStack(err) return errors.WithStack(err)
@ -316,7 +315,7 @@ func migrationList(ctx context.Context, db *sqlx.DB, log *log.Logger, isUnittest
} }
} }
if len(v)%batch > 0 { if len(v)%batch > 0 {
fmt.Println("Remain part: ", len(v)-n*batch) log.Println("Remain part: ", len(v)-n*batch)
vn := v[n*batch:] vn := v[n*batch:]
err := fn(vn) err := fn(vn)
if err != nil { if err != nil {
@ -324,11 +323,8 @@ func migrationList(ctx context.Context, db *sqlx.DB, log *log.Logger, isUnittest
} }
} }
} }
//fmt.Println("Insert Geoname took: ", time.Since(start))
//fmt.Println("LoadGeonames: end country: ", country)
} }
fmt.Println("Total Geonames population took: ", time.Since(start)) log.Println("Total Geonames population took: ", time.Since(start))
queries := []string{ queries := []string{
`create index idx_geonames_country_code on geonames (country_code)`, `create index idx_geonames_country_code on geonames (country_code)`,