From 442e05a4e129b7c669efde51080e5d03704ba717 Mon Sep 17 00:00:00 2001 From: Brad Rydzewski Date: Sun, 12 Mar 2017 00:46:59 -0800 Subject: [PATCH] mask secrets, backport drone_ variables --- drone/agent/exp.go | 26 +++-- server/hook2.go | 40 ++++++- server/rpc.go | 17 ++- .../cncd/pipeline/pipeline/backend/types.go | 9 ++ .../pipeline/pipeline/frontend/metadata.go | 107 ++++++++++++++++-- .../frontend/yaml/compiler/convert.go | 7 -- .../pipeline/frontend/yaml/compiler/option.go | 11 ++ .../pipeline/pipeline/multipart/reader.go | 47 ++++---- .../cncd/pipeline/pipeline/rpc/client.go | 16 +-- .../cncd/pipeline/pipeline/rpc/line.go | 49 ++++++-- .../cncd/pipeline/pipeline/rpc/peer.go | 13 ++- .../cncd/pipeline/pipeline/rpc/server.go | 16 ++- vendor/vendor.json | 54 ++++----- 13 files changed, 297 insertions(+), 115 deletions(-) diff --git a/drone/agent/exp.go b/drone/agent/exp.go index 39f2fde4c..e3329dfee 100644 --- a/drone/agent/exp.go +++ b/drone/agent/exp.go @@ -26,6 +26,11 @@ func loop(c *cli.Context) error { if err != nil { return err } + filter := rpc.Filter{ + Labels: map[string]string{ + "platform": c.String("platform"), + }, + } client, err := rpc.NewClient( endpoint.String(), @@ -62,7 +67,7 @@ func loop(c *cli.Context) error { if sigterm.IsSet() { return } - if err := run(ctx, client); err != nil { + if err := run(ctx, client, filter); err != nil { log.Printf("build runner encountered error: exiting: %s", err) return } @@ -74,11 +79,16 @@ func loop(c *cli.Context) error { return nil } -func run(ctx context.Context, client rpc.Peer) error { +const ( + maxFileUpload = 5000000 + maxLogsUpload = 5000000 +) + +func run(ctx context.Context, client rpc.Peer, filter rpc.Filter) error { log.Println("pipeline: request next execution") // get the next job from the queue - work, err := client.Next(ctx) + work, err := client.Next(ctx, filter) if err != nil { return err } @@ -103,9 +113,9 @@ func run(ctx context.Context, client rpc.Peer) error { cancelled := abool.New() go func() { - if err := client.Wait(ctx, work.ID); err != nil { + if werr := client.Wait(ctx, work.ID); err != nil { cancelled.SetTo(true) - log.Printf("pipeline: cancel signal received: %s: %s", work.ID, err) + log.Printf("pipeline: cancel signal received: %s: %s", work.ID, werr) cancel() } else { log.Printf("pipeline: cancel channel closed: %s", work.ID) @@ -140,7 +150,8 @@ func run(ctx context.Context, client rpc.Peer) error { } uploads.Add(1) writer := rpc.NewLineWriter(client, work.ID, proc.Alias) - io.Copy(writer, part) + rlimit := io.LimitReader(part, maxLogsUpload) + io.Copy(writer, rlimit) defer func() { log.Printf("pipeline: finish uploading logs: %s: step %s", work.ID, proc.Alias) @@ -151,8 +162,9 @@ func run(ctx context.Context, client rpc.Peer) error { if rerr != nil { return nil } + rlimit = io.LimitReader(part, maxFileUpload) mime := part.Header().Get("Content-Type") - if serr := client.Save(context.Background(), work.ID, mime, part); serr != nil { + if serr := client.Upload(context.Background(), work.ID, mime, rlimit); serr != nil { log.Printf("pipeline: cannot upload artifact: %s: %s: %s", work.ID, mime, serr) } return nil diff --git a/server/hook2.go b/server/hook2.go index 77dcd771c..5c435078f 100644 --- a/server/hook2.go +++ b/server/hook2.go @@ -249,7 +249,7 @@ func PostHook2(c *gin.Context) { for _, job := range jobs { - metadata := metadataFromStruct(repo, build, last, job, "linux/amd64") + metadata := metadataFromStruct(repo, build, last, job, httputil.GetURL(c.Request)) environ := metadata.Environ() secrets := map[string]string{} @@ -296,6 +296,7 @@ func PostHook2(c *gin.Context) { ir := compiler.New( compiler.WithEnviron(environ), + // TODO ability to customize the escalated plugins compiler.WithEscalated("plugins/docker", "plugins/gcr", "plugins/ecr"), compiler.WithLocal(false), compiler.WithNetrc(netrc.Login, netrc.Password, netrc.Machine), @@ -306,17 +307,46 @@ func PostHook2(c *gin.Context) { time.Now().Unix(), ), ), + compiler.WithEnviron(job.Environment), compiler.WithProxy(), - compiler.WithVolumes(), // todo set global volumes + // TODO ability to set global volumes for things like certs + compiler.WithVolumes(), compiler.WithWorkspaceFromURL("/drone", repo.Link), ).Compile(parsed) + // TODO there is a chicken and egg problem here because + // the compiled yaml has a platform environment variable + // that is not correctly set, because we are just about + // to set it .... + // TODO maybe we remove platform from metadata and let + // the compiler set the value from the yaml itself. + if parsed.Platform == "" { + parsed.Platform = "linux/amd64" + } + + for _, sec := range secs { + if !sec.MatchEvent(build.Event) { + continue + } + if build.Verified || sec.SkipVerify { + ir.Secrets = append(ir.Secrets, &backend.Secret{ + Mask: sec.Conceal, + Name: sec.Name, + Value: sec.Value, + }) + } + } + task := new(queue.Task) task.ID = fmt.Sprint(job.ID) task.Labels = map[string]string{} - task.Labels["platform"] = "linux/amd64" - // TODO set proper platform - // TODO set proper labels + task.Labels["platform"] = parsed.Platform + if parsed.Labels != nil { + for k, v := range parsed.Labels { + task.Labels[k] = v + } + } + task.Data, _ = json.Marshal(rpc.Pipeline{ ID: fmt.Sprint(job.ID), Config: ir, diff --git a/server/rpc.go b/server/rpc.go index 26876bc66..30adaa1b7 100644 --- a/server/rpc.go +++ b/server/rpc.go @@ -78,9 +78,16 @@ type RPC struct { } // Next implements the rpc.Next function -func (s *RPC) Next(c context.Context) (*rpc.Pipeline, error) { - filter := func(*queue.Task) bool { return true } - task, err := s.queue.Poll(c, filter) +func (s *RPC) Next(c context.Context, filter rpc.Filter) (*rpc.Pipeline, error) { + fn := func(task *queue.Task) bool { + for k, v := range filter.Labels { + if task.Labels[k] != v { + return false + } + } + return true + } + task, err := s.queue.Poll(c, fn) if err != nil { return nil, err } else if task == nil { @@ -207,8 +214,8 @@ func (s *RPC) Update(c context.Context, id string, state rpc.State) error { return nil } -// Save implements the rpc.Save function -func (s *RPC) Save(c context.Context, id, mime string, file io.Reader) error { return nil } +// Upload implements the rpc.Upload function +func (s *RPC) Upload(c context.Context, id, mime string, file io.Reader) error { return nil } // Done implements the rpc.Done function func (s *RPC) Done(c context.Context, id string) error { return nil } diff --git a/vendor/github.com/cncd/pipeline/pipeline/backend/types.go b/vendor/github.com/cncd/pipeline/pipeline/backend/types.go index df0f79745..a37cceb60 100644 --- a/vendor/github.com/cncd/pipeline/pipeline/backend/types.go +++ b/vendor/github.com/cncd/pipeline/pipeline/backend/types.go @@ -6,6 +6,7 @@ type ( Stages []*Stage `json:"pipeline"` // pipeline stages Networks []*Network `json:"networks"` // network definitions Volumes []*Volume `json:"volumes"` // volume definitions + Secrets []*Secret `json:"secrets"` // secret definitions } // Stage denotes a collection of one or more steps. @@ -72,6 +73,14 @@ type ( DriverOpts map[string]string `json:"driver_opts,omitempty"` } + // Secret defines a runtime secret + Secret struct { + Name string `json:"name,omitempty"` + Value string `json:"value,omitempty"` + Mount string `json:"mount,omitempty"` + Mask bool `json:"mask,omitempty"` + } + // State defines a container state. State struct { // Container exit code diff --git a/vendor/github.com/cncd/pipeline/pipeline/frontend/metadata.go b/vendor/github.com/cncd/pipeline/pipeline/frontend/metadata.go index ec35206c2..380bb6a53 100644 --- a/vendor/github.com/cncd/pipeline/pipeline/frontend/metadata.go +++ b/vendor/github.com/cncd/pipeline/pipeline/frontend/metadata.go @@ -1,6 +1,19 @@ package frontend -import "strconv" +import ( + "fmt" + "regexp" + "strconv" + "strings" +) + +// Event types corresponding to scm hooks. +const ( + EventPush = "push" + EventPull = "pull_request" + EventTag = "tag" + EventDeploy = "deployment" +) type ( // Metadata defines runtime m. @@ -15,10 +28,11 @@ type ( // Repo defines runtime metadata for a repository. Repo struct { - Name string `json:"name,omitempty"` - Link string `json:"link,omitempty"` - Remote string `json:"remote,omitempty"` - Private bool `json:"private,omitempty"` + Name string `json:"name,omitempty"` + Link string `json:"link,omitempty"` + Remote string `json:"remote,omitempty"` + Private bool `json:"private,omitempty"` + Secrets []Secret `json:"secrets,omitempty"` } // Build defines runtime metadata for a build. @@ -59,18 +73,27 @@ type ( Matrix map[string]string `json:"matrix,omitempty"` } + // Secret defines a runtime secret + Secret struct { + Name string `json:"name,omitempty"` + Value string `json:"value,omitempty"` + Mount string `json:"mount,omitempty"` + Mask bool `json:"mask,omitempty"` + } + // System defines runtime metadata for a ci/cd system. System struct { - Name string `json:"name,omitempty"` - Host string `json:"host,omitempty"` - Link string `json:"link,omitempty"` - Arch string `json:"arch,omitempty"` + Name string `json:"name,omitempty"` + Host string `json:"host,omitempty"` + Link string `json:"link,omitempty"` + Arch string `json:"arch,omitempty"` + Version string `json:"version,omitempty"` } ) // Environ returns the metadata as a map of environment variables. func (m *Metadata) Environ() map[string]string { - return map[string]string{ + params := map[string]string{ "CI_REPO": m.Repo.Name, "CI_REPO_NAME": m.Repo.Name, "CI_REPO_LINK": m.Repo.Link, @@ -116,6 +139,70 @@ func (m *Metadata) Environ() map[string]string { "CI_SYSTEM_LINK": m.Sys.Link, "CI_SYSTEM_HOST": m.Sys.Host, "CI_SYSTEM_ARCH": m.Sys.Arch, + "CI_SYSTEM_VERSION": m.Sys.Version, "CI": m.Sys.Name, } + if m.Curr.Event == EventTag { + params["CI_TAG"] = strings.TrimPrefix(m.Curr.Commit.Ref, "refs/tags/") + } + if m.Curr.Event == EventPull { + params["CI_PULL_REQUEST"] = pullRegexp.FindString(m.Curr.Commit.Ref) + } + return params } + +// EnvironDrone returns metadata as a map of DRONE_ environment variables. +// This is here for backward compatibility and will eventually be removed. +func (m *Metadata) EnvironDrone() map[string]string { + // MISSING PARAMETERS + // * DRONE_REPO_TRUSTED + // * DRONE_YAML_VERIFIED + // * DRONE_YAML_VERIFIED + params := map[string]string{ + "CI": "drone", + "DRONE": "true", + "DRONE_ARCH": "linux/amd64", + "DRONE_REPO": m.Repo.Name, + "DRONE_REPO_SCM": "git", + "DRONE_REPO_OWNER": strings.Split(m.Repo.Name, "/")[0], + "DRONE_REPO_NAME": strings.Split(m.Repo.Name, "/")[0], + "DRONE_REPO_LINK": m.Repo.Link, + "DRONE_REPO_BRANCH": m.Curr.Commit.Branch, + "DRONE_REPO_PRIVATE": fmt.Sprintf("%v", m.Repo.Private), + "DRONE_REPO_TRUSTED": "false", // TODO should this be added? + "DRONE_REMOTE_URL": m.Repo.Remote, + "DRONE_COMMIT_SHA": m.Curr.Commit.Sha, + "DRONE_COMMIT_REF": m.Curr.Commit.Ref, + "DRONE_COMMIT_REFSPEC": m.Curr.Commit.Refspec, + "DRONE_COMMIT_BRANCH": m.Curr.Commit.Branch, + "DRONE_COMMIT_LINK": m.Curr.Link, + "DRONE_COMMIT_MESSAGE": m.Curr.Commit.Message, + "DRONE_COMMIT_AUTHOR": m.Curr.Commit.Author.Name, + "DRONE_COMMIT_AUTHOR_EMAIL": m.Curr.Commit.Author.Email, + "DRONE_COMMIT_AUTHOR_AVATAR": m.Curr.Commit.Author.Avatar, + "DRONE_BUILD_NUMBER": fmt.Sprintf("%d", m.Curr.Number), + "DRONE_BUILD_EVENT": m.Curr.Event, + "DRONE_BUILD_LINK": fmt.Sprintf("%s/%s/%d", m.Sys.Link, m.Repo.Name, m.Curr.Number), + "DRONE_BUILD_CREATED": fmt.Sprintf("%d", m.Curr.Created), + "DRONE_BUILD_STARTED": fmt.Sprintf("%d", m.Curr.Started), + "DRONE_BUILD_FINISHED": fmt.Sprintf("%d", m.Curr.Finished), + "DRONE_JOB_NUMBER": fmt.Sprintf("%d", m.Job.Number), + "DRONE_JOB_STARTED": fmt.Sprintf("%d", m.Curr.Started), // ISSUE: no job started + "DRONE_BRANCH": m.Curr.Commit.Branch, + "DRONE_COMMIT": m.Curr.Commit.Sha, + "DRONE_VERSION": m.Sys.Version, + "DRONE_DEPLOY_TO": m.Curr.Target, + "DRONE_PREV_BUILD_STATUS": m.Prev.Status, + "DRONE_PREV_BUILD_NUMBER": fmt.Sprintf("%v", m.Prev.Number), + "DRONE_PREV_COMMIT_SHA": m.Prev.Commit.Sha, + } + if m.Curr.Event == EventTag { + params["DRONE_TAG"] = strings.TrimPrefix(m.Curr.Commit.Ref, "refs/tags/") + } + if m.Curr.Event == EventPull { + params["DRONE_PULL_REQUEST"] = pullRegexp.FindString(m.Curr.Commit.Ref) + } + return params +} + +var pullRegexp = regexp.MustCompile("\\d+") diff --git a/vendor/github.com/cncd/pipeline/pipeline/frontend/yaml/compiler/convert.go b/vendor/github.com/cncd/pipeline/pipeline/frontend/yaml/compiler/convert.go index d61de71e6..8f7071060 100644 --- a/vendor/github.com/cncd/pipeline/pipeline/frontend/yaml/compiler/convert.go +++ b/vendor/github.com/cncd/pipeline/pipeline/frontend/yaml/compiler/convert.go @@ -3,7 +3,6 @@ package compiler import ( "fmt" "path" - "strings" "github.com/cncd/pipeline/pipeline/backend" "github.com/cncd/pipeline/pipeline/frontend/yaml" @@ -55,12 +54,6 @@ func (c *Compiler) createProcess(name string, container *yaml.Container) *backen continue default: environment[k] = v - - // legacy code for drone plugins - if strings.HasPrefix(k, "CI_") { - p := strings.Replace(k, "CI_", "DRONE_", 1) - environment[p] = v - } } } diff --git a/vendor/github.com/cncd/pipeline/pipeline/frontend/yaml/compiler/option.go b/vendor/github.com/cncd/pipeline/pipeline/frontend/yaml/compiler/option.go index b42cc6f8f..1ee66a002 100644 --- a/vendor/github.com/cncd/pipeline/pipeline/frontend/yaml/compiler/option.go +++ b/vendor/github.com/cncd/pipeline/pipeline/frontend/yaml/compiler/option.go @@ -31,6 +31,11 @@ func WithMetadata(metadata frontend.Metadata) Option { for k, v := range metadata.Environ() { compiler.env[k] = v } + // TODO this is present for backward compatibility and should + // be removed in a future version. + for k, v := range metadata.EnvironDrone() { + compiler.env[k] = v + } } } @@ -42,6 +47,12 @@ func WithNetrc(username, password, machine string) Option { "CI_NETRC_USERNAME": username, "CI_NETRC_PASSWORD": password, "CI_NETRC_MACHINE": machine, + + // TODO this is present for backward compatibility and should + // be removed in a future version. + "DRONE_NETRC_USERNAME": username, + "DRONE_NETRC_PASSWORD": password, + "DRONE_NETRC_MACHINE": machine, }, ) } diff --git a/vendor/github.com/cncd/pipeline/pipeline/multipart/reader.go b/vendor/github.com/cncd/pipeline/pipeline/multipart/reader.go index 47505f73d..a35f8e9c0 100644 --- a/vendor/github.com/cncd/pipeline/pipeline/multipart/reader.go +++ b/vendor/github.com/cncd/pipeline/pipeline/multipart/reader.go @@ -8,30 +8,32 @@ import ( "net/textproto" ) -// Reader is an iterator over parts in a multipart log stream. -type Reader interface { - // NextPart returns the next part in the multipart or - // an error. When there are no more parts, the error - // io.EOF is returned. - NextPart() (Part, error) -} +type ( + // Reader is an iterator over parts in a multipart log stream. + Reader interface { + // NextPart returns the next part in the multipart or + // an error. When there are no more parts, the error + // io.EOF is returned. + NextPart() (Part, error) + } -// A Part represents a single part in a multipart body. -type Part interface { - io.Reader + // A Part represents a single part in a multipart body. + Part interface { + io.Reader - // Header returns the headers of the body with the - // keys canonicalized. - Header() textproto.MIMEHeader + // Header returns the headers of the body with the + // keys canonicalized. + Header() textproto.MIMEHeader - // FileName returns the filename parameter of the - // Content-Disposition header. - FileName() string + // FileName returns the filename parameter of the + // Content-Disposition header. + FileName() string - // FormName returns the name parameter if p has a - // Content-Disposition of type form-data. - FormName() string -} + // FormName returns the name parameter if p has a + // Content-Disposition of type form-data. + FormName() string + } +) // New returns a new multipart Reader. func New(r io.Reader) Reader { @@ -49,7 +51,7 @@ func New(r io.Reader) Reader { } // -// +// wraps the stdlib multi-part reader // type multipartReader struct { @@ -70,7 +72,7 @@ func (r *multipartReader) NextPart() (Part, error) { } // -// +// wraps a simple io.Reader to satisfy the multi-part interface // type textReader struct { @@ -85,7 +87,6 @@ func (r *textReader) NextPart() (Part, error) { r.done = true p := new(part) p.Reader = r.reader - p.filename = "terminal.log" return p, nil } diff --git a/vendor/github.com/cncd/pipeline/pipeline/rpc/client.go b/vendor/github.com/cncd/pipeline/pipeline/rpc/client.go index b6f9b5403..b21957559 100644 --- a/vendor/github.com/cncd/pipeline/pipeline/rpc/client.go +++ b/vendor/github.com/cncd/pipeline/pipeline/rpc/client.go @@ -21,12 +21,12 @@ const ( methodDone = "done" methodExtend = "extend" methodUpdate = "update" - methodSave = "save" + methodUpload = "upload" methodLog = "log" ) type ( - saveReq struct { + uploadReq struct { ID string `json:"id"` Mime string `json:"mime"` Data []byte `json:"data"` @@ -75,9 +75,9 @@ func NewClient(endpoint string, opts ...Option) (*Client, error) { } // Next returns the next pipeline in the queue. -func (t *Client) Next(c context.Context) (*Pipeline, error) { +func (t *Client) Next(c context.Context, f Filter) (*Pipeline, error) { res := new(Pipeline) - err := t.call(c, methodNext, nil, res) + err := t.call(c, methodNext, f, res) return res, err } @@ -112,14 +112,14 @@ func (t *Client) Log(c context.Context, id string, line *Line) error { return t.call(c, methodLog, ¶ms, nil) } -// Save saves the pipeline artifact. -func (t *Client) Save(c context.Context, id, mime string, file io.Reader) error { +// Upload uploads the pipeline artifact. +func (t *Client) Upload(c context.Context, id, mime string, file io.Reader) error { data, err := ioutil.ReadAll(file) if err != nil { return err } - params := saveReq{id, mime, data} - return t.call(c, methodSave, params, nil) + params := uploadReq{id, mime, data} + return t.call(c, methodUpload, params, nil) } // Close closes the client connection. diff --git a/vendor/github.com/cncd/pipeline/pipeline/rpc/line.go b/vendor/github.com/cncd/pipeline/pipeline/rpc/line.go index d3470e89d..d85ae7bbb 100644 --- a/vendor/github.com/cncd/pipeline/pipeline/rpc/line.go +++ b/vendor/github.com/cncd/pipeline/pipeline/rpc/line.go @@ -1,9 +1,9 @@ package rpc import ( - "bytes" "context" "fmt" + "strings" "time" ) @@ -41,30 +41,55 @@ type LineWriter struct { name string num int now time.Time + rep *strings.Replacer } // NewLineWriter returns a new line reader. -func NewLineWriter(peer Peer, id, name string) *LineWriter { +func NewLineWriter(peer Peer, id, name string, secret ...string) *LineWriter { w := new(LineWriter) w.peer = peer w.id = id w.name = name w.num = 0 w.now = time.Now().UTC() + + var oldnew []string + for _, old := range secret { + oldnew = append(oldnew, old) + oldnew = append(oldnew, "********") + } + if len(oldnew) != 0 { + w.rep = strings.NewReplacer(oldnew...) + } return w } func (w *LineWriter) Write(p []byte) (n int, err error) { - for _, part := range bytes.Split(p, []byte{'\n'}) { - line := &Line{ - Out: string(part), - Proc: w.name, - Pos: w.num, - Time: int64(time.Since(w.now).Seconds()), - Type: LineStdout, - } - w.peer.Log(context.Background(), w.id, line) - w.num++ + out := string(p) + if w.rep != nil { + out = w.rep.Replace(out) } + + line := &Line{ + Out: out, + Proc: w.name, + Pos: w.num, + Time: int64(time.Since(w.now).Seconds()), + Type: LineStdout, + } + w.peer.Log(context.Background(), w.id, line) + w.num++ + + // for _, part := range bytes.Split(p, []byte{'\n'}) { + // line := &Line{ + // Out: string(part), + // Proc: w.name, + // Pos: w.num, + // Time: int64(time.Since(w.now).Seconds()), + // Type: LineStdout, + // } + // w.peer.Log(context.Background(), w.id, line) + // w.num++ + // } return len(p), nil } diff --git a/vendor/github.com/cncd/pipeline/pipeline/rpc/peer.go b/vendor/github.com/cncd/pipeline/pipeline/rpc/peer.go index 4636f1d98..22c96db0b 100644 --- a/vendor/github.com/cncd/pipeline/pipeline/rpc/peer.go +++ b/vendor/github.com/cncd/pipeline/pipeline/rpc/peer.go @@ -13,7 +13,8 @@ import ( type ( // Filter defines filters for fetching items from the queue. Filter struct { - Platform string `json:"platform"` + Labels map[string]string `json:"labels"` + Expr string `json:"expr"` } // State defines the pipeline state. @@ -34,10 +35,13 @@ type ( } ) +// NoFilter is an empty filter. +var NoFilter = Filter{} + // Peer defines a peer-to-peer connection. type Peer interface { // Next returns the next pipeline in the queue. - Next(c context.Context) (*Pipeline, error) + Next(c context.Context, f Filter) (*Pipeline, error) // Wait blocks untilthe pipeline is complete. Wait(c context.Context, id string) error @@ -51,9 +55,8 @@ type Peer interface { // Update updates the pipeline state. Update(c context.Context, id string, state State) error - // Save saves the pipeline artifact. - // TODO rename to Upload - Save(c context.Context, id, mime string, file io.Reader) error + // Upload uploads the pipeline artifact. + Upload(c context.Context, id, mime string, file io.Reader) error // Log writes the pipeline log entry. Log(c context.Context, id string, line *Line) error diff --git a/vendor/github.com/cncd/pipeline/pipeline/rpc/server.go b/vendor/github.com/cncd/pipeline/pipeline/rpc/server.go index 21e4507cb..decb9ed2b 100644 --- a/vendor/github.com/cncd/pipeline/pipeline/rpc/server.go +++ b/vendor/github.com/cncd/pipeline/pipeline/rpc/server.go @@ -62,8 +62,8 @@ func (s *Server) router(ctx context.Context, conn *jsonrpc2.Conn, req *jsonrpc2. return s.update(req) case methodLog: return s.log(req) - case methodSave: - return s.save(req) + case methodUpload: + return s.upload(req) default: return nil, errNoSuchMethod } @@ -72,7 +72,11 @@ func (s *Server) router(ctx context.Context, conn *jsonrpc2.Conn, req *jsonrpc2. // next unmarshals the rpc request parameters and invokes the peer.Next // procedure. The results are retuned and written to the rpc response. func (s *Server) next(ctx context.Context, req *jsonrpc2.Request) (interface{}, error) { - return s.peer.Next(ctx) + in := Filter{} + if err := json.Unmarshal([]byte(*req.Params), &in); err != nil { + return nil, err + } + return s.peer.Next(ctx, in) } // wait unmarshals the rpc request parameters and invokes the peer.Wait @@ -128,10 +132,10 @@ func (s *Server) log(req *jsonrpc2.Request) (interface{}, error) { return nil, s.peer.Log(noContext, in.ID, in.Line) } -func (s *Server) save(req *jsonrpc2.Request) (interface{}, error) { - in := new(saveReq) +func (s *Server) upload(req *jsonrpc2.Request) (interface{}, error) { + in := new(uploadReq) if err := json.Unmarshal([]byte(*req.Params), in); err != nil { return nil, err } - return nil, s.peer.Save(noContext, in.ID, in.Mime, bytes.NewBuffer(in.Data)) + return nil, s.peer.Upload(noContext, in.ID, in.Mime, bytes.NewBuffer(in.Data)) } diff --git a/vendor/vendor.json b/vendor/vendor.json index 7a5bc4060..686f89cb4 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -33,68 +33,68 @@ { "checksumSHA1": "W3AuK8ocqHwlUajGmQLFvnRhTZE=", "path": "github.com/cncd/pipeline/pipeline", - "revision": "d4e09fd3021a16408bc3ebdd3500efd28f51e72c", - "revisionTime": "2017-03-05T09:53:47Z" + "revision": "687ea03140263b4774505c44f212dd4999faa534", + "revisionTime": "2017-03-12T08:45:42Z" }, { - "checksumSHA1": "PSzh0ix/rlMrS/Cl3aH6GHGrJuo=", + "checksumSHA1": "Qu2FreqaMr8Yx2bW9O0cxAGgjr0=", "path": "github.com/cncd/pipeline/pipeline/backend", - "revision": "d4e09fd3021a16408bc3ebdd3500efd28f51e72c", - "revisionTime": "2017-03-05T09:53:47Z" + "revision": "687ea03140263b4774505c44f212dd4999faa534", + "revisionTime": "2017-03-12T08:45:42Z" }, { "checksumSHA1": "0CGXRaYwZhJxGIrGhn8WGpkFqPo=", "path": "github.com/cncd/pipeline/pipeline/backend/docker", - "revision": "d4e09fd3021a16408bc3ebdd3500efd28f51e72c", - "revisionTime": "2017-03-05T09:53:47Z" + "revision": "687ea03140263b4774505c44f212dd4999faa534", + "revisionTime": "2017-03-12T08:45:42Z" }, { - "checksumSHA1": "uUagpzha5ah/a3RO6IImvzHYFlY=", + "checksumSHA1": "/8wE+cVb7T4PQZgpLNu0DHzKGuE=", "path": "github.com/cncd/pipeline/pipeline/frontend", - "revision": "d4e09fd3021a16408bc3ebdd3500efd28f51e72c", - "revisionTime": "2017-03-05T09:53:47Z" + "revision": "687ea03140263b4774505c44f212dd4999faa534", + "revisionTime": "2017-03-12T08:45:42Z" }, { "checksumSHA1": "O0sulBQAHJeNLg3lO38Cq5uf/eg=", "path": "github.com/cncd/pipeline/pipeline/frontend/yaml", - "revision": "d4e09fd3021a16408bc3ebdd3500efd28f51e72c", - "revisionTime": "2017-03-05T09:53:47Z" + "revision": "687ea03140263b4774505c44f212dd4999faa534", + "revisionTime": "2017-03-12T08:45:42Z" }, { - "checksumSHA1": "+4c/I/PEDCgzog8m4ohw1parhgE=", + "checksumSHA1": "Iu+QmUqkN9ZsBdmVlCclVKthJbM=", "path": "github.com/cncd/pipeline/pipeline/frontend/yaml/compiler", - "revision": "d4e09fd3021a16408bc3ebdd3500efd28f51e72c", - "revisionTime": "2017-03-05T09:53:47Z" + "revision": "687ea03140263b4774505c44f212dd4999faa534", + "revisionTime": "2017-03-12T08:45:42Z" }, { "checksumSHA1": "Q0GkNUFamVYIA1Fd8r0A5M6Gx54=", "path": "github.com/cncd/pipeline/pipeline/frontend/yaml/linter", - "revision": "d4e09fd3021a16408bc3ebdd3500efd28f51e72c", - "revisionTime": "2017-03-05T09:53:47Z" + "revision": "687ea03140263b4774505c44f212dd4999faa534", + "revisionTime": "2017-03-12T08:45:42Z" }, { "checksumSHA1": "kx2sPUIMozPC/g6E4w48h3FfH3k=", "path": "github.com/cncd/pipeline/pipeline/frontend/yaml/matrix", - "revision": "d4e09fd3021a16408bc3ebdd3500efd28f51e72c", - "revisionTime": "2017-03-05T09:53:47Z" + "revision": "687ea03140263b4774505c44f212dd4999faa534", + "revisionTime": "2017-03-12T08:45:42Z" }, { "checksumSHA1": "2/3f3oNmxXy5kcrRLCFa24Oc9O4=", "path": "github.com/cncd/pipeline/pipeline/interrupt", - "revision": "d4e09fd3021a16408bc3ebdd3500efd28f51e72c", - "revisionTime": "2017-03-05T09:53:47Z" + "revision": "687ea03140263b4774505c44f212dd4999faa534", + "revisionTime": "2017-03-12T08:45:42Z" }, { - "checksumSHA1": "8eTwXZPM/Kp9uE/mnhpWDTiX7nY=", + "checksumSHA1": "/nHBigDoEi2F6zJzvCWOvJ3um2c=", "path": "github.com/cncd/pipeline/pipeline/multipart", - "revision": "d4e09fd3021a16408bc3ebdd3500efd28f51e72c", - "revisionTime": "2017-03-05T09:53:47Z" + "revision": "687ea03140263b4774505c44f212dd4999faa534", + "revisionTime": "2017-03-12T08:45:42Z" }, { - "checksumSHA1": "UUmeGDBdpk+UXtexFnNmbWIHgG8=", + "checksumSHA1": "MratmNKJ78/IhWvDsZphN01CtmE=", "path": "github.com/cncd/pipeline/pipeline/rpc", - "revision": "d4e09fd3021a16408bc3ebdd3500efd28f51e72c", - "revisionTime": "2017-03-05T09:53:47Z" + "revision": "687ea03140263b4774505c44f212dd4999faa534", + "revisionTime": "2017-03-12T08:45:42Z" }, { "checksumSHA1": "7Qj1DK0ceAXkYztW0l3+L6sn+V8=",