diff --git a/agent/updater.go b/agent/updater.go index 67afee5a6..15922e0e2 100644 --- a/agent/updater.go +++ b/agent/updater.go @@ -42,6 +42,28 @@ func NewClientUpdater(client client.Client) UpdateFunc { } } +func NewStreamLogger(stream client.StreamWriter, w io.Writer, limit int64) LoggerFunc { + var err error + var size int64 + return func(line *build.Line) { + + if size > limit { + return + } + + // TODO remove this double-serialization + linejson, _ := json.Marshal(line) + w.Write(linejson) + w.Write([]byte{'\n'}) + + if err = stream.WriteJSON(line); err != nil { + logrus.Errorf("Error streaming build logs. %s", err) + } + + size += int64(len(line.Out)) + } +} + func NewClientLogger(client client.Client, id int64, rc io.ReadCloser, wc io.WriteCloser, limit int64) LoggerFunc { var once sync.Once var size int64 diff --git a/client/client.go b/client/client.go index 37b6e99ee..783f1fc5d 100644 --- a/client/client.go +++ b/client/client.go @@ -102,6 +102,10 @@ type Client interface { // Stream streams the build logs to the server. Stream(int64, io.ReadCloser) error + LogStream(int64) (StreamWriter, error) + + LogPost(int64, io.ReadCloser) error + // Wait waits for the job to the complete. Wait(int64) *Wait diff --git a/client/client_impl.go b/client/client_impl.go index 96d6814e5..780ffc81b 100644 --- a/client/client_impl.go +++ b/client/client_impl.go @@ -13,17 +13,20 @@ import ( "github.com/drone/drone/model" "github.com/drone/drone/queue" + "github.com/gorilla/websocket" "golang.org/x/net/context" "golang.org/x/net/context/ctxhttp" "golang.org/x/oauth2" ) const ( - pathPull = "%s/api/queue/pull/%s/%s" - pathWait = "%s/api/queue/wait/%d" - pathStream = "%s/api/queue/stream/%d" - pathPush = "%s/api/queue/status/%d" - pathPing = "%s/api/queue/ping" + pathPull = "%s/api/queue/pull/%s/%s" + pathWait = "%s/api/queue/wait/%d" + pathStream = "%s/api/queue/stream/%d" + pathPush = "%s/api/queue/status/%d" + pathPing = "%s/api/queue/ping" + pathLogs = "%s/api/queue/logs/%d" + pathLogsAuth = "%s/api/queue/logs/%d?access_token=%s" pathSelf = "%s/api/user" pathFeed = "%s/api/user/feed" @@ -48,12 +51,13 @@ const ( type client struct { client *http.Client + token string // auth token base string // base url } // NewClient returns a client at the specified url. func NewClient(uri string) Client { - return &client{http.DefaultClient, uri} + return &client{client: http.DefaultClient, base: uri} } // NewClientToken returns a client at the specified url that authenticates all @@ -61,7 +65,7 @@ func NewClient(uri string) Client { func NewClientToken(uri, token string) Client { config := new(oauth2.Config) auther := config.Client(oauth2.NoContext, &oauth2.Token{AccessToken: token}) - return &client{auther, uri} + return &client{client: auther, base: uri, token: token} } // NewClientTokenTLS returns a client at the specified url that authenticates @@ -74,7 +78,7 @@ func NewClientTokenTLS(uri, token string, c *tls.Config) Client { trans.Base = &http.Transport{TLSClientConfig: c} } } - return &client{auther, uri} + return &client{client: auther, base: uri, token: token} } // Self returns the currently authenticated user. @@ -304,9 +308,42 @@ func (c *client) Ping() error { func (c *client) Stream(id int64, rc io.ReadCloser) error { uri := fmt.Sprintf(pathStream, c.base, id) err := c.post(uri, rc, nil) + return err } +// LogPost sends the full build logs to the server. +func (c *client) LogPost(id int64, rc io.ReadCloser) error { + uri := fmt.Sprintf(pathLogs, c.base, id) + return c.post(uri, rc, nil) +} + +// StreamWriter implements a special writer for streaming log entries to the +// central Drone server. The standard implementation is the gorilla.Connection. +type StreamWriter interface { + Close() error + WriteJSON(interface{}) error +} + +// LogStream streams the build logs to the server. +func (c *client) LogStream(id int64) (StreamWriter, error) { + rawurl := fmt.Sprintf(pathLogsAuth, c.base, id, c.token) + uri, err := url.Parse(rawurl) + if err != nil { + return nil, err + } + if uri.Scheme == "https" { + uri.Scheme = "wss" + } else { + uri.Scheme = "ws" + } + + // TODO need TLS client configuration + + conn, _, err := websocket.DefaultDialer.Dial(uri.String(), nil) + return conn, err +} + // Wait watches and waits for the build to cancel or finish. func (c *client) Wait(id int64) *Wait { ctx, cancel := context.WithCancel(context.Background()) diff --git a/drone/agent/exec.go b/drone/agent/exec.go index b179dd312..fdd5c9fb2 100644 --- a/drone/agent/exec.go +++ b/drone/agent/exec.go @@ -1,7 +1,8 @@ package agent import ( - "io" + "bytes" + "io/ioutil" "time" "github.com/Sirupsen/logrus" @@ -40,15 +41,23 @@ func (r *pipeline) run() error { engine := docker.NewClient(r.docker) // streaming the logs - rc, wc := io.Pipe() - defer func() { - wc.Close() - rc.Close() - }() + // rc, wc := io.Pipe() + // defer func() { + // wc.Close() + // rc.Close() + // }() + + var buf bytes.Buffer + + stream, err := r.drone.LogStream(w.Job.ID) + if err != nil { + return err + } a := agent.Agent{ - Update: agent.NewClientUpdater(r.drone), - Logger: agent.NewClientLogger(r.drone, w.Job.ID, rc, wc, r.config.logs), + Update: agent.NewClientUpdater(r.drone), + // Logger: agent.NewClientLogger(r.drone, w.Job.ID, rc, wc, r.config.logs), + Logger: agent.NewStreamLogger(stream, &buf, r.config.logs), Engine: engine, Timeout: r.config.timeout, Platform: r.config.platform, @@ -70,8 +79,11 @@ func (r *pipeline) run() error { a.Run(w, cancel) - wc.Close() - rc.Close() + if err := r.drone.LogPost(w.Job.ID, ioutil.NopCloser(&buf)); err != nil { + logrus.Errorf("Error sending logs for %s/%s#%d.%d", + w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number) + } + stream.Close() logrus.Infof("Finished build %s/%s#%d.%d", w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number) diff --git a/router/router.go b/router/router.go index 86ccd8ac0..370d7b8f3 100644 --- a/router/router.go +++ b/router/router.go @@ -156,6 +156,9 @@ func Load(middleware ...gin.HandlerFunc) http.Handler { queue.POST("/stream/:id", server.Stream) queue.POST("/status/:id", server.Update) queue.POST("/ping", server.Ping) + + queue.POST("/logs/:id", server.PostLogs) + queue.GET("/logs/:id", server.WriteLogs) } // DELETE THESE diff --git a/server/queue.go b/server/queue.go index 7570ff33a..52fc4bf82 100644 --- a/server/queue.go +++ b/server/queue.go @@ -3,6 +3,7 @@ package server import ( "fmt" "io" + "net/http" "strconv" "sync" "time" @@ -15,6 +16,7 @@ import ( "github.com/drone/drone/store" "github.com/drone/drone/stream" "github.com/gin-gonic/gin" + "github.com/gorilla/websocket" ) // Pull is a long request that polls and attemts to pull work off the queue stack. @@ -25,6 +27,12 @@ func Pull(c *gin.Context) { if w == nil { logrus.Debugf("Agent %s could not pull work.", c.ClientIP()) } else { + + // setup the channel to stream logs + if err := stream.Create(c, stream.ToKey(w.Job.ID)); err != nil { + logrus.Errorf("Unable to create stream. %s", err) + } + c.JSON(202, w) logrus.Debugf("Agent %s assigned work. %s/%s#%d.%d", @@ -63,7 +71,7 @@ func Wait(c *gin.Context) { } } -// Update handles build updates from the agent and persists to the database. +// Update handles build updates from the agent and persists to the database. func Update(c *gin.Context) { work := &queue.Work{} if err := c.BindJSON(work); err != nil { @@ -99,12 +107,12 @@ func Update(c *gin.Context) { store.UpdateBuild(c, build) } - if job.Status == model.StatusRunning { - err := stream.Create(c, stream.ToKey(job.ID)) - if err != nil { - logrus.Errorf("Unable to create stream. %s", err) - } - } + // if job.Status == model.StatusRunning { + // err := stream.Create(c, stream.ToKey(job.ID)) + // if err != nil { + // logrus.Errorf("Unable to create stream. %s", err) + // } + // } ok, err := store.UpdateBuildJob(c, build, job) if err != nil { @@ -199,3 +207,98 @@ func Ping(c *gin.Context) { } c.String(200, "PONG") } + +// +// +// Below are alternate implementations for the Queue that use websockets. +// +// + +// PostLogs handles an http request from the agent to post build logs. These +// logs are posted at the end of the build process. +func PostLogs(c *gin.Context) { + id, _ := strconv.ParseInt(c.Param("id"), 10, 64) + job, err := store.GetJob(c, id) + if err != nil { + c.String(404, "Cannot upload logs. %s", err) + return + } + if err := store.WriteLog(c, job, c.Request.Body); err != nil { + c.String(500, "Cannot persist logs", err) + return + } + c.String(200, "") +} + +// WriteLogs handles an http request from the agent to stream build logs from +// the agent to the server to enable real time streamings to the client. +func WriteLogs(c *gin.Context) { + id, err := strconv.ParseInt(c.Param("id"), 10, 64) + if err != nil { + c.String(500, "Invalid input. %s", err) + return + } + + conn, err := upgrader.Upgrade(c.Writer, c.Request, nil) + if err != nil { + c.String(500, "Cannot upgrade to websocket. %s", err) + return + } + defer conn.Close() + + wc, err := stream.Writer(c, stream.ToKey(id)) + if err != nil { + c.String(500, "Cannot create stream writer. %s", err) + return + } + defer func() { + wc.Close() + stream.Delete(c, stream.ToKey(id)) + }() + + var msg []byte + for { + _, msg, err = conn.ReadMessage() + if err != nil { + break + } + wc.Write(msg) + wc.Write(newline) + } + + if err != nil && err != io.EOF { + c.String(500, "Error reading logs. %s", err) + return + } + // + // rc, err := stream.Reader(c, stream.ToKey(id)) + // if err != nil { + // c.String(500, "Failed to create stream reader. %s", err) + // return + // } + // + // wg := sync.WaitGroup{} + // wg.Add(1) + // + // go func() { + // defer recover() + // store.WriteLog(c, &model.Job{ID: id}, rc) + // wg.Done() + // }() + // + // wc.Close() + // wg.Wait() + +} + +// newline defines a newline constant to separate lines in the build output +var newline = []byte{'\n'} + +// upgrader defines the default behavior for upgrading the websocket. +var upgrader = websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, + CheckOrigin: func(r *http.Request) bool { + return true + }, +} diff --git a/yaml/transform/clone.go b/yaml/transform/clone.go index b3e20f430..06674a06a 100644 --- a/yaml/transform/clone.go +++ b/yaml/transform/clone.go @@ -11,6 +11,9 @@ func Clone(c *yaml.Config, plugin string) error { return nil } } + if plugin == "" { + plugin = "git" + } s := &yaml.Container{ Image: plugin,