diff --git a/internal/geonames/geonames.go b/internal/geonames/geonames.go index ae1c9c8..c7a7bc9 100644 --- a/internal/geonames/geonames.go +++ b/internal/geonames/geonames.go @@ -5,10 +5,13 @@ import ( "bufio" "bytes" "context" + "crypto/md5" "encoding/csv" "fmt" "io" "net/http" + "os" + "path/filepath" "strconv" "strings" "time" @@ -191,144 +194,6 @@ func FindGeonameRegions(ctx context.Context, dbConn *sqlx.DB, orderBy, where str return resp, nil } -// LoadGeonames enables streaming retrieval of GeoNames. The downloaded results -// will be written to the interface{} resultReceiver channel enabling processing the results while -// they're still being fetched. After all pages have been processed the channel is closed. -// Possible types sent to the channel are limited to: -// - error -// - GeoName -func LoadGeonames(ctx context.Context, rr chan<- interface{}, countries ...string) { - defer close(rr) - - if len(countries) == 0 { - countries = ValidGeonameCountries(ctx) - } - - for _, country := range countries { - loadGeonameCountry(ctx, rr, country) - } -} - -// loadGeonameCountry enables streaming retrieval of GeoNames. The downloaded results -// will be written to the interface{} resultReceiver channel enabling processing the results while -// they're still being fetched. -// Possible types sent to the channel are limited to: -// - error -// - GeoName -func loadGeonameCountry(ctx context.Context, rr chan<- interface{}, country string) { - u := fmt.Sprintf("http://download.geonames.org/export/zip/%s.zip", country) - resp, err := pester.Get(u) - if err != nil { - rr <- errors.WithMessagef(err, "Failed to read countries from '%s'", u) - return - } - defer resp.Body.Close() - - br := bufio.NewReader(resp.Body) - - buff := bytes.NewBuffer([]byte{}) - size, err := io.Copy(buff, br) - if err != nil { - rr <- errors.WithStack(err) - return - } - - b := bytes.NewReader(buff.Bytes()) - zr, err := zip.NewReader(b, size) - if err != nil { - rr <- errors.WithStack(err) - return - } - - for _, f := range zr.File { - if f.Name == "readme.txt" { - continue - } - - fh, err := f.Open() - if err != nil { - rr <- errors.WithStack(err) - return - } - - scanner := bufio.NewScanner(fh) - for scanner.Scan() { - line := scanner.Text() - - if strings.Contains(line, "\"") { - line = strings.Replace(line, "\"", "\\\"", -1) - } - - r := csv.NewReader(strings.NewReader(line)) - r.Comma = '\t' // Use tab-delimited instead of comma <---- here! - r.LazyQuotes = true - r.FieldsPerRecord = -1 - - lines, err := r.ReadAll() - if err != nil { - rr <- errors.WithStack(err) - continue - } - - for _, row := range lines { - - /* - fmt.Println("CountryCode: row[0]", row[0]) - fmt.Println("PostalCode: row[1]", row[1]) - fmt.Println("PlaceName: row[2]", row[2]) - fmt.Println("StateName: row[3]", row[3]) - fmt.Println("StateCode : row[4]", row[4]) - fmt.Println("CountyName: row[5]", row[5]) - fmt.Println("CountyCode : row[6]", row[6]) - fmt.Println("CommunityName: row[7]", row[7]) - fmt.Println("CommunityCode: row[8]", row[8]) - fmt.Println("Latitude: row[9]", row[9]) - fmt.Println("Longitude: row[10]", row[10]) - fmt.Println("Accuracy: row[11]", row[11]) - */ - - gn := Geoname{ - CountryCode: row[0], - PostalCode: row[1], - PlaceName: row[2], - StateName: row[3], - StateCode: row[4], - CountyName: row[5], - CountyCode: row[6], - CommunityName: row[7], - CommunityCode: row[8], - } - if row[9] != "" { - gn.Latitude, err = decimal.NewFromString(row[9]) - if err != nil { - rr <- errors.WithStack(err) - } - } - - if row[10] != "" { - gn.Longitude, err = decimal.NewFromString(row[10]) - if err != nil { - rr <- errors.WithStack(err) - } - } - - if row[11] != "" { - gn.Accuracy, err = strconv.Atoi(row[11]) - if err != nil { - rr <- errors.WithStack(err) - } - } - - rr <- gn - } - } - - if err := scanner.Err(); err != nil { - rr <- errors.WithStack(err) - } - } -} - // GetGeonameCountry downloads geoname data for the country. // Parses data and returns slice of Geoname func GetGeonameCountry(ctx context.Context, country string) ([]Geoname, error) { @@ -337,25 +202,51 @@ func GetGeonameCountry(ctx context.Context, country string) ([]Geoname, error) { var resp *http.Response u := fmt.Sprintf("http://download.geonames.org/export/zip/%s.zip", country) - resp, err = pester.Get(u) - if err != nil { - // Add re-try three times after failing first time - // This reduces the risk when network is lagy, we still have chance to re-try. - for i := 0; i < 3; i++ { - resp, err = pester.Get(u) - if err == nil { - break - } - time.Sleep(time.Second * 1) - } - if err != nil { - err = errors.WithMessagef(err, "Failed to read countries from '%s'", u) - return res, err - } - } - defer resp.Body.Close() - br := bufio.NewReader(resp.Body) + h := fmt.Sprintf("%x", md5.Sum([]byte(u))) + cp := filepath.Join(os.TempDir(), h+".zip") + + if _, err := os.Stat(cp); err != nil { + resp, err = pester.Get(u) + if err != nil { + // Add re-try three times after failing first time + // This reduces the risk when network is lagy, we still have chance to re-try. + for i := 0; i < 3; i++ { + resp, err = pester.Get(u) + if err == nil { + break + } + time.Sleep(time.Second * 1) + } + if err != nil { + err = errors.WithMessagef(err, "Failed to read countries from '%s'", u) + return res, err + } + } + defer resp.Body.Close() + + // Create the file + out, err := os.Create(cp) + if err != nil { + return nil, err + } + defer out.Close() + + // Write the body to file + _, err = io.Copy(out, resp.Body) + if err != nil { + return nil, err + } + + out.Close() + } + + f, err := os.Open(cp) + if err != nil { + return nil, err + } + defer f.Close() + br := bufio.NewReader(f) buff := bytes.NewBuffer([]byte{}) size, err := io.Copy(buff, br) @@ -403,21 +294,6 @@ func GetGeonameCountry(ctx context.Context, country string) ([]Geoname, error) { for _, row := range lines { - /* - fmt.Println("CountryCode: row[0]", row[0]) - fmt.Println("PostalCode: row[1]", row[1]) - fmt.Println("PlaceName: row[2]", row[2]) - fmt.Println("StateName: row[3]", row[3]) - fmt.Println("StateCode : row[4]", row[4]) - fmt.Println("CountyName: row[5]", row[5]) - fmt.Println("CountyCode : row[6]", row[6]) - fmt.Println("CommunityName: row[7]", row[7]) - fmt.Println("CommunityCode: row[8]", row[8]) - fmt.Println("Latitude: row[9]", row[9]) - fmt.Println("Longitude: row[10]", row[10]) - fmt.Println("Accuracy: row[11]", row[11]) - */ - gn := Geoname{ CountryCode: row[0], PostalCode: row[1], diff --git a/internal/schema/migrations.go b/internal/schema/migrations.go index 2ec3bec..3fa9e87 100644 --- a/internal/schema/migrations.go +++ b/internal/schema/migrations.go @@ -217,7 +217,7 @@ func migrationList(ctx context.Context, db *sqlx.DB, log *log.Logger, isUnittest }, // Load new geonames table. { - ID: "20190731-02h", + ID: "20190731-02l", Migrate: func(tx *sql.Tx) error { schemas := []string{ @@ -246,7 +246,7 @@ func migrationList(ctx context.Context, db *sqlx.DB, log *log.Logger, isUnittest countries := geonames.ValidGeonameCountries(ctx) if isUnittest { - + countries = []string{"US"} } ncol := 12 @@ -287,7 +287,6 @@ func migrationList(ctx context.Context, db *sqlx.DB, log *log.Logger, isUnittest } start := time.Now() for _, country := range countries { - //fmt.Println("LoadGeonames: start country: ", country) v, err := geonames.GetGeonameCountry(context.Background(), country) if err != nil { return errors.WithStack(err) @@ -316,7 +315,7 @@ func migrationList(ctx context.Context, db *sqlx.DB, log *log.Logger, isUnittest } } if len(v)%batch > 0 { - fmt.Println("Remain part: ", len(v)-n*batch) + log.Println("Remain part: ", len(v)-n*batch) vn := v[n*batch:] err := fn(vn) if err != nil { @@ -324,11 +323,8 @@ func migrationList(ctx context.Context, db *sqlx.DB, log *log.Logger, isUnittest } } } - - //fmt.Println("Insert Geoname took: ", time.Since(start)) - //fmt.Println("LoadGeonames: end country: ", country) } - fmt.Println("Total Geonames population took: ", time.Since(start)) + log.Println("Total Geonames population took: ", time.Since(start)) queries := []string{ `create index idx_geonames_country_code on geonames (country_code)`,