You've already forked woodpecker
							
							
				mirror of
				https://github.com/woodpecker-ci/woodpecker.git
				synced 2025-10-30 23:27:39 +02:00 
			
		
		
		
	added 0.5 yaml and runtime support through feature flag
This commit is contained in:
		
							
								
								
									
										31
									
								
								api/build.go
									
									
									
									
									
								
							
							
						
						
									
										31
									
								
								api/build.go
									
									
									
									
									
								
							| @@ -12,6 +12,7 @@ import ( | ||||
|  | ||||
| 	log "github.com/Sirupsen/logrus" | ||||
| 	"github.com/drone/drone/engine" | ||||
| 	"github.com/drone/drone/queue" | ||||
| 	"github.com/drone/drone/remote" | ||||
| 	"github.com/drone/drone/shared/httputil" | ||||
| 	"github.com/drone/drone/store" | ||||
| @@ -280,6 +281,36 @@ func PostBuild(c *gin.Context) { | ||||
| 	// on status change notifications | ||||
| 	last, _ := store.GetBuildLastBefore(c, repo, build.Branch, build.ID) | ||||
|  | ||||
|  | ||||
| 	// IMPORTANT. PLEASE READ | ||||
| 	// | ||||
| 	// The below code uses a feature flag to switch between the current | ||||
| 	// build engine and the exerimental 0.5 build engine. This can be | ||||
| 	// enabled using with the environment variable CANARY=true | ||||
|  | ||||
| 	if os.Getenv("CANARY") == "true" { | ||||
| 		for _, job := range jobs { | ||||
| 			queue.Publish(c, &queue.Work{ | ||||
| 				User:      user, | ||||
| 				Repo:      repo, | ||||
| 				Build:     build, | ||||
| 				BuildLast: last, | ||||
| 				Job:       job, | ||||
| 				Keys:      key, | ||||
| 				Netrc:     netrc, | ||||
| 				Yaml:      string(raw), | ||||
| 				YamlEnc:   string(sec), | ||||
| 				System: &model.System{ | ||||
| 					Link:      httputil.GetURL(c.Request), | ||||
| 					Plugins:   strings.Split(os.Getenv("PLUGIN_FILTER"), " "), | ||||
| 					Globals:   strings.Split(os.Getenv("PLUGIN_PARAMS"), " "), | ||||
| 					Escalates: strings.Split(os.Getenv("ESCALATE_FILTER"), " "), | ||||
| 				}, | ||||
| 			}) | ||||
| 		} | ||||
| 		return // EXIT NOT TO AVOID THE 0.4 ENGINE CODE BELOW | ||||
| 	} | ||||
|  | ||||
| 	engine_ := engine.FromContext(c) | ||||
| 	go engine_.Schedule(c.Copy(), &engine.Task{ | ||||
| 		User:      user, | ||||
|   | ||||
| @@ -1,4 +1,4 @@ | ||||
| package libyaml | ||||
| package compiler | ||||
|  | ||||
| import ( | ||||
| 	"github.com/drone/drone/engine/runner" | ||||
| @@ -143,4 +143,4 @@ func (c *Compiler) walk(node yaml.Node) (err error) { | ||||
| 		} | ||||
| 	} | ||||
| 	return err | ||||
| } | ||||
| } | ||||
|   | ||||
| @@ -1 +1 @@ | ||||
| package libyaml | ||||
| package compiler | ||||
|   | ||||
| @@ -1,4 +1,4 @@ | ||||
| package libyaml | ||||
| package compiler | ||||
|  | ||||
| import "github.com/drone/drone/engine/compiler/parse" | ||||
|  | ||||
|   | ||||
							
								
								
									
										158
									
								
								engine/engine/engine.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										158
									
								
								engine/engine/engine.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,158 @@ | ||||
| package engine | ||||
|  | ||||
| import ( | ||||
| 	"fmt" | ||||
| 	"time" | ||||
|  | ||||
| 	"github.com/Sirupsen/logrus" | ||||
| 	"github.com/drone/drone/bus" | ||||
| 	"github.com/drone/drone/engine/compiler" | ||||
| 	"github.com/drone/drone/engine/runner" | ||||
| 	"github.com/drone/drone/engine/runner/docker" | ||||
| 	"github.com/drone/drone/model" | ||||
| 	"github.com/drone/drone/queue" | ||||
| 	"github.com/drone/drone/store" | ||||
| 	"github.com/drone/drone/stream" | ||||
| 	"golang.org/x/net/context" | ||||
| ) | ||||
|  | ||||
| // Poll polls the build queue for build jobs. | ||||
| func Poll(c context.Context) { | ||||
| 	for { | ||||
| 		pollRecover(c) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func pollRecover(c context.Context) { | ||||
| 	defer recover() | ||||
| 	poll(c) | ||||
| } | ||||
|  | ||||
| func poll(c context.Context) { | ||||
| 	w := queue.Pull(c) | ||||
|  | ||||
| 	logrus.Infof("Starting build %s/%s#%d.%d", | ||||
| 		w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number) | ||||
|  | ||||
| 	rc, wc, err := stream.Create(c, stream.ToKey(w.Job.ID)) | ||||
| 	if err != nil { | ||||
| 		logrus.Errorf("Error opening build stream %s/%s#%d.%d. %s", | ||||
| 			w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number, err) | ||||
| 	} | ||||
|  | ||||
| 	defer func() { | ||||
| 		wc.Close() | ||||
| 		rc.Close() | ||||
| 		stream.Remove(c, stream.ToKey(w.Job.ID)) | ||||
| 	}() | ||||
|  | ||||
| 	w.Job.Status = model.StatusRunning | ||||
| 	w.Job.Started = time.Now().Unix() | ||||
|  | ||||
| 	quitc := make(chan bool, 1) | ||||
| 	eventc := make(chan *bus.Event, 1) | ||||
| 	bus.Subscribe(c, eventc) | ||||
|  | ||||
| 	compile := compiler.New() | ||||
| 	compile.Transforms(nil) | ||||
| 	spec, err := compile.CompileString(w.Yaml) | ||||
| 	if err != nil { | ||||
| 		// TODO handle error | ||||
| 		logrus.Infof("Error compiling Yaml %s/%s#%d %s", | ||||
| 			w.Repo.Owner, w.Repo.Name, w.Build.Number, err.Error()) | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	defer func() { | ||||
| 		bus.Unsubscribe(c, eventc) | ||||
| 		quitc <- true | ||||
| 	}() | ||||
|  | ||||
| 	ctx := context.TODO() | ||||
| 	ctx, cancel := context.WithCancel(ctx) | ||||
|  | ||||
| 	// TODO store the started build in the database | ||||
| 	// TODO publish the started build | ||||
| 	store.UpdateJob(c, w.Job) | ||||
| 	//store.Write(c, w.Job, rc) | ||||
| 	bus.Publish(c, bus.NewEvent(bus.Started, w.Repo, w.Build, w.Job)) | ||||
|  | ||||
| 	conf := runner.Config{ | ||||
| 		Engine: docker.FromContext(c), | ||||
| 	} | ||||
|  | ||||
| 	run := conf.Runner(ctx, spec) | ||||
| 	run.Run() | ||||
| 	defer cancel() | ||||
|  | ||||
| 	go func() { | ||||
| 		for { | ||||
| 			select { | ||||
| 			case event := <-eventc: | ||||
| 				if event.Type == bus.Cancelled && event.Job.ID == w.Job.ID { | ||||
| 					logrus.Infof("Cancel build %s/%s#%d.%d", | ||||
| 						w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number) | ||||
| 					cancel() | ||||
| 				} | ||||
| 			case <-quitc: | ||||
| 				return | ||||
| 			} | ||||
| 		} | ||||
| 	}() | ||||
|  | ||||
| 	pipe := run.Pipe() | ||||
| 	for { | ||||
| 		line := pipe.Next() | ||||
| 		if line == nil { | ||||
| 			break | ||||
| 		} | ||||
| 		fmt.Println(line) | ||||
| 	} | ||||
|  | ||||
| 	err = run.Wait() | ||||
|  | ||||
| 	// catch the build result | ||||
| 	if err != nil { | ||||
| 		w.Job.ExitCode = 255 | ||||
| 	} | ||||
| 	if exitErr, ok := err.(*runner.ExitError); ok { | ||||
| 		w.Job.ExitCode = exitErr.Code | ||||
| 	} | ||||
|  | ||||
| 	w.Job.Finished = time.Now().Unix() | ||||
|  | ||||
| 	switch w.Job.ExitCode { | ||||
| 	case 128, 130: | ||||
| 		w.Job.Status = model.StatusKilled | ||||
| 	case 0: | ||||
| 		w.Job.Status = model.StatusSuccess | ||||
| 	default: | ||||
| 		w.Job.Status = model.StatusFailure | ||||
| 	} | ||||
|  | ||||
| 	// store the finished build in the database | ||||
| 	logs, _, err := stream.Open(c, stream.ToKey(w.Job.ID)) | ||||
| 	if err != nil { | ||||
| 		logrus.Errorf("Error reading build stream %s/%s#%d.%d", | ||||
| 			w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number) | ||||
| 	} | ||||
| 	defer func() { | ||||
| 		if logs != nil { | ||||
| 			logs.Close() | ||||
| 		} | ||||
| 	}() | ||||
| 	if err := store.WriteLog(c, w.Job, logs); err != nil { | ||||
| 		logrus.Errorf("Error persisting build stream %s/%s#%d.%d", | ||||
| 			w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number) | ||||
| 	} | ||||
| 	if logs != nil { | ||||
| 		logs.Close() | ||||
| 	} | ||||
|  | ||||
| 	// TODO publish the finished build | ||||
| 	store.UpdateJob(c, w.Job) | ||||
| 	bus.Publish(c, bus.NewEvent(bus.Finished, w.Repo, w.Build, w.Job)) | ||||
|  | ||||
| 	logrus.Infof("Finished build %s/%s#%d.%d", | ||||
| 		w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number) | ||||
| } | ||||
							
								
								
									
										24
									
								
								engine/runner/docker/context.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										24
									
								
								engine/runner/docker/context.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,24 @@ | ||||
| package docker | ||||
|  | ||||
| import ( | ||||
| 	"github.com/drone/drone/engine/runner" | ||||
| 	"golang.org/x/net/context" | ||||
| ) | ||||
|  | ||||
| const key = "docker" | ||||
|  | ||||
| // Setter defines a context that enables setting values. | ||||
| type Setter interface { | ||||
| 	Set(string, interface{}) | ||||
| } | ||||
|  | ||||
| // FromContext returns the Engine associated with this context. | ||||
| func FromContext(c context.Context) runner.Engine { | ||||
| 	return c.Value(key).(runner.Engine) | ||||
| } | ||||
|  | ||||
| // ToContext adds the Engine to this context if it supports the | ||||
| // Setter interface. | ||||
| func ToContext(c Setter, d runner.Engine) { | ||||
| 	c.Set(key, d) | ||||
| } | ||||
							
								
								
									
										111
									
								
								engine/runner/docker/docker.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										111
									
								
								engine/runner/docker/docker.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,111 @@ | ||||
| package docker | ||||
|  | ||||
| import ( | ||||
| 	"io" | ||||
|  | ||||
| 	"github.com/drone/drone/engine/runner" | ||||
| 	"github.com/drone/drone/engine/runner/docker/internal" | ||||
|  | ||||
| 	"github.com/samalba/dockerclient" | ||||
| ) | ||||
|  | ||||
| type dockerEngine struct { | ||||
| 	client dockerclient.Client | ||||
| } | ||||
|  | ||||
| func (e *dockerEngine) ContainerStart(container *runner.Container) (string, error) { | ||||
| 	conf := toContainerConfig(container) | ||||
| 	auth := toAuthConfig(container) | ||||
|  | ||||
| 	// pull the image if it does not exists or if the Container | ||||
| 	// is configured to always pull a new image. | ||||
| 	_, err := e.client.InspectImage(container.Image) | ||||
| 	if err != nil || container.Pull { | ||||
| 		e.client.PullImage(container.Image, auth) | ||||
| 	} | ||||
|  | ||||
| 	// create and start the container and return the Container ID. | ||||
| 	id, err := e.client.CreateContainer(conf, container.Name, auth) | ||||
| 	if err != nil { | ||||
| 		return id, err | ||||
| 	} | ||||
| 	err = e.client.StartContainer(id, &conf.HostConfig) | ||||
| 	if err != nil { | ||||
|  | ||||
| 		// remove the container if it cannot be started | ||||
| 		e.client.RemoveContainer(id, true, true) | ||||
| 		return id, err | ||||
| 	} | ||||
| 	return id, nil | ||||
| } | ||||
|  | ||||
| func (e *dockerEngine) ContainerStop(id string) error { | ||||
| 	e.client.StopContainer(id, 1) | ||||
| 	e.client.KillContainer(id, "9") | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (e *dockerEngine) ContainerRemove(id string) error { | ||||
| 	e.client.StopContainer(id, 1) | ||||
| 	e.client.KillContainer(id, "9") | ||||
| 	e.client.RemoveContainer(id, true, true) | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (e *dockerEngine) ContainerWait(id string) (*runner.State, error) { | ||||
| 	// wait for the container to exit | ||||
| 	// | ||||
| 	// TODO(bradrydzewski) we should have a for loop here | ||||
| 	// to re-connect and wait if this channel returns a | ||||
| 	// result even though the container is still running. | ||||
| 	// | ||||
| 	<-e.client.Wait(id) | ||||
| 	v, err := e.client.InspectContainer(id) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	return &runner.State{ | ||||
| 		ExitCode:  v.State.ExitCode, | ||||
| 		OOMKilled: v.State.OOMKilled, | ||||
| 	}, nil | ||||
| } | ||||
|  | ||||
| func (e *dockerEngine) ContainerLogs(id string) (io.ReadCloser, error) { | ||||
| 	opts := &dockerclient.LogOptions{ | ||||
| 		Follow: true, | ||||
| 		Stdout: true, | ||||
| 		Stderr: true, | ||||
| 	} | ||||
|  | ||||
| 	piper, pipew := io.Pipe() | ||||
| 	go func() { | ||||
| 		defer pipew.Close() | ||||
|  | ||||
| 		// sometimes the docker logs fails due to parsing errors. this | ||||
| 		// routine will check for such a failure and attempt to resume | ||||
| 		// if necessary. | ||||
| 		for i := 0; i < 5; i++ { | ||||
| 			if i > 0 { | ||||
| 				opts.Tail = 1 | ||||
| 			} | ||||
|  | ||||
| 			rc, err := e.client.ContainerLogs(id, opts) | ||||
| 			if err != nil { | ||||
| 				return | ||||
| 			} | ||||
| 			defer rc.Close() | ||||
|  | ||||
| 			// use Docker StdCopy | ||||
| 			internal.StdCopy(pipew, pipew, rc) | ||||
|  | ||||
| 			// check to see if the container is still running. If not, | ||||
| 			// we can safely exit and assume there are no more logs left | ||||
| 			// to stream. | ||||
| 			v, err := e.client.InspectContainer(id) | ||||
| 			if err != nil || !v.State.Running { | ||||
| 				return | ||||
| 			} | ||||
| 		} | ||||
| 	}() | ||||
| 	return piper, nil | ||||
| } | ||||
							
								
								
									
										1
									
								
								engine/runner/docker/docker_test.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										1
									
								
								engine/runner/docker/docker_test.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1 @@ | ||||
| package docker | ||||
							
								
								
									
										49
									
								
								engine/runner/docker/helper.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										49
									
								
								engine/runner/docker/helper.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,49 @@ | ||||
| package docker | ||||
|  | ||||
| import ( | ||||
| 	"os" | ||||
|  | ||||
| 	"github.com/drone/drone/engine/runner" | ||||
| 	"github.com/samalba/dockerclient" | ||||
| ) | ||||
|  | ||||
| var ( | ||||
| 	dockerHost = os.Getenv("DOCKER_HOST") | ||||
| 	dockerCert = os.Getenv("DOCKER_CERT_PATH") | ||||
| 	dockerTLS  = os.Getenv("DOCKER_TLS_VERIFY") | ||||
| ) | ||||
|  | ||||
| func init() { | ||||
| 	if dockerHost == "" { | ||||
| 		dockerHost = "unix:///var/run/docker.sock" | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // New returns a new Docker engine using the provided Docker client. | ||||
| func New(client dockerclient.Client) runner.Engine { | ||||
| 	return &dockerEngine{client} | ||||
| } | ||||
|  | ||||
| // NewEnv returns a new Docker engine from the DOCKER_HOST and DOCKER_CERT_PATH | ||||
| // environment variables. | ||||
| func NewEnv() (runner.Engine, error) { | ||||
| 	config, err := dockerclient.TLSConfigFromCertPath(dockerCert) | ||||
| 	if err == nil && dockerTLS != "1" { | ||||
| 		config.InsecureSkipVerify = true | ||||
| 	} | ||||
| 	client, err := dockerclient.NewDockerClient(dockerHost, config) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	return New(client), nil | ||||
| } | ||||
|  | ||||
| // MustEnv returns a new Docker engine from the DOCKER_HOST and DOCKER_CERT_PATH | ||||
| // environment variables. Errors creating the Docker engine will panic. | ||||
| func MustEnv() runner.Engine { | ||||
| 	engine, err := NewEnv() | ||||
| 	if err != nil { | ||||
| 		panic(err) | ||||
| 	} | ||||
| 	return engine | ||||
| } | ||||
							
								
								
									
										1
									
								
								engine/runner/docker/helper_test.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										1
									
								
								engine/runner/docker/helper_test.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1 @@ | ||||
| package docker | ||||
							
								
								
									
										1
									
								
								engine/runner/docker/internal/README
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										1
									
								
								engine/runner/docker/internal/README
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1 @@ | ||||
| This is an internal copy of the Docker stdcopy package that removes the logrus debug logging. The original package is found at https://github.com/docker/docker/tree/master/pkg/stdcopy | ||||
							
								
								
									
										167
									
								
								engine/runner/docker/internal/stdcopy.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										167
									
								
								engine/runner/docker/internal/stdcopy.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,167 @@ | ||||
| package internal | ||||
|  | ||||
| import ( | ||||
| 	"encoding/binary" | ||||
| 	"errors" | ||||
| 	"fmt" | ||||
| 	"io" | ||||
| ) | ||||
|  | ||||
| // StdType is the type of standard stream | ||||
| // a writer can multiplex to. | ||||
| type StdType byte | ||||
|  | ||||
| const ( | ||||
| 	// Stdin represents standard input stream type. | ||||
| 	Stdin StdType = iota | ||||
| 	// Stdout represents standard output stream type. | ||||
| 	Stdout | ||||
| 	// Stderr represents standard error steam type. | ||||
| 	Stderr | ||||
|  | ||||
| 	stdWriterPrefixLen = 8 | ||||
| 	stdWriterFdIndex   = 0 | ||||
| 	stdWriterSizeIndex = 4 | ||||
|  | ||||
| 	startingBufLen = 32*1024 + stdWriterPrefixLen + 1 | ||||
| ) | ||||
|  | ||||
| // stdWriter is wrapper of io.Writer with extra customized info. | ||||
| type stdWriter struct { | ||||
| 	io.Writer | ||||
| 	prefix byte | ||||
| } | ||||
|  | ||||
| // Write sends the buffer to the underneath writer. | ||||
| // It insert the prefix header before the buffer, | ||||
| // so stdcopy.StdCopy knows where to multiplex the output. | ||||
| // It makes stdWriter to implement io.Writer. | ||||
| func (w *stdWriter) Write(buf []byte) (n int, err error) { | ||||
| 	if w == nil || w.Writer == nil { | ||||
| 		return 0, errors.New("Writer not instantiated") | ||||
| 	} | ||||
| 	if buf == nil { | ||||
| 		return 0, nil | ||||
| 	} | ||||
|  | ||||
| 	header := [stdWriterPrefixLen]byte{stdWriterFdIndex: w.prefix} | ||||
| 	binary.BigEndian.PutUint32(header[stdWriterSizeIndex:], uint32(len(buf))) | ||||
|  | ||||
| 	line := append(header[:], buf...) | ||||
|  | ||||
| 	n, err = w.Writer.Write(line) | ||||
| 	n -= stdWriterPrefixLen | ||||
|  | ||||
| 	if n < 0 { | ||||
| 		n = 0 | ||||
| 	} | ||||
| 	return | ||||
| } | ||||
|  | ||||
| // NewStdWriter instantiates a new Writer. | ||||
| // Everything written to it will be encapsulated using a custom format, | ||||
| // and written to the underlying `w` stream. | ||||
| // This allows multiple write streams (e.g. stdout and stderr) to be muxed into a single connection. | ||||
| // `t` indicates the id of the stream to encapsulate. | ||||
| // It can be stdcopy.Stdin, stdcopy.Stdout, stdcopy.Stderr. | ||||
| func NewStdWriter(w io.Writer, t StdType) io.Writer { | ||||
| 	return &stdWriter{ | ||||
| 		Writer: w, | ||||
| 		prefix: byte(t), | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // StdCopy is a modified version of io.Copy. | ||||
| // | ||||
| // StdCopy will demultiplex `src`, assuming that it contains two streams, | ||||
| // previously multiplexed together using a StdWriter instance. | ||||
| // As it reads from `src`, StdCopy will write to `dstout` and `dsterr`. | ||||
| // | ||||
| // StdCopy will read until it hits EOF on `src`. It will then return a nil error. | ||||
| // In other words: if `err` is non nil, it indicates a real underlying error. | ||||
| // | ||||
| // `written` will hold the total number of bytes written to `dstout` and `dsterr`. | ||||
| func StdCopy(dstout, dsterr io.Writer, src io.Reader) (written int64, err error) { | ||||
| 	var ( | ||||
| 		buf       = make([]byte, startingBufLen) | ||||
| 		bufLen    = len(buf) | ||||
| 		nr, nw    int | ||||
| 		er, ew    error | ||||
| 		out       io.Writer | ||||
| 		frameSize int | ||||
| 	) | ||||
|  | ||||
| 	for { | ||||
| 		// Make sure we have at least a full header | ||||
| 		for nr < stdWriterPrefixLen { | ||||
| 			var nr2 int | ||||
| 			nr2, er = src.Read(buf[nr:]) | ||||
| 			nr += nr2 | ||||
| 			if er == io.EOF { | ||||
| 				if nr < stdWriterPrefixLen { | ||||
| 					return written, nil | ||||
| 				} | ||||
| 				break | ||||
| 			} | ||||
| 			if er != nil { | ||||
| 				return 0, er | ||||
| 			} | ||||
| 		} | ||||
|  | ||||
| 		// Check the first byte to know where to write | ||||
| 		switch StdType(buf[stdWriterFdIndex]) { | ||||
| 		case Stdin: | ||||
| 			fallthrough | ||||
| 		case Stdout: | ||||
| 			// Write on stdout | ||||
| 			out = dstout | ||||
| 		case Stderr: | ||||
| 			// Write on stderr | ||||
| 			out = dsterr | ||||
| 		default: | ||||
| 			return 0, fmt.Errorf("Unrecognized input header: %d", buf[stdWriterFdIndex]) | ||||
| 		} | ||||
|  | ||||
| 		// Retrieve the size of the frame | ||||
| 		frameSize = int(binary.BigEndian.Uint32(buf[stdWriterSizeIndex : stdWriterSizeIndex+4])) | ||||
|  | ||||
| 		// Check if the buffer is big enough to read the frame. | ||||
| 		// Extend it if necessary. | ||||
| 		if frameSize+stdWriterPrefixLen > bufLen { | ||||
| 			buf = append(buf, make([]byte, frameSize+stdWriterPrefixLen-bufLen+1)...) | ||||
| 			bufLen = len(buf) | ||||
| 		} | ||||
|  | ||||
| 		// While the amount of bytes read is less than the size of the frame + header, we keep reading | ||||
| 		for nr < frameSize+stdWriterPrefixLen { | ||||
| 			var nr2 int | ||||
| 			nr2, er = src.Read(buf[nr:]) | ||||
| 			nr += nr2 | ||||
| 			if er == io.EOF { | ||||
| 				if nr < frameSize+stdWriterPrefixLen { | ||||
| 					return written, nil | ||||
| 				} | ||||
| 				break | ||||
| 			} | ||||
| 			if er != nil { | ||||
| 				return 0, er | ||||
| 			} | ||||
| 		} | ||||
|  | ||||
| 		// Write the retrieved frame (without header) | ||||
| 		nw, ew = out.Write(buf[stdWriterPrefixLen : frameSize+stdWriterPrefixLen]) | ||||
| 		if ew != nil { | ||||
| 			return 0, ew | ||||
| 		} | ||||
| 		// If the frame has not been fully written: error | ||||
| 		if nw != frameSize { | ||||
| 			return 0, io.ErrShortWrite | ||||
| 		} | ||||
| 		written += int64(nw) | ||||
|  | ||||
| 		// Move the rest of the buffer to the beginning | ||||
| 		copy(buf, buf[frameSize+stdWriterPrefixLen:]) | ||||
| 		// Move the index | ||||
| 		nr -= frameSize + stdWriterPrefixLen | ||||
| 	} | ||||
| } | ||||
							
								
								
									
										260
									
								
								engine/runner/docker/internal/stdcopy_test.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										260
									
								
								engine/runner/docker/internal/stdcopy_test.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,260 @@ | ||||
| package internal | ||||
|  | ||||
| import ( | ||||
| 	"bytes" | ||||
| 	"errors" | ||||
| 	"io" | ||||
| 	"io/ioutil" | ||||
| 	"strings" | ||||
| 	"testing" | ||||
| ) | ||||
|  | ||||
| func TestNewStdWriter(t *testing.T) { | ||||
| 	writer := NewStdWriter(ioutil.Discard, Stdout) | ||||
| 	if writer == nil { | ||||
| 		t.Fatalf("NewStdWriter with an invalid StdType should not return nil.") | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestWriteWithUnitializedStdWriter(t *testing.T) { | ||||
| 	writer := stdWriter{ | ||||
| 		Writer: nil, | ||||
| 		prefix: byte(Stdout), | ||||
| 	} | ||||
| 	n, err := writer.Write([]byte("Something here")) | ||||
| 	if n != 0 || err == nil { | ||||
| 		t.Fatalf("Should fail when given an uncomplete or uninitialized StdWriter") | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestWriteWithNilBytes(t *testing.T) { | ||||
| 	writer := NewStdWriter(ioutil.Discard, Stdout) | ||||
| 	n, err := writer.Write(nil) | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("Shouldn't have fail when given no data") | ||||
| 	} | ||||
| 	if n > 0 { | ||||
| 		t.Fatalf("Write should have written 0 byte, but has written %d", n) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestWrite(t *testing.T) { | ||||
| 	writer := NewStdWriter(ioutil.Discard, Stdout) | ||||
| 	data := []byte("Test StdWrite.Write") | ||||
| 	n, err := writer.Write(data) | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("Error while writing with StdWrite") | ||||
| 	} | ||||
| 	if n != len(data) { | ||||
| 		t.Fatalf("Write should have written %d byte but wrote %d.", len(data), n) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| type errWriter struct { | ||||
| 	n   int | ||||
| 	err error | ||||
| } | ||||
|  | ||||
| func (f *errWriter) Write(buf []byte) (int, error) { | ||||
| 	return f.n, f.err | ||||
| } | ||||
|  | ||||
| func TestWriteWithWriterError(t *testing.T) { | ||||
| 	expectedError := errors.New("expected") | ||||
| 	expectedReturnedBytes := 10 | ||||
| 	writer := NewStdWriter(&errWriter{ | ||||
| 		n:   stdWriterPrefixLen + expectedReturnedBytes, | ||||
| 		err: expectedError}, Stdout) | ||||
| 	data := []byte("This won't get written, sigh") | ||||
| 	n, err := writer.Write(data) | ||||
| 	if err != expectedError { | ||||
| 		t.Fatalf("Didn't get expected error.") | ||||
| 	} | ||||
| 	if n != expectedReturnedBytes { | ||||
| 		t.Fatalf("Didn't get expected written bytes %d, got %d.", | ||||
| 			expectedReturnedBytes, n) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestWriteDoesNotReturnNegativeWrittenBytes(t *testing.T) { | ||||
| 	writer := NewStdWriter(&errWriter{n: -1}, Stdout) | ||||
| 	data := []byte("This won't get written, sigh") | ||||
| 	actual, _ := writer.Write(data) | ||||
| 	if actual != 0 { | ||||
| 		t.Fatalf("Expected returned written bytes equal to 0, got %d", actual) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func getSrcBuffer(stdOutBytes, stdErrBytes []byte) (buffer *bytes.Buffer, err error) { | ||||
| 	buffer = new(bytes.Buffer) | ||||
| 	dstOut := NewStdWriter(buffer, Stdout) | ||||
| 	_, err = dstOut.Write(stdOutBytes) | ||||
| 	if err != nil { | ||||
| 		return | ||||
| 	} | ||||
| 	dstErr := NewStdWriter(buffer, Stderr) | ||||
| 	_, err = dstErr.Write(stdErrBytes) | ||||
| 	return | ||||
| } | ||||
|  | ||||
| func TestStdCopyWriteAndRead(t *testing.T) { | ||||
| 	stdOutBytes := []byte(strings.Repeat("o", startingBufLen)) | ||||
| 	stdErrBytes := []byte(strings.Repeat("e", startingBufLen)) | ||||
| 	buffer, err := getSrcBuffer(stdOutBytes, stdErrBytes) | ||||
| 	if err != nil { | ||||
| 		t.Fatal(err) | ||||
| 	} | ||||
| 	written, err := StdCopy(ioutil.Discard, ioutil.Discard, buffer) | ||||
| 	if err != nil { | ||||
| 		t.Fatal(err) | ||||
| 	} | ||||
| 	expectedTotalWritten := len(stdOutBytes) + len(stdErrBytes) | ||||
| 	if written != int64(expectedTotalWritten) { | ||||
| 		t.Fatalf("Expected to have total of %d bytes written, got %d", expectedTotalWritten, written) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| type customReader struct { | ||||
| 	n            int | ||||
| 	err          error | ||||
| 	totalCalls   int | ||||
| 	correctCalls int | ||||
| 	src          *bytes.Buffer | ||||
| } | ||||
|  | ||||
| func (f *customReader) Read(buf []byte) (int, error) { | ||||
| 	f.totalCalls++ | ||||
| 	if f.totalCalls <= f.correctCalls { | ||||
| 		return f.src.Read(buf) | ||||
| 	} | ||||
| 	return f.n, f.err | ||||
| } | ||||
|  | ||||
| func TestStdCopyReturnsErrorReadingHeader(t *testing.T) { | ||||
| 	expectedError := errors.New("error") | ||||
| 	reader := &customReader{ | ||||
| 		err: expectedError} | ||||
| 	written, err := StdCopy(ioutil.Discard, ioutil.Discard, reader) | ||||
| 	if written != 0 { | ||||
| 		t.Fatalf("Expected 0 bytes read, got %d", written) | ||||
| 	} | ||||
| 	if err != expectedError { | ||||
| 		t.Fatalf("Didn't get expected error") | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestStdCopyReturnsErrorReadingFrame(t *testing.T) { | ||||
| 	expectedError := errors.New("error") | ||||
| 	stdOutBytes := []byte(strings.Repeat("o", startingBufLen)) | ||||
| 	stdErrBytes := []byte(strings.Repeat("e", startingBufLen)) | ||||
| 	buffer, err := getSrcBuffer(stdOutBytes, stdErrBytes) | ||||
| 	if err != nil { | ||||
| 		t.Fatal(err) | ||||
| 	} | ||||
| 	reader := &customReader{ | ||||
| 		correctCalls: 1, | ||||
| 		n:            stdWriterPrefixLen + 1, | ||||
| 		err:          expectedError, | ||||
| 		src:          buffer} | ||||
| 	written, err := StdCopy(ioutil.Discard, ioutil.Discard, reader) | ||||
| 	if written != 0 { | ||||
| 		t.Fatalf("Expected 0 bytes read, got %d", written) | ||||
| 	} | ||||
| 	if err != expectedError { | ||||
| 		t.Fatalf("Didn't get expected error") | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestStdCopyDetectsCorruptedFrame(t *testing.T) { | ||||
| 	stdOutBytes := []byte(strings.Repeat("o", startingBufLen)) | ||||
| 	stdErrBytes := []byte(strings.Repeat("e", startingBufLen)) | ||||
| 	buffer, err := getSrcBuffer(stdOutBytes, stdErrBytes) | ||||
| 	if err != nil { | ||||
| 		t.Fatal(err) | ||||
| 	} | ||||
| 	reader := &customReader{ | ||||
| 		correctCalls: 1, | ||||
| 		n:            stdWriterPrefixLen + 1, | ||||
| 		err:          io.EOF, | ||||
| 		src:          buffer} | ||||
| 	written, err := StdCopy(ioutil.Discard, ioutil.Discard, reader) | ||||
| 	if written != startingBufLen { | ||||
| 		t.Fatalf("Expected %d bytes read, got %d", startingBufLen, written) | ||||
| 	} | ||||
| 	if err != nil { | ||||
| 		t.Fatal("Didn't get nil error") | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestStdCopyWithInvalidInputHeader(t *testing.T) { | ||||
| 	dstOut := NewStdWriter(ioutil.Discard, Stdout) | ||||
| 	dstErr := NewStdWriter(ioutil.Discard, Stderr) | ||||
| 	src := strings.NewReader("Invalid input") | ||||
| 	_, err := StdCopy(dstOut, dstErr, src) | ||||
| 	if err == nil { | ||||
| 		t.Fatal("StdCopy with invalid input header should fail.") | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestStdCopyWithCorruptedPrefix(t *testing.T) { | ||||
| 	data := []byte{0x01, 0x02, 0x03} | ||||
| 	src := bytes.NewReader(data) | ||||
| 	written, err := StdCopy(nil, nil, src) | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("StdCopy should not return an error with corrupted prefix.") | ||||
| 	} | ||||
| 	if written != 0 { | ||||
| 		t.Fatalf("StdCopy should have written 0, but has written %d", written) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestStdCopyReturnsWriteErrors(t *testing.T) { | ||||
| 	stdOutBytes := []byte(strings.Repeat("o", startingBufLen)) | ||||
| 	stdErrBytes := []byte(strings.Repeat("e", startingBufLen)) | ||||
| 	buffer, err := getSrcBuffer(stdOutBytes, stdErrBytes) | ||||
| 	if err != nil { | ||||
| 		t.Fatal(err) | ||||
| 	} | ||||
| 	expectedError := errors.New("expected") | ||||
|  | ||||
| 	dstOut := &errWriter{err: expectedError} | ||||
|  | ||||
| 	written, err := StdCopy(dstOut, ioutil.Discard, buffer) | ||||
| 	if written != 0 { | ||||
| 		t.Fatalf("StdCopy should have written 0, but has written %d", written) | ||||
| 	} | ||||
| 	if err != expectedError { | ||||
| 		t.Fatalf("Didn't get expected error, got %v", err) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestStdCopyDetectsNotFullyWrittenFrames(t *testing.T) { | ||||
| 	stdOutBytes := []byte(strings.Repeat("o", startingBufLen)) | ||||
| 	stdErrBytes := []byte(strings.Repeat("e", startingBufLen)) | ||||
| 	buffer, err := getSrcBuffer(stdOutBytes, stdErrBytes) | ||||
| 	if err != nil { | ||||
| 		t.Fatal(err) | ||||
| 	} | ||||
| 	dstOut := &errWriter{n: startingBufLen - 10} | ||||
|  | ||||
| 	written, err := StdCopy(dstOut, ioutil.Discard, buffer) | ||||
| 	if written != 0 { | ||||
| 		t.Fatalf("StdCopy should have return 0 written bytes, but returned %d", written) | ||||
| 	} | ||||
| 	if err != io.ErrShortWrite { | ||||
| 		t.Fatalf("Didn't get expected io.ErrShortWrite error") | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func BenchmarkWrite(b *testing.B) { | ||||
| 	w := NewStdWriter(ioutil.Discard, Stdout) | ||||
| 	data := []byte("Test line for testing stdwriter performance\n") | ||||
| 	data = bytes.Repeat(data, 100) | ||||
| 	b.SetBytes(int64(len(data))) | ||||
| 	b.ResetTimer() | ||||
| 	for i := 0; i < b.N; i++ { | ||||
| 		if _, err := w.Write(data); err != nil { | ||||
| 			b.Fatal(err) | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
							
								
								
									
										102
									
								
								engine/runner/docker/util.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										102
									
								
								engine/runner/docker/util.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,102 @@ | ||||
| package docker | ||||
|  | ||||
| import ( | ||||
| 	"fmt" | ||||
| 	"strings" | ||||
|  | ||||
| 	"github.com/drone/drone/engine/runner" | ||||
| 	"github.com/samalba/dockerclient" | ||||
| ) | ||||
|  | ||||
| // helper function that converts the Continer data structure to the exepcted | ||||
| // dockerclient.ContainerConfig. | ||||
| func toContainerConfig(c *runner.Container) *dockerclient.ContainerConfig { | ||||
| 	config := &dockerclient.ContainerConfig{ | ||||
| 		Image:      c.Image, | ||||
| 		Env:        toEnvironmentSlice(c.Environment), | ||||
| 		Cmd:        c.Command, | ||||
| 		Entrypoint: c.Entrypoint, | ||||
| 		WorkingDir: c.WorkingDir, | ||||
| 		HostConfig: dockerclient.HostConfig{ | ||||
| 			Privileged:       c.Privileged, | ||||
| 			NetworkMode:      c.Network, | ||||
| 			Memory:           c.MemLimit, | ||||
| 			CpuShares:        c.CPUShares, | ||||
| 			CpuQuota:         c.CPUQuota, | ||||
| 			CpusetCpus:       c.CPUSet, | ||||
| 			MemorySwappiness: -1, | ||||
| 			OomKillDisable:   c.OomKillDisable, | ||||
| 		}, | ||||
| 	} | ||||
|  | ||||
| 	if len(config.Entrypoint) == 0 { | ||||
| 		config.Entrypoint = nil | ||||
| 	} | ||||
| 	if len(config.Cmd) == 0 { | ||||
| 		config.Cmd = nil | ||||
| 	} | ||||
| 	if len(c.ExtraHosts) > 0 { | ||||
| 		config.HostConfig.ExtraHosts = c.ExtraHosts | ||||
| 	} | ||||
| 	if len(c.DNS) != 0 { | ||||
| 		config.HostConfig.Dns = c.DNS | ||||
| 	} | ||||
| 	if len(c.DNSSearch) != 0 { | ||||
| 		config.HostConfig.DnsSearch = c.DNSSearch | ||||
| 	} | ||||
| 	if len(c.VolumesFrom) != 0 { | ||||
| 		config.HostConfig.VolumesFrom = c.VolumesFrom | ||||
| 	} | ||||
|  | ||||
| 	config.Volumes = map[string]struct{}{} | ||||
| 	for _, path := range c.Volumes { | ||||
| 		if strings.Index(path, ":") == -1 { | ||||
| 			config.Volumes[path] = struct{}{} | ||||
| 			continue | ||||
| 		} | ||||
| 		parts := strings.Split(path, ":") | ||||
| 		config.Volumes[parts[1]] = struct{}{} | ||||
| 		config.HostConfig.Binds = append(config.HostConfig.Binds, path) | ||||
| 	} | ||||
|  | ||||
| 	for _, path := range c.Devices { | ||||
| 		if strings.Index(path, ":") == -1 { | ||||
| 			continue | ||||
| 		} | ||||
| 		parts := strings.Split(path, ":") | ||||
| 		device := dockerclient.DeviceMapping{ | ||||
| 			PathOnHost:        parts[0], | ||||
| 			PathInContainer:   parts[1], | ||||
| 			CgroupPermissions: "rwm", | ||||
| 		} | ||||
| 		config.HostConfig.Devices = append(config.HostConfig.Devices, device) | ||||
| 	} | ||||
|  | ||||
| 	return config | ||||
| } | ||||
|  | ||||
| // helper function that converts the AuthConfig data structure to the exepcted | ||||
| // dockerclient.AuthConfig. | ||||
| func toAuthConfig(container *runner.Container) *dockerclient.AuthConfig { | ||||
| 	if container.AuthConfig.Username == "" && | ||||
| 		container.AuthConfig.Password == "" && | ||||
| 		container.AuthConfig.Token == "" { | ||||
| 		return nil | ||||
| 	} | ||||
| 	return &dockerclient.AuthConfig{ | ||||
| 		Email:         container.AuthConfig.Email, | ||||
| 		Username:      container.AuthConfig.Username, | ||||
| 		Password:      container.AuthConfig.Password, | ||||
| 		RegistryToken: container.AuthConfig.Token, | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // helper function that converts a key value map of environment variables to a | ||||
| // string slice in key=value format. | ||||
| func toEnvironmentSlice(env map[string]string) []string { | ||||
| 	var envs []string | ||||
| 	for k, v := range env { | ||||
| 		envs = append(envs, fmt.Sprintf("%s=%s", k, v)) | ||||
| 	} | ||||
| 	return envs | ||||
| } | ||||
							
								
								
									
										24
									
								
								engine/runner/docker/util_test.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										24
									
								
								engine/runner/docker/util_test.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,24 @@ | ||||
| package docker | ||||
|  | ||||
| import ( | ||||
| 	"testing" | ||||
| ) | ||||
|  | ||||
| func Test_toContainerConfig(t *testing.T) { | ||||
| 	t.Skip() | ||||
| } | ||||
|  | ||||
| func Test_toAuthConfig(t *testing.T) { | ||||
| 	t.Skip() | ||||
| } | ||||
|  | ||||
| func Test_toEnvironmentSlice(t *testing.T) { | ||||
| 	env := map[string]string{ | ||||
| 		"HOME": "/root", | ||||
| 	} | ||||
| 	envs := toEnvironmentSlice(env) | ||||
| 	want, got := "HOME=/root", envs[0] | ||||
| 	if want != got { | ||||
| 		t.Errorf("Wanted envar %s got %s", want, got) | ||||
| 	} | ||||
| } | ||||
| @@ -2,8 +2,8 @@ package runner | ||||
|  | ||||
| import ( | ||||
| 	"bufio" | ||||
| 	"time" | ||||
| 	"fmt" | ||||
| 	"time" | ||||
|  | ||||
| 	"github.com/drone/drone/engine/runner/parse" | ||||
|  | ||||
| @@ -59,7 +59,7 @@ type Runner struct { | ||||
| // Run starts the build runner but does not wait for it to complete. The Wait | ||||
| // method will return the exit code and release associated resources once the | ||||
| // running containers exit. | ||||
| func (r *Runner) Run() error { | ||||
| func (r *Runner) Run() { | ||||
|  | ||||
| 	go func() { | ||||
| 		r.setup() | ||||
| @@ -74,8 +74,6 @@ func (r *Runner) Run() error { | ||||
| 		<-r.ctx.Done() | ||||
| 		r.cancel() | ||||
| 	}() | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // Wait waits for the runner to exit. | ||||
|   | ||||
							
								
								
									
										31
									
								
								web/hook.go
									
									
									
									
									
								
							
							
						
						
									
										31
									
								
								web/hook.go
									
									
									
									
									
								
							| @@ -17,6 +17,7 @@ import ( | ||||
| 	"github.com/drone/drone/shared/httputil" | ||||
| 	"github.com/drone/drone/shared/token" | ||||
| 	"github.com/drone/drone/store" | ||||
| 	"github.com/drone/drone/queue" | ||||
| ) | ||||
|  | ||||
| var ( | ||||
| @@ -204,6 +205,35 @@ func PostHook(c *gin.Context) { | ||||
| 	// on status change notifications | ||||
| 	last, _ := store.GetBuildLastBefore(c, repo, build.Branch, build.ID) | ||||
|  | ||||
| 	// IMPORTANT. PLEASE READ | ||||
| 	// | ||||
| 	// The below code uses a feature flag to switch between the current | ||||
| 	// build engine and the exerimental 0.5 build engine. This can be | ||||
| 	// enabled using with the environment variable CANARY=true | ||||
|  | ||||
| 	if os.Getenv("CANARY") == "true" { | ||||
| 		for _, job := range jobs { | ||||
| 			queue.Publish(c, &queue.Work{ | ||||
| 				User:      user, | ||||
| 				Repo:      repo, | ||||
| 				Build:     build, | ||||
| 				BuildLast: last, | ||||
| 				Job:       job, | ||||
| 				Keys:      key, | ||||
| 				Netrc:     netrc, | ||||
| 				Yaml:      string(raw), | ||||
| 				YamlEnc:   string(sec), | ||||
| 				System: &model.System{ | ||||
| 					Link:      httputil.GetURL(c.Request), | ||||
| 					Plugins:   strings.Split(os.Getenv("PLUGIN_FILTER"), " "), | ||||
| 					Globals:   strings.Split(os.Getenv("PLUGIN_PARAMS"), " "), | ||||
| 					Escalates: strings.Split(os.Getenv("ESCALATE_FILTER"), " "), | ||||
| 				}, | ||||
| 			}) | ||||
| 		} | ||||
| 		return // EXIT NOT TO AVOID THE 0.4 ENGINE CODE BELOW | ||||
| 	} | ||||
|  | ||||
| 	engine_ := engine.FromContext(c) | ||||
| 	go engine_.Schedule(c.Copy(), &engine.Task{ | ||||
| 		User:      user, | ||||
| @@ -222,5 +252,4 @@ func PostHook(c *gin.Context) { | ||||
| 			Escalates: strings.Split(os.Getenv("ESCALATE_FILTER"), " "), | ||||
| 		}, | ||||
| 	}) | ||||
|  | ||||
| } | ||||
|   | ||||
							
								
								
									
										126
									
								
								web/stream2.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										126
									
								
								web/stream2.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,126 @@ | ||||
| package web | ||||
|  | ||||
| import ( | ||||
| 	"bufio" | ||||
| 	"encoding/json" | ||||
| 	"io" | ||||
| 	"strconv" | ||||
|  | ||||
| 	"github.com/gin-gonic/gin" | ||||
|  | ||||
| 	"github.com/drone/drone/bus" | ||||
| 	"github.com/drone/drone/model" | ||||
| 	"github.com/drone/drone/router/middleware/session" | ||||
| 	"github.com/drone/drone/store" | ||||
| 	"github.com/drone/drone/stream" | ||||
|  | ||||
| 	log "github.com/Sirupsen/logrus" | ||||
|  | ||||
| 	"github.com/manucorporat/sse" | ||||
| ) | ||||
|  | ||||
| // IMPORTANT. PLEASE READ | ||||
| // | ||||
| // This file containers experimental streaming features for the 0.5 | ||||
| // release. These can be enabled with the feature flag CANARY=true | ||||
|  | ||||
| // GetRepoEvents will upgrade the connection to a Websocket and will stream | ||||
| // event updates to the browser. | ||||
| func GetRepoEvents2(c *gin.Context) { | ||||
| 	repo := session.Repo(c) | ||||
| 	c.Writer.Header().Set("Content-Type", "text/event-stream") | ||||
|  | ||||
| 	eventc := make(chan *bus.Event, 1) | ||||
| 	bus.Subscribe(c, eventc) | ||||
| 	defer func() { | ||||
| 		bus.Unsubscribe(c, eventc) | ||||
| 		close(eventc) | ||||
| 		log.Infof("closed event stream") | ||||
| 	}() | ||||
|  | ||||
| 	c.Stream(func(w io.Writer) bool { | ||||
| 		select { | ||||
| 		case event := <-eventc: | ||||
| 			if event == nil { | ||||
| 				log.Infof("nil event received") | ||||
| 				return false | ||||
| 			} | ||||
|  | ||||
| 			if event.Repo.FullName == repo.FullName { | ||||
|  | ||||
| 				var payload = struct { | ||||
| 					model.Build | ||||
| 					Jobs []*model.Job `json:"jobs"` | ||||
| 				}{} | ||||
| 				payload.Build = event.Build | ||||
| 				payload.Jobs, _ = store.GetJobList(c, &event.Build) | ||||
| 				data, _ := json.Marshal(&payload) | ||||
|  | ||||
| 				sse.Encode(w, sse.Event{ | ||||
| 					Event: "message", | ||||
| 					Data:  string(data), | ||||
| 				}) | ||||
| 			} | ||||
| 		case <-c.Writer.CloseNotify(): | ||||
| 			return false | ||||
| 		} | ||||
| 		return true | ||||
| 	}) | ||||
| } | ||||
|  | ||||
| func GetStream2(c *gin.Context) { | ||||
|  | ||||
| 	repo := session.Repo(c) | ||||
| 	buildn, _ := strconv.Atoi(c.Param("build")) | ||||
| 	jobn, _ := strconv.Atoi(c.Param("number")) | ||||
|  | ||||
| 	c.Writer.Header().Set("Content-Type", "text/event-stream") | ||||
|  | ||||
| 	build, err := store.GetBuildNumber(c, repo, buildn) | ||||
| 	if err != nil { | ||||
| 		log.Debugln("stream cannot get build number.", err) | ||||
| 		c.AbortWithError(404, err) | ||||
| 		return | ||||
| 	} | ||||
| 	job, err := store.GetJobNumber(c, build, jobn) | ||||
| 	if err != nil { | ||||
| 		log.Debugln("stream cannot get job number.", err) | ||||
| 		c.AbortWithError(404, err) | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	rc, wc, err := stream.Open(c, stream.ToKey(job.ID)) | ||||
| 	if err != nil { | ||||
| 		c.AbortWithError(404, err) | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	defer func() { | ||||
| 		if wc != nil { | ||||
| 			wc.Close() | ||||
| 		} | ||||
| 		if rc != nil { | ||||
| 			rc.Close() | ||||
| 		} | ||||
| 	}() | ||||
|  | ||||
| 	go func() { | ||||
| 		<-c.Writer.CloseNotify() | ||||
| 		rc.Close() | ||||
| 	}() | ||||
|  | ||||
| 	var line int | ||||
| 	var scanner = bufio.NewScanner(rc) | ||||
| 	for scanner.Scan() { | ||||
| 		line++ | ||||
| 		var err = sse.Encode(c.Writer, sse.Event{ | ||||
| 			Id:    strconv.Itoa(line), | ||||
| 			Event: "message", | ||||
| 			Data:  scanner.Text(), | ||||
| 		}) | ||||
| 		if err != nil { | ||||
| 			break | ||||
| 		} | ||||
| 		c.Writer.Flush() | ||||
| 	} | ||||
| } | ||||
		Reference in New Issue
	
	Block a user