mirror of
https://github.com/woodpecker-ci/woodpecker.git
synced 2024-12-24 10:07:21 +02:00
queue integrated with server, but not agent
This commit is contained in:
parent
524fba6253
commit
f2c1d46f9e
37
bus/bus.go
37
bus/bus.go
@ -2,8 +2,6 @@ package bus
|
||||
|
||||
//go:generate mockery -name Bus -output mock -case=underscore
|
||||
|
||||
import "golang.org/x/net/context"
|
||||
|
||||
// Bus represents an event bus implementation that
|
||||
// allows a publisher to broadcast Event notifications
|
||||
// to a list of subscribers.
|
||||
@ -21,20 +19,21 @@ type Bus interface {
|
||||
Unsubscribe(chan *Event)
|
||||
}
|
||||
|
||||
// Publish broadcasts an event to all subscribers.
|
||||
func Publish(c context.Context, event *Event) {
|
||||
FromContext(c).Publish(event)
|
||||
}
|
||||
|
||||
// Subscribe adds the channel to the list of
|
||||
// subscribers. Each subscriber in the list will
|
||||
// receive broadcast events.
|
||||
func Subscribe(c context.Context, eventc chan *Event) {
|
||||
FromContext(c).Subscribe(eventc)
|
||||
}
|
||||
|
||||
// Unsubscribe removes the channel from the
|
||||
// list of subscribers.
|
||||
func Unsubscribe(c context.Context, eventc chan *Event) {
|
||||
FromContext(c).Unsubscribe(eventc)
|
||||
}
|
||||
//
|
||||
// // Publish broadcasts an event to all subscribers.
|
||||
// func Publish(c context.Context, event *Event) {
|
||||
// FromContext(c).Publish(event)
|
||||
// }
|
||||
//
|
||||
// // Subscribe adds the channel to the list of
|
||||
// // subscribers. Each subscriber in the list will
|
||||
// // receive broadcast events.
|
||||
// func Subscribe(c context.Context, eventc chan *Event) {
|
||||
// FromContext(c).Subscribe(eventc)
|
||||
// }
|
||||
//
|
||||
// // Unsubscribe removes the channel from the
|
||||
// // list of subscribers.
|
||||
// func Unsubscribe(c context.Context, eventc chan *Event) {
|
||||
// FromContext(c).Unsubscribe(eventc)
|
||||
// }
|
||||
|
@ -295,6 +295,7 @@ func server(c *cli.Context) error {
|
||||
middleware.Store(c),
|
||||
middleware.Remote(c),
|
||||
middleware.Agents(c),
|
||||
middleware.Broker(c),
|
||||
)
|
||||
|
||||
// start the server with tls enabled
|
||||
|
@ -1,93 +1,94 @@
|
||||
package queue
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
. "github.com/franela/goblin"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
func TestBuild(t *testing.T) {
|
||||
g := Goblin(t)
|
||||
g.Describe("Queue", func() {
|
||||
|
||||
g.It("Should publish item", func() {
|
||||
c := new(gin.Context)
|
||||
q := newQueue()
|
||||
ToContext(c, q)
|
||||
|
||||
w1 := &Work{}
|
||||
w2 := &Work{}
|
||||
Publish(c, w1)
|
||||
Publish(c, w2)
|
||||
g.Assert(len(q.items)).Equal(2)
|
||||
g.Assert(len(q.itemc)).Equal(2)
|
||||
})
|
||||
|
||||
g.It("Should remove item", func() {
|
||||
c := new(gin.Context)
|
||||
q := newQueue()
|
||||
ToContext(c, q)
|
||||
|
||||
w1 := &Work{}
|
||||
w2 := &Work{}
|
||||
w3 := &Work{}
|
||||
Publish(c, w1)
|
||||
Publish(c, w2)
|
||||
Publish(c, w3)
|
||||
Remove(c, w2)
|
||||
g.Assert(len(q.items)).Equal(2)
|
||||
g.Assert(len(q.itemc)).Equal(2)
|
||||
|
||||
g.Assert(Pull(c)).Equal(w1)
|
||||
g.Assert(Pull(c)).Equal(w3)
|
||||
g.Assert(Remove(c, w2)).Equal(ErrNotFound)
|
||||
})
|
||||
|
||||
g.It("Should pull item", func() {
|
||||
c := new(gin.Context)
|
||||
q := New()
|
||||
ToContext(c, q)
|
||||
|
||||
cn := new(closeNotifier)
|
||||
cn.closec = make(chan bool, 1)
|
||||
w1 := &Work{}
|
||||
w2 := &Work{}
|
||||
|
||||
Publish(c, w1)
|
||||
g.Assert(Pull(c)).Equal(w1)
|
||||
|
||||
Publish(c, w2)
|
||||
g.Assert(PullClose(c, cn)).Equal(w2)
|
||||
})
|
||||
|
||||
g.It("Should cancel pulling item", func() {
|
||||
c := new(gin.Context)
|
||||
q := New()
|
||||
ToContext(c, q)
|
||||
|
||||
cn := new(closeNotifier)
|
||||
cn.closec = make(chan bool, 1)
|
||||
var wg sync.WaitGroup
|
||||
go func() {
|
||||
wg.Add(1)
|
||||
g.Assert(PullClose(c, cn) == nil).IsTrue()
|
||||
wg.Done()
|
||||
}()
|
||||
go func() {
|
||||
cn.closec <- true
|
||||
}()
|
||||
wg.Wait()
|
||||
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
type closeNotifier struct {
|
||||
closec chan bool
|
||||
}
|
||||
|
||||
func (c *closeNotifier) CloseNotify() <-chan bool {
|
||||
return c.closec
|
||||
}
|
||||
//
|
||||
// import (
|
||||
// "sync"
|
||||
// "testing"
|
||||
//
|
||||
// . "github.com/franela/goblin"
|
||||
// "github.com/gin-gonic/gin"
|
||||
// )
|
||||
//
|
||||
// func TestBuild(t *testing.T) {
|
||||
// g := Goblin(t)
|
||||
// g.Describe("Queue", func() {
|
||||
//
|
||||
// g.It("Should publish item", func() {
|
||||
// c := new(gin.Context)
|
||||
// q := newQueue()
|
||||
// ToContext(c, q)
|
||||
//
|
||||
// w1 := &Work{}
|
||||
// w2 := &Work{}
|
||||
// Publish(c, w1)
|
||||
// Publish(c, w2)
|
||||
// g.Assert(len(q.items)).Equal(2)
|
||||
// g.Assert(len(q.itemc)).Equal(2)
|
||||
// })
|
||||
//
|
||||
// g.It("Should remove item", func() {
|
||||
// c := new(gin.Context)
|
||||
// q := newQueue()
|
||||
// ToContext(c, q)
|
||||
//
|
||||
// w1 := &Work{}
|
||||
// w2 := &Work{}
|
||||
// w3 := &Work{}
|
||||
// Publish(c, w1)
|
||||
// Publish(c, w2)
|
||||
// Publish(c, w3)
|
||||
// Remove(c, w2)
|
||||
// g.Assert(len(q.items)).Equal(2)
|
||||
// g.Assert(len(q.itemc)).Equal(2)
|
||||
//
|
||||
// g.Assert(Pull(c)).Equal(w1)
|
||||
// g.Assert(Pull(c)).Equal(w3)
|
||||
// g.Assert(Remove(c, w2)).Equal(ErrNotFound)
|
||||
// })
|
||||
//
|
||||
// g.It("Should pull item", func() {
|
||||
// c := new(gin.Context)
|
||||
// q := New()
|
||||
// ToContext(c, q)
|
||||
//
|
||||
// cn := new(closeNotifier)
|
||||
// cn.closec = make(chan bool, 1)
|
||||
// w1 := &Work{}
|
||||
// w2 := &Work{}
|
||||
//
|
||||
// Publish(c, w1)
|
||||
// g.Assert(Pull(c)).Equal(w1)
|
||||
//
|
||||
// Publish(c, w2)
|
||||
// g.Assert(PullClose(c, cn)).Equal(w2)
|
||||
// })
|
||||
//
|
||||
// g.It("Should cancel pulling item", func() {
|
||||
// c := new(gin.Context)
|
||||
// q := New()
|
||||
// ToContext(c, q)
|
||||
//
|
||||
// cn := new(closeNotifier)
|
||||
// cn.closec = make(chan bool, 1)
|
||||
// var wg sync.WaitGroup
|
||||
// go func() {
|
||||
// wg.Add(1)
|
||||
// g.Assert(PullClose(c, cn) == nil).IsTrue()
|
||||
// wg.Done()
|
||||
// }()
|
||||
// go func() {
|
||||
// cn.closec <- true
|
||||
// }()
|
||||
// wg.Wait()
|
||||
//
|
||||
// })
|
||||
// })
|
||||
// }
|
||||
//
|
||||
// type closeNotifier struct {
|
||||
// closec chan bool
|
||||
// }
|
||||
//
|
||||
// func (c *closeNotifier) CloseNotify() <-chan bool {
|
||||
// return c.closec
|
||||
// }
|
||||
|
52
router/middleware/broker.go
Normal file
52
router/middleware/broker.go
Normal file
@ -0,0 +1,52 @@
|
||||
package middleware
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
handlers "github.com/drone/drone/server"
|
||||
|
||||
"github.com/codegangsta/cli"
|
||||
"github.com/drone/mq/server"
|
||||
"github.com/drone/mq/stomp"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
const (
|
||||
serverKey = "broker"
|
||||
clientKey = "stomp.client" // mirrored from stomp/context
|
||||
)
|
||||
|
||||
// Broker is a middleware function that initializes the broker
|
||||
// and adds the broker client to the request context.
|
||||
func Broker(cli *cli.Context) gin.HandlerFunc {
|
||||
secret := cli.String("agent-secret")
|
||||
if secret == "" {
|
||||
logrus.Fatalf("failed to generate token from DRONE_SECRET")
|
||||
}
|
||||
|
||||
broker := server.NewServer(
|
||||
server.WithCredentials("x-token", secret),
|
||||
)
|
||||
client := broker.Client()
|
||||
|
||||
var once sync.Once
|
||||
return func(c *gin.Context) {
|
||||
c.Set(serverKey, broker)
|
||||
c.Set(clientKey, client)
|
||||
once.Do(func() {
|
||||
// this is some really hacky stuff
|
||||
// turns out I need to do some refactoring
|
||||
// don't judge!
|
||||
// will fix in 0.6 release
|
||||
ctx := c.Copy()
|
||||
client.Connect(
|
||||
stomp.WithCredentials("x-token", secret),
|
||||
)
|
||||
client.Subscribe("/queue/updates", stomp.HandlerFunc(func(m *stomp.Message) {
|
||||
go handlers.HandleUpdate(ctx, m.Copy())
|
||||
}))
|
||||
})
|
||||
}
|
||||
}
|
@ -113,17 +113,9 @@ func Load(middleware ...gin.HandlerFunc) http.Handler {
|
||||
e.POST("/hook", server.PostHook)
|
||||
e.POST("/api/hook", server.PostHook)
|
||||
|
||||
stream := e.Group("/api/stream")
|
||||
{
|
||||
stream.Use(session.SetRepo())
|
||||
stream.Use(session.SetPerm())
|
||||
stream.Use(session.MustPull)
|
||||
|
||||
stream.GET("/:owner/:name", server.GetRepoEvents)
|
||||
stream.GET("/:owner/:name/:build/:number", server.GetStream)
|
||||
}
|
||||
ws := e.Group("/ws")
|
||||
{
|
||||
ws.GET("/broker", server.Broker)
|
||||
ws.GET("/feed", server.EventStream)
|
||||
ws.GET("/logs/:owner/:name/:build/:number",
|
||||
session.SetRepo(),
|
||||
@ -152,20 +144,6 @@ func Load(middleware ...gin.HandlerFunc) http.Handler {
|
||||
agents.GET("", server.GetAgents)
|
||||
}
|
||||
|
||||
queue := e.Group("/api/queue")
|
||||
{
|
||||
queue.Use(session.AuthorizeAgent)
|
||||
queue.POST("/pull", server.Pull)
|
||||
queue.POST("/pull/:os/:arch", server.Pull)
|
||||
queue.POST("/wait/:id", server.Wait)
|
||||
queue.POST("/stream/:id", server.Stream)
|
||||
queue.POST("/status/:id", server.Update)
|
||||
queue.POST("/ping", server.Ping)
|
||||
|
||||
queue.POST("/logs/:id", server.PostLogs)
|
||||
queue.GET("/logs/:id", server.WriteLogs)
|
||||
}
|
||||
|
||||
// DELETE THESE
|
||||
// gitlab := e.Group("/gitlab/:owner/:name")
|
||||
// {
|
||||
|
13
server/broker.go
Normal file
13
server/broker.go
Normal file
@ -0,0 +1,13 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
// Broker handles connections to the embedded message broker.
|
||||
func Broker(c *gin.Context) {
|
||||
broker := c.MustGet("broker").(http.Handler)
|
||||
broker.ServeHTTP(c.Writer, c.Request)
|
||||
}
|
@ -12,11 +12,13 @@ import (
|
||||
"github.com/drone/drone/shared/httputil"
|
||||
"github.com/drone/drone/store"
|
||||
"github.com/drone/drone/stream"
|
||||
"github.com/drone/drone/yaml"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/square/go-jose"
|
||||
|
||||
"github.com/drone/drone/model"
|
||||
"github.com/drone/drone/router/middleware/session"
|
||||
"github.com/drone/mq/stomp"
|
||||
)
|
||||
|
||||
func GetBuilds(c *gin.Context) {
|
||||
@ -148,7 +150,14 @@ func DeleteBuild(c *gin.Context) {
|
||||
job.ExitCode = 137
|
||||
store.UpdateBuildJob(c, build, job)
|
||||
|
||||
bus.Publish(c, bus.NewEvent(bus.Cancelled, repo, build, job))
|
||||
client := stomp.MustFromContext(c)
|
||||
client.SendJSON("/topic/cancel", bus.Event{
|
||||
Type: bus.Cancelled,
|
||||
Repo: *repo,
|
||||
Build: *build,
|
||||
Job: *job,
|
||||
})
|
||||
|
||||
c.String(204, "")
|
||||
}
|
||||
|
||||
@ -293,7 +302,7 @@ func PostBuild(c *gin.Context) {
|
||||
last, _ := store.GetBuildLastBefore(c, repo, build.Branch, build.ID)
|
||||
secs, err := store.GetMergedSecretList(c, repo)
|
||||
if err != nil {
|
||||
log.Errorf("Error getting secrets for %s#%d. %s", repo.FullName, build.Number, err)
|
||||
log.Debugf("Error getting secrets for %s#%d. %s", repo.FullName, build.Number, err)
|
||||
}
|
||||
|
||||
var signed bool
|
||||
@ -318,9 +327,19 @@ func PostBuild(c *gin.Context) {
|
||||
|
||||
log.Debugf(".drone.yml is signed=%v and verified=%v", signed, verified)
|
||||
|
||||
bus.Publish(c, bus.NewBuildEvent(bus.Enqueued, repo, build))
|
||||
client := stomp.MustFromContext(c)
|
||||
client.SendJSON("/topic/events", bus.Event{
|
||||
Type: bus.Enqueued,
|
||||
Repo: *repo,
|
||||
Build: *build,
|
||||
},
|
||||
stomp.WithHeader("repo", repo.FullName),
|
||||
stomp.WithHeader("private", strconv.FormatBool(repo.IsPrivate)),
|
||||
)
|
||||
|
||||
for _, job := range jobs {
|
||||
queue.Publish(c, &queue.Work{
|
||||
broker, _ := stomp.FromContext(c)
|
||||
broker.SendJSON("/queue/pending", &queue.Work{
|
||||
Signed: signed,
|
||||
Verified: verified,
|
||||
User: user,
|
||||
@ -332,7 +351,7 @@ func PostBuild(c *gin.Context) {
|
||||
Yaml: string(raw),
|
||||
Secrets: secs,
|
||||
System: &model.System{Link: httputil.GetURL(c.Request)},
|
||||
})
|
||||
}, stomp.WithHeaders(yaml.ParseLabel(raw)))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3,6 +3,7 @@ package server
|
||||
import (
|
||||
"fmt"
|
||||
"regexp"
|
||||
"strconv"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/square/go-jose"
|
||||
@ -16,6 +17,7 @@ import (
|
||||
"github.com/drone/drone/shared/token"
|
||||
"github.com/drone/drone/store"
|
||||
"github.com/drone/drone/yaml"
|
||||
"github.com/drone/mq/stomp"
|
||||
)
|
||||
|
||||
var skipRe = regexp.MustCompile(`\[(?i:ci *skip|skip *ci)\]`)
|
||||
@ -208,12 +210,22 @@ func PostHook(c *gin.Context) {
|
||||
last, _ := store.GetBuildLastBefore(c, repo, build.Branch, build.ID)
|
||||
secs, err := store.GetMergedSecretList(c, repo)
|
||||
if err != nil {
|
||||
log.Errorf("Error getting secrets for %s#%d. %s", repo.FullName, build.Number, err)
|
||||
log.Debugf("Error getting secrets for %s#%d. %s", repo.FullName, build.Number, err)
|
||||
}
|
||||
|
||||
bus.Publish(c, bus.NewBuildEvent(bus.Enqueued, repo, build))
|
||||
client := stomp.MustFromContext(c)
|
||||
client.SendJSON("/topic/events", bus.Event{
|
||||
Type: bus.Enqueued,
|
||||
Repo: *repo,
|
||||
Build: *build,
|
||||
},
|
||||
stomp.WithHeader("repo", repo.FullName),
|
||||
stomp.WithHeader("private", strconv.FormatBool(repo.IsPrivate)),
|
||||
)
|
||||
|
||||
for _, job := range jobs {
|
||||
queue.Publish(c, &queue.Work{
|
||||
broker, _ := stomp.FromContext(c)
|
||||
broker.SendJSON("/queue/pending", &queue.Work{
|
||||
Signed: build.Signed,
|
||||
Verified: build.Verified,
|
||||
User: user,
|
||||
@ -225,7 +237,7 @@ func PostHook(c *gin.Context) {
|
||||
Yaml: string(raw),
|
||||
Secrets: secs,
|
||||
System: &model.System{Link: httputil.GetURL(c.Request)},
|
||||
})
|
||||
}, stomp.WithHeaders(yaml.ParseLabel(raw)))
|
||||
}
|
||||
|
||||
}
|
||||
|
552
server/queue.go
552
server/queue.go
@ -1,80 +1,319 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
"github.com/drone/drone/bus"
|
||||
"github.com/drone/drone/model"
|
||||
"github.com/drone/drone/queue"
|
||||
"github.com/drone/drone/remote"
|
||||
"github.com/drone/drone/store"
|
||||
"github.com/drone/drone/stream"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/drone/mq/stomp"
|
||||
"github.com/gorilla/websocket"
|
||||
)
|
||||
|
||||
// Pull is a long request that polls and attemts to pull work off the queue stack.
|
||||
func Pull(c *gin.Context) {
|
||||
logrus.Debugf("Agent %s connected.", c.ClientIP())
|
||||
//
|
||||
// // Pull is a long request that polls and attemts to pull work off the queue stack.
|
||||
// func Pull(c *gin.Context) {
|
||||
// logrus.Debugf("Agent %s connected.", c.ClientIP())
|
||||
//
|
||||
// w := queue.PullClose(c, c.Writer)
|
||||
// if w == nil {
|
||||
// logrus.Debugf("Agent %s could not pull work.", c.ClientIP())
|
||||
// } else {
|
||||
//
|
||||
// // setup the channel to stream logs
|
||||
// if err := stream.Create(c, stream.ToKey(w.Job.ID)); err != nil {
|
||||
// logrus.Errorf("Unable to create stream. %s", err)
|
||||
// }
|
||||
//
|
||||
// c.JSON(202, w)
|
||||
//
|
||||
// logrus.Debugf("Agent %s assigned work. %s/%s#%d.%d",
|
||||
// c.ClientIP(),
|
||||
// w.Repo.Owner,
|
||||
// w.Repo.Name,
|
||||
// w.Build.Number,
|
||||
// w.Job.Number,
|
||||
// )
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// // Wait is a long request that polls and waits for cancelled build requests.
|
||||
// func Wait(c *gin.Context) {
|
||||
// id, err := strconv.ParseInt(c.Param("id"), 10, 64)
|
||||
// if err != nil {
|
||||
// c.String(500, "Invalid input. %s", err)
|
||||
// return
|
||||
// }
|
||||
//
|
||||
// eventc := make(chan *bus.Event, 1)
|
||||
//
|
||||
// bus.Subscribe(c, eventc)
|
||||
// defer bus.Unsubscribe(c, eventc)
|
||||
//
|
||||
// for {
|
||||
// select {
|
||||
// case event := <-eventc:
|
||||
// if event.Job.ID == id && event.Type == bus.Cancelled {
|
||||
// c.JSON(200, event.Job)
|
||||
// return
|
||||
// }
|
||||
// case <-c.Writer.CloseNotify():
|
||||
// return
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// // Update handles build updates from the agent and persists to the database.
|
||||
// func Update(c *gin.Context) {
|
||||
// work := &queue.Work{}
|
||||
// if err := c.BindJSON(work); err != nil {
|
||||
// logrus.Errorf("Invalid input. %s", err)
|
||||
// return
|
||||
// }
|
||||
//
|
||||
// // TODO(bradrydzewski) it is really annoying that we have to do this lookup
|
||||
// // and I'd prefer not to. The reason we do this is because the Build and Job
|
||||
// // have fields that aren't serialized to json and would be reset to their
|
||||
// // empty values if we just saved what was coming in the http.Request body.
|
||||
// build, err := store.GetBuild(c, work.Build.ID)
|
||||
// if err != nil {
|
||||
// c.String(404, "Unable to find build. %s", err)
|
||||
// return
|
||||
// }
|
||||
// job, err := store.GetJob(c, work.Job.ID)
|
||||
// if err != nil {
|
||||
// c.String(404, "Unable to find job. %s", err)
|
||||
// return
|
||||
// }
|
||||
// build.Started = work.Build.Started
|
||||
// build.Finished = work.Build.Finished
|
||||
// build.Status = work.Build.Status
|
||||
// job.Started = work.Job.Started
|
||||
// job.Finished = work.Job.Finished
|
||||
// job.Status = work.Job.Status
|
||||
// job.ExitCode = work.Job.ExitCode
|
||||
// job.Error = work.Job.Error
|
||||
//
|
||||
// if build.Status == model.StatusPending {
|
||||
// build.Started = work.Job.Started
|
||||
// build.Status = model.StatusRunning
|
||||
// store.UpdateBuild(c, build)
|
||||
// }
|
||||
//
|
||||
// // if job.Status == model.StatusRunning {
|
||||
// // err := stream.Create(c, stream.ToKey(job.ID))
|
||||
// // if err != nil {
|
||||
// // logrus.Errorf("Unable to create stream. %s", err)
|
||||
// // }
|
||||
// // }
|
||||
//
|
||||
// ok, err := store.UpdateBuildJob(c, build, job)
|
||||
// if err != nil {
|
||||
// c.String(500, "Unable to update job. %s", err)
|
||||
// return
|
||||
// }
|
||||
//
|
||||
// if ok && build.Status != model.StatusRunning {
|
||||
// // get the user because we transfer the user form the server to agent
|
||||
// // and back we lose the token which does not get serialized to json.
|
||||
// user, err := store.GetUser(c, work.User.ID)
|
||||
// if err != nil {
|
||||
// c.String(500, "Unable to find user. %s", err)
|
||||
// return
|
||||
// }
|
||||
// remote.Status(c, user, work.Repo, build,
|
||||
// fmt.Sprintf("%s/%s/%d", work.System.Link, work.Repo.FullName, work.Build.Number))
|
||||
// }
|
||||
//
|
||||
// if build.Status == model.StatusRunning {
|
||||
// bus.Publish(c, bus.NewEvent(bus.Started, work.Repo, build, job))
|
||||
// } else {
|
||||
// bus.Publish(c, bus.NewEvent(bus.Finished, work.Repo, build, job))
|
||||
// }
|
||||
//
|
||||
// c.JSON(200, work)
|
||||
// }
|
||||
//
|
||||
// // Stream streams the logs to disk or memory for broadcasing to listeners. Once
|
||||
// // the stream is closed it is moved to permanent storage in the database.
|
||||
// func Stream(c *gin.Context) {
|
||||
// id, err := strconv.ParseInt(c.Param("id"), 10, 64)
|
||||
// if err != nil {
|
||||
// c.String(500, "Invalid input. %s", err)
|
||||
// return
|
||||
// }
|
||||
//
|
||||
// key := c.Param("id")
|
||||
// logrus.Infof("Agent %s creating stream %s.", c.ClientIP(), key)
|
||||
//
|
||||
// wc, err := stream.Writer(c, key)
|
||||
// if err != nil {
|
||||
// c.String(500, "Failed to create stream writer. %s", err)
|
||||
// return
|
||||
// }
|
||||
//
|
||||
// defer func() {
|
||||
// wc.Close()
|
||||
// stream.Delete(c, key)
|
||||
// }()
|
||||
//
|
||||
// io.Copy(wc, c.Request.Body)
|
||||
//
|
||||
// rc, err := stream.Reader(c, key)
|
||||
// if err != nil {
|
||||
// c.String(500, "Failed to create stream reader. %s", err)
|
||||
// return
|
||||
// }
|
||||
//
|
||||
// wg := sync.WaitGroup{}
|
||||
// wg.Add(1)
|
||||
//
|
||||
// go func() {
|
||||
// defer recover()
|
||||
// store.WriteLog(c, &model.Job{ID: id}, rc)
|
||||
// wg.Done()
|
||||
// }()
|
||||
//
|
||||
// wc.Close()
|
||||
// wg.Wait()
|
||||
// c.String(200, "")
|
||||
//
|
||||
// logrus.Debugf("Agent %s wrote stream to database", c.ClientIP())
|
||||
// }
|
||||
//
|
||||
// func Ping(c *gin.Context) {
|
||||
// agent, err := store.GetAgentAddr(c, c.ClientIP())
|
||||
// if err == nil {
|
||||
// agent.Updated = time.Now().Unix()
|
||||
// err = store.UpdateAgent(c, agent)
|
||||
// } else {
|
||||
// err = store.CreateAgent(c, &model.Agent{
|
||||
// Address: c.ClientIP(),
|
||||
// Platform: "linux/amd64",
|
||||
// Capacity: 2,
|
||||
// Created: time.Now().Unix(),
|
||||
// Updated: time.Now().Unix(),
|
||||
// })
|
||||
// }
|
||||
// if err != nil {
|
||||
// logrus.Errorf("Unable to register agent. %s", err.Error())
|
||||
// }
|
||||
// c.String(200, "PONG")
|
||||
// }
|
||||
|
||||
w := queue.PullClose(c, c.Writer)
|
||||
if w == nil {
|
||||
logrus.Debugf("Agent %s could not pull work.", c.ClientIP())
|
||||
} else {
|
||||
//
|
||||
//
|
||||
// Below are alternate implementations for the Queue that use websockets.
|
||||
//
|
||||
//
|
||||
//
|
||||
// // PostLogs handles an http request from the agent to post build logs. These
|
||||
// // logs are posted at the end of the build process.
|
||||
// func PostLogs(c *gin.Context) {
|
||||
// id, _ := strconv.ParseInt(c.Param("id"), 10, 64)
|
||||
// job, err := store.GetJob(c, id)
|
||||
// if err != nil {
|
||||
// c.String(404, "Cannot upload logs. %s", err)
|
||||
// return
|
||||
// }
|
||||
// if err := store.WriteLog(c, job, c.Request.Body); err != nil {
|
||||
// c.String(500, "Cannot persist logs", err)
|
||||
// return
|
||||
// }
|
||||
// c.String(200, "")
|
||||
// }
|
||||
//
|
||||
// // WriteLogs handles an http request from the agent to stream build logs from
|
||||
// // the agent to the server to enable real time streamings to the client.
|
||||
// func WriteLogs(c *gin.Context) {
|
||||
// id, err := strconv.ParseInt(c.Param("id"), 10, 64)
|
||||
// if err != nil {
|
||||
// c.String(500, "Invalid input. %s", err)
|
||||
// return
|
||||
// }
|
||||
//
|
||||
// conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
|
||||
// if err != nil {
|
||||
// c.String(500, "Cannot upgrade to websocket. %s", err)
|
||||
// return
|
||||
// }
|
||||
// defer conn.Close()
|
||||
//
|
||||
// wc, err := stream.Writer(c, stream.ToKey(id))
|
||||
// if err != nil {
|
||||
// c.String(500, "Cannot create stream writer. %s", err)
|
||||
// return
|
||||
// }
|
||||
// defer func() {
|
||||
// wc.Close()
|
||||
// stream.Delete(c, stream.ToKey(id))
|
||||
// }()
|
||||
//
|
||||
// var msg []byte
|
||||
// for {
|
||||
// _, msg, err = conn.ReadMessage()
|
||||
// if err != nil {
|
||||
// break
|
||||
// }
|
||||
// wc.Write(msg)
|
||||
// wc.Write(newline)
|
||||
// }
|
||||
//
|
||||
// if err != nil && err != io.EOF {
|
||||
// c.String(500, "Error reading logs. %s", err)
|
||||
// return
|
||||
// }
|
||||
// //
|
||||
// // rc, err := stream.Reader(c, stream.ToKey(id))
|
||||
// // if err != nil {
|
||||
// // c.String(500, "Failed to create stream reader. %s", err)
|
||||
// // return
|
||||
// // }
|
||||
// //
|
||||
// // wg := sync.WaitGroup{}
|
||||
// // wg.Add(1)
|
||||
// //
|
||||
// // go func() {
|
||||
// // defer recover()
|
||||
// // store.WriteLog(c, &model.Job{ID: id}, rc)
|
||||
// // wg.Done()
|
||||
// // }()
|
||||
// //
|
||||
// // wc.Close()
|
||||
// // wg.Wait()
|
||||
//
|
||||
// }
|
||||
|
||||
// setup the channel to stream logs
|
||||
if err := stream.Create(c, stream.ToKey(w.Job.ID)); err != nil {
|
||||
logrus.Errorf("Unable to create stream. %s", err)
|
||||
}
|
||||
// newline defines a newline constant to separate lines in the build output
|
||||
var newline = []byte{'\n'}
|
||||
|
||||
c.JSON(202, w)
|
||||
|
||||
logrus.Debugf("Agent %s assigned work. %s/%s#%d.%d",
|
||||
c.ClientIP(),
|
||||
w.Repo.Owner,
|
||||
w.Repo.Name,
|
||||
w.Build.Number,
|
||||
w.Job.Number,
|
||||
)
|
||||
}
|
||||
// upgrader defines the default behavior for upgrading the websocket.
|
||||
var upgrader = websocket.Upgrader{
|
||||
ReadBufferSize: 1024,
|
||||
WriteBufferSize: 1024,
|
||||
CheckOrigin: func(r *http.Request) bool {
|
||||
return true
|
||||
},
|
||||
}
|
||||
|
||||
// Wait is a long request that polls and waits for cancelled build requests.
|
||||
func Wait(c *gin.Context) {
|
||||
id, err := strconv.ParseInt(c.Param("id"), 10, 64)
|
||||
if err != nil {
|
||||
c.String(500, "Invalid input. %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
eventc := make(chan *bus.Event, 1)
|
||||
|
||||
bus.Subscribe(c, eventc)
|
||||
defer bus.Unsubscribe(c, eventc)
|
||||
|
||||
for {
|
||||
select {
|
||||
case event := <-eventc:
|
||||
if event.Job.ID == id && event.Type == bus.Cancelled {
|
||||
c.JSON(200, event.Job)
|
||||
return
|
||||
}
|
||||
case <-c.Writer.CloseNotify():
|
||||
return
|
||||
// HandleUpdate handles build updates from the agent and persists to the database.
|
||||
func HandleUpdate(c context.Context, message *stomp.Message) {
|
||||
defer func() {
|
||||
message.Release()
|
||||
if r := recover(); r != nil {
|
||||
err := r.(error)
|
||||
logrus.Errorf("Panic recover: broker update handler: %s", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Update handles build updates from the agent and persists to the database.
|
||||
func Update(c *gin.Context) {
|
||||
work := &queue.Work{}
|
||||
if err := c.BindJSON(work); err != nil {
|
||||
work := new(queue.Work)
|
||||
if err := message.Unmarshal(work); err != nil {
|
||||
logrus.Errorf("Invalid input. %s", err)
|
||||
return
|
||||
}
|
||||
@ -85,12 +324,12 @@ func Update(c *gin.Context) {
|
||||
// empty values if we just saved what was coming in the http.Request body.
|
||||
build, err := store.GetBuild(c, work.Build.ID)
|
||||
if err != nil {
|
||||
c.String(404, "Unable to find build. %s", err)
|
||||
logrus.Errorf("Unable to find build. %s", err)
|
||||
return
|
||||
}
|
||||
job, err := store.GetJob(c, work.Job.ID)
|
||||
if err != nil {
|
||||
c.String(404, "Unable to find job. %s", err)
|
||||
logrus.Errorf("Unable to find job. %s", err)
|
||||
return
|
||||
}
|
||||
build.Started = work.Build.Started
|
||||
@ -117,189 +356,52 @@ func Update(c *gin.Context) {
|
||||
|
||||
ok, err := store.UpdateBuildJob(c, build, job)
|
||||
if err != nil {
|
||||
c.String(500, "Unable to update job. %s", err)
|
||||
logrus.Errorf("Unable to update job. %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
if ok && build.Status != model.StatusRunning {
|
||||
// get the user because we transfer the user form the server to agent
|
||||
// and back we lose the token which does not get serialized to json.
|
||||
user, err := store.GetUser(c, work.User.ID)
|
||||
if err != nil {
|
||||
c.String(500, "Unable to find user. %s", err)
|
||||
user, uerr := store.GetUser(c, work.User.ID)
|
||||
if uerr != nil {
|
||||
logrus.Errorf("Unable to find user. %s", err)
|
||||
return
|
||||
}
|
||||
remote.Status(c, user, work.Repo, build,
|
||||
fmt.Sprintf("%s/%s/%d", work.System.Link, work.Repo.FullName, work.Build.Number))
|
||||
}
|
||||
|
||||
if build.Status == model.StatusRunning {
|
||||
bus.Publish(c, bus.NewEvent(bus.Started, work.Repo, build, job))
|
||||
} else {
|
||||
bus.Publish(c, bus.NewEvent(bus.Finished, work.Repo, build, job))
|
||||
}
|
||||
var buf bytes.Buffer
|
||||
var sub []byte
|
||||
|
||||
c.JSON(200, work)
|
||||
}
|
||||
|
||||
// Stream streams the logs to disk or memory for broadcasing to listeners. Once
|
||||
// the stream is closed it is moved to permanent storage in the database.
|
||||
func Stream(c *gin.Context) {
|
||||
id, err := strconv.ParseInt(c.Param("id"), 10, 64)
|
||||
if err != nil {
|
||||
c.String(500, "Invalid input. %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
key := c.Param("id")
|
||||
logrus.Infof("Agent %s creating stream %s.", c.ClientIP(), key)
|
||||
|
||||
wc, err := stream.Writer(c, key)
|
||||
if err != nil {
|
||||
c.String(500, "Failed to create stream writer. %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
defer func() {
|
||||
wc.Close()
|
||||
stream.Delete(c, key)
|
||||
}()
|
||||
|
||||
io.Copy(wc, c.Request.Body)
|
||||
|
||||
rc, err := stream.Reader(c, key)
|
||||
if err != nil {
|
||||
c.String(500, "Failed to create stream reader. %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
|
||||
go func() {
|
||||
defer recover()
|
||||
store.WriteLog(c, &model.Job{ID: id}, rc)
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
wc.Close()
|
||||
wg.Wait()
|
||||
c.String(200, "")
|
||||
|
||||
logrus.Debugf("Agent %s wrote stream to database", c.ClientIP())
|
||||
}
|
||||
|
||||
func Ping(c *gin.Context) {
|
||||
agent, err := store.GetAgentAddr(c, c.ClientIP())
|
||||
if err == nil {
|
||||
agent.Updated = time.Now().Unix()
|
||||
err = store.UpdateAgent(c, agent)
|
||||
} else {
|
||||
err = store.CreateAgent(c, &model.Agent{
|
||||
Address: c.ClientIP(),
|
||||
Platform: "linux/amd64",
|
||||
Capacity: 2,
|
||||
Created: time.Now().Unix(),
|
||||
Updated: time.Now().Unix(),
|
||||
})
|
||||
}
|
||||
if err != nil {
|
||||
logrus.Errorf("Unable to register agent. %s", err.Error())
|
||||
}
|
||||
c.String(200, "PONG")
|
||||
}
|
||||
|
||||
//
|
||||
//
|
||||
// Below are alternate implementations for the Queue that use websockets.
|
||||
//
|
||||
//
|
||||
|
||||
// PostLogs handles an http request from the agent to post build logs. These
|
||||
// logs are posted at the end of the build process.
|
||||
func PostLogs(c *gin.Context) {
|
||||
id, _ := strconv.ParseInt(c.Param("id"), 10, 64)
|
||||
job, err := store.GetJob(c, id)
|
||||
if err != nil {
|
||||
c.String(404, "Cannot upload logs. %s", err)
|
||||
return
|
||||
}
|
||||
if err := store.WriteLog(c, job, c.Request.Body); err != nil {
|
||||
c.String(500, "Cannot persist logs", err)
|
||||
return
|
||||
}
|
||||
c.String(200, "")
|
||||
}
|
||||
|
||||
// WriteLogs handles an http request from the agent to stream build logs from
|
||||
// the agent to the server to enable real time streamings to the client.
|
||||
func WriteLogs(c *gin.Context) {
|
||||
id, err := strconv.ParseInt(c.Param("id"), 10, 64)
|
||||
if err != nil {
|
||||
c.String(500, "Invalid input. %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
|
||||
if err != nil {
|
||||
c.String(500, "Cannot upgrade to websocket. %s", err)
|
||||
return
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
wc, err := stream.Writer(c, stream.ToKey(id))
|
||||
if err != nil {
|
||||
c.String(500, "Cannot create stream writer. %s", err)
|
||||
return
|
||||
}
|
||||
defer func() {
|
||||
wc.Close()
|
||||
stream.Delete(c, stream.ToKey(id))
|
||||
}()
|
||||
|
||||
var msg []byte
|
||||
for {
|
||||
_, msg, err = conn.ReadMessage()
|
||||
if err != nil {
|
||||
break
|
||||
done := make(chan bool)
|
||||
dest := fmt.Sprintf("/topic/%d", job.ID)
|
||||
client, _ := stomp.FromContext(c)
|
||||
sub, err = client.Subscribe(dest, stomp.HandlerFunc(func(m *stomp.Message) {
|
||||
if len(m.Header.Get([]byte("eof"))) != 0 {
|
||||
done <- true
|
||||
}
|
||||
wc.Write(msg)
|
||||
wc.Write(newline)
|
||||
}
|
||||
|
||||
if err != nil && err != io.EOF {
|
||||
c.String(500, "Error reading logs. %s", err)
|
||||
buf.Write(m.Body)
|
||||
buf.WriteByte('\n')
|
||||
m.Release()
|
||||
}))
|
||||
if err != nil {
|
||||
logrus.Errorf("Unable to read logs from broker. %s", err)
|
||||
return
|
||||
}
|
||||
//
|
||||
// rc, err := stream.Reader(c, stream.ToKey(id))
|
||||
// if err != nil {
|
||||
// c.String(500, "Failed to create stream reader. %s", err)
|
||||
// return
|
||||
<-done
|
||||
|
||||
if err := store.WriteLog(c, job, &buf); err != nil {
|
||||
logrus.Errorf("Unable to write logs to store. %s", err)
|
||||
return
|
||||
}
|
||||
// if build.Status == model.StatusRunning {
|
||||
// bus.Publish(c, bus.NewEvent(bus.Started, work.Repo, build, job))
|
||||
// } else {
|
||||
// bus.Publish(c, bus.NewEvent(bus.Finished, work.Repo, build, job))
|
||||
// }
|
||||
//
|
||||
// wg := sync.WaitGroup{}
|
||||
// wg.Add(1)
|
||||
//
|
||||
// go func() {
|
||||
// defer recover()
|
||||
// store.WriteLog(c, &model.Job{ID: id}, rc)
|
||||
// wg.Done()
|
||||
// }()
|
||||
//
|
||||
// wc.Close()
|
||||
// wg.Wait()
|
||||
|
||||
}
|
||||
|
||||
// newline defines a newline constant to separate lines in the build output
|
||||
var newline = []byte{'\n'}
|
||||
|
||||
// upgrader defines the default behavior for upgrading the websocket.
|
||||
var upgrader = websocket.Upgrader{
|
||||
ReadBufferSize: 1024,
|
||||
WriteBufferSize: 1024,
|
||||
CheckOrigin: func(r *http.Request) bool {
|
||||
return true
|
||||
},
|
||||
client.Unsubscribe(sub)
|
||||
client.Send(dest, []byte{}, stomp.WithRetain("remove"))
|
||||
}
|
||||
|
212
server/stream.go
212
server/stream.go
@ -1,121 +1,21 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"encoding/json"
|
||||
"io"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/drone/drone/bus"
|
||||
"github.com/drone/drone/cache"
|
||||
"github.com/drone/drone/model"
|
||||
"github.com/drone/drone/router/middleware/session"
|
||||
"github.com/drone/drone/store"
|
||||
"github.com/drone/drone/stream"
|
||||
"github.com/drone/mq/stomp"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/manucorporat/sse"
|
||||
)
|
||||
|
||||
// GetRepoEvents will upgrade the connection to a Websocket and will stream
|
||||
// event updates to the browser.
|
||||
func GetRepoEvents(c *gin.Context) {
|
||||
repo := session.Repo(c)
|
||||
c.Writer.Header().Set("Content-Type", "text/event-stream")
|
||||
|
||||
eventc := make(chan *bus.Event, 1)
|
||||
bus.Subscribe(c, eventc)
|
||||
defer func() {
|
||||
bus.Unsubscribe(c, eventc)
|
||||
close(eventc)
|
||||
logrus.Infof("closed event stream")
|
||||
}()
|
||||
|
||||
c.Stream(func(w io.Writer) bool {
|
||||
select {
|
||||
case event := <-eventc:
|
||||
if event == nil {
|
||||
logrus.Infof("nil event received")
|
||||
return false
|
||||
}
|
||||
|
||||
// TODO(bradrydzewski) This is a super hacky workaround until we improve
|
||||
// the actual bus. Having a per-call database event is just plain stupid.
|
||||
if event.Repo.FullName == repo.FullName {
|
||||
|
||||
var payload = struct {
|
||||
model.Build
|
||||
Jobs []*model.Job `json:"jobs"`
|
||||
}{}
|
||||
payload.Build = event.Build
|
||||
payload.Jobs, _ = store.GetJobList(c, &event.Build)
|
||||
data, _ := json.Marshal(&payload)
|
||||
|
||||
sse.Encode(w, sse.Event{
|
||||
Event: "message",
|
||||
Data: string(data),
|
||||
})
|
||||
}
|
||||
case <-c.Writer.CloseNotify():
|
||||
return false
|
||||
}
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
func GetStream(c *gin.Context) {
|
||||
|
||||
repo := session.Repo(c)
|
||||
buildn, _ := strconv.Atoi(c.Param("build"))
|
||||
jobn, _ := strconv.Atoi(c.Param("number"))
|
||||
|
||||
c.Writer.Header().Set("Content-Type", "text/event-stream")
|
||||
|
||||
build, err := store.GetBuildNumber(c, repo, buildn)
|
||||
if err != nil {
|
||||
logrus.Debugln("stream cannot get build number.", err)
|
||||
c.AbortWithError(404, err)
|
||||
return
|
||||
}
|
||||
job, err := store.GetJobNumber(c, build, jobn)
|
||||
if err != nil {
|
||||
logrus.Debugln("stream cannot get job number.", err)
|
||||
c.AbortWithError(404, err)
|
||||
return
|
||||
}
|
||||
|
||||
rc, err := stream.Reader(c, stream.ToKey(job.ID))
|
||||
if err != nil {
|
||||
c.AbortWithError(404, err)
|
||||
return
|
||||
}
|
||||
|
||||
go func() {
|
||||
<-c.Writer.CloseNotify()
|
||||
rc.Close()
|
||||
}()
|
||||
|
||||
var line int
|
||||
var scanner = bufio.NewScanner(rc)
|
||||
for scanner.Scan() {
|
||||
line++
|
||||
var err = sse.Encode(c.Writer, sse.Event{
|
||||
Id: strconv.Itoa(line),
|
||||
Event: "message",
|
||||
Data: scanner.Text(),
|
||||
})
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
c.Writer.Flush()
|
||||
}
|
||||
|
||||
logrus.Debugf("Closed stream %s#%d", repo.FullName, build.Number)
|
||||
}
|
||||
|
||||
var (
|
||||
// Time allowed to write the file to the client.
|
||||
writeWait = 5 * time.Second
|
||||
@ -165,47 +65,35 @@ func LogStream(c *gin.Context) {
|
||||
ticker := time.NewTicker(pingPeriod)
|
||||
defer ticker.Stop()
|
||||
|
||||
rc, err := stream.Reader(c, stream.ToKey(job.ID))
|
||||
if err != nil {
|
||||
c.AbortWithError(404, err)
|
||||
return
|
||||
}
|
||||
|
||||
quitc := make(chan bool)
|
||||
defer func() {
|
||||
quitc <- true
|
||||
close(quitc)
|
||||
rc.Close()
|
||||
ws.Close()
|
||||
logrus.Debug("Successfully closed websocket")
|
||||
}()
|
||||
|
||||
go func() {
|
||||
defer func() {
|
||||
recover()
|
||||
}()
|
||||
for {
|
||||
select {
|
||||
case <-quitc:
|
||||
return
|
||||
case <-ticker.C:
|
||||
err := ws.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(writeWait))
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
var scanner = bufio.NewScanner(rc)
|
||||
var b []byte
|
||||
for scanner.Scan() {
|
||||
b = scanner.Bytes()
|
||||
if len(b) == 0 {
|
||||
continue
|
||||
done := make(chan bool)
|
||||
dest := fmt.Sprintf("/topic/%d", job.ID)
|
||||
client, _ := stomp.FromContext(c)
|
||||
sub, err := client.Subscribe(dest, stomp.HandlerFunc(func(m *stomp.Message) {
|
||||
if len(m.Header.Get([]byte("eof"))) != 0 {
|
||||
done <- true
|
||||
}
|
||||
ws.SetWriteDeadline(time.Now().Add(writeWait))
|
||||
ws.WriteMessage(websocket.TextMessage, b)
|
||||
ws.WriteMessage(websocket.TextMessage, m.Body)
|
||||
m.Release()
|
||||
}))
|
||||
if err != nil {
|
||||
logrus.Errorf("Unable to read logs from broker. %s", err)
|
||||
return
|
||||
}
|
||||
defer func() {
|
||||
client.Unsubscribe(sub)
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-done:
|
||||
return
|
||||
case <-ticker.C:
|
||||
err := ws.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(writeWait))
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -227,20 +115,34 @@ func EventStream(c *gin.Context) {
|
||||
repo, _ = cache.GetRepoMap(c, user)
|
||||
}
|
||||
|
||||
ticker := time.NewTicker(pingPeriod)
|
||||
eventc := make(chan []byte, 10)
|
||||
quitc := make(chan bool)
|
||||
eventc := make(chan *bus.Event, 10)
|
||||
bus.Subscribe(c, eventc)
|
||||
tick := time.NewTicker(pingPeriod)
|
||||
defer func() {
|
||||
ticker.Stop()
|
||||
bus.Unsubscribe(c, eventc)
|
||||
quitc <- true
|
||||
close(quitc)
|
||||
close(eventc)
|
||||
tick.Stop()
|
||||
ws.Close()
|
||||
logrus.Debug("Successfully closed websocket")
|
||||
}()
|
||||
|
||||
client := stomp.MustFromContext(c)
|
||||
sub, err := client.Subscribe("/topic/events", stomp.HandlerFunc(func(m *stomp.Message) {
|
||||
name := m.Header.GetString("repo")
|
||||
priv := m.Header.GetBool("private")
|
||||
if repo[name] || !priv {
|
||||
eventc <- m.Body
|
||||
}
|
||||
m.Release()
|
||||
}))
|
||||
if err != nil {
|
||||
logrus.Errorf("Unable to read logs from broker. %s", err)
|
||||
return
|
||||
}
|
||||
defer func() {
|
||||
close(quitc)
|
||||
close(eventc)
|
||||
client.Unsubscribe(sub)
|
||||
}()
|
||||
|
||||
go func() {
|
||||
defer func() {
|
||||
recover()
|
||||
@ -249,15 +151,13 @@ func EventStream(c *gin.Context) {
|
||||
select {
|
||||
case <-quitc:
|
||||
return
|
||||
case event := <-eventc:
|
||||
if event == nil {
|
||||
case event, ok := <-eventc:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
if repo[event.Repo.FullName] || !event.Repo.IsPrivate {
|
||||
ws.SetWriteDeadline(time.Now().Add(writeWait))
|
||||
ws.WriteJSON(event)
|
||||
}
|
||||
case <-ticker.C:
|
||||
ws.SetWriteDeadline(time.Now().Add(writeWait))
|
||||
ws.WriteMessage(websocket.TextMessage, event)
|
||||
case <-tick.C:
|
||||
err := ws.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(writeWait))
|
||||
if err != nil {
|
||||
return
|
||||
|
26
yaml/label.go
Normal file
26
yaml/label.go
Normal file
@ -0,0 +1,26 @@
|
||||
package yaml
|
||||
|
||||
import (
|
||||
"gopkg.in/yaml.v2"
|
||||
|
||||
"github.com/drone/drone/yaml/types"
|
||||
)
|
||||
|
||||
// ParseLabel parses the labels section of the Yaml document.
|
||||
func ParseLabel(in []byte) map[string]string {
|
||||
out := struct {
|
||||
Labels types.MapEqualSlice `yaml:"labels"`
|
||||
}{}
|
||||
|
||||
yaml.Unmarshal(in, &out)
|
||||
labels := out.Labels.Map()
|
||||
if labels == nil {
|
||||
labels = make(map[string]string)
|
||||
}
|
||||
return labels
|
||||
}
|
||||
|
||||
// ParseLabelString parses the labels section of the Yaml document.
|
||||
func ParseLabelString(in string) map[string]string {
|
||||
return ParseLabel([]byte(in))
|
||||
}
|
32
yaml/label_test.go
Normal file
32
yaml/label_test.go
Normal file
@ -0,0 +1,32 @@
|
||||
package yaml
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/franela/goblin"
|
||||
)
|
||||
|
||||
func TestLabel(t *testing.T) {
|
||||
|
||||
g := goblin.Goblin(t)
|
||||
g.Describe("Label parser", func() {
|
||||
|
||||
g.It("Should parse empty yaml", func() {
|
||||
labels := ParseLabelString("")
|
||||
g.Assert(len(labels)).Equal(0)
|
||||
})
|
||||
|
||||
g.It("Should parse slice", func() {
|
||||
labels := ParseLabelString("labels: [foo=bar, baz=boo]")
|
||||
g.Assert(len(labels)).Equal(2)
|
||||
g.Assert(labels["foo"]).Equal("bar")
|
||||
g.Assert(labels["baz"]).Equal("boo")
|
||||
})
|
||||
|
||||
g.It("Should parse map", func() {
|
||||
labels := ParseLabelString("labels: {foo: bar, baz: boo}")
|
||||
g.Assert(labels["foo"]).Equal("bar")
|
||||
g.Assert(labels["baz"]).Equal("boo")
|
||||
})
|
||||
})
|
||||
}
|
Loading…
Reference in New Issue
Block a user