From 65105d06c2de37055366acbb8f243d4bd7a6512f Mon Sep 17 00:00:00 2001 From: Brad Rydzewski Date: Sun, 26 Apr 2015 23:49:38 -0700 Subject: [PATCH] queue polling not correctly handles client disconnect --- queue/builtin/queue.go | 18 ++++++++++++++++++ queue/queue.go | 12 ++++++++++++ server/queue.go | 8 ++++++-- 3 files changed, 36 insertions(+), 2 deletions(-) diff --git a/queue/builtin/queue.go b/queue/builtin/queue.go index d680d4ac8..4e6d2c89d 100644 --- a/queue/builtin/queue.go +++ b/queue/builtin/queue.go @@ -82,6 +82,24 @@ func (q *Queue) Pull() *queue.Work { return work } +// PullClose retrieves and removes the head of this queue, +// waiting if necessary until work becomes available. The +// CloseNotifier should be provided to clone the channel +// if the subscribing client terminates its connection. +func (q *Queue) PullClose(cn queue.CloseNotifier) *queue.Work { + for { + select { + case <-cn.CloseNotify(): + return nil + case work := <-q.itemc: + q.Lock() + delete(q.items, work) + q.Unlock() + return work + } + } +} + // PullAck retrieves and removes the head of this queue, waiting // if necessary until work becomes available. Items pull from the // queue that aren't acknowledged will be pushed back to the queue diff --git a/queue/queue.go b/queue/queue.go index 73df7a70c..54ba17c95 100644 --- a/queue/queue.go +++ b/queue/queue.go @@ -13,6 +13,12 @@ type Queue interface { // if necessary until work becomes available. Pull() *Work + // PullClose retrieves and removes the head of this queue, + // waiting if necessary until work becomes available. The + // CloseNotifier should be provided to clone the channel + // if the subscribing client terminates its connection. + PullClose(CloseNotifier) *Work + // PullAck retrieves and removes the head of this queue, waiting // if necessary until work becomes available. Items pull from the // queue that aren't acknowledged will be pushed back to the queue @@ -26,3 +32,9 @@ type Queue interface { // queue, in proper sequence. Items() []*Work } + +type CloseNotifier interface { + // CloseNotify returns a channel that receives a single value + // when the client connection has gone away. + CloseNotify() <-chan bool +} diff --git a/server/queue.go b/server/queue.go index 674bdd6b5..1bdbdf2d9 100644 --- a/server/queue.go +++ b/server/queue.go @@ -19,8 +19,12 @@ import ( // GET /queue/pull func PollBuild(c *gin.Context) { queue := ToQueue(c) - work := queue.Pull() - c.JSON(200, work) + work := queue.PullClose(c.Writer) + if work == nil { + c.AbortWithStatus(500) + } else { + c.JSON(200, work) + } } // GET /queue/push/:owner/:repo