From 69de8face1624accc50dd821a8e04c0f2bb682ac Mon Sep 17 00:00:00 2001 From: Laszlo Fogas Date: Wed, 19 Jun 2019 08:36:13 +0200 Subject: [PATCH] Handling canceled, skipped builds --- cncd/queue/fifo.go | 16 +++++++++++ cncd/queue/fifo_test.go | 35 ++++++++++++++++++++++++ remote/github/convert.go | 2 +- server/build.go | 42 ++++++++++++++++++----------- server/rpc.go | 57 ++++------------------------------------ 5 files changed, 84 insertions(+), 68 deletions(-) diff --git a/cncd/queue/fifo.go b/cncd/queue/fifo.go index 3f21111e2..43d0c5097 100644 --- a/cncd/queue/fifo.go +++ b/cncd/queue/fifo.go @@ -103,6 +103,8 @@ func (q *fifo) Error(c context.Context, id string, err error) error { taskEntry.error = err close(taskEntry.done) delete(q.running, id) + } else { + q.removeFromPending(id) } q.Unlock() return nil @@ -268,3 +270,17 @@ func (q *fifo) updateDepStatusInQueue(taskID string, success bool) { } } } + +func (q *fifo) removeFromPending(taskID string) { + logrus.Debugf("queue: trying to remove %s", taskID) + var next *list.Element + for e := q.pending.Front(); e != nil; e = next { + next = e.Next() + task := e.Value.(*Task) + if task.ID == taskID { + logrus.Debugf("queue: %s is removed from pending", taskID) + q.pending.Remove(e) + return + } + } +} diff --git a/cncd/queue/fifo_test.go b/cncd/queue/fifo_test.go index 5aa8327ff..a5642ba38 100644 --- a/cncd/queue/fifo_test.go +++ b/cncd/queue/fifo_test.go @@ -203,6 +203,41 @@ func TestFifoErrors(t *testing.T) { } } +func TestFifoCancel(t *testing.T) { + task1 := &Task{ + ID: "1", + } + + task2 := &Task{ + ID: "2", + Dependencies: []string{"1"}, + DepStatus: make(map[string]bool), + } + + task3 := &Task{ + ID: "3", + Dependencies: []string{"1"}, + DepStatus: make(map[string]bool), + RunOn: []string{"success", "failure"}, + } + + q := New().(*fifo) + q.Push(noContext, task2) + q.Push(noContext, task3) + q.Push(noContext, task1) + + _, _ = q.Poll(noContext, func(*Task) bool { return true }) + q.Error(noContext, task1.ID, fmt.Errorf("cancelled")) + q.Error(noContext, task2.ID, fmt.Errorf("cancelled")) + q.Error(noContext, task3.ID, fmt.Errorf("cancelled")) + + info := q.Info(noContext) + if len(info.Pending) != 0 { + t.Errorf("All pipelines should be cancelled") + return + } +} + func TestShouldRun(t *testing.T) { task := &Task{ ID: "2", diff --git a/remote/github/convert.go b/remote/github/convert.go index b8ce64f65..4fde67e4c 100644 --- a/remote/github/convert.go +++ b/remote/github/convert.go @@ -51,7 +51,7 @@ const ( // GitHub commit status. func convertStatus(status string) string { switch status { - case model.StatusPending, model.StatusRunning, model.StatusBlocked: + case model.StatusPending, model.StatusRunning, model.StatusBlocked, model.StatusSkipped: return statusPending case model.StatusFailure, model.StatusDeclined: return statusFailure diff --git a/server/build.go b/server/build.go index df2efc093..b94a7e119 100644 --- a/server/build.go +++ b/server/build.go @@ -156,13 +156,12 @@ func GetProcLogs(c *gin.Context) { io.Copy(c.Writer, rc) } +// DeleteBuild cancels a build func DeleteBuild(c *gin.Context) { repo := session.Repo(c) - // parse the build number and job sequence number from - // the repquest parameter. + // parse the build number from the request parameter. num, _ := strconv.Atoi(c.Params.ByName("number")) - seq, _ := strconv.Atoi(c.Params.ByName("job")) build, err := store.GetBuildNumber(c, repo, num) if err != nil { @@ -170,27 +169,40 @@ func DeleteBuild(c *gin.Context) { return } - proc, err := store.FromContext(c).ProcFind(build, seq) + procs, err := store.FromContext(c).ProcList(build) if err != nil { c.AbortWithError(404, err) return } - if proc.State != model.StatusRunning { + cancelled := false + for _, proc := range procs { + if proc.PPID != 0 { + continue + } + + if proc.State != model.StatusRunning && proc.State != model.StatusPending { + continue + } + + proc.State = model.StatusKilled + proc.Stopped = time.Now().Unix() + if proc.Started == 0 { + proc.Started = proc.Stopped + } + proc.ExitCode = 137 + // TODO cancel child procs + store.FromContext(c).ProcUpdate(proc) + + Config.Services.Queue.Error(context.Background(), fmt.Sprint(proc.ID), queue.ErrCancel) + cancelled = true + } + + if !cancelled { c.String(400, "Cannot cancel a non-running build") return } - proc.State = model.StatusKilled - proc.Stopped = time.Now().Unix() - if proc.Started == 0 { - proc.Started = proc.Stopped - } - proc.ExitCode = 137 - // TODO cancel child procs - store.FromContext(c).ProcUpdate(proc) - - Config.Services.Queue.Error(context.Background(), fmt.Sprint(proc.ID), queue.ErrCancel) c.String(204, "") } diff --git a/server/rpc.go b/server/rpc.go index c552ea195..0410ece42 100644 --- a/server/rpc.go +++ b/server/rpc.go @@ -441,7 +441,11 @@ func (s *RPC) updateProcState(proc *model.Proc, state rpc.State) { proc.Stopped = state.Finished proc.Error = state.Error proc.ExitCode = state.ExitCode - proc.State = model.StatusSuccess + if state.Started == 0 { + proc.State = model.StatusSkipped + } else { + proc.State = model.StatusSuccess + } if proc.ExitCode != 0 || proc.Error != "" { proc.State = model.StatusFailure } @@ -522,21 +526,6 @@ func (s *RPC) notify(c context.Context, repo *model.Repo, build *model.Build, pr s.pubsub.Publish(c, "topic/events", message) } -func (s *RPC) checkCancelled(pipeline *rpc.Pipeline) (bool, error) { - pid, err := strconv.ParseInt(pipeline.ID, 10, 64) - if err != nil { - return false, err - } - proc, err := s.store.ProcLoad(pid) - if err != nil { - return false, err - } - if proc.State == model.StatusKilled { - return true, nil - } - return false, err -} - func createFilterFunc(filter rpc.Filter) (queue.Filter, error) { var st *expr.Selector var err error @@ -606,42 +595,6 @@ func (s *DroneServer) Next(c oldcontext.Context, req *proto.NextRequest) (*proto res.Pipeline.Payload, _ = json.Marshal(pipeline.Config) return res, err - - // fn := func(task *queue.Task) bool { - // for k, v := range req.GetFilter().Labels { - // if task.Labels[k] != v { - // return false - // } - // } - // return true - // } - // task, err := s.Queue.Poll(c, fn) - // if err != nil { - // return nil, err - // } else if task == nil { - // return nil, nil - // } - // - // pipeline := new(rpc.Pipeline) - // json.Unmarshal(task.Data, pipeline) - // - // res := new(proto.NextReply) - // res.Pipeline = new(proto.Pipeline) - // res.Pipeline.Id = pipeline.ID - // res.Pipeline.Timeout = pipeline.Timeout - // res.Pipeline.Payload, _ = json.Marshal(pipeline.Config) - // - // // check if the process was previously cancelled - // // cancelled, _ := s.checkCancelled(pipeline) - // // if cancelled { - // // logrus.Debugf("ignore pid %v: cancelled by user", pipeline.ID) - // // if derr := s.queue.Done(c, pipeline.ID); derr != nil { - // // logrus.Errorf("error: done: cannot ack proc_id %v: %s", pipeline.ID, err) - // // } - // // return nil, nil - // // } - // - // return res, nil } func (s *DroneServer) Init(c oldcontext.Context, req *proto.InitRequest) (*proto.Empty, error) {