From 6c11444de053e8752f9804709cfe32f3206ed16f Mon Sep 17 00:00:00 2001 From: Brad Rydzewski Date: Sat, 1 Apr 2017 20:17:04 +0900 Subject: [PATCH] agent update build steps --- drone/agent/agent.go | 94 +++++- model/build.go | 61 ++-- model/proc.go | 2 + server/hook.go | 1 + server/rpc.go | 270 +++++++++++++----- store/datastore/files_test.go | 15 +- store/datastore/procs.go | 7 + store/datastore/procs_test.go | 25 +- store/datastore/sql/postgres/files/procs.sql | 20 ++ store/datastore/sql/postgres/sql_gen.go | 21 ++ store/datastore/sql/sqlite/files/procs.sql | 20 ++ store/datastore/sql/sqlite/sql_gen.go | 21 ++ store/store.go | 1 + .../cncd/pipeline/pipeline/rpc/client.go | 24 +- .../cncd/pipeline/pipeline/rpc/line.go | 24 +- .../cncd/pipeline/pipeline/rpc/peer.go | 20 +- .../cncd/pipeline/pipeline/rpc/server.go | 22 +- vendor/vendor.json | 46 +-- 18 files changed, 516 insertions(+), 178 deletions(-) diff --git a/drone/agent/agent.go b/drone/agent/agent.go index dba7ca407..df3c1233e 100644 --- a/drone/agent/agent.go +++ b/drone/agent/agent.go @@ -2,10 +2,13 @@ package agent import ( "context" + "encoding/json" "io" + "io/ioutil" "log" "math" "net/url" + "strconv" "sync" "time" @@ -189,9 +192,9 @@ func run(ctx context.Context, client rpc.Peer, filter rpc.Filter) error { state := rpc.State{} state.Started = time.Now().Unix() - err = client.Update(context.Background(), work.ID, state) + err = client.Init(context.Background(), work.ID, state) if err != nil { - log.Printf("pipeline: error updating pipeline status: %s: %s", work.ID, err) + log.Printf("pipeline: error signaling pipeline init: %s: %s", work.ID, err) } var uploads sync.WaitGroup @@ -201,9 +204,31 @@ func run(ctx context.Context, client rpc.Peer, filter rpc.Filter) error { return rerr } uploads.Add(1) - writer := rpc.NewLineWriter(client, work.ID, proc.Alias) - rlimit := io.LimitReader(part, maxLogsUpload) - io.Copy(writer, rlimit) + + var secrets []string + for _, secret := range work.Config.Secrets { + if secret.Mask { + secrets = append(secrets, secret.Value) + } + } + + limitedPart := io.LimitReader(part, maxLogsUpload) + logstream := rpc.NewLineWriter(client, work.ID, proc.Alias, secrets...) + io.Copy(logstream, limitedPart) + + file := &rpc.File{} + file.Mime = "application/json+logs" + file.Proc = proc.Alias + file.Name = "logs.json" + file.Data, _ = json.Marshal(logstream.Lines()) + file.Size = len(file.Data) + file.Time = time.Now().Unix() + + if serr := client.Upload(context.Background(), work.ID, file); serr != nil { + log.Printf("pipeline: cannot upload logs: %s: %s: %s", work.ID, file.Mime, serr) + } else { + log.Printf("pipeline: finish uploading logs: %s: step %s: %s", file.Mime, work.ID, proc.Alias) + } defer func() { log.Printf("pipeline: finish uploading logs: %s: step %s", work.ID, proc.Alias) @@ -214,10 +239,54 @@ func run(ctx context.Context, client rpc.Peer, filter rpc.Filter) error { if rerr != nil { return nil } - rlimit = io.LimitReader(part, maxFileUpload) - mime := part.Header().Get("Content-Type") - if serr := client.Upload(context.Background(), work.ID, mime, rlimit); serr != nil { - log.Printf("pipeline: cannot upload artifact: %s: %s: %s", work.ID, mime, serr) + // TODO should be configurable + limitedPart = io.LimitReader(part, maxFileUpload) + file = &rpc.File{} + file.Mime = part.Header().Get("Content-Type") + file.Proc = proc.Alias + file.Name = part.FileName() + file.Data, _ = ioutil.ReadAll(limitedPart) + file.Size = len(file.Data) + file.Time = time.Now().Unix() + + if serr := client.Upload(context.Background(), work.ID, file); serr != nil { + log.Printf("pipeline: cannot upload artifact: %s: %s: %s", work.ID, file.Mime, serr) + } else { + log.Printf("pipeline: finish uploading artifact: %s: step %s: %s", file.Mime, work.ID, proc.Alias) + } + return nil + }) + + defaultTracer := pipeline.TraceFunc(func(state *pipeline.State) error { + procState := rpc.State{ + Proc: state.Pipeline.Step.Alias, + Exited: state.Process.Exited, + ExitCode: state.Process.ExitCode, + Started: time.Now().Unix(), // TODO do not do this + Finished: time.Now().Unix(), + } + defer func() { + if uerr := client.Update(context.Background(), work.ID, procState); uerr != nil { + log.Printf("Pipeine: error updating pipeline step status: %s: %s: %s", work.ID, procState.Proc, uerr) + } + }() + if state.Process.Exited { + return nil + } + if state.Pipeline.Step.Environment == nil { + state.Pipeline.Step.Environment = map[string]string{} + } + state.Pipeline.Step.Environment["CI_BUILD_STATUS"] = "success" + state.Pipeline.Step.Environment["CI_BUILD_STARTED"] = strconv.FormatInt(state.Pipeline.Time, 10) + state.Pipeline.Step.Environment["CI_BUILD_FINISHED"] = strconv.FormatInt(time.Now().Unix(), 10) + + state.Pipeline.Step.Environment["CI_JOB_STATUS"] = "success" + state.Pipeline.Step.Environment["CI_JOB_STARTED"] = strconv.FormatInt(state.Pipeline.Time, 10) + state.Pipeline.Step.Environment["CI_JOB_FINISHED"] = strconv.FormatInt(time.Now().Unix(), 10) + + if state.Pipeline.Error != nil { + state.Pipeline.Step.Environment["CI_BUILD_STATUS"] = "failure" + state.Pipeline.Step.Environment["CI_JOB_STATUS"] = "failure" } return nil }) @@ -225,7 +294,7 @@ func run(ctx context.Context, client rpc.Peer, filter rpc.Filter) error { err = pipeline.New(work.Config, pipeline.WithContext(ctx), pipeline.WithLogger(defaultLogger), - pipeline.WithTracer(pipeline.DefaultTracer), + pipeline.WithTracer(defaultTracer), pipeline.WithEngine(engine), ).Run() @@ -247,9 +316,10 @@ func run(ctx context.Context, client rpc.Peer, filter rpc.Filter) error { log.Printf("pipeline: execution complete: %s", work.ID) uploads.Wait() - err = client.Update(context.Background(), work.ID, state) + + err = client.Done(context.Background(), work.ID, state) if err != nil { - log.Printf("Pipeine: error updating pipeline status: %s: %s", work.ID, err) + log.Printf("Pipeine: error signaling pipeline done: %s: %s", work.ID, err) } return nil diff --git a/model/build.go b/model/build.go index 56c2f69af..cfca0744a 100644 --- a/model/build.go +++ b/model/build.go @@ -2,36 +2,37 @@ package model // swagger:model build type Build struct { - ID int64 `json:"id" meddler:"build_id,pk"` - RepoID int64 `json:"-" meddler:"build_repo_id"` - Number int `json:"number" meddler:"build_number"` - Parent int `json:"parent" meddler:"build_parent"` - Event string `json:"event" meddler:"build_event"` - Status string `json:"status" meddler:"build_status"` - Error string `json:"error" meddler:"build_error"` - Enqueued int64 `json:"enqueued_at" meddler:"build_enqueued"` - Created int64 `json:"created_at" meddler:"build_created"` - Started int64 `json:"started_at" meddler:"build_started"` - Finished int64 `json:"finished_at" meddler:"build_finished"` - Deploy string `json:"deploy_to" meddler:"build_deploy"` - Commit string `json:"commit" meddler:"build_commit"` - Branch string `json:"branch" meddler:"build_branch"` - Ref string `json:"ref" meddler:"build_ref"` - Refspec string `json:"refspec" meddler:"build_refspec"` - Remote string `json:"remote" meddler:"build_remote"` - Title string `json:"title" meddler:"build_title"` - Message string `json:"message" meddler:"build_message"` - Timestamp int64 `json:"timestamp" meddler:"build_timestamp"` - Sender string `json:"sender" meddler:"build_sender"` - Author string `json:"author" meddler:"build_author"` - Avatar string `json:"author_avatar" meddler:"build_avatar"` - Email string `json:"author_email" meddler:"build_email"` - Link string `json:"link_url" meddler:"build_link"` - Signed bool `json:"signed" meddler:"build_signed"` // deprecate - Verified bool `json:"verified" meddler:"build_verified"` // deprecate - Reviewer string `json:"reviewed_by" meddler:"build_reviewer"` - Reviewed int64 `json:"reviewed_at" meddler:"build_reviewed"` - Jobs []*Job `json:"jobs,omitempty" meddler:"-"` + ID int64 `json:"id" meddler:"build_id,pk"` + RepoID int64 `json:"-" meddler:"build_repo_id"` + Number int `json:"number" meddler:"build_number"` + Parent int `json:"parent" meddler:"build_parent"` + Event string `json:"event" meddler:"build_event"` + Status string `json:"status" meddler:"build_status"` + Error string `json:"error" meddler:"build_error"` + Enqueued int64 `json:"enqueued_at" meddler:"build_enqueued"` + Created int64 `json:"created_at" meddler:"build_created"` + Started int64 `json:"started_at" meddler:"build_started"` + Finished int64 `json:"finished_at" meddler:"build_finished"` + Deploy string `json:"deploy_to" meddler:"build_deploy"` + Commit string `json:"commit" meddler:"build_commit"` + Branch string `json:"branch" meddler:"build_branch"` + Ref string `json:"ref" meddler:"build_ref"` + Refspec string `json:"refspec" meddler:"build_refspec"` + Remote string `json:"remote" meddler:"build_remote"` + Title string `json:"title" meddler:"build_title"` + Message string `json:"message" meddler:"build_message"` + Timestamp int64 `json:"timestamp" meddler:"build_timestamp"` + Sender string `json:"sender" meddler:"build_sender"` + Author string `json:"author" meddler:"build_author"` + Avatar string `json:"author_avatar" meddler:"build_avatar"` + Email string `json:"author_email" meddler:"build_email"` + Link string `json:"link_url" meddler:"build_link"` + Signed bool `json:"signed" meddler:"build_signed"` // deprecate + Verified bool `json:"verified" meddler:"build_verified"` // deprecate + Reviewer string `json:"reviewed_by" meddler:"build_reviewer"` + Reviewed int64 `json:"reviewed_at" meddler:"build_reviewed"` + Jobs []*Job `json:"jobs,omitempty" meddler:"-"` + Procs []*Proc `json:"procs,omitempty" meddler:"-"` } type BuildGroup struct { diff --git a/model/proc.go b/model/proc.go index 1f6659325..54aa7bc68 100644 --- a/model/proc.go +++ b/model/proc.go @@ -2,6 +2,7 @@ package model // ProcStore persists process information to storage. type ProcStore interface { + ProcLoad(int64) (*Proc, error) ProcFind(*Build, int) (*Proc, error) ProcChild(*Build, int, string) (*Proc, error) ProcList(*Build) ([]*Proc, error) @@ -10,6 +11,7 @@ type ProcStore interface { } // Proc represents a process in the build pipeline. +// swagger:model proc type Proc struct { ID int64 `json:"id" meddler:"proc_id,pk"` BuildID int64 `json:"build_id" meddler:"proc_build_id"` diff --git a/server/hook.go b/server/hook.go index 15a407a8a..2f4972885 100644 --- a/server/hook.go +++ b/server/hook.go @@ -490,6 +490,7 @@ func (b *builder) Build() ([]*buildItem, error) { // TODO ability to set global volumes for things like certs compiler.WithVolumes(), compiler.WithWorkspaceFromURL("/drone", b.Curr.Link), + compiler.WithMetadata(metadata), ).Compile(parsed) for _, sec := range b.Secs { diff --git a/server/rpc.go b/server/rpc.go index 520f67d25..9d5271920 100644 --- a/server/rpc.go +++ b/server/rpc.go @@ -4,13 +4,10 @@ import ( "bytes" "context" "encoding/json" - "fmt" - "io" "log" "os" "strconv" - "github.com/Sirupsen/logrus" "github.com/cncd/logging" "github.com/cncd/pipeline/pipeline/rpc" "github.com/cncd/pubsub" @@ -109,20 +106,20 @@ func (s *RPC) Extend(c context.Context, id string) error { // Update implements the rpc.Update function func (s *RPC) Update(c context.Context, id string, state rpc.State) error { - jobID, err := strconv.ParseInt(id, 10, 64) + procID, err := strconv.ParseInt(id, 10, 64) if err != nil { return err } - job, err := s.store.GetJob(jobID) + proc, err := s.store.ProcLoad(procID) if err != nil { - log.Printf("error: cannot find job with id %d: %s", jobID, err) + log.Printf("error: rpc.update: cannot find proc with id %d: %s", procID, err) return err } - build, err := s.store.GetBuild(job.BuildID) + build, err := s.store.GetBuild(proc.BuildID) if err != nil { - log.Printf("error: cannot find build with id %d: %s", job.BuildID, err) + log.Printf("error: cannot find build with id %d: %s", proc.BuildID, err) return err } @@ -132,92 +129,209 @@ func (s *RPC) Update(c context.Context, id string, state rpc.State) error { return err } - if build.Status != model.StatusRunning { - - } - - job.Started = state.Started - job.Finished = state.Finished - job.ExitCode = state.ExitCode - job.Status = model.StatusRunning - job.Error = state.Error - - if build.Status == model.StatusPending { - build.Started = job.Started - build.Status = model.StatusRunning - s.store.UpdateBuild(build) - } - - log.Printf("pipeline: update %s: exited=%v, exit_code=%d", id, state.Exited, state.ExitCode) - if state.Exited { - - job.Status = model.StatusSuccess - if job.ExitCode != 0 || job.Error != "" { - job.Status = model.StatusFailure - } - - // save the logs - var buf bytes.Buffer - if serr := s.logger.Snapshot(context.Background(), id, &buf); serr != nil { - log.Printf("error: snapshotting logs: %s", serr) - } - if werr := s.store.WriteLog(job, &buf); werr != nil { - log.Printf("error: persisting logs: %s", werr) - } - - // close the logger - s.logger.Close(c, id) - s.queue.Done(c, id) + proc.Stopped = state.Finished + proc.ExitCode = state.ExitCode + proc.Error = state.Error + } else { + proc.Started = state.Started + proc.State = model.StatusRunning } - // hackity hack - cc := context.WithValue(c, "store", s.store) - ok, uerr := store.UpdateBuildJob(cc, build, job) - if uerr != nil { - log.Printf("error: updating job: %s", uerr) - } - if ok { - // get the user because we transfer the user form the server to agent - // and back we lose the token which does not get serialized to json. - user, uerr := s.store.GetUser(repo.UserID) - if uerr != nil { - logrus.Errorf("Unable to find user. %s", err) - } else { - s.remote.Status(user, repo, build, - fmt.Sprintf("%s/%s/%d", s.host, repo.FullName, build.Number)) - } + if err := s.store.ProcUpdate(proc); err != nil { + log.Printf("error: rpc.update: cannot update proc: %s", err) } - message := pubsub.Message{} + build.Procs, _ = s.store.ProcList(build) + message := pubsub.Message{ + Labels: map[string]string{ + "repo": repo.FullName, + "private": strconv.FormatBool(repo.IsPrivate), + }, + } message.Data, _ = json.Marshal(model.Event{ - Type: func() model.EventType { - // HACK we don't even really care about the event type. - // so we should just simplify how events are triggered. - // WTF was this being used for????????????????????????? - if job.Status == model.StatusRunning { - return model.Started - } - return model.Finished - }(), Repo: *repo, Build: *build, - Job: *job, }) - message.Labels = map[string]string{ - "repo": repo.FullName, - "private": strconv.FormatBool(repo.IsPrivate), - } s.pubsub.Publish(c, "topic/events", message) - log.Println("finish rpc.update") + return nil } // Upload implements the rpc.Upload function -func (s *RPC) Upload(c context.Context, id, mime string, file io.Reader) error { return nil } +func (s *RPC) Upload(c context.Context, id string, file *rpc.File) error { + procID, err := strconv.ParseInt(id, 10, 64) + if err != nil { + return err + } + + proc, err := s.store.ProcLoad(procID) + if err != nil { + log.Printf("error: cannot find proc with id %d: %s", procID, err) + return err + } + + return s.store.FileCreate(&model.File{ + BuildID: proc.BuildID, + ProcID: proc.ID, + Mime: file.Mime, + Name: file.Name, + Size: file.Size, + Time: file.Time, + }, + bytes.NewBuffer(file.Data), + ) +} + +// Init implements the rpc.Init function +func (s *RPC) Init(c context.Context, id string, state rpc.State) error { + procID, err := strconv.ParseInt(id, 10, 64) + if err != nil { + return err + } + + proc, err := s.store.ProcLoad(procID) + if err != nil { + log.Printf("error: cannot find proc with id %d: %s", procID, err) + return err + } + + build, err := s.store.GetBuild(proc.BuildID) + if err != nil { + log.Printf("error: cannot find build with id %d: %s", proc.BuildID, err) + return err + } + + repo, err := s.store.GetRepo(build.RepoID) + if err != nil { + log.Printf("error: cannot find repo with id %d: %s", build.RepoID, err) + return err + } + + if build.Status == model.StatusPending { + build.Status = model.StatusRunning + build.Started = state.Started + if err := s.store.UpdateBuild(build); err != nil { + log.Printf("error: init: cannot update build_id %d state: %s", build.ID, err) + } + } + + defer func() { + build.Procs, _ = s.store.ProcList(build) + message := pubsub.Message{ + Labels: map[string]string{ + "repo": repo.FullName, + "private": strconv.FormatBool(repo.IsPrivate), + }, + } + message.Data, _ = json.Marshal(model.Event{ + Repo: *repo, + Build: *build, + }) + s.pubsub.Publish(c, "topic/events", message) + }() + + proc.Started = state.Started + proc.State = model.StatusRunning + return s.store.ProcUpdate(proc) +} // Done implements the rpc.Done function -func (s *RPC) Done(c context.Context, id string) error { return nil } +func (s *RPC) Done(c context.Context, id string, state rpc.State) error { + procID, err := strconv.ParseInt(id, 10, 64) + if err != nil { + return err + } + + proc, err := s.store.ProcLoad(procID) + if err != nil { + log.Printf("error: cannot find proc with id %d: %s", procID, err) + return err + } + + build, err := s.store.GetBuild(proc.BuildID) + if err != nil { + log.Printf("error: cannot find build with id %d: %s", proc.BuildID, err) + return err + } + + repo, err := s.store.GetRepo(build.RepoID) + if err != nil { + log.Printf("error: cannot find repo with id %d: %s", build.RepoID, err) + return err + } + + if build.Status == model.StatusPending { + build.Status = model.StatusRunning + build.Started = state.Started + if err := s.store.UpdateBuild(build); err != nil { + log.Printf("error: done: cannot update build_id %d state: %s", build.ID, err) + } + } + + proc.Started = state.Started + proc.State = model.StatusRunning + proc.Stopped = state.Finished + proc.Error = state.Error + proc.ExitCode = state.ExitCode + if err := s.store.ProcUpdate(proc); err != nil { + log.Printf("error: done: cannot update proc_id %d state: %s", procID, err) + } + + if err := s.queue.Done(c, id); err != nil { + log.Printf("error: done: cannot ack proc_id %d: %s", procID, err) + } + + done := false + status := model.StatusSuccess + // TODO handle this error + procs, _ := s.store.ProcList(build) + for _, p := range procs { + if !proc.Running() && p.PPID == proc.PID { + p.State = model.StatusSkipped + if p.Started != 0 { + p.State = model.StatusKilled + p.Stopped = proc.Stopped + } + if err := s.store.ProcUpdate(p); err != nil { + log.Printf("error: done: cannot update proc_id %d child state: %s", p.ID, err) + } + } + if !proc.Running() && p.PPID == 0 { + done = true + if p.Failing() { + status = model.StatusFailure + } + continue + } + } + if done { + build.Status = status + build.Finished = proc.Stopped + if err := s.store.UpdateBuild(build); err != nil { + log.Printf("error: done: cannot update build_id %d final state: %s", build.ID, err) + } + } + + if err := s.logger.Close(c, id); err != nil { + log.Printf("error: done: cannot close build_id %d logger: %s", proc.ID, err) + } + + build.Procs = procs + message := pubsub.Message{ + Labels: map[string]string{ + "repo": repo.FullName, + "private": strconv.FormatBool(repo.IsPrivate), + }, + } + message.Data, _ = json.Marshal(model.Event{ + Repo: *repo, + Build: *build, + }) + s.pubsub.Publish(c, "topic/events", message) + + return nil +} // Log implements the rpc.Log function func (s *RPC) Log(c context.Context, id string, line *rpc.Line) error { diff --git a/store/datastore/files_test.go b/store/datastore/files_test.go index 35752035a..051e78a1d 100644 --- a/store/datastore/files_test.go +++ b/store/datastore/files_test.go @@ -10,7 +10,10 @@ import ( func TestFileFind(t *testing.T) { s := newTest() - defer s.Close() + defer func() { + s.Exec("delete from files") + s.Close() + }() if err := s.FileCreate( &model.File{ @@ -63,7 +66,10 @@ func TestFileFind(t *testing.T) { func TestFileList(t *testing.T) { s := newTest() - defer s.Close() + defer func() { + s.Exec("delete from files") + s.Close() + }() s.FileCreate( &model.File{ @@ -99,7 +105,10 @@ func TestFileList(t *testing.T) { func TestFileIndexes(t *testing.T) { s := newTest() - defer s.Close() + defer func() { + s.Exec("delete from files") + s.Close() + }() if err := s.FileCreate( &model.File{ diff --git a/store/datastore/procs.go b/store/datastore/procs.go index 61d70461d..189c6bbb8 100644 --- a/store/datastore/procs.go +++ b/store/datastore/procs.go @@ -6,6 +6,13 @@ import ( "github.com/russross/meddler" ) +func (db *datastore) ProcLoad(id int64) (*model.Proc, error) { + stmt := sql.Lookup(db.driver, "procs-find-id") + proc := new(model.Proc) + err := meddler.QueryRow(db, proc, stmt, id) + return proc, err +} + func (db *datastore) ProcFind(build *model.Build, pid int) (*model.Proc, error) { stmt := sql.Lookup(db.driver, "procs-find-build-pid") proc := new(model.Proc) diff --git a/store/datastore/procs_test.go b/store/datastore/procs_test.go index 16c69ec99..7357ff843 100644 --- a/store/datastore/procs_test.go +++ b/store/datastore/procs_test.go @@ -8,7 +8,10 @@ import ( func TestProcFind(t *testing.T) { s := newTest() - defer s.Close() + defer func() { + s.Exec("delete from procs") + s.Close() + }() err := s.ProcCreate([]*model.Proc{ { @@ -57,7 +60,10 @@ func TestProcFind(t *testing.T) { func TestProcChild(t *testing.T) { s := newTest() - defer s.Close() + defer func() { + s.Exec("delete from procs") + s.Close() + }() err := s.ProcCreate([]*model.Proc{ { @@ -96,7 +102,10 @@ func TestProcChild(t *testing.T) { func TestProcList(t *testing.T) { s := newTest() - defer s.Close() + defer func() { + s.Exec("delete from procs") + s.Close() + }() err := s.ProcCreate([]*model.Proc{ { @@ -138,7 +147,10 @@ func TestProcList(t *testing.T) { func TestProcUpdate(t *testing.T) { s := newTest() - defer s.Close() + defer func() { + s.Exec("delete from procs") + s.Close() + }() proc := &model.Proc{ BuildID: 1, @@ -174,7 +186,10 @@ func TestProcUpdate(t *testing.T) { func TestProcIndexes(t *testing.T) { s := newTest() - defer s.Close() + defer func() { + s.Exec("delete from procs") + s.Close() + }() if err := s.ProcCreate([]*model.Proc{ { diff --git a/store/datastore/sql/postgres/files/procs.sql b/store/datastore/sql/postgres/files/procs.sql index 34bf76559..56f917a17 100644 --- a/store/datastore/sql/postgres/files/procs.sql +++ b/store/datastore/sql/postgres/files/procs.sql @@ -1,3 +1,23 @@ +-- name: procs-find-id + +SELECT + proc_id +,proc_build_id +,proc_pid +,proc_ppid +,proc_pgid +,proc_name +,proc_state +,proc_error +,proc_exit_code +,proc_started +,proc_stopped +,proc_machine +,proc_platform +,proc_environ +FROM procs +WHERE proc_id = $1 + -- name: procs-find-build SELECT diff --git a/store/datastore/sql/postgres/sql_gen.go b/store/datastore/sql/postgres/sql_gen.go index b2e1024cc..6787faae5 100644 --- a/store/datastore/sql/postgres/sql_gen.go +++ b/store/datastore/sql/postgres/sql_gen.go @@ -9,6 +9,7 @@ var index = map[string]string{ "files-find-build": filesFindBuild, "files-find-proc-name": filesFindProcName, "files-find-proc-name-data": filesFindProcNameData, + "procs-find-id": procsFindId, "procs-find-build": procsFindBuild, "procs-find-build-pid": procsFindBuildPid, "procs-find-build-ppid": procsFindBuildPpid, @@ -56,6 +57,26 @@ WHERE file_proc_id = $1 AND file_name = $2 ` +var procsFindId = ` +SELECT + proc_id +,proc_build_id +,proc_pid +,proc_ppid +,proc_pgid +,proc_name +,proc_state +,proc_error +,proc_exit_code +,proc_started +,proc_stopped +,proc_machine +,proc_platform +,proc_environ +FROM procs +WHERE proc_id = $1 +` + var procsFindBuild = ` SELECT proc_id diff --git a/store/datastore/sql/sqlite/files/procs.sql b/store/datastore/sql/sqlite/files/procs.sql index a63ee7dde..a8643b3e7 100644 --- a/store/datastore/sql/sqlite/files/procs.sql +++ b/store/datastore/sql/sqlite/files/procs.sql @@ -1,3 +1,23 @@ +-- name: procs-find-id + +SELECT + proc_id +,proc_build_id +,proc_pid +,proc_ppid +,proc_pgid +,proc_name +,proc_state +,proc_error +,proc_exit_code +,proc_started +,proc_stopped +,proc_machine +,proc_platform +,proc_environ +FROM procs +WHERE proc_id = ? + -- name: procs-find-build SELECT diff --git a/store/datastore/sql/sqlite/sql_gen.go b/store/datastore/sql/sqlite/sql_gen.go index 251796761..edbc4b2a9 100644 --- a/store/datastore/sql/sqlite/sql_gen.go +++ b/store/datastore/sql/sqlite/sql_gen.go @@ -9,6 +9,7 @@ var index = map[string]string{ "files-find-build": filesFindBuild, "files-find-proc-name": filesFindProcName, "files-find-proc-name-data": filesFindProcNameData, + "procs-find-id": procsFindId, "procs-find-build": procsFindBuild, "procs-find-build-pid": procsFindBuildPid, "procs-find-build-ppid": procsFindBuildPpid, @@ -56,6 +57,26 @@ WHERE file_proc_id = ? AND file_name = ? ` +var procsFindId = ` +SELECT + proc_id +,proc_build_id +,proc_pid +,proc_ppid +,proc_pgid +,proc_name +,proc_state +,proc_error +,proc_exit_code +,proc_started +,proc_stopped +,proc_machine +,proc_platform +,proc_environ +FROM procs +WHERE proc_id = ? +` + var procsFindBuild = ` SELECT proc_id diff --git a/store/store.go b/store/store.go index 30708d9e3..56d793719 100644 --- a/store/store.go +++ b/store/store.go @@ -145,6 +145,7 @@ type Store interface { DeleteAgent(*model.Agent) error + ProcLoad(int64) (*model.Proc, error) ProcFind(*model.Build, int) (*model.Proc, error) ProcChild(*model.Build, int, string) (*model.Proc, error) ProcList(*model.Build) ([]*model.Proc, error) diff --git a/vendor/github.com/cncd/pipeline/pipeline/rpc/client.go b/vendor/github.com/cncd/pipeline/pipeline/rpc/client.go index b21957559..07016825d 100644 --- a/vendor/github.com/cncd/pipeline/pipeline/rpc/client.go +++ b/vendor/github.com/cncd/pipeline/pipeline/rpc/client.go @@ -3,7 +3,6 @@ package rpc import ( "context" "io" - "io/ioutil" "log" "math" "net/http" @@ -18,6 +17,7 @@ import ( const ( methodNext = "next" methodWait = "wait" + methodInit = "init" methodDone = "done" methodExtend = "extend" methodUpdate = "update" @@ -28,8 +28,7 @@ const ( type ( uploadReq struct { ID string `json:"id"` - Mime string `json:"mime"` - Data []byte `json:"data"` + File *File `json:"file"` } updateReq struct { @@ -90,9 +89,16 @@ func (t *Client) Wait(c context.Context, id string) error { return t.call(c, methodWait, id, nil) } +// Init signals the pipeline is initialized. +func (t *Client) Init(c context.Context, id string, state State) error { + params := updateReq{id, state} + return t.call(c, methodInit, ¶ms, nil) +} + // Done signals the pipeline is complete. -func (t *Client) Done(c context.Context, id string) error { - return t.call(c, methodDone, id, nil) +func (t *Client) Done(c context.Context, id string, state State) error { + params := updateReq{id, state} + return t.call(c, methodDone, ¶ms, nil) } // Extend extends the pipeline deadline. @@ -113,12 +119,8 @@ func (t *Client) Log(c context.Context, id string, line *Line) error { } // Upload uploads the pipeline artifact. -func (t *Client) Upload(c context.Context, id, mime string, file io.Reader) error { - data, err := ioutil.ReadAll(file) - if err != nil { - return err - } - params := uploadReq{id, mime, data} +func (t *Client) Upload(c context.Context, id string, file *File) error { + params := uploadReq{id, file} return t.call(c, methodUpload, params, nil) } diff --git a/vendor/github.com/cncd/pipeline/pipeline/rpc/line.go b/vendor/github.com/cncd/pipeline/pipeline/rpc/line.go index d85ae7bbb..1cbe4cd1e 100644 --- a/vendor/github.com/cncd/pipeline/pipeline/rpc/line.go +++ b/vendor/github.com/cncd/pipeline/pipeline/rpc/line.go @@ -36,12 +36,13 @@ func (l *Line) String() string { // LineWriter sends logs to the client. type LineWriter struct { - peer Peer - id string - name string - num int - now time.Time - rep *strings.Replacer + peer Peer + id string + name string + num int + now time.Time + rep *strings.Replacer + lines []*Line } // NewLineWriter returns a new line reader. @@ -91,5 +92,16 @@ func (w *LineWriter) Write(p []byte) (n int, err error) { // w.peer.Log(context.Background(), w.id, line) // w.num++ // } + w.lines = append(w.lines, line) return len(p), nil } + +// Lines returns the line history +func (w *LineWriter) Lines() []*Line { + return w.lines +} + +// Clear clears the line history +func (w *LineWriter) Clear() { + w.lines = w.lines[:0] +} diff --git a/vendor/github.com/cncd/pipeline/pipeline/rpc/peer.go b/vendor/github.com/cncd/pipeline/pipeline/rpc/peer.go index 22c96db0b..8c7e21e7e 100644 --- a/vendor/github.com/cncd/pipeline/pipeline/rpc/peer.go +++ b/vendor/github.com/cncd/pipeline/pipeline/rpc/peer.go @@ -2,7 +2,6 @@ package rpc import ( "context" - "io" "github.com/cncd/pipeline/pipeline/backend" ) @@ -33,6 +32,16 @@ type ( Config *backend.Config `json:"config"` Timeout int64 `json:"timeout"` } + + // File defines a pipeline artifact. + File struct { + Name string `json:"name"` + Proc string `json:"proc"` + Mime string `json:"mime"` + Time int64 `json:"time"` + Size int `json:"size"` + Data []byte `json:"data"` + } ) // NoFilter is an empty filter. @@ -43,11 +52,14 @@ type Peer interface { // Next returns the next pipeline in the queue. Next(c context.Context, f Filter) (*Pipeline, error) - // Wait blocks untilthe pipeline is complete. + // Wait blocks until the pipeline is complete. Wait(c context.Context, id string) error + // Init signals the pipeline is initialized. + Init(c context.Context, id string, state State) error + // Done signals the pipeline is complete. - Done(c context.Context, id string) error + Done(c context.Context, id string, state State) error // Extend extends the pipeline deadline Extend(c context.Context, id string) error @@ -56,7 +68,7 @@ type Peer interface { Update(c context.Context, id string, state State) error // Upload uploads the pipeline artifact. - Upload(c context.Context, id, mime string, file io.Reader) error + Upload(c context.Context, id string, file *File) error // Log writes the pipeline log entry. Log(c context.Context, id string, line *Line) error diff --git a/vendor/github.com/cncd/pipeline/pipeline/rpc/server.go b/vendor/github.com/cncd/pipeline/pipeline/rpc/server.go index decb9ed2b..65eb0f438 100644 --- a/vendor/github.com/cncd/pipeline/pipeline/rpc/server.go +++ b/vendor/github.com/cncd/pipeline/pipeline/rpc/server.go @@ -1,7 +1,6 @@ package rpc import ( - "bytes" "context" "encoding/json" "errors" @@ -54,6 +53,8 @@ func (s *Server) router(ctx context.Context, conn *jsonrpc2.Conn, req *jsonrpc2. return s.next(ctx, req) case methodWait: return s.wait(ctx, req) + case methodInit: + return s.init(ctx, req) case methodDone: return s.done(ctx, req) case methodExtend: @@ -90,15 +91,24 @@ func (s *Server) wait(ctx context.Context, req *jsonrpc2.Request) (interface{}, return nil, s.peer.Wait(ctx, id) } +// init unmarshals the rpc request parameters and invokes the peer.Init +// procedure. The results are retuned and written to the rpc response. +func (s *Server) init(ctx context.Context, req *jsonrpc2.Request) (interface{}, error) { + in := new(updateReq) + if err := json.Unmarshal([]byte(*req.Params), in); err != nil { + return nil, err + } + return nil, s.peer.Init(ctx, in.ID, in.State) +} + // done unmarshals the rpc request parameters and invokes the peer.Done // procedure. The results are retuned and written to the rpc response. func (s *Server) done(ctx context.Context, req *jsonrpc2.Request) (interface{}, error) { - var id string - err := json.Unmarshal([]byte(*req.Params), &id) - if err != nil { + in := new(updateReq) + if err := json.Unmarshal([]byte(*req.Params), in); err != nil { return nil, err } - return nil, s.peer.Done(ctx, id) + return nil, s.peer.Done(ctx, in.ID, in.State) } // extend unmarshals the rpc request parameters and invokes the peer.Extend @@ -137,5 +147,5 @@ func (s *Server) upload(req *jsonrpc2.Request) (interface{}, error) { if err := json.Unmarshal([]byte(*req.Params), in); err != nil { return nil, err } - return nil, s.peer.Upload(noContext, in.ID, in.Mime, bytes.NewBuffer(in.Data)) + return nil, s.peer.Upload(noContext, in.ID, in.File) } diff --git a/vendor/vendor.json b/vendor/vendor.json index 7c33e2d14..9d792a276 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -28,68 +28,68 @@ { "checksumSHA1": "W3AuK8ocqHwlUajGmQLFvnRhTZE=", "path": "github.com/cncd/pipeline/pipeline", - "revision": "addc99dad68008570994f8de318101adfe4161a6", - "revisionTime": "2017-03-19T09:04:25Z" + "revision": "4b348532eddd31220de9a179c197d31a78b200f5", + "revisionTime": "2017-03-29T08:36:18Z" }, { "checksumSHA1": "Qu2FreqaMr8Yx2bW9O0cxAGgjr0=", "path": "github.com/cncd/pipeline/pipeline/backend", - "revision": "addc99dad68008570994f8de318101adfe4161a6", - "revisionTime": "2017-03-19T09:04:25Z" + "revision": "4b348532eddd31220de9a179c197d31a78b200f5", + "revisionTime": "2017-03-29T08:36:18Z" }, { "checksumSHA1": "0CGXRaYwZhJxGIrGhn8WGpkFqPo=", "path": "github.com/cncd/pipeline/pipeline/backend/docker", - "revision": "addc99dad68008570994f8de318101adfe4161a6", - "revisionTime": "2017-03-19T09:04:25Z" + "revision": "4b348532eddd31220de9a179c197d31a78b200f5", + "revisionTime": "2017-03-29T08:36:18Z" }, { "checksumSHA1": "/8wE+cVb7T4PQZgpLNu0DHzKGuE=", "path": "github.com/cncd/pipeline/pipeline/frontend", - "revision": "addc99dad68008570994f8de318101adfe4161a6", - "revisionTime": "2017-03-19T09:04:25Z" + "revision": "4b348532eddd31220de9a179c197d31a78b200f5", + "revisionTime": "2017-03-29T08:36:18Z" }, { "checksumSHA1": "O0sulBQAHJeNLg3lO38Cq5uf/eg=", "path": "github.com/cncd/pipeline/pipeline/frontend/yaml", - "revision": "addc99dad68008570994f8de318101adfe4161a6", - "revisionTime": "2017-03-19T09:04:25Z" + "revision": "4b348532eddd31220de9a179c197d31a78b200f5", + "revisionTime": "2017-03-29T08:36:18Z" }, { "checksumSHA1": "ftyr9EJQl9D5OvzOcqGBS6stt0g=", "path": "github.com/cncd/pipeline/pipeline/frontend/yaml/compiler", - "revision": "addc99dad68008570994f8de318101adfe4161a6", - "revisionTime": "2017-03-19T09:04:25Z" + "revision": "4b348532eddd31220de9a179c197d31a78b200f5", + "revisionTime": "2017-03-29T08:36:18Z" }, { "checksumSHA1": "Q0GkNUFamVYIA1Fd8r0A5M6Gx54=", "path": "github.com/cncd/pipeline/pipeline/frontend/yaml/linter", - "revision": "addc99dad68008570994f8de318101adfe4161a6", - "revisionTime": "2017-03-19T09:04:25Z" + "revision": "4b348532eddd31220de9a179c197d31a78b200f5", + "revisionTime": "2017-03-29T08:36:18Z" }, { "checksumSHA1": "kx2sPUIMozPC/g6E4w48h3FfH3k=", "path": "github.com/cncd/pipeline/pipeline/frontend/yaml/matrix", - "revision": "addc99dad68008570994f8de318101adfe4161a6", - "revisionTime": "2017-03-19T09:04:25Z" + "revision": "4b348532eddd31220de9a179c197d31a78b200f5", + "revisionTime": "2017-03-29T08:36:18Z" }, { "checksumSHA1": "2/3f3oNmxXy5kcrRLCFa24Oc9O4=", "path": "github.com/cncd/pipeline/pipeline/interrupt", - "revision": "addc99dad68008570994f8de318101adfe4161a6", - "revisionTime": "2017-03-19T09:04:25Z" + "revision": "4b348532eddd31220de9a179c197d31a78b200f5", + "revisionTime": "2017-03-29T08:36:18Z" }, { "checksumSHA1": "uOjTfke7Qxosrivgz/nVTHeIP5g=", "path": "github.com/cncd/pipeline/pipeline/multipart", - "revision": "addc99dad68008570994f8de318101adfe4161a6", - "revisionTime": "2017-03-19T09:04:25Z" + "revision": "4b348532eddd31220de9a179c197d31a78b200f5", + "revisionTime": "2017-03-29T08:36:18Z" }, { - "checksumSHA1": "MratmNKJ78/IhWvDsZphN01CtmE=", + "checksumSHA1": "TP5lK1T8cOKv5QjZ2nqdlYczSTo=", "path": "github.com/cncd/pipeline/pipeline/rpc", - "revision": "addc99dad68008570994f8de318101adfe4161a6", - "revisionTime": "2017-03-19T09:04:25Z" + "revision": "4b348532eddd31220de9a179c197d31a78b200f5", + "revisionTime": "2017-03-29T08:36:18Z" }, { "checksumSHA1": "7Qj1DK0ceAXkYztW0l3+L6sn+V8=",