mirror of
https://github.com/woodpecker-ci/woodpecker.git
synced 2024-12-30 10:11:23 +02:00
Merge pull request #70 from laszlocph/cancel-build
Iterating on cancellation logic
This commit is contained in:
commit
a74ee24e96
@ -99,41 +99,57 @@ func (q *fifo) Poll(c context.Context, f Filter) (*Task, error) {
|
||||
|
||||
// Done signals that the item is done executing.
|
||||
func (q *fifo) Done(c context.Context, id string, exitStatus string) error {
|
||||
return q.finished(id, exitStatus, nil)
|
||||
return q.finished([]string{id}, exitStatus, nil)
|
||||
}
|
||||
|
||||
// Error signals that the item is done executing with error.
|
||||
func (q *fifo) Error(c context.Context, id string, err error) error {
|
||||
return q.finished([]string{id}, StatusFailure, err)
|
||||
}
|
||||
|
||||
// Error signals that the item is done executing with error.
|
||||
func (q *fifo) ErrorAtOnce(c context.Context, id []string, err error) error {
|
||||
return q.finished(id, StatusFailure, err)
|
||||
}
|
||||
|
||||
func (q *fifo) finished(id string, exitStatus string, err error) error {
|
||||
func (q *fifo) finished(ids []string, exitStatus string, err error) error {
|
||||
q.Lock()
|
||||
taskEntry, ok := q.running[id]
|
||||
if ok {
|
||||
taskEntry.error = err
|
||||
close(taskEntry.done)
|
||||
delete(q.running, id)
|
||||
} else {
|
||||
q.removeFromPending(id)
|
||||
|
||||
for _, id := range ids {
|
||||
taskEntry, ok := q.running[id]
|
||||
if ok {
|
||||
taskEntry.error = err
|
||||
close(taskEntry.done)
|
||||
delete(q.running, id)
|
||||
} else {
|
||||
q.removeFromPending(id)
|
||||
}
|
||||
q.updateDepStatusInQueue(id, exitStatus)
|
||||
}
|
||||
q.updateDepStatusInQueue(id, exitStatus)
|
||||
|
||||
q.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Evict removes a pending task from the queue.
|
||||
func (q *fifo) Evict(c context.Context, id string) error {
|
||||
return q.EvictAtOnce(c, []string{id})
|
||||
}
|
||||
|
||||
// Evict removes a pending task from the queue.
|
||||
func (q *fifo) EvictAtOnce(c context.Context, ids []string) error {
|
||||
q.Lock()
|
||||
defer q.Unlock()
|
||||
|
||||
var next *list.Element
|
||||
for e := q.pending.Front(); e != nil; e = next {
|
||||
next = e.Next()
|
||||
task, ok := e.Value.(*Task)
|
||||
if ok && task.ID == id {
|
||||
q.pending.Remove(e)
|
||||
return nil
|
||||
for _, id := range ids {
|
||||
var next *list.Element
|
||||
for e := q.pending.Front(); e != nil; e = next {
|
||||
next = e.Next()
|
||||
task, ok := e.Value.(*Task)
|
||||
if ok && task.ID == id {
|
||||
q.pending.Remove(e)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
return ErrNotFound
|
||||
|
@ -123,9 +123,15 @@ type Queue interface {
|
||||
// Error signals the task is complete with errors.
|
||||
Error(c context.Context, id string, err error) error
|
||||
|
||||
// Error signals the task is complete with errors.
|
||||
ErrorAtOnce(c context.Context, id []string, err error) error
|
||||
|
||||
// Evict removes a pending task from the queue.
|
||||
Evict(c context.Context, id string) error
|
||||
|
||||
// Evict removes a pending task from the queue.
|
||||
EvictAtOnce(c context.Context, id []string) error
|
||||
|
||||
// Wait waits until the task is complete.
|
||||
Wait(c context.Context, id string) error
|
||||
|
||||
|
@ -119,3 +119,14 @@ func (q *persistentQueue) Evict(c context.Context, id string) error {
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// Evict removes a pending task from the queue.
|
||||
func (q *persistentQueue) EvictAtOnce(c context.Context, ids []string) error {
|
||||
err := q.Queue.EvictAtOnce(c, ids)
|
||||
if err == nil {
|
||||
for _, id := range ids {
|
||||
q.store.TaskDelete(id)
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
@ -113,7 +113,7 @@ func Load(mux *httptreemux.ContextMux, middleware ...gin.HandlerFunc) http.Handl
|
||||
repo.POST("/move", session.MustRepoAdmin(), server.MoveRepo)
|
||||
|
||||
repo.POST("/builds/:number", session.MustPush, server.PostBuild)
|
||||
repo.DELETE("/builds/:number", session.MustRepoAdmin(), server.ZombieKill)
|
||||
repo.DELETE("/builds/:number", session.MustPush, server.DeleteBuild)
|
||||
repo.POST("/builds/:number/approve", session.MustPush, server.PostApproval)
|
||||
repo.POST("/builds/:number/decline", session.MustPush, server.PostDecline)
|
||||
repo.DELETE("/builds/:number/:job", session.MustPush, server.DeleteBuild)
|
||||
|
@ -160,8 +160,6 @@ func GetProcLogs(c *gin.Context) {
|
||||
// DeleteBuild cancels a build
|
||||
func DeleteBuild(c *gin.Context) {
|
||||
repo := session.Repo(c)
|
||||
|
||||
// parse the build number from the request parameter.
|
||||
num, _ := strconv.Atoi(c.Params.ByName("number"))
|
||||
|
||||
build, err := store.GetBuildNumber(c, repo, num)
|
||||
@ -176,76 +174,63 @@ func DeleteBuild(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
cancelled := false
|
||||
if build.Status != model.StatusRunning && build.Status != model.StatusPending {
|
||||
c.String(400, "Cannot cancel a non-running or non-pending build")
|
||||
return
|
||||
}
|
||||
|
||||
// First cancel/evict procs in the queue in one go
|
||||
procToCancel := []string{}
|
||||
procToEvict := []string{}
|
||||
for _, proc := range procs {
|
||||
if proc.PPID != 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
if proc.State != model.StatusRunning && proc.State != model.StatusPending {
|
||||
continue
|
||||
if proc.State == model.StatusRunning {
|
||||
procToCancel = append(procToCancel, fmt.Sprint(proc.ID))
|
||||
}
|
||||
|
||||
// TODO cancel child procs
|
||||
if _, err = UpdateProcToStatusKilled(store.FromContext(c), *proc); err != nil {
|
||||
log.Printf("error: done: cannot update proc_id %d state: %s", proc.ID, err)
|
||||
if proc.State == model.StatusPending {
|
||||
procToEvict = append(procToEvict, fmt.Sprint(proc.ID))
|
||||
}
|
||||
|
||||
Config.Services.Queue.Error(context.Background(), fmt.Sprint(proc.ID), queue.ErrCancel)
|
||||
cancelled = true
|
||||
}
|
||||
|
||||
if !cancelled {
|
||||
c.String(400, "Cannot cancel a non-running build")
|
||||
return
|
||||
}
|
||||
|
||||
c.String(204, "")
|
||||
}
|
||||
|
||||
// ZombieKill kills zombie processes stuck in an infinite pending
|
||||
// or running state. This can only be invoked by administrators and
|
||||
// may have negative effects.
|
||||
func ZombieKill(c *gin.Context) {
|
||||
repo := session.Repo(c)
|
||||
|
||||
// parse the build number and job sequence number from
|
||||
// the repquest parameter.
|
||||
num, _ := strconv.Atoi(c.Params.ByName("number"))
|
||||
|
||||
build, err := store.GetBuildNumber(c, repo, num)
|
||||
if err != nil {
|
||||
c.AbortWithError(404, err)
|
||||
return
|
||||
}
|
||||
|
||||
procs, err := store.FromContext(c).ProcList(build)
|
||||
if err != nil {
|
||||
c.AbortWithError(404, err)
|
||||
return
|
||||
}
|
||||
|
||||
if build.Status != model.StatusRunning {
|
||||
c.String(400, "Cannot force cancel a non-running build")
|
||||
return
|
||||
}
|
||||
Config.Services.Queue.EvictAtOnce(context.Background(), procToEvict)
|
||||
Config.Services.Queue.ErrorAtOnce(context.Background(), procToEvict, queue.ErrCancel)
|
||||
Config.Services.Queue.ErrorAtOnce(context.Background(), procToCancel, queue.ErrCancel)
|
||||
|
||||
// Then update the DB status for pending builds
|
||||
// Running ones will be set when the agents stop on the cancel signal
|
||||
for _, proc := range procs {
|
||||
if proc.Running() {
|
||||
if _, err := UpdateProcToStatusKilled(store.FromContext(c), *proc); err != nil {
|
||||
log.Printf("error: done: cannot update proc_id %d state: %s", proc.ID, err)
|
||||
if proc.State == model.StatusPending {
|
||||
if proc.PPID != 0 {
|
||||
if _, err = UpdateProcToStatusSkipped(store.FromContext(c), *proc, 0); err != nil {
|
||||
log.Printf("error: done: cannot update proc_id %d state: %s", proc.ID, err)
|
||||
}
|
||||
} else {
|
||||
if _, err = UpdateProcToStatusKilled(store.FromContext(c), *proc); err != nil {
|
||||
log.Printf("error: done: cannot update proc_id %d state: %s", proc.ID, err)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
store.FromContext(c).ProcUpdate(proc)
|
||||
}
|
||||
Config.Services.Queue.Error(context.Background(), fmt.Sprint(proc.ID), queue.ErrCancel)
|
||||
}
|
||||
|
||||
if _, err := UpdateToStatusKilled(store.FromContext(c), *build); err != nil {
|
||||
killedBuild, err := UpdateToStatusKilled(store.FromContext(c), *build)
|
||||
if err != nil {
|
||||
c.AbortWithError(500, err)
|
||||
return
|
||||
}
|
||||
|
||||
// For pending builds, we stream the UI the latest state.
|
||||
// For running builds, the UI will be updated when the agents acknowledge the cancel
|
||||
if build.Status == model.StatusPending {
|
||||
procs, err = store.FromContext(c).ProcList(killedBuild)
|
||||
if err != nil {
|
||||
c.AbortWithError(404, err)
|
||||
return
|
||||
}
|
||||
killedBuild.Procs = model.Tree(procs)
|
||||
publishToTopic(c, killedBuild, repo, model.Cancelled)
|
||||
}
|
||||
|
||||
c.String(204, "")
|
||||
}
|
||||
|
||||
@ -353,7 +338,7 @@ func PostApproval(c *gin.Context) {
|
||||
}
|
||||
}()
|
||||
|
||||
publishToTopic(c, build, repo)
|
||||
publishToTopic(c, build, repo, model.Enqueued)
|
||||
queueBuild(build, repo, buildItems)
|
||||
}
|
||||
|
||||
@ -557,7 +542,7 @@ func PostBuild(c *gin.Context) {
|
||||
}
|
||||
c.JSON(202, build)
|
||||
|
||||
publishToTopic(c, build, repo)
|
||||
publishToTopic(c, build, repo, model.Enqueued)
|
||||
queueBuild(build, repo, buildItems)
|
||||
}
|
||||
|
||||
|
@ -285,7 +285,7 @@ func PostHook(c *gin.Context) {
|
||||
}
|
||||
}()
|
||||
|
||||
publishToTopic(c, build, repo)
|
||||
publishToTopic(c, build, repo, model.Enqueued)
|
||||
queueBuild(build, repo, buildItems)
|
||||
}
|
||||
|
||||
@ -360,7 +360,7 @@ func findOrPersistPipelineConfig(build *model.Build, remoteYamlConfig *remote.Fi
|
||||
}
|
||||
|
||||
// publishes message to UI clients
|
||||
func publishToTopic(c *gin.Context, build *model.Build, repo *model.Repo) {
|
||||
func publishToTopic(c *gin.Context, build *model.Build, repo *model.Repo, event model.EventType) {
|
||||
message := pubsub.Message{
|
||||
Labels: map[string]string{
|
||||
"repo": repo.FullName,
|
||||
|
Loading…
Reference in New Issue
Block a user