mirror of
https://github.com/go-micro/go-micro.git
synced 2024-12-24 10:07:04 +02:00
Remove cloudflare store
This commit is contained in:
parent
b9a5e9d610
commit
d4b2c948dd
@ -1,236 +0,0 @@
|
||||
package certmagic
|
||||
|
||||
import (
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
"reflect"
|
||||
"sort"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/go-acme/lego/v3/providers/dns/cloudflare"
|
||||
"github.com/mholt/certmagic"
|
||||
"github.com/micro/go-micro/v2/api/server/acme"
|
||||
cfstore "github.com/micro/go-micro/v2/store/cloudflare"
|
||||
"github.com/micro/go-micro/v2/sync/lock/memory"
|
||||
)
|
||||
|
||||
func TestCertMagic(t *testing.T) {
|
||||
if len(os.Getenv("IN_TRAVIS_CI")) != 0 {
|
||||
t.Skip()
|
||||
}
|
||||
l, err := NewProvider().Listen()
|
||||
if err != nil {
|
||||
if _, ok := err.(*net.OpError); ok {
|
||||
t.Skip("Run under non privileged user")
|
||||
}
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
l.Close()
|
||||
|
||||
c := cloudflare.NewDefaultConfig()
|
||||
c.AuthEmail = ""
|
||||
c.AuthKey = ""
|
||||
c.AuthToken = "test"
|
||||
c.ZoneToken = "test"
|
||||
|
||||
p, err := cloudflare.NewDNSProviderConfig(c)
|
||||
if err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
|
||||
l, err = NewProvider(acme.AcceptToS(true),
|
||||
acme.CA(acme.LetsEncryptStagingCA),
|
||||
acme.ChallengeProvider(p),
|
||||
).Listen()
|
||||
|
||||
if err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
l.Close()
|
||||
}
|
||||
|
||||
func TestStorageImplementation(t *testing.T) {
|
||||
if len(os.Getenv("IN_TRAVIS_CI")) != 0 {
|
||||
t.Skip()
|
||||
}
|
||||
|
||||
apiToken, accountID := os.Getenv("CF_API_TOKEN"), os.Getenv("CF_ACCOUNT_ID")
|
||||
kvID := os.Getenv("KV_NAMESPACE_ID")
|
||||
if len(apiToken) == 0 || len(accountID) == 0 || len(kvID) == 0 {
|
||||
t.Skip("No Cloudflare API keys available, skipping test")
|
||||
}
|
||||
|
||||
var s certmagic.Storage
|
||||
st := cfstore.NewStore(
|
||||
cfstore.Token(apiToken),
|
||||
cfstore.Account(accountID),
|
||||
cfstore.Namespace(kvID),
|
||||
)
|
||||
s = &storage{
|
||||
lock: memory.NewLock(),
|
||||
store: st,
|
||||
}
|
||||
|
||||
// Test Lock
|
||||
if err := s.Lock("test"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Test Unlock
|
||||
if err := s.Unlock("test"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Test data
|
||||
testdata := []struct {
|
||||
key string
|
||||
value []byte
|
||||
}{
|
||||
{key: "/foo/a", value: []byte("lorem")},
|
||||
{key: "/foo/b", value: []byte("ipsum")},
|
||||
{key: "/foo/c", value: []byte("dolor")},
|
||||
{key: "/foo/d", value: []byte("sit")},
|
||||
{key: "/bar/a", value: []byte("amet")},
|
||||
{key: "/bar/b", value: []byte("consectetur")},
|
||||
{key: "/bar/c", value: []byte("adipiscing")},
|
||||
{key: "/bar/d", value: []byte("elit")},
|
||||
{key: "/foo/bar/a", value: []byte("sed")},
|
||||
{key: "/foo/bar/b", value: []byte("do")},
|
||||
{key: "/foo/bar/c", value: []byte("eiusmod")},
|
||||
{key: "/foo/bar/d", value: []byte("tempor")},
|
||||
{key: "/foo/bar/baz/a", value: []byte("incididunt")},
|
||||
{key: "/foo/bar/baz/b", value: []byte("ut")},
|
||||
{key: "/foo/bar/baz/c", value: []byte("labore")},
|
||||
{key: "/foo/bar/baz/d", value: []byte("et")},
|
||||
// a duplicate just in case there's any edge cases
|
||||
{key: "/foo/a", value: []byte("lorem")},
|
||||
}
|
||||
|
||||
// Test Store
|
||||
for _, d := range testdata {
|
||||
if err := s.Store(d.key, d.value); err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
// Test Load
|
||||
for _, d := range testdata {
|
||||
if value, err := s.Load(d.key); err != nil {
|
||||
t.Fatal(err.Error())
|
||||
} else {
|
||||
if !reflect.DeepEqual(value, d.value) {
|
||||
t.Fatalf("Load %s: expected %v, got %v", d.key, d.value, value)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Test Exists
|
||||
for _, d := range testdata {
|
||||
if !s.Exists(d.key) {
|
||||
t.Fatalf("%s should exist, but doesn't\n", d.key)
|
||||
}
|
||||
}
|
||||
|
||||
// Test List
|
||||
if list, err := s.List("/", true); err != nil {
|
||||
t.Fatal(err.Error())
|
||||
} else {
|
||||
var expected []string
|
||||
for i, d := range testdata {
|
||||
if i != len(testdata)-1 {
|
||||
// Don't store the intentionally duplicated key
|
||||
expected = append(expected, d.key)
|
||||
}
|
||||
}
|
||||
sort.Strings(expected)
|
||||
sort.Strings(list)
|
||||
if !reflect.DeepEqual(expected, list) {
|
||||
t.Fatalf("List: Expected %v, got %v\n", expected, list)
|
||||
}
|
||||
}
|
||||
if list, err := s.List("/foo", false); err != nil {
|
||||
t.Fatal(err.Error())
|
||||
} else {
|
||||
sort.Strings(list)
|
||||
expected := []string{"/foo/a", "/foo/b", "/foo/bar", "/foo/c", "/foo/d"}
|
||||
if !reflect.DeepEqual(expected, list) {
|
||||
t.Fatalf("List: expected %s, got %s\n", expected, list)
|
||||
}
|
||||
}
|
||||
|
||||
// Test Stat
|
||||
for _, d := range testdata {
|
||||
info, err := s.Stat(d.key)
|
||||
if err != nil {
|
||||
t.Fatal(err.Error())
|
||||
} else {
|
||||
if info.Key != d.key {
|
||||
t.Fatalf("Stat().Key: expected %s, got %s\n", d.key, info.Key)
|
||||
}
|
||||
if info.Size != int64(len(d.value)) {
|
||||
t.Fatalf("Stat().Size: expected %d, got %d\n", len(d.value), info.Size)
|
||||
}
|
||||
if time.Since(info.Modified) > time.Minute {
|
||||
t.Fatalf("Stat().Modified: expected time since last modified to be < 1 minute, got %v\n", time.Since(info.Modified))
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// Test Delete
|
||||
for _, d := range testdata {
|
||||
if err := s.Delete(d.key); err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
// New interface doesn't return an error, so call it in case any log.Fatal
|
||||
// happens
|
||||
NewProvider(acme.Cache(s))
|
||||
}
|
||||
|
||||
// Full test with a real zone, with against LE staging
|
||||
func TestE2e(t *testing.T) {
|
||||
if len(os.Getenv("IN_TRAVIS_CI")) != 0 {
|
||||
t.Skip()
|
||||
}
|
||||
|
||||
apiToken, accountID := os.Getenv("CF_API_TOKEN"), os.Getenv("CF_ACCOUNT_ID")
|
||||
kvID := os.Getenv("KV_NAMESPACE_ID")
|
||||
if len(apiToken) == 0 || len(accountID) == 0 || len(kvID) == 0 {
|
||||
t.Skip("No Cloudflare API keys available, skipping test")
|
||||
}
|
||||
|
||||
testLock := memory.NewLock()
|
||||
testStore := cfstore.NewStore(
|
||||
cfstore.Token(apiToken),
|
||||
cfstore.Account(accountID),
|
||||
cfstore.Namespace(kvID),
|
||||
)
|
||||
testStorage := NewStorage(testLock, testStore)
|
||||
|
||||
conf := cloudflare.NewDefaultConfig()
|
||||
conf.AuthToken = apiToken
|
||||
conf.ZoneToken = apiToken
|
||||
testChallengeProvider, err := cloudflare.NewDNSProviderConfig(conf)
|
||||
if err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
|
||||
testProvider := NewProvider(
|
||||
acme.AcceptToS(true),
|
||||
acme.Cache(testStorage),
|
||||
acme.CA(acme.LetsEncryptStagingCA),
|
||||
acme.ChallengeProvider(testChallengeProvider),
|
||||
acme.OnDemand(false),
|
||||
)
|
||||
|
||||
listener, err := testProvider.Listen("*.micro.mu", "micro.mu")
|
||||
if err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
go http.Serve(listener, http.NotFoundHandler())
|
||||
time.Sleep(10 * time.Minute)
|
||||
}
|
@ -1,411 +0,0 @@
|
||||
// Package cloudflare is a store implementation backed by cloudflare workers kv
|
||||
// Note that the cloudflare workers KV API is eventually consistent.
|
||||
package cloudflare
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"math"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/micro/go-micro/v2/store"
|
||||
"github.com/pkg/errors"
|
||||
|
||||
"github.com/patrickmn/go-cache"
|
||||
)
|
||||
|
||||
const (
|
||||
apiBaseURL = "https://api.cloudflare.com/client/v4/"
|
||||
)
|
||||
|
||||
type workersKV struct {
|
||||
options store.Options
|
||||
// cf account id
|
||||
account string
|
||||
// cf api token
|
||||
token string
|
||||
// cf kv namespace
|
||||
namespace string
|
||||
// http client to use
|
||||
httpClient *http.Client
|
||||
// cache
|
||||
cache *cache.Cache
|
||||
}
|
||||
|
||||
// apiResponse is a cloudflare v4 api response
|
||||
type apiResponse struct {
|
||||
Result []struct {
|
||||
ID string `json:"id"`
|
||||
Type string `json:"type"`
|
||||
Name string `json:"name"`
|
||||
Expiration int64 `json:"expiration"`
|
||||
Content string `json:"content"`
|
||||
Proxiable bool `json:"proxiable"`
|
||||
Proxied bool `json:"proxied"`
|
||||
TTL int64 `json:"ttl"`
|
||||
Priority int64 `json:"priority"`
|
||||
Locked bool `json:"locked"`
|
||||
ZoneID string `json:"zone_id"`
|
||||
ZoneName string `json:"zone_name"`
|
||||
ModifiedOn time.Time `json:"modified_on"`
|
||||
CreatedOn time.Time `json:"created_on"`
|
||||
} `json:"result"`
|
||||
Success bool `json:"success"`
|
||||
Errors []apiMessage `json:"errors"`
|
||||
// not sure Messages is ever populated?
|
||||
Messages []apiMessage `json:"messages"`
|
||||
ResultInfo struct {
|
||||
Page int `json:"page"`
|
||||
PerPage int `json:"per_page"`
|
||||
Count int `json:"count"`
|
||||
TotalCount int `json:"total_count"`
|
||||
} `json:"result_info"`
|
||||
}
|
||||
|
||||
// apiMessage is a Cloudflare v4 API Error
|
||||
type apiMessage struct {
|
||||
Code int `json:"code"`
|
||||
Message string `json:"message"`
|
||||
}
|
||||
|
||||
// getOptions returns account id, token and namespace
|
||||
func getOptions() (string, string, string) {
|
||||
accountID := strings.TrimSpace(os.Getenv("CF_ACCOUNT_ID"))
|
||||
apiToken := strings.TrimSpace(os.Getenv("CF_API_TOKEN"))
|
||||
namespace := strings.TrimSpace(os.Getenv("KV_NAMESPACE_ID"))
|
||||
|
||||
return accountID, apiToken, namespace
|
||||
}
|
||||
|
||||
func validateOptions(account, token, namespace string) {
|
||||
if len(account) == 0 {
|
||||
log.Fatal("Store: CF_ACCOUNT_ID is blank")
|
||||
}
|
||||
|
||||
if len(token) == 0 {
|
||||
log.Fatal("Store: CF_API_TOKEN is blank")
|
||||
}
|
||||
|
||||
if len(namespace) == 0 {
|
||||
log.Fatal("Store: KV_NAMESPACE_ID is blank")
|
||||
}
|
||||
}
|
||||
|
||||
func (w *workersKV) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *workersKV) Init(opts ...store.Option) error {
|
||||
for _, o := range opts {
|
||||
o(&w.options)
|
||||
}
|
||||
if len(w.options.Database) > 0 {
|
||||
w.namespace = w.options.Database
|
||||
}
|
||||
if w.options.Context == nil {
|
||||
w.options.Context = context.TODO()
|
||||
}
|
||||
ttl := w.options.Context.Value("STORE_CACHE_TTL")
|
||||
if ttl != nil {
|
||||
ttlduration, ok := ttl.(time.Duration)
|
||||
if !ok {
|
||||
log.Fatal("STORE_CACHE_TTL from context must be type int64")
|
||||
}
|
||||
w.cache = cache.New(ttlduration, 3*ttlduration)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *workersKV) list(prefix string) ([]string, error) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
path := fmt.Sprintf("accounts/%s/storage/kv/namespaces/%s/keys", w.account, w.namespace)
|
||||
|
||||
body := make(map[string]string)
|
||||
|
||||
if len(prefix) > 0 {
|
||||
body["prefix"] = prefix
|
||||
}
|
||||
|
||||
response, _, _, err := w.request(ctx, http.MethodGet, path, body, make(http.Header))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
a := &apiResponse{}
|
||||
if err := json.Unmarshal(response, a); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if !a.Success {
|
||||
messages := ""
|
||||
for _, m := range a.Errors {
|
||||
messages += strconv.Itoa(m.Code) + " " + m.Message + "\n"
|
||||
}
|
||||
return nil, errors.New(messages)
|
||||
}
|
||||
|
||||
keys := make([]string, 0, len(a.Result))
|
||||
|
||||
for _, r := range a.Result {
|
||||
keys = append(keys, r.Name)
|
||||
}
|
||||
|
||||
return keys, nil
|
||||
}
|
||||
|
||||
// In the cloudflare workers KV implemention, List() doesn't guarantee
|
||||
// anything as the workers API is eventually consistent.
|
||||
func (w *workersKV) List(opts ...store.ListOption) ([]string, error) {
|
||||
keys, err := w.list("")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return keys, nil
|
||||
}
|
||||
|
||||
func (w *workersKV) Read(key string, opts ...store.ReadOption) ([]*store.Record, error) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
var options store.ReadOptions
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
|
||||
keys := []string{key}
|
||||
|
||||
if options.Prefix {
|
||||
k, err := w.list(key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
keys = k
|
||||
}
|
||||
|
||||
//nolint:prealloc
|
||||
var records []*store.Record
|
||||
|
||||
for _, k := range keys {
|
||||
if w.cache != nil {
|
||||
if resp, hit := w.cache.Get(k); hit {
|
||||
if record, ok := resp.(*store.Record); ok {
|
||||
records = append(records, record)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
path := fmt.Sprintf("accounts/%s/storage/kv/namespaces/%s/values/%s", w.account, w.namespace, url.PathEscape(k))
|
||||
response, headers, status, err := w.request(ctx, http.MethodGet, path, nil, make(http.Header))
|
||||
if err != nil {
|
||||
return records, err
|
||||
}
|
||||
if status < 200 || status >= 300 {
|
||||
if status == 404 {
|
||||
return nil, store.ErrNotFound
|
||||
}
|
||||
|
||||
return records, errors.New("Received unexpected Status " + strconv.Itoa(status) + string(response))
|
||||
}
|
||||
record := &store.Record{
|
||||
Key: k,
|
||||
Value: response,
|
||||
}
|
||||
if expiry := headers.Get("Expiration"); len(expiry) != 0 {
|
||||
expiryUnix, err := strconv.ParseInt(expiry, 10, 64)
|
||||
if err != nil {
|
||||
return records, err
|
||||
}
|
||||
record.Expiry = time.Until(time.Unix(expiryUnix, 0))
|
||||
}
|
||||
if w.cache != nil {
|
||||
w.cache.Set(record.Key, record, cache.DefaultExpiration)
|
||||
}
|
||||
records = append(records, record)
|
||||
}
|
||||
|
||||
return records, nil
|
||||
}
|
||||
|
||||
func (w *workersKV) Write(r *store.Record, opts ...store.WriteOption) error {
|
||||
// Set it in local cache, with the global TTL from options
|
||||
if w.cache != nil {
|
||||
w.cache.Set(r.Key, r, cache.DefaultExpiration)
|
||||
}
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
path := fmt.Sprintf("accounts/%s/storage/kv/namespaces/%s/values/%s", w.account, w.namespace, url.PathEscape(r.Key))
|
||||
if r.Expiry != 0 {
|
||||
// Minimum cloudflare TTL is 60 Seconds
|
||||
exp := int(math.Max(60, math.Round(r.Expiry.Seconds())))
|
||||
path = path + "?expiration_ttl=" + strconv.Itoa(exp)
|
||||
}
|
||||
|
||||
headers := make(http.Header)
|
||||
|
||||
resp, _, _, err := w.request(ctx, http.MethodPut, path, r.Value, headers)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
a := &apiResponse{}
|
||||
if err := json.Unmarshal(resp, a); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !a.Success {
|
||||
messages := ""
|
||||
for _, m := range a.Errors {
|
||||
messages += strconv.Itoa(m.Code) + " " + m.Message + "\n"
|
||||
}
|
||||
return errors.New(messages)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *workersKV) Delete(key string, opts ...store.DeleteOption) error {
|
||||
if w.cache != nil {
|
||||
w.cache.Delete(key)
|
||||
}
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
path := fmt.Sprintf("accounts/%s/storage/kv/namespaces/%s/values/%s", w.account, w.namespace, url.PathEscape(key))
|
||||
resp, _, _, err := w.request(ctx, http.MethodDelete, path, nil, make(http.Header))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
a := &apiResponse{}
|
||||
if err := json.Unmarshal(resp, a); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !a.Success {
|
||||
messages := ""
|
||||
for _, m := range a.Errors {
|
||||
messages += strconv.Itoa(m.Code) + " " + m.Message + "\n"
|
||||
}
|
||||
return errors.New(messages)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *workersKV) request(ctx context.Context, method, path string, body interface{}, headers http.Header) ([]byte, http.Header, int, error) {
|
||||
var jsonBody []byte
|
||||
var err error
|
||||
|
||||
if body != nil {
|
||||
if paramBytes, ok := body.([]byte); ok {
|
||||
jsonBody = paramBytes
|
||||
} else {
|
||||
jsonBody, err = json.Marshal(body)
|
||||
if err != nil {
|
||||
return nil, nil, 0, errors.Wrap(err, "error marshalling params to JSON")
|
||||
}
|
||||
}
|
||||
} else {
|
||||
jsonBody = nil
|
||||
}
|
||||
|
||||
var reqBody io.Reader
|
||||
|
||||
if jsonBody != nil {
|
||||
reqBody = bytes.NewReader(jsonBody)
|
||||
}
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, method, apiBaseURL+path, reqBody)
|
||||
if err != nil {
|
||||
return nil, nil, 0, errors.Wrap(err, "error creating new request")
|
||||
}
|
||||
|
||||
for key, value := range headers {
|
||||
req.Header[key] = value
|
||||
}
|
||||
|
||||
// set token if it exists
|
||||
if len(w.token) > 0 {
|
||||
req.Header.Set("Authorization", "Bearer "+w.token)
|
||||
}
|
||||
|
||||
// set the user agent to micro
|
||||
req.Header.Set("User-Agent", "micro/1.0 (https://micro.mu)")
|
||||
|
||||
// Official cloudflare client does exponential backoff here
|
||||
// TODO: retry and use util/backoff
|
||||
resp, err := w.httpClient.Do(req)
|
||||
if err != nil {
|
||||
return nil, nil, 0, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
respBody, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return respBody, resp.Header, resp.StatusCode, err
|
||||
}
|
||||
|
||||
return respBody, resp.Header, resp.StatusCode, nil
|
||||
}
|
||||
|
||||
func (w *workersKV) String() string {
|
||||
return "cloudflare"
|
||||
}
|
||||
|
||||
func (w *workersKV) Options() store.Options {
|
||||
return w.options
|
||||
}
|
||||
|
||||
// NewStore returns a cloudflare Store implementation.
|
||||
// Account ID, Token and Namespace must either be passed as options or
|
||||
// environment variables. If set as env vars we expect the following;
|
||||
// CF_API_TOKEN to a cloudflare API token scoped to Workers KV.
|
||||
// CF_ACCOUNT_ID to contain a string with your cloudflare account ID.
|
||||
// KV_NAMESPACE_ID to contain the namespace UUID for your KV storage.
|
||||
func NewStore(opts ...store.Option) store.Store {
|
||||
var options store.Options
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
|
||||
// get options from environment
|
||||
account, token, namespace := getOptions()
|
||||
|
||||
if len(account) == 0 {
|
||||
account = getAccount(options.Context)
|
||||
}
|
||||
|
||||
if len(token) == 0 {
|
||||
token = getToken(options.Context)
|
||||
}
|
||||
|
||||
if len(namespace) == 0 {
|
||||
namespace = options.Database
|
||||
}
|
||||
|
||||
// validate options are not blank or log.Fatal
|
||||
validateOptions(account, token, namespace)
|
||||
|
||||
return &workersKV{
|
||||
account: account,
|
||||
namespace: namespace,
|
||||
token: token,
|
||||
options: options,
|
||||
httpClient: &http.Client{},
|
||||
}
|
||||
}
|
@ -1,95 +0,0 @@
|
||||
package cloudflare
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
"os"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/micro/go-micro/v2/store"
|
||||
)
|
||||
|
||||
func TestCloudflare(t *testing.T) {
|
||||
if len(os.Getenv("IN_TRAVIS_CI")) != 0 {
|
||||
t.Skip()
|
||||
}
|
||||
|
||||
apiToken, accountID := os.Getenv("CF_API_TOKEN"), os.Getenv("CF_ACCOUNT_ID")
|
||||
kvID := os.Getenv("KV_NAMESPACE_ID")
|
||||
if len(apiToken) == 0 || len(accountID) == 0 || len(kvID) == 0 {
|
||||
t.Skip("No Cloudflare API keys available, skipping test")
|
||||
}
|
||||
rand.Seed(time.Now().UnixNano())
|
||||
randomK := strconv.Itoa(rand.Int())
|
||||
randomV := strconv.Itoa(rand.Int())
|
||||
|
||||
wkv := NewStore(
|
||||
Token(apiToken),
|
||||
Account(accountID),
|
||||
Namespace(kvID),
|
||||
CacheTTL(60000000000),
|
||||
)
|
||||
|
||||
records, err := wkv.List()
|
||||
if err != nil {
|
||||
t.Fatalf("List: %s\n", err.Error())
|
||||
} else {
|
||||
if len(os.Getenv("IN_TRAVIS_CI")) == 0 {
|
||||
t.Log("Listed " + strconv.Itoa(len(records)) + " records")
|
||||
}
|
||||
}
|
||||
|
||||
err = wkv.Write(&store.Record{
|
||||
Key: randomK,
|
||||
Value: []byte(randomV),
|
||||
})
|
||||
if err != nil {
|
||||
t.Errorf("Write: %s", err.Error())
|
||||
}
|
||||
err = wkv.Write(&store.Record{
|
||||
Key: "expirationtest",
|
||||
Value: []byte("This message will self destruct"),
|
||||
Expiry: 75 * time.Second,
|
||||
})
|
||||
if err != nil {
|
||||
t.Errorf("Write: %s", err.Error())
|
||||
}
|
||||
|
||||
// This might be needed for cloudflare eventual consistency
|
||||
time.Sleep(1 * time.Minute)
|
||||
|
||||
r, err := wkv.Read(randomK)
|
||||
if err != nil {
|
||||
t.Errorf("Read: %s\n", err.Error())
|
||||
}
|
||||
if len(r) != 1 {
|
||||
t.Errorf("Expected to read 1 key, got %d keys\n", len(r))
|
||||
}
|
||||
if string(r[0].Value) != randomV {
|
||||
t.Errorf("Read: expected %s, got %s\n", randomK, string(r[0].Value))
|
||||
}
|
||||
|
||||
r, err = wkv.Read("expirationtest")
|
||||
if err != nil {
|
||||
t.Errorf("Read: expirationtest should still exist")
|
||||
}
|
||||
if r[0].Expiry == 0 {
|
||||
t.Error("Expected r to have an expiry")
|
||||
} else {
|
||||
t.Log(r[0].Expiry)
|
||||
}
|
||||
|
||||
time.Sleep(20 * time.Second)
|
||||
r, err = wkv.Read("expirationtest")
|
||||
if err == nil && len(r) != 0 {
|
||||
t.Error("Read: Managed to read expirationtest, but it should have expired")
|
||||
t.Log(err, r[0].Key, string(r[0].Value), r[0].Expiry, len(r))
|
||||
}
|
||||
|
||||
err = wkv.Delete(randomK)
|
||||
if err != nil {
|
||||
t.Errorf("Delete: %s\n", err.Error())
|
||||
}
|
||||
|
||||
}
|
@ -1,64 +0,0 @@
|
||||
package cloudflare
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/micro/go-micro/v2/store"
|
||||
)
|
||||
|
||||
func getOption(ctx context.Context, key string) string {
|
||||
if ctx == nil {
|
||||
return ""
|
||||
}
|
||||
val, ok := ctx.Value(key).(string)
|
||||
if !ok {
|
||||
return ""
|
||||
}
|
||||
return val
|
||||
}
|
||||
|
||||
func getToken(ctx context.Context) string {
|
||||
return getOption(ctx, "CF_API_TOKEN")
|
||||
}
|
||||
|
||||
func getAccount(ctx context.Context) string {
|
||||
return getOption(ctx, "CF_ACCOUNT_ID")
|
||||
}
|
||||
|
||||
// Token sets the cloudflare api token
|
||||
func Token(t string) store.Option {
|
||||
return func(o *store.Options) {
|
||||
if o.Context == nil {
|
||||
o.Context = context.Background()
|
||||
}
|
||||
o.Context = context.WithValue(o.Context, "CF_API_TOKEN", t)
|
||||
}
|
||||
}
|
||||
|
||||
// Account sets the cloudflare account id
|
||||
func Account(id string) store.Option {
|
||||
return func(o *store.Options) {
|
||||
if o.Context == nil {
|
||||
o.Context = context.Background()
|
||||
}
|
||||
o.Context = context.WithValue(o.Context, "CF_ACCOUNT_ID", id)
|
||||
}
|
||||
}
|
||||
|
||||
// Namespace sets the KV namespace
|
||||
func Namespace(ns string) store.Option {
|
||||
return func(o *store.Options) {
|
||||
o.Database = ns
|
||||
}
|
||||
}
|
||||
|
||||
// CacheTTL sets the timeout in nanoseconds of the read/write cache
|
||||
func CacheTTL(ttl time.Duration) store.Option {
|
||||
return func(o *store.Options) {
|
||||
if o.Context == nil {
|
||||
o.Context = context.Background()
|
||||
}
|
||||
o.Context = context.WithValue(o.Context, "STORE_CACHE_TTL", ttl)
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user