diff --git a/agent/agent.go b/agent/agent.go index afe0e14df..651a616ad 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -11,7 +11,6 @@ import ( "github.com/drone/drone/build" "github.com/drone/drone/model" - "github.com/drone/drone/queue" "github.com/drone/drone/version" "github.com/drone/drone/yaml" "github.com/drone/drone/yaml/expander" @@ -48,7 +47,7 @@ func (a *Agent) Poll() error { return nil } -func (a *Agent) Run(payload *queue.Work, cancel <-chan bool) error { +func (a *Agent) Run(payload *model.Work, cancel <-chan bool) error { payload.Job.Status = model.StatusRunning payload.Job.Started = time.Now().Unix() @@ -90,7 +89,7 @@ func (a *Agent) Run(payload *queue.Work, cancel <-chan bool) error { return err } -func (a *Agent) prep(w *queue.Work) (*yaml.Config, error) { +func (a *Agent) prep(w *model.Work) (*yaml.Config, error) { envs := toEnv(w) w.Yaml = expander.ExpandString(w.Yaml, envs) @@ -172,7 +171,7 @@ func (a *Agent) prep(w *queue.Work) (*yaml.Config, error) { return conf, nil } -func (a *Agent) exec(spec *yaml.Config, payload *queue.Work, cancel <-chan bool) error { +func (a *Agent) exec(spec *yaml.Config, payload *model.Work, cancel <-chan bool) error { conf := build.Config{ Engine: a.Engine, @@ -231,7 +230,7 @@ func (a *Agent) exec(spec *yaml.Config, payload *queue.Work, cancel <-chan bool) } } -func toEnv(w *queue.Work) map[string]string { +func toEnv(w *model.Work) map[string]string { envs := map[string]string{ "CI": "drone", "DRONE": "true", diff --git a/agent/updater.go b/agent/updater.go index ba5c1aa4d..e2e1f43ad 100644 --- a/agent/updater.go +++ b/agent/updater.go @@ -6,17 +6,16 @@ import ( "github.com/Sirupsen/logrus" "github.com/drone/drone/build" "github.com/drone/drone/model" - "github.com/drone/drone/queue" "github.com/drone/mq/stomp" ) // UpdateFunc handles buid pipeline status updates. -type UpdateFunc func(*queue.Work) +type UpdateFunc func(*model.Work) // LoggerFunc handles buid pipeline logging updates. type LoggerFunc func(*build.Line) -var NoopUpdateFunc = func(*queue.Work) {} +var NoopUpdateFunc = func(*model.Work) {} var TermLoggerFunc = func(line *build.Line) { fmt.Println(line) @@ -25,7 +24,7 @@ var TermLoggerFunc = func(line *build.Line) { // NewClientUpdater returns an updater that sends updated build details // to the drone server. func NewClientUpdater(client *stomp.Client) UpdateFunc { - return func(w *queue.Work) { + return func(w *model.Work) { err := client.SendJSON("/queue/updates", w) if err != nil { logrus.Errorf("Error updating %s/%s#%d.%d. %s", diff --git a/bus/bus.go b/bus/bus.go deleted file mode 100644 index 5f858b954..000000000 --- a/bus/bus.go +++ /dev/null @@ -1,39 +0,0 @@ -package bus - -//go:generate mockery -name Bus -output mock -case=underscore - -// Bus represents an event bus implementation that -// allows a publisher to broadcast Event notifications -// to a list of subscribers. -type Bus interface { - // Publish broadcasts an event to all subscribers. - Publish(*Event) - - // Subscribe adds the channel to the list of - // subscribers. Each subscriber in the list will - // receive broadcast events. - Subscribe(chan *Event) - - // Unsubscribe removes the channel from the list - // of subscribers. - Unsubscribe(chan *Event) -} - -// -// // Publish broadcasts an event to all subscribers. -// func Publish(c context.Context, event *Event) { -// FromContext(c).Publish(event) -// } -// -// // Subscribe adds the channel to the list of -// // subscribers. Each subscriber in the list will -// // receive broadcast events. -// func Subscribe(c context.Context, eventc chan *Event) { -// FromContext(c).Subscribe(eventc) -// } -// -// // Unsubscribe removes the channel from the -// // list of subscribers. -// func Unsubscribe(c context.Context, eventc chan *Event) { -// FromContext(c).Unsubscribe(eventc) -// } diff --git a/bus/bus_impl.go b/bus/bus_impl.go deleted file mode 100644 index d0f0e6a64..000000000 --- a/bus/bus_impl.go +++ /dev/null @@ -1,46 +0,0 @@ -package bus - -import ( - "sync" -) - -type eventbus struct { - sync.Mutex - subs map[chan *Event]bool -} - -// New creates a simple event bus that manages a list of -// subscribers to which events are published. -func New() Bus { - return newEventbus() -} - -func newEventbus() *eventbus { - return &eventbus{ - subs: make(map[chan *Event]bool), - } -} - -func (b *eventbus) Subscribe(c chan *Event) { - b.Lock() - b.subs[c] = true - b.Unlock() -} - -func (b *eventbus) Unsubscribe(c chan *Event) { - b.Lock() - delete(b.subs, c) - b.Unlock() -} - -func (b *eventbus) Publish(event *Event) { - b.Lock() - defer b.Unlock() - - for s := range b.subs { - go func(c chan *Event) { - defer recover() - c <- event - }(s) - } -} diff --git a/bus/bus_impl_test.go b/bus/bus_impl_test.go deleted file mode 100644 index ffcb1e563..000000000 --- a/bus/bus_impl_test.go +++ /dev/null @@ -1,73 +0,0 @@ -package bus - -import ( - "sync" - "testing" - - "github.com/drone/drone/model" - . "github.com/franela/goblin" - "github.com/gin-gonic/gin" -) - -func TestBus(t *testing.T) { - g := Goblin(t) - g.Describe("Event bus", func() { - - g.It("Should unsubscribe", func() { - c := new(gin.Context) - b := newEventbus() - ToContext(c, b) - - c1 := make(chan *Event) - c2 := make(chan *Event) - Subscribe(c, c1) - Subscribe(c, c2) - - g.Assert(len(b.subs)).Equal(2) - }) - - g.It("Should subscribe", func() { - c := new(gin.Context) - b := newEventbus() - ToContext(c, b) - - c1 := make(chan *Event) - c2 := make(chan *Event) - Subscribe(c, c1) - Subscribe(c, c2) - - g.Assert(len(b.subs)).Equal(2) - - Unsubscribe(c, c1) - Unsubscribe(c, c2) - - g.Assert(len(b.subs)).Equal(0) - }) - - g.It("Should publish", func() { - c := new(gin.Context) - b := New() - ToContext(c, b) - - e1 := NewEvent(Started, &model.Repo{}, &model.Build{}, &model.Job{}) - e2 := NewEvent(Started, &model.Repo{}, &model.Build{}, &model.Job{}) - c1 := make(chan *Event) - - Subscribe(c, c1) - - var wg sync.WaitGroup - wg.Add(1) - - var r1, r2 *Event - go func() { - r1 = <-c1 - r2 = <-c1 - wg.Done() - }() - Publish(c, e1) - Publish(c, e2) - wg.Wait() - }) - }) - -} diff --git a/bus/context.go b/bus/context.go deleted file mode 100644 index 4eccfa7f0..000000000 --- a/bus/context.go +++ /dev/null @@ -1,21 +0,0 @@ -package bus - -import "golang.org/x/net/context" - -const key = "bus" - -// Setter defines a context that enables setting values. -type Setter interface { - Set(string, interface{}) -} - -// FromContext returns the Bus associated with this context. -func FromContext(c context.Context) Bus { - return c.Value(key).(Bus) -} - -// ToContext adds the Bus to this context if it supports -// the Setter interface. -func ToContext(c Setter, b Bus) { - c.Set(key, b) -} diff --git a/drone/agent/agent.go b/drone/agent/agent.go index ec94eada4..b3037a701 100644 --- a/drone/agent/agent.go +++ b/drone/agent/agent.go @@ -8,7 +8,7 @@ import ( "syscall" "time" - "github.com/drone/drone/queue" + "github.com/drone/drone/model" "github.com/drone/mq/stomp" "github.com/Sirupsen/logrus" @@ -190,7 +190,7 @@ func start(c *cli.Context) { }, } - work := new(queue.Work) + work := new(model.Work) m.Unmarshal(work) r.run(work) } diff --git a/drone/agent/exec.go b/drone/agent/exec.go index 269a6754f..7ad0b458c 100644 --- a/drone/agent/exec.go +++ b/drone/agent/exec.go @@ -6,7 +6,7 @@ import ( "github.com/Sirupsen/logrus" "github.com/drone/drone/agent" "github.com/drone/drone/build/docker" - "github.com/drone/drone/queue" + "github.com/drone/drone/model" "github.com/drone/mq/stomp" "github.com/samalba/dockerclient" @@ -27,7 +27,7 @@ type pipeline struct { config config } -func (r *pipeline) run(w *queue.Work) { +func (r *pipeline) run(w *model.Work) { // defer func() { // // r.drone.Ack(id, opts) diff --git a/drone/exec.go b/drone/exec.go index c766707df..f061f1bc9 100644 --- a/drone/exec.go +++ b/drone/exec.go @@ -13,7 +13,6 @@ import ( "github.com/drone/drone/agent" "github.com/drone/drone/build/docker" "github.com/drone/drone/model" - "github.com/drone/drone/queue" "github.com/drone/drone/yaml" "github.com/codegangsta/cli" @@ -340,7 +339,7 @@ func exec(c *cli.Context) error { Pull: c.Bool("pull"), } - payload := &queue.Work{ + payload := &model.Work{ Yaml: string(file), Verified: c.BoolT("yaml.verified"), Signed: c.BoolT("yaml.signed"), diff --git a/drone/server.go b/drone/server.go index 94c7cfe01..465595ce3 100644 --- a/drone/server.go +++ b/drone/server.go @@ -288,9 +288,6 @@ func server(c *cli.Context) error { ginrus.Ginrus(logrus.StandardLogger(), time.RFC3339, true), middleware.Version, middleware.Config(c), - middleware.Queue(c), - middleware.Stream(c), - middleware.Bus(c), middleware.Cache(c), middleware.Store(c), middleware.Remote(c), diff --git a/bus/types.go b/model/event.go similarity index 61% rename from bus/types.go rename to model/event.go index 6be95a3d7..efee037cc 100644 --- a/bus/types.go +++ b/model/event.go @@ -1,6 +1,4 @@ -package bus - -import "github.com/drone/drone/model" +package model // EventType defines the possible types of build events. type EventType string @@ -14,15 +12,15 @@ const ( // Event represents a build event. type Event struct { - Type EventType `json:"type"` - Repo model.Repo `json:"repo"` - Build model.Build `json:"build"` - Job model.Job `json:"job"` + Type EventType `json:"type"` + Repo Repo `json:"repo"` + Build Build `json:"build"` + Job Job `json:"job"` } // NewEvent creates a new Event for the build, using copies of // the build data to avoid possible mutation or race conditions. -func NewEvent(t EventType, r *model.Repo, b *model.Build, j *model.Job) *Event { +func NewEvent(t EventType, r *Repo, b *Build, j *Job) *Event { return &Event{ Type: t, Repo: *r, @@ -31,7 +29,7 @@ func NewEvent(t EventType, r *model.Repo, b *model.Build, j *model.Job) *Event { } } -func NewBuildEvent(t EventType, r *model.Repo, b *model.Build) *Event { +func NewBuildEvent(t EventType, r *Repo, b *Build) *Event { return &Event{ Type: t, Repo: *r, diff --git a/model/work.go b/model/work.go new file mode 100644 index 000000000..06c69d91c --- /dev/null +++ b/model/work.go @@ -0,0 +1,19 @@ +package model + +// Work represents an item for work to be +// processed by a worker. +type Work struct { + Signed bool `json:"signed"` + Verified bool `json:"verified"` + Yaml string `json:"config"` + YamlEnc string `json:"secret"` + Repo *Repo `json:"repo"` + Build *Build `json:"build"` + BuildLast *Build `json:"build_last"` + Job *Job `json:"job"` + Netrc *Netrc `json:"netrc"` + Keys *Key `json:"keys"` + System *System `json:"system"` + Secrets []*Secret `json:"secrets"` + User *User `json:"user"` +} diff --git a/queue/context.go b/queue/context.go deleted file mode 100644 index 98a78e9ec..000000000 --- a/queue/context.go +++ /dev/null @@ -1,23 +0,0 @@ -package queue - -import ( - "golang.org/x/net/context" -) - -const key = "queue" - -// Setter defines a context that enables setting values. -type Setter interface { - Set(string, interface{}) -} - -// FromContext returns the Queue associated with this context. -func FromContext(c context.Context) Queue { - return c.Value(key).(Queue) -} - -// ToContext adds the Queue to this context if it supports -// the Setter interface. -func ToContext(c Setter, q Queue) { - c.Set(key, q) -} diff --git a/queue/queue.go b/queue/queue.go deleted file mode 100644 index 0cdf2e6fc..000000000 --- a/queue/queue.go +++ /dev/null @@ -1,67 +0,0 @@ -package queue - -//go:generate mockery -name Queue -output mock -case=underscore - -import ( - "errors" - - "golang.org/x/net/context" -) - -// ErrNotFound indicates the requested work item does not -// exist in the queue. -var ErrNotFound = errors.New("queue item not found") - -type Queue interface { - // Publish inserts work at the tail of this queue, waiting for - // space to become available if the queue is full. - Publish(*Work) error - - // Remove removes the specified work item from this queue, - // if it is present. - Remove(*Work) error - - // PullClose retrieves and removes the head of this queue, - // waiting if necessary until work becomes available. - Pull() *Work - - // PullClose retrieves and removes the head of this queue, - // waiting if necessary until work becomes available. The - // CloseNotifier should be provided to clone the channel - // if the subscribing client terminates its connection. - PullClose(CloseNotifier) *Work -} - -// Publish inserts work at the tail of this queue, waiting for -// space to become available if the queue is full. -func Publish(c context.Context, w *Work) error { - return FromContext(c).Publish(w) -} - -// Remove removes the specified work item from this queue, -// if it is present. -func Remove(c context.Context, w *Work) error { - return FromContext(c).Remove(w) -} - -// Pull retrieves and removes the head of this queue, -// waiting if necessary until work becomes available. -func Pull(c context.Context) *Work { - return FromContext(c).Pull() -} - -// PullClose retrieves and removes the head of this queue, -// waiting if necessary until work becomes available. The -// CloseNotifier should be provided to clone the channel -// if the subscribing client terminates its connection. -func PullClose(c context.Context, cn CloseNotifier) *Work { - return FromContext(c).PullClose(cn) -} - -// CloseNotifier defines a datastructure that is capable of notifying -// a subscriber when its connection is closed. -type CloseNotifier interface { - // CloseNotify returns a channel that receives a single value - // when the client connection has gone away. - CloseNotify() <-chan bool -} diff --git a/queue/queue_impl.go b/queue/queue_impl.go deleted file mode 100644 index 8882bc24d..000000000 --- a/queue/queue_impl.go +++ /dev/null @@ -1,85 +0,0 @@ -package queue - -import "sync" - -type queue struct { - sync.Mutex - - items map[*Work]struct{} - itemc chan *Work -} - -func New() Queue { - return newQueue() -} - -func newQueue() *queue { - return &queue{ - items: make(map[*Work]struct{}), - itemc: make(chan *Work, 999), - } -} - -func (q *queue) Publish(work *Work) error { - q.Lock() - q.items[work] = struct{}{} - q.Unlock() - q.itemc <- work - return nil -} - -func (q *queue) Remove(work *Work) error { - q.Lock() - defer q.Unlock() - - _, ok := q.items[work] - if !ok { - return ErrNotFound - } - var items []*Work - - // loop through and drain all items - // from the -drain: - for { - select { - case item := <-q.itemc: - items = append(items, item) - default: - break drain - } - } - - // re-add all items to the queue except - // the item we're trying to remove - for _, item := range items { - if item == work { - delete(q.items, work) - continue - } - q.itemc <- item - } - return nil -} - -func (q *queue) Pull() *Work { - work := <-q.itemc - q.Lock() - delete(q.items, work) - q.Unlock() - return work -} - -func (q *queue) PullClose(cn CloseNotifier) *Work { - for { - select { - case <-cn.CloseNotify(): - return nil - case work := <-q.itemc: - q.Lock() - delete(q.items, work) - q.Unlock() - return work - } - } -} diff --git a/queue/queue_impl_test.go b/queue/queue_impl_test.go deleted file mode 100644 index 45f38bff6..000000000 --- a/queue/queue_impl_test.go +++ /dev/null @@ -1,94 +0,0 @@ -package queue - -// -// import ( -// "sync" -// "testing" -// -// . "github.com/franela/goblin" -// "github.com/gin-gonic/gin" -// ) -// -// func TestBuild(t *testing.T) { -// g := Goblin(t) -// g.Describe("Queue", func() { -// -// g.It("Should publish item", func() { -// c := new(gin.Context) -// q := newQueue() -// ToContext(c, q) -// -// w1 := &Work{} -// w2 := &Work{} -// Publish(c, w1) -// Publish(c, w2) -// g.Assert(len(q.items)).Equal(2) -// g.Assert(len(q.itemc)).Equal(2) -// }) -// -// g.It("Should remove item", func() { -// c := new(gin.Context) -// q := newQueue() -// ToContext(c, q) -// -// w1 := &Work{} -// w2 := &Work{} -// w3 := &Work{} -// Publish(c, w1) -// Publish(c, w2) -// Publish(c, w3) -// Remove(c, w2) -// g.Assert(len(q.items)).Equal(2) -// g.Assert(len(q.itemc)).Equal(2) -// -// g.Assert(Pull(c)).Equal(w1) -// g.Assert(Pull(c)).Equal(w3) -// g.Assert(Remove(c, w2)).Equal(ErrNotFound) -// }) -// -// g.It("Should pull item", func() { -// c := new(gin.Context) -// q := New() -// ToContext(c, q) -// -// cn := new(closeNotifier) -// cn.closec = make(chan bool, 1) -// w1 := &Work{} -// w2 := &Work{} -// -// Publish(c, w1) -// g.Assert(Pull(c)).Equal(w1) -// -// Publish(c, w2) -// g.Assert(PullClose(c, cn)).Equal(w2) -// }) -// -// g.It("Should cancel pulling item", func() { -// c := new(gin.Context) -// q := New() -// ToContext(c, q) -// -// cn := new(closeNotifier) -// cn.closec = make(chan bool, 1) -// var wg sync.WaitGroup -// go func() { -// wg.Add(1) -// g.Assert(PullClose(c, cn) == nil).IsTrue() -// wg.Done() -// }() -// go func() { -// cn.closec <- true -// }() -// wg.Wait() -// -// }) -// }) -// } -// -// type closeNotifier struct { -// closec chan bool -// } -// -// func (c *closeNotifier) CloseNotify() <-chan bool { -// return c.closec -// } diff --git a/queue/types.go b/queue/types.go deleted file mode 100644 index 48fc41942..000000000 --- a/queue/types.go +++ /dev/null @@ -1,21 +0,0 @@ -package queue - -import "github.com/drone/drone/model" - -// Work represents an item for work to be -// processed by a worker. -type Work struct { - Signed bool `json:"signed"` - Verified bool `json:"verified"` - Yaml string `json:"config"` - YamlEnc string `json:"secret"` - Repo *model.Repo `json:"repo"` - Build *model.Build `json:"build"` - BuildLast *model.Build `json:"build_last"` - Job *model.Job `json:"job"` - Netrc *model.Netrc `json:"netrc"` - Keys *model.Key `json:"keys"` - System *model.System `json:"system"` - Secrets []*model.Secret `json:"secrets"` - User *model.User `json:"user"` -} diff --git a/router/middleware/bus.go b/router/middleware/bus.go deleted file mode 100644 index 25665da1d..000000000 --- a/router/middleware/bus.go +++ /dev/null @@ -1,17 +0,0 @@ -package middleware - -import ( - "github.com/drone/drone/bus" - - "github.com/codegangsta/cli" - "github.com/gin-gonic/gin" -) - -// Bus is a middleware function that initializes the Event Bus and attaches to -// the context of every http.Request. -func Bus(cli *cli.Context) gin.HandlerFunc { - v := bus.New() - return func(c *gin.Context) { - bus.ToContext(c, v) - } -} diff --git a/router/middleware/queue.go b/router/middleware/queue.go deleted file mode 100644 index d2791033e..000000000 --- a/router/middleware/queue.go +++ /dev/null @@ -1,17 +0,0 @@ -package middleware - -import ( - "github.com/drone/drone/queue" - - "github.com/codegangsta/cli" - "github.com/gin-gonic/gin" -) - -// Queue is a middleware function that initializes the Queue and attaches to -// the context of every http.Request. -func Queue(cli *cli.Context) gin.HandlerFunc { - v := queue.New() - return func(c *gin.Context) { - queue.ToContext(c, v) - } -} diff --git a/router/middleware/stream.go b/router/middleware/stream.go deleted file mode 100644 index d78a119c2..000000000 --- a/router/middleware/stream.go +++ /dev/null @@ -1,17 +0,0 @@ -package middleware - -import ( - "github.com/drone/drone/stream" - - "github.com/codegangsta/cli" - "github.com/gin-gonic/gin" -) - -// Stream is a middleware function that initializes the Stream and attaches to -// the context of every http.Request. -func Stream(cli *cli.Context) gin.HandlerFunc { - v := stream.New() - return func(c *gin.Context) { - stream.ToContext(c, v) - } -} diff --git a/server/build.go b/server/build.go index 5a35f126d..e8e883b35 100644 --- a/server/build.go +++ b/server/build.go @@ -1,17 +1,16 @@ package server import ( + "bufio" + "io" "net/http" "strconv" "time" log "github.com/Sirupsen/logrus" - "github.com/drone/drone/bus" - "github.com/drone/drone/queue" "github.com/drone/drone/remote" "github.com/drone/drone/shared/httputil" "github.com/drone/drone/store" - "github.com/drone/drone/stream" "github.com/drone/drone/yaml" "github.com/gin-gonic/gin" "github.com/square/go-jose" @@ -114,7 +113,7 @@ func GetBuildLogs(c *gin.Context) { } c.Header("Content-Type", "application/json") - stream.Copy(c.Writer, r) + copyLogs(c.Writer, r) } func DeleteBuild(c *gin.Context) { @@ -151,8 +150,8 @@ func DeleteBuild(c *gin.Context) { store.UpdateBuildJob(c, build, job) client := stomp.MustFromContext(c) - client.SendJSON("/topic/cancel", bus.Event{ - Type: bus.Cancelled, + client.SendJSON("/topic/cancel", model.Event{ + Type: model.Cancelled, Repo: *repo, Build: *build, Job: *job, @@ -328,8 +327,8 @@ func PostBuild(c *gin.Context) { log.Debugf(".drone.yml is signed=%v and verified=%v", signed, verified) client := stomp.MustFromContext(c) - client.SendJSON("/topic/events", bus.Event{ - Type: bus.Enqueued, + client.SendJSON("/topic/events", model.Event{ + Type: model.Enqueued, Repo: *repo, Build: *build, }, @@ -339,7 +338,7 @@ func PostBuild(c *gin.Context) { for _, job := range jobs { broker, _ := stomp.FromContext(c) - broker.SendJSON("/queue/pending", &queue.Work{ + broker.SendJSON("/queue/pending", &model.Work{ Signed: signed, Verified: verified, User: user, @@ -371,3 +370,20 @@ func GetBuildQueue(c *gin.Context) { } c.JSON(200, out) } + +// copyLogs copies the stream from the source to the destination in valid JSON +// format. This converts the logs, which are per-line JSON objects, to a +// proper JSON array. +func copyLogs(dest io.Writer, src io.Reader) error { + io.WriteString(dest, "[") + + scanner := bufio.NewScanner(src) + for scanner.Scan() { + io.WriteString(dest, scanner.Text()) + io.WriteString(dest, ",\n") + } + + io.WriteString(dest, "{}]") + + return nil +} diff --git a/server/hook.go b/server/hook.go index ef167f231..9f87437e6 100644 --- a/server/hook.go +++ b/server/hook.go @@ -9,9 +9,7 @@ import ( "github.com/square/go-jose" log "github.com/Sirupsen/logrus" - "github.com/drone/drone/bus" "github.com/drone/drone/model" - "github.com/drone/drone/queue" "github.com/drone/drone/remote" "github.com/drone/drone/shared/httputil" "github.com/drone/drone/shared/token" @@ -214,8 +212,8 @@ func PostHook(c *gin.Context) { } client := stomp.MustFromContext(c) - client.SendJSON("/topic/events", bus.Event{ - Type: bus.Enqueued, + client.SendJSON("/topic/events", model.Event{ + Type: model.Enqueued, Repo: *repo, Build: *build, }, @@ -225,7 +223,7 @@ func PostHook(c *gin.Context) { for _, job := range jobs { broker, _ := stomp.FromContext(c) - broker.SendJSON("/queue/pending", &queue.Work{ + broker.SendJSON("/queue/pending", &model.Work{ Signed: build.Signed, Verified: build.Verified, User: user, diff --git a/server/queue.go b/server/queue.go index d9ca743c7..25d44cb82 100644 --- a/server/queue.go +++ b/server/queue.go @@ -10,289 +10,13 @@ import ( "golang.org/x/net/context" "github.com/Sirupsen/logrus" - "github.com/drone/drone/bus" "github.com/drone/drone/model" - "github.com/drone/drone/queue" "github.com/drone/drone/remote" "github.com/drone/drone/store" "github.com/drone/mq/stomp" "github.com/gorilla/websocket" ) -// -// // Pull is a long request that polls and attemts to pull work off the queue stack. -// func Pull(c *gin.Context) { -// logrus.Debugf("Agent %s connected.", c.ClientIP()) -// -// w := queue.PullClose(c, c.Writer) -// if w == nil { -// logrus.Debugf("Agent %s could not pull work.", c.ClientIP()) -// } else { -// -// // setup the channel to stream logs -// if err := stream.Create(c, stream.ToKey(w.Job.ID)); err != nil { -// logrus.Errorf("Unable to create stream. %s", err) -// } -// -// c.JSON(202, w) -// -// logrus.Debugf("Agent %s assigned work. %s/%s#%d.%d", -// c.ClientIP(), -// w.Repo.Owner, -// w.Repo.Name, -// w.Build.Number, -// w.Job.Number, -// ) -// } -// } -// -// // Wait is a long request that polls and waits for cancelled build requests. -// func Wait(c *gin.Context) { -// id, err := strconv.ParseInt(c.Param("id"), 10, 64) -// if err != nil { -// c.String(500, "Invalid input. %s", err) -// return -// } -// -// eventc := make(chan *bus.Event, 1) -// -// bus.Subscribe(c, eventc) -// defer bus.Unsubscribe(c, eventc) -// -// for { -// select { -// case event := <-eventc: -// if event.Job.ID == id && event.Type == bus.Cancelled { -// c.JSON(200, event.Job) -// return -// } -// case <-c.Writer.CloseNotify(): -// return -// } -// } -// } -// -// // Update handles build updates from the agent and persists to the database. -// func Update(c *gin.Context) { -// work := &queue.Work{} -// if err := c.BindJSON(work); err != nil { -// logrus.Errorf("Invalid input. %s", err) -// return -// } -// -// // TODO(bradrydzewski) it is really annoying that we have to do this lookup -// // and I'd prefer not to. The reason we do this is because the Build and Job -// // have fields that aren't serialized to json and would be reset to their -// // empty values if we just saved what was coming in the http.Request body. -// build, err := store.GetBuild(c, work.Build.ID) -// if err != nil { -// c.String(404, "Unable to find build. %s", err) -// return -// } -// job, err := store.GetJob(c, work.Job.ID) -// if err != nil { -// c.String(404, "Unable to find job. %s", err) -// return -// } -// build.Started = work.Build.Started -// build.Finished = work.Build.Finished -// build.Status = work.Build.Status -// job.Started = work.Job.Started -// job.Finished = work.Job.Finished -// job.Status = work.Job.Status -// job.ExitCode = work.Job.ExitCode -// job.Error = work.Job.Error -// -// if build.Status == model.StatusPending { -// build.Started = work.Job.Started -// build.Status = model.StatusRunning -// store.UpdateBuild(c, build) -// } -// -// // if job.Status == model.StatusRunning { -// // err := stream.Create(c, stream.ToKey(job.ID)) -// // if err != nil { -// // logrus.Errorf("Unable to create stream. %s", err) -// // } -// // } -// -// ok, err := store.UpdateBuildJob(c, build, job) -// if err != nil { -// c.String(500, "Unable to update job. %s", err) -// return -// } -// -// if ok && build.Status != model.StatusRunning { -// // get the user because we transfer the user form the server to agent -// // and back we lose the token which does not get serialized to json. -// user, err := store.GetUser(c, work.User.ID) -// if err != nil { -// c.String(500, "Unable to find user. %s", err) -// return -// } -// remote.Status(c, user, work.Repo, build, -// fmt.Sprintf("%s/%s/%d", work.System.Link, work.Repo.FullName, work.Build.Number)) -// } -// -// if build.Status == model.StatusRunning { -// bus.Publish(c, bus.NewEvent(bus.Started, work.Repo, build, job)) -// } else { -// bus.Publish(c, bus.NewEvent(bus.Finished, work.Repo, build, job)) -// } -// -// c.JSON(200, work) -// } -// -// // Stream streams the logs to disk or memory for broadcasing to listeners. Once -// // the stream is closed it is moved to permanent storage in the database. -// func Stream(c *gin.Context) { -// id, err := strconv.ParseInt(c.Param("id"), 10, 64) -// if err != nil { -// c.String(500, "Invalid input. %s", err) -// return -// } -// -// key := c.Param("id") -// logrus.Infof("Agent %s creating stream %s.", c.ClientIP(), key) -// -// wc, err := stream.Writer(c, key) -// if err != nil { -// c.String(500, "Failed to create stream writer. %s", err) -// return -// } -// -// defer func() { -// wc.Close() -// stream.Delete(c, key) -// }() -// -// io.Copy(wc, c.Request.Body) -// -// rc, err := stream.Reader(c, key) -// if err != nil { -// c.String(500, "Failed to create stream reader. %s", err) -// return -// } -// -// wg := sync.WaitGroup{} -// wg.Add(1) -// -// go func() { -// defer recover() -// store.WriteLog(c, &model.Job{ID: id}, rc) -// wg.Done() -// }() -// -// wc.Close() -// wg.Wait() -// c.String(200, "") -// -// logrus.Debugf("Agent %s wrote stream to database", c.ClientIP()) -// } -// -// func Ping(c *gin.Context) { -// agent, err := store.GetAgentAddr(c, c.ClientIP()) -// if err == nil { -// agent.Updated = time.Now().Unix() -// err = store.UpdateAgent(c, agent) -// } else { -// err = store.CreateAgent(c, &model.Agent{ -// Address: c.ClientIP(), -// Platform: "linux/amd64", -// Capacity: 2, -// Created: time.Now().Unix(), -// Updated: time.Now().Unix(), -// }) -// } -// if err != nil { -// logrus.Errorf("Unable to register agent. %s", err.Error()) -// } -// c.String(200, "PONG") -// } - -// -// -// Below are alternate implementations for the Queue that use websockets. -// -// -// -// // PostLogs handles an http request from the agent to post build logs. These -// // logs are posted at the end of the build process. -// func PostLogs(c *gin.Context) { -// id, _ := strconv.ParseInt(c.Param("id"), 10, 64) -// job, err := store.GetJob(c, id) -// if err != nil { -// c.String(404, "Cannot upload logs. %s", err) -// return -// } -// if err := store.WriteLog(c, job, c.Request.Body); err != nil { -// c.String(500, "Cannot persist logs", err) -// return -// } -// c.String(200, "") -// } -// -// // WriteLogs handles an http request from the agent to stream build logs from -// // the agent to the server to enable real time streamings to the client. -// func WriteLogs(c *gin.Context) { -// id, err := strconv.ParseInt(c.Param("id"), 10, 64) -// if err != nil { -// c.String(500, "Invalid input. %s", err) -// return -// } -// -// conn, err := upgrader.Upgrade(c.Writer, c.Request, nil) -// if err != nil { -// c.String(500, "Cannot upgrade to websocket. %s", err) -// return -// } -// defer conn.Close() -// -// wc, err := stream.Writer(c, stream.ToKey(id)) -// if err != nil { -// c.String(500, "Cannot create stream writer. %s", err) -// return -// } -// defer func() { -// wc.Close() -// stream.Delete(c, stream.ToKey(id)) -// }() -// -// var msg []byte -// for { -// _, msg, err = conn.ReadMessage() -// if err != nil { -// break -// } -// wc.Write(msg) -// wc.Write(newline) -// } -// -// if err != nil && err != io.EOF { -// c.String(500, "Error reading logs. %s", err) -// return -// } -// // -// // rc, err := stream.Reader(c, stream.ToKey(id)) -// // if err != nil { -// // c.String(500, "Failed to create stream reader. %s", err) -// // return -// // } -// // -// // wg := sync.WaitGroup{} -// // wg.Add(1) -// // -// // go func() { -// // defer recover() -// // store.WriteLog(c, &model.Job{ID: id}, rc) -// // wg.Done() -// // }() -// // -// // wc.Close() -// // wg.Wait() -// -// } - // newline defines a newline constant to separate lines in the build output var newline = []byte{'\n'} @@ -315,7 +39,7 @@ func HandleUpdate(c context.Context, message *stomp.Message) { } }() - work := new(queue.Work) + work := new(model.Work) if err := message.Unmarshal(work); err != nil { logrus.Errorf("Invalid input. %s", err) return @@ -376,8 +100,8 @@ func HandleUpdate(c context.Context, message *stomp.Message) { } client := stomp.MustFromContext(c) - err = client.SendJSON("/topic/events", bus.Event{ - Type: bus.Started, + err = client.SendJSON("/topic/events", model.Event{ + Type: model.Started, Repo: *work.Repo, Build: *build, Job: *job, diff --git a/stream/context.go b/stream/context.go deleted file mode 100644 index e1202cd1b..000000000 --- a/stream/context.go +++ /dev/null @@ -1,21 +0,0 @@ -package stream - -import "golang.org/x/net/context" - -const key = "stream" - -// Setter defines a context that enables setting values. -type Setter interface { - Set(string, interface{}) -} - -// FromContext returns the Stream associated with this context. -func FromContext(c context.Context) Stream { - return c.Value(key).(Stream) -} - -// ToContext adds the Stream to this context if it supports the -// Setter interface. -func ToContext(c Setter, s Stream) { - c.Set(key, s) -} diff --git a/stream/reader.go b/stream/reader.go deleted file mode 100644 index 935f0f93c..000000000 --- a/stream/reader.go +++ /dev/null @@ -1,54 +0,0 @@ -package stream - -import ( - "bytes" - "io" - "sync/atomic" -) - -type reader struct { - w *writer - off int - closed uint32 -} - -// Read reads from the Buffer -func (r *reader) Read(p []byte) (n int, err error) { - r.w.RLock() - defer r.w.RUnlock() - - var m int - - for len(p) > 0 { - - m, _ = bytes.NewReader(r.w.buffer.Bytes()[r.off:]).Read(p) - n += m - r.off += n - - if n > 0 { - break - } - - if r.w.Closed() { - err = io.EOF - break - } - if r.Closed() { - err = io.EOF - break - } - - r.w.Wait() - } - - return -} - -func (r *reader) Close() error { - atomic.StoreUint32(&r.closed, 1) - return nil -} - -func (r *reader) Closed() bool { - return atomic.LoadUint32(&r.closed) != 0 -} diff --git a/stream/reader_test.go b/stream/reader_test.go deleted file mode 100644 index e113cc4a4..000000000 --- a/stream/reader_test.go +++ /dev/null @@ -1,7 +0,0 @@ -package stream - -import "testing" - -func TetsReader(t *testing.T) { - t.Skip() //TODO(bradrydzewski) implement reader tests -} diff --git a/stream/stream.go b/stream/stream.go deleted file mode 100644 index 2619b53cb..000000000 --- a/stream/stream.go +++ /dev/null @@ -1,60 +0,0 @@ -package stream - -import ( - "bufio" - "io" - "strconv" - - "golang.org/x/net/context" -) - -// Stream manages the stream of build logs. -type Stream interface { - Create(string) error - Delete(string) error - Reader(string) (io.ReadCloser, error) - Writer(string) (io.WriteCloser, error) -} - -// Create creates a new stream. -func Create(c context.Context, key string) error { - return FromContext(c).Create(key) -} - -// Reader opens the stream for reading. -func Reader(c context.Context, key string) (io.ReadCloser, error) { - return FromContext(c).Reader(key) -} - -// Writer opens the stream for writing. -func Writer(c context.Context, key string) (io.WriteCloser, error) { - return FromContext(c).Writer(key) -} - -// Delete deletes the stream by key. -func Delete(c context.Context, key string) error { - return FromContext(c).Delete(key) -} - -// ToKey is a helper function that converts a unique identifier -// of type int64 into a string. -func ToKey(i int64) string { - return strconv.FormatInt(i, 10) -} - -// Copy copies the stream from the source to the destination in valid JSON -// format. This converts the logs, which are per-line JSON objects, to a -// proper JSON array. -func Copy(dest io.Writer, src io.Reader) error { - io.WriteString(dest, "[") - - scanner := bufio.NewScanner(src) - for scanner.Scan() { - io.WriteString(dest, scanner.Text()) - io.WriteString(dest, ",\n") - } - - io.WriteString(dest, "{}]") - - return nil -} diff --git a/stream/stream_impl.go b/stream/stream_impl.go deleted file mode 100644 index 8e21aaf9b..000000000 --- a/stream/stream_impl.go +++ /dev/null @@ -1,72 +0,0 @@ -package stream - -import ( - "fmt" - "io" - "sync" -) - -type stream struct { - sync.Mutex - writers map[string]*writer -} - -// New returns a new in-memory implementation of Stream. -func New() Stream { - return &stream{ - writers: map[string]*writer{}, - } -} - -// Reader returns an io.Reader for reading from to the stream. -func (s *stream) Reader(name string) (io.ReadCloser, error) { - s.Lock() - defer s.Unlock() - - if !s.exists(name) { - return nil, fmt.Errorf("stream: cannot read stream %s, not found", name) - } - return s.writers[name].Reader() -} - -// Writer returns an io.WriteCloser for writing to the stream. -func (s *stream) Writer(name string) (io.WriteCloser, error) { - s.Lock() - defer s.Unlock() - - if !s.exists(name) { - return nil, fmt.Errorf("stream: cannot write stream %s, not found", name) - } - return s.writers[name], nil -} - -// Create creates a new stream. -func (s *stream) Create(name string) error { - s.Lock() - defer s.Unlock() - - if s.exists(name) { - return fmt.Errorf("stream: cannot create stream %s, already exists", name) - } - - s.writers[name] = newWriter() - return nil -} - -// Delete deletes the stream by key. -func (s *stream) Delete(name string) error { - s.Lock() - defer s.Unlock() - - if !s.exists(name) { - return fmt.Errorf("stream: cannot delete stream %s, not found", name) - } - w := s.writers[name] - delete(s.writers, name) - return w.Close() -} - -func (s *stream) exists(name string) bool { - _, exists := s.writers[name] - return exists -} diff --git a/stream/stream_impl_test.go b/stream/stream_impl_test.go deleted file mode 100644 index fdc29fce4..000000000 --- a/stream/stream_impl_test.go +++ /dev/null @@ -1,7 +0,0 @@ -package stream - -import "testing" - -func TetsStream(t *testing.T) { - t.Skip() //TODO(bradrydzewski) implement stream tests -} diff --git a/stream/writer.go b/stream/writer.go deleted file mode 100644 index 15a873604..000000000 --- a/stream/writer.go +++ /dev/null @@ -1,52 +0,0 @@ -package stream - -import ( - "bytes" - "io" - "sync" - "sync/atomic" -) - -type writer struct { - sync.RWMutex - *sync.Cond - - buffer bytes.Buffer - closed uint32 -} - -func newWriter() *writer { - var w writer - w.Cond = sync.NewCond(w.RWMutex.RLocker()) - return &w -} - -func (w *writer) Write(p []byte) (n int, err error) { - defer w.Broadcast() - w.Lock() - defer w.Unlock() - if w.Closed() { - return 0, io.EOF - } - return w.buffer.Write(p) -} - -func (w *writer) Reader() (io.ReadCloser, error) { - return &reader{w: w}, nil -} - -func (w *writer) Wait() { - if !w.Closed() { - w.Cond.Wait() - } -} - -func (w *writer) Close() error { - atomic.StoreUint32(&w.closed, 1) - w.Cond.Broadcast() - return nil -} - -func (w *writer) Closed() bool { - return atomic.LoadUint32(&w.closed) != 0 -} diff --git a/stream/writer_test.go b/stream/writer_test.go deleted file mode 100644 index c0c757e10..000000000 --- a/stream/writer_test.go +++ /dev/null @@ -1,7 +0,0 @@ -package stream - -import "testing" - -func TetsWriter(t *testing.T) { - t.Skip() //TODO(bradrydzewski) implement writer tests -}