diff --git a/cncd/queue/fifo.go b/cncd/queue/fifo.go index c0428de9a..bc8936ba6 100644 --- a/cncd/queue/fifo.go +++ b/cncd/queue/fifo.go @@ -31,6 +31,7 @@ type fifo struct { running map[string]*entry pending *list.List extension time.Duration + paused bool } // New returns a new fifo queue. @@ -40,6 +41,7 @@ func New() Queue { running: map[string]*entry{}, pending: list.New(), extension: time.Minute * 10, + paused: false, } } @@ -167,14 +169,32 @@ func (q *fifo) Info(c context.Context) InfoT { for _, entry := range q.running { stats.Running = append(stats.Running, entry.item) } + stats.Paused = q.paused q.Unlock() return stats } +func (q *fifo) Pause() { + q.Lock() + q.paused = true + q.Unlock() +} + +func (q *fifo) Resume() { + q.Lock() + q.paused = false + q.Unlock() + go q.process() +} + // helper function that loops through the queue and attempts to // match the item to a single subscriber. func (q *fifo) process() { + if q.paused { + return + } + defer func() { // the risk of panic is low. This code can probably be removed // once the code has been used in real world installs without issue. diff --git a/cncd/queue/fifo_test.go b/cncd/queue/fifo_test.go index a5642ba38..2252bbb7f 100644 --- a/cncd/queue/fifo_test.go +++ b/cncd/queue/fifo_test.go @@ -238,6 +238,52 @@ func TestFifoCancel(t *testing.T) { } } +func TestFifoPause(t *testing.T) { + task1 := &Task{ + ID: "1", + } + + q := New().(*fifo) + var wg sync.WaitGroup + wg.Add(1) + go func() { + _, _ = q.Poll(noContext, func(*Task) bool { return true }) + wg.Done() + }() + + + q.Pause() + t0 := time.Now() + q.Push(noContext, task1) + time.Sleep(20 * time.Millisecond) + q.Resume() + + wg.Wait() + t1 := time.Now() + + if t1.Sub(t0) < 20 * time.Millisecond { + t.Errorf("Should have waited til resume") + } + + q.Pause() + q.Push(noContext, task1) + q.Resume() + _, _ = q.Poll(noContext, func(*Task) bool { return true }) +} + +func TestFifoPauseResume(t *testing.T) { + task1 := &Task{ + ID: "1", + } + + q := New().(*fifo) + q.Pause() + q.Push(noContext, task1) + q.Resume() + + _, _ = q.Poll(noContext, func(*Task) bool { return true }) +} + func TestShouldRun(t *testing.T) { task := &Task{ ID: "2", diff --git a/cncd/queue/queue.go b/cncd/queue/queue.go index 2b70d3285..fc1641c36 100644 --- a/cncd/queue/queue.go +++ b/cncd/queue/queue.go @@ -93,6 +93,7 @@ type InfoT struct { Running int `json:"running_count"` Complete int `json:"completed_count"` } `json:"stats"` + Paused bool } // Filter filters tasks in the queue. If the Filter returns false, @@ -128,4 +129,10 @@ type Queue interface { // Info returns internal queue information. Info(c context.Context) InfoT + + // Stops the queue from handing out new work items in Poll + Pause() + + // Starts the queue again, Poll returns new items + Resume() } diff --git a/router/router.go b/router/router.go index e9956c709..2fa37b3b2 100644 --- a/router/router.go +++ b/router/router.go @@ -148,6 +148,22 @@ func Load(mux *httptreemux.ContextMux, middleware ...gin.HandlerFunc) http.Handl ) } + queue := e.Group("/api/queue") + { + queue.GET("/pause", + session.MustAdmin(), + server.PauseQueue, + ) + queue.GET("/resume", + session.MustAdmin(), + server.ResumeQueue, + ) + queue.GET("/norunningbuilds", + session.MustAdmin(), + server.BlockTilQueueHasRunningItem, + ) + } + auth := e.Group("/authorize") { auth.GET("", server.HandleAuth) diff --git a/server/hook.go b/server/hook.go index c5f0af973..0becc690c 100644 --- a/server/hook.go +++ b/server/hook.go @@ -20,6 +20,7 @@ import ( "encoding/json" "fmt" "math/rand" + "net/http" "regexp" "strconv" "time" @@ -51,6 +52,26 @@ func GetQueueInfo(c *gin.Context) { ) } +func PauseQueue(c *gin.Context) { + Config.Services.Queue.Pause() + c.Status(http.StatusOK) +} + +func ResumeQueue(c *gin.Context) { + Config.Services.Queue.Resume() + c.Status(http.StatusOK) +} + +func BlockTilQueueHasRunningItem(c *gin.Context) { + for { + info := Config.Services.Queue.Info(c) + if info.Stats.Running == 0 { + break + } + } + c.Status(http.StatusOK) +} + func PostHook(c *gin.Context) { remote_ := remote.FromContext(c) diff --git a/server/stream.go b/server/stream.go index 56915e332..c4d42341c 100644 --- a/server/stream.go +++ b/server/stream.go @@ -77,7 +77,6 @@ func EventStreamSSE(c *gin.Context) { }() go func() { - // TODO remove this from global config Config.Services.Pubsub.Subscribe(ctx, "topic/events", func(m pubsub.Message) { defer func() { recover() // fix #2480