You've already forked woodpecker
mirror of
https://github.com/woodpecker-ci/woodpecker.git
synced 2026-06-03 16:35:37 +02:00
Rework log streaming and related functions (#1802)
closes #1801 closes #1815 closes #1144 closes #983 closes #557 closes #1827 regression of #1791 # TODO - [x] adjust log model - [x] add migration for logs - [x] send log line via grpc using step-id - [x] save log-line to db - [x] stream log-lines to UI - [x] use less structs for log-data - [x] make web UI work - [x] display logs loaded from db - [x] display streaming logs - [ ] ~~make migration work~~ -> dedicated pull (#1828) # TESTED - [x] new logs are stored in database - [x] log retrieval via cli (of new logs) works - [x] log streaming works (tested via curl & webui) - [x] log retrieval via web (of new logs) works --------- Co-authored-by: 6543 <6543@obermui.de>
This commit is contained in:
+28
-82
@@ -19,17 +19,14 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/rs/zerolog/log"
|
||||
|
||||
"github.com/woodpecker-ci/woodpecker/server"
|
||||
"github.com/woodpecker-ci/woodpecker/server/model"
|
||||
@@ -190,75 +187,29 @@ func GetPipelineLast(c *gin.Context) {
|
||||
c.JSON(http.StatusOK, pl)
|
||||
}
|
||||
|
||||
// GetPipelineLogs
|
||||
//
|
||||
// @Summary Log information per step
|
||||
// @Router /repos/{owner}/{name}/logs/{number}/{pid}/{step} [get]
|
||||
// @Produce plain
|
||||
// @Success 200
|
||||
// @Tags Pipeline logs
|
||||
// @Param Authorization header string true "Insert your personal access token" default(Bearer <personal access token>)
|
||||
// @Param owner path string true "the repository owner's name"
|
||||
// @Param name path string true "the repository name"
|
||||
// @Param number path int true "the number of the pipeline"
|
||||
// @Param pid path int true "the pipeline id"
|
||||
// @Param step path int true "the step name"
|
||||
func GetPipelineLogs(c *gin.Context) {
|
||||
_store := store.FromContext(c)
|
||||
repo := session.Repo(c)
|
||||
|
||||
// parse the pipeline number and step sequence number from
|
||||
// the request parameter.
|
||||
num, _ := strconv.ParseInt(c.Params.ByName("number"), 10, 64)
|
||||
ppid, _ := strconv.Atoi(c.Params.ByName("pid"))
|
||||
name := c.Params.ByName("step")
|
||||
|
||||
pl, err := _store.GetPipelineNumber(repo, num)
|
||||
if err != nil {
|
||||
handleDbGetError(c, err)
|
||||
return
|
||||
}
|
||||
|
||||
step, err := _store.StepChild(pl, ppid, name)
|
||||
if err != nil {
|
||||
handleDbGetError(c, err)
|
||||
return
|
||||
}
|
||||
|
||||
rc, err := _store.LogFind(step)
|
||||
if err != nil {
|
||||
handleDbGetError(c, err)
|
||||
return
|
||||
}
|
||||
|
||||
defer rc.Close()
|
||||
|
||||
c.Header("Content-Type", "application/json")
|
||||
if _, err := io.Copy(c.Writer, rc); err != nil {
|
||||
log.Error().Err(err).Msg("could not copy log to http response")
|
||||
}
|
||||
}
|
||||
|
||||
// GetStepLogs
|
||||
//
|
||||
// @Summary Log information
|
||||
// @Router /repos/{owner}/{name}/logs/{number}/{pid} [get]
|
||||
// @Produce plain
|
||||
// @Success 200
|
||||
// @Tags Pipeline logs
|
||||
// @Router /repos/{owner}/{name}/logs/{number}/{stepID} [get]
|
||||
// @Produce json
|
||||
// @Success 200 {array} LogEntry
|
||||
// @Tags Pipeline logs
|
||||
// @Param Authorization header string true "Insert your personal access token" default(Bearer <personal access token>)
|
||||
// @Param owner path string true "the repository owner's name"
|
||||
// @Param name path string true "the repository name"
|
||||
// @Param number path int true "the number of the pipeline"
|
||||
// @Param pid path int true "the pipeline id"
|
||||
// @Param number path int true "the number of the pipeline"
|
||||
// @Param stepID path int true "the step id"
|
||||
func GetStepLogs(c *gin.Context) {
|
||||
_store := store.FromContext(c)
|
||||
repo := session.Repo(c)
|
||||
|
||||
// parse the pipeline number and step sequence number from
|
||||
// the request parameter.
|
||||
num, _ := strconv.ParseInt(c.Params.ByName("number"), 10, 64)
|
||||
pid, _ := strconv.Atoi(c.Params.ByName("pid"))
|
||||
num, err := strconv.ParseInt(c.Params.ByName("number"), 10, 64)
|
||||
if err != nil {
|
||||
_ = c.AbortWithError(http.StatusBadRequest, err)
|
||||
return
|
||||
}
|
||||
|
||||
pl, err := _store.GetPipelineNumber(repo, num)
|
||||
if err != nil {
|
||||
@@ -266,24 +217,31 @@ func GetStepLogs(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
step, err := _store.StepFind(pl, pid)
|
||||
stepID, err := strconv.ParseInt(c.Params.ByName("stepId"), 10, 64)
|
||||
if err != nil {
|
||||
_ = c.AbortWithError(http.StatusBadRequest, err)
|
||||
return
|
||||
}
|
||||
|
||||
step, err := _store.StepLoad(stepID)
|
||||
if err != nil {
|
||||
handleDbGetError(c, err)
|
||||
return
|
||||
}
|
||||
|
||||
rc, err := _store.LogFind(step)
|
||||
if step.PipelineID != pl.ID {
|
||||
// make sure we can not read arbitrary logs by id
|
||||
_ = c.AbortWithError(http.StatusBadRequest, fmt.Errorf("step with id %d is not part of repo %s", stepID, repo.FullName))
|
||||
return
|
||||
}
|
||||
|
||||
logs, err := _store.LogFind(step)
|
||||
if err != nil {
|
||||
handleDbGetError(c, err)
|
||||
return
|
||||
}
|
||||
|
||||
defer rc.Close()
|
||||
|
||||
c.Header("Content-Type", "application/json")
|
||||
if _, err := io.Copy(c.Writer, rc); err != nil {
|
||||
log.Error().Err(err).Msg("could not copy log to http response")
|
||||
}
|
||||
c.JSON(http.StatusOK, logs)
|
||||
}
|
||||
|
||||
// GetPipelineConfig
|
||||
@@ -532,7 +490,6 @@ func DeletePipelineLogs(c *gin.Context) {
|
||||
_store := store.FromContext(c)
|
||||
|
||||
repo := session.Repo(c)
|
||||
user := session.User(c)
|
||||
num, _ := strconv.ParseInt(c.Params.ByName("number"), 10, 64)
|
||||
|
||||
pl, err := _store.GetPipelineNumber(repo, num)
|
||||
@@ -554,11 +511,8 @@ func DeletePipelineLogs(c *gin.Context) {
|
||||
}
|
||||
|
||||
for _, step := range steps {
|
||||
t := time.Now().UTC()
|
||||
buf := bytes.NewBufferString(fmt.Sprintf(deleteStr, step.Name, user.Login, t.Format(time.UnixDate)))
|
||||
lerr := _store.LogSave(step, buf)
|
||||
if lerr != nil {
|
||||
err = lerr
|
||||
if lErr := _store.LogDelete(step); err != nil {
|
||||
err = errors.Join(err, lErr)
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
@@ -568,11 +522,3 @@ func DeletePipelineLogs(c *gin.Context) {
|
||||
|
||||
c.String(http.StatusNoContent, "")
|
||||
}
|
||||
|
||||
var deleteStr = `[
|
||||
{
|
||||
"step": %q,
|
||||
"pos": 0,
|
||||
"out": "logs purged by %s on %s\n"
|
||||
}
|
||||
]`
|
||||
|
||||
+40
-16
@@ -17,6 +17,7 @@ package api
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
@@ -27,7 +28,6 @@ import (
|
||||
"github.com/rs/zerolog/log"
|
||||
|
||||
"github.com/woodpecker-ci/woodpecker/server"
|
||||
"github.com/woodpecker-ci/woodpecker/server/logging"
|
||||
"github.com/woodpecker-ci/woodpecker/server/model"
|
||||
"github.com/woodpecker-ci/woodpecker/server/pubsub"
|
||||
"github.com/woodpecker-ci/woodpecker/server/router/middleware/session"
|
||||
@@ -121,6 +121,17 @@ func EventStreamSSE(c *gin.Context) {
|
||||
}
|
||||
}
|
||||
|
||||
// LogStream
|
||||
//
|
||||
// @Summary Log stream
|
||||
// @Router /logs/{owner}/{name}/{pipeline}/{stepID} [get]
|
||||
// @Produce plain
|
||||
// @Success 200
|
||||
// @Tags Pipeline logs
|
||||
// @Param owner path string true "the repository owner's name"
|
||||
// @Param name path string true "the repository name"
|
||||
// @Param pipeline path int true "the number of the pipeline"
|
||||
// @Param stepID path int true "the step id"
|
||||
func LogStreamSSE(c *gin.Context) {
|
||||
c.Header("Content-Type", "text/event-stream")
|
||||
c.Header("Cache-Control", "no-cache")
|
||||
@@ -138,26 +149,43 @@ func LogStreamSSE(c *gin.Context) {
|
||||
logWriteStringErr(io.WriteString(rw, ": ping\n\n"))
|
||||
flusher.Flush()
|
||||
|
||||
repo := session.Repo(c)
|
||||
_store := store.FromContext(c)
|
||||
repo := session.Repo(c)
|
||||
|
||||
// // parse the pipeline number and step sequence number from
|
||||
// // the request parameter.
|
||||
pipelinen, _ := strconv.ParseInt(c.Param("pipeline"), 10, 64)
|
||||
stepn, _ := strconv.Atoi(c.Param("number"))
|
||||
|
||||
pipeline, err := _store.GetPipelineNumber(repo, pipelinen)
|
||||
pipeline, err := strconv.ParseInt(c.Param("pipeline"), 10, 64)
|
||||
if err != nil {
|
||||
log.Debug().Err(err).Msg("pipeline number invalid")
|
||||
logWriteStringErr(io.WriteString(rw, "event: error\ndata: pipeline number invalid\n\n"))
|
||||
return
|
||||
}
|
||||
pl, err := _store.GetPipelineNumber(repo, pipeline)
|
||||
if err != nil {
|
||||
log.Debug().Msgf("stream cannot get pipeline number: %v", err)
|
||||
logWriteStringErr(io.WriteString(rw, "event: error\ndata: pipeline not found\n\n"))
|
||||
return
|
||||
}
|
||||
step, err := _store.StepFind(pipeline, stepn)
|
||||
|
||||
stepID, err := strconv.ParseInt(c.Param("stepId"), 10, 64)
|
||||
if err != nil {
|
||||
log.Debug().Err(err).Msg("step id invalid")
|
||||
logWriteStringErr(io.WriteString(rw, "event: error\ndata: step id invalid\n\n"))
|
||||
return
|
||||
}
|
||||
step, err := _store.StepLoad(stepID)
|
||||
if err != nil {
|
||||
log.Debug().Msgf("stream cannot get step number: %v", err)
|
||||
logWriteStringErr(io.WriteString(rw, "event: error\ndata: process not found\n\n"))
|
||||
return
|
||||
}
|
||||
|
||||
if step.PipelineID != pl.ID {
|
||||
// make sure we can not read arbitrary logs by id
|
||||
err = fmt.Errorf("step with id %d is not part of repo %s", stepID, repo.FullName)
|
||||
log.Debug().Err(err).Msg("event error")
|
||||
logWriteStringErr(io.WriteString(rw, "event: error\ndata: "+err.Error()+"\n\n"))
|
||||
return
|
||||
}
|
||||
|
||||
if step.State != model.StatusRunning {
|
||||
log.Debug().Msg("stream not found.")
|
||||
logWriteStringErr(io.WriteString(rw, "event: error\ndata: stream not found\n\n"))
|
||||
@@ -178,18 +206,14 @@ func LogStreamSSE(c *gin.Context) {
|
||||
}()
|
||||
|
||||
go func() {
|
||||
// TODO remove global variable
|
||||
err := server.Config.Services.Logs.Tail(ctx, fmt.Sprint(step.ID), func(entries ...*logging.Entry) {
|
||||
defer func() {
|
||||
obj := recover() // fix #2480 // TODO: check if it's still needed
|
||||
log.Trace().Msgf("pubsub subscribe recover return: %v", obj)
|
||||
}()
|
||||
err := server.Config.Services.Logs.Tail(ctx, step.ID, func(entries ...*model.LogEntry) {
|
||||
for _, entry := range entries {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
logc <- entry.Data
|
||||
ee, _ := json.Marshal(entry)
|
||||
logc <- ee
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
+41
-20
@@ -238,40 +238,41 @@ func (s *RPC) Done(c context.Context, id string, state rpc.State) error {
|
||||
|
||||
workflow, err := s.store.StepLoad(workflowID)
|
||||
if err != nil {
|
||||
log.Error().Msgf("error: cannot find step with id %d: %s", workflowID, err)
|
||||
log.Error().Err(err).Msgf("cannot find step with id %d", workflowID)
|
||||
return err
|
||||
}
|
||||
|
||||
currentPipeline, err := s.store.GetPipeline(workflow.PipelineID)
|
||||
if err != nil {
|
||||
log.Error().Msgf("error: cannot find pipeline with id %d: %s", workflow.PipelineID, err)
|
||||
log.Error().Err(err).Msgf("cannot find pipeline with id %d", workflow.PipelineID)
|
||||
return err
|
||||
}
|
||||
|
||||
repo, err := s.store.GetRepo(currentPipeline.RepoID)
|
||||
if err != nil {
|
||||
log.Error().Msgf("error: cannot find repo with id %d: %s", currentPipeline.RepoID, err)
|
||||
log.Error().Err(err).Msgf("cannot find repo with id %d", currentPipeline.RepoID)
|
||||
return err
|
||||
}
|
||||
|
||||
log.Trace().
|
||||
logger := log.With().
|
||||
Str("repo_id", fmt.Sprint(repo.ID)).
|
||||
Str("build_id", fmt.Sprint(currentPipeline.ID)).
|
||||
Str("step_id", id).
|
||||
Msgf("gRPC Done with state: %#v", state)
|
||||
Str("pipeline_id", fmt.Sprint(currentPipeline.ID)).
|
||||
Str("workflow_id", id).Logger()
|
||||
|
||||
logger.Trace().Msgf("gRPC Done with state: %#v", state)
|
||||
|
||||
if workflow, err = pipeline.UpdateStepStatusToDone(s.store, *workflow, state); err != nil {
|
||||
log.Error().Msgf("error: done: cannot update step_id %d state: %s", workflow.ID, err)
|
||||
logger.Error().Err(err).Msgf("pipeline.UpdateStepStatusToDone: cannot update workflow state: %s", err)
|
||||
}
|
||||
|
||||
var queueErr error
|
||||
if workflow.Failing() {
|
||||
queueErr = s.queue.Error(c, id, fmt.Errorf("Step finished with exitcode %d, %s", state.ExitCode, state.Error))
|
||||
queueErr = s.queue.Error(c, id, fmt.Errorf("Step finished with exit code %d, %s", state.ExitCode, state.Error))
|
||||
} else {
|
||||
queueErr = s.queue.Done(c, id, workflow.State)
|
||||
}
|
||||
if queueErr != nil {
|
||||
log.Error().Msgf("error: done: cannot ack step_id %d: %s", workflowID, err)
|
||||
logger.Error().Err(queueErr).Msg("queue.Done: cannot ack workflow")
|
||||
}
|
||||
|
||||
steps, err := s.store.StepList(currentPipeline)
|
||||
@@ -282,15 +283,20 @@ func (s *RPC) Done(c context.Context, id string, state rpc.State) error {
|
||||
|
||||
if !model.IsThereRunningStage(steps) {
|
||||
if currentPipeline, err = pipeline.UpdateStatusToDone(s.store, *currentPipeline, model.PipelineStatus(steps), workflow.Stopped); err != nil {
|
||||
log.Error().Err(err).Msgf("error: done: cannot update build_id %d final state", currentPipeline.ID)
|
||||
logger.Error().Err(err).Msgf("pipeline.UpdateStatusToDone: cannot update workflow final state")
|
||||
}
|
||||
}
|
||||
|
||||
s.updateForgeStatus(c, repo, currentPipeline, workflow)
|
||||
|
||||
if err := s.logger.Close(c, id); err != nil {
|
||||
log.Error().Err(err).Msgf("done: cannot close build_id %d logger", workflow.ID)
|
||||
}
|
||||
// make sure writes to pubsub are non blocking (https://github.com/woodpecker-ci/woodpecker/blob/c919f32e0b6432a95e1a6d3d0ad662f591adf73f/server/logging/log.go#L9)
|
||||
go func() {
|
||||
for _, step := range steps {
|
||||
if err := s.logger.Close(c, step.ID); err != nil {
|
||||
logger.Error().Err(err).Msgf("done: cannot close log stream for step %d", step.ID)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
if err := s.notify(c, repo, currentPipeline, steps); err != nil {
|
||||
return err
|
||||
@@ -308,13 +314,28 @@ func (s *RPC) Done(c context.Context, id string, state rpc.State) error {
|
||||
}
|
||||
|
||||
// Log implements the rpc.Log function
|
||||
func (s *RPC) Log(c context.Context, id string, line *rpc.Line) error {
|
||||
entry := new(logging.Entry)
|
||||
entry.Data, _ = json.Marshal(line)
|
||||
if err := s.logger.Write(c, id, entry); err != nil {
|
||||
log.Error().Err(err).Msgf("rpc server could not write to logger")
|
||||
func (s *RPC) Log(c context.Context, _logEntry *rpc.LogEntry) error {
|
||||
// convert rpc log_entry to model.log_entry
|
||||
step, err := s.store.StepByUUID(_logEntry.StepUUID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not find step with uuid %s in store: %w", _logEntry.StepUUID, err)
|
||||
}
|
||||
return nil
|
||||
logEntry := &model.LogEntry{
|
||||
StepID: step.ID,
|
||||
Time: _logEntry.Time,
|
||||
Line: _logEntry.Line,
|
||||
Data: []byte(_logEntry.Data),
|
||||
Type: model.LogEntryType(_logEntry.Type),
|
||||
}
|
||||
// make sure writes to pubsub are non blocking (https://github.com/woodpecker-ci/woodpecker/blob/c919f32e0b6432a95e1a6d3d0ad662f591adf73f/server/logging/log.go#L9)
|
||||
go func() {
|
||||
// write line to listening web clients
|
||||
if err := s.logger.Write(c, logEntry.StepID, logEntry); err != nil {
|
||||
log.Error().Err(err).Msgf("rpc server could not write to logger")
|
||||
}
|
||||
}()
|
||||
// make line persistent in database
|
||||
return s.store.LogAppend(logEntry)
|
||||
}
|
||||
|
||||
func (s *RPC) RegisterAgent(ctx context.Context, platform, backend, version string, capacity int32) (int64, error) {
|
||||
|
||||
@@ -145,14 +145,15 @@ func (s *WoodpeckerServer) Extend(c context.Context, req *proto.ExtendRequest) (
|
||||
}
|
||||
|
||||
func (s *WoodpeckerServer) Log(c context.Context, req *proto.LogRequest) (*proto.Empty, error) {
|
||||
line := &rpc.Line{
|
||||
Out: req.GetLine().GetOut(),
|
||||
Pos: int(req.GetLine().GetPos()),
|
||||
Time: req.GetLine().GetTime(),
|
||||
Step: req.GetLine().GetStep(),
|
||||
logEntry := &rpc.LogEntry{
|
||||
Data: req.GetLogEntry().GetData(),
|
||||
Line: int(req.GetLogEntry().GetLine()),
|
||||
Time: req.GetLogEntry().GetTime(),
|
||||
StepUUID: req.GetLogEntry().GetStepUuid(),
|
||||
Type: int(req.GetLogEntry().GetType()),
|
||||
}
|
||||
res := new(proto.Empty)
|
||||
err := s.peer.Log(c, req.GetId(), line)
|
||||
err := s.peer.Log(c, logEntry)
|
||||
return res, err
|
||||
}
|
||||
|
||||
|
||||
+23
-44
@@ -2,8 +2,9 @@ package logging
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"sync"
|
||||
|
||||
"github.com/woodpecker-ci/woodpecker/server/model"
|
||||
)
|
||||
|
||||
// TODO (bradrydzewski) writing to subscribers is currently a blocking
|
||||
@@ -27,58 +28,58 @@ type subscriber struct {
|
||||
type stream struct {
|
||||
sync.Mutex
|
||||
|
||||
path string
|
||||
list []*Entry
|
||||
subs map[*subscriber]struct{}
|
||||
done chan struct{}
|
||||
stepID int64
|
||||
list []*model.LogEntry
|
||||
subs map[*subscriber]struct{}
|
||||
done chan struct{}
|
||||
}
|
||||
|
||||
type log struct {
|
||||
sync.Mutex
|
||||
|
||||
streams map[string]*stream
|
||||
streams map[int64]*stream
|
||||
}
|
||||
|
||||
// New returns a new logger.
|
||||
func New() Log {
|
||||
return &log{
|
||||
streams: map[string]*stream{},
|
||||
streams: map[int64]*stream{},
|
||||
}
|
||||
}
|
||||
|
||||
func (l *log) Open(_ context.Context, path string) error {
|
||||
func (l *log) Open(_ context.Context, stepID int64) error {
|
||||
l.Lock()
|
||||
_, ok := l.streams[path]
|
||||
_, ok := l.streams[stepID]
|
||||
if !ok {
|
||||
l.streams[path] = &stream{
|
||||
path: path,
|
||||
subs: make(map[*subscriber]struct{}),
|
||||
done: make(chan struct{}),
|
||||
l.streams[stepID] = &stream{
|
||||
stepID: stepID,
|
||||
subs: make(map[*subscriber]struct{}),
|
||||
done: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
l.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *log) Write(_ context.Context, path string, entry *Entry) error {
|
||||
func (l *log) Write(_ context.Context, stepID int64, logEntry *model.LogEntry) error {
|
||||
l.Lock()
|
||||
s, ok := l.streams[path]
|
||||
s, ok := l.streams[stepID]
|
||||
l.Unlock()
|
||||
if !ok {
|
||||
return ErrNotFound
|
||||
}
|
||||
s.Lock()
|
||||
s.list = append(s.list, entry)
|
||||
s.list = append(s.list, logEntry)
|
||||
for sub := range s.subs {
|
||||
go sub.handler(entry)
|
||||
go sub.handler(logEntry)
|
||||
}
|
||||
s.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *log) Tail(c context.Context, path string, handler Handler) error {
|
||||
func (l *log) Tail(c context.Context, stepID int64, handler Handler) error {
|
||||
l.Lock()
|
||||
s, ok := l.streams[path]
|
||||
s, ok := l.streams[stepID]
|
||||
l.Unlock()
|
||||
if !ok {
|
||||
return ErrNotFound
|
||||
@@ -105,9 +106,9 @@ func (l *log) Tail(c context.Context, path string, handler Handler) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *log) Close(_ context.Context, path string) error {
|
||||
func (l *log) Close(_ context.Context, stepID int64) error {
|
||||
l.Lock()
|
||||
s, ok := l.streams[path]
|
||||
s, ok := l.streams[stepID]
|
||||
l.Unlock()
|
||||
if !ok {
|
||||
return ErrNotFound
|
||||
@@ -118,29 +119,7 @@ func (l *log) Close(_ context.Context, path string) error {
|
||||
s.Unlock()
|
||||
|
||||
l.Lock()
|
||||
delete(l.streams, path)
|
||||
delete(l.streams, stepID)
|
||||
l.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *log) Snapshot(_ context.Context, path string, w io.Writer) error {
|
||||
l.Lock()
|
||||
s, ok := l.streams[path]
|
||||
l.Unlock()
|
||||
if !ok {
|
||||
return ErrNotFound
|
||||
}
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
for _, entry := range s.list {
|
||||
if _, err := w.Write(entry.Data); err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := w.Write(cr); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
var cr = []byte{'\n'}
|
||||
|
||||
@@ -7,14 +7,15 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/woodpecker-ci/woodpecker/server/model"
|
||||
)
|
||||
|
||||
func TestLogging(t *testing.T) {
|
||||
var (
|
||||
wg sync.WaitGroup
|
||||
|
||||
testPath = "test"
|
||||
testEntry = &Entry{
|
||||
testStepID = int64(123)
|
||||
testEntry = &model.LogEntry{
|
||||
Data: []byte("test"),
|
||||
}
|
||||
)
|
||||
@@ -24,27 +25,27 @@ func TestLogging(t *testing.T) {
|
||||
)
|
||||
|
||||
logger := New()
|
||||
assert.NoError(t, logger.Open(ctx, testPath))
|
||||
assert.NoError(t, logger.Open(ctx, testStepID))
|
||||
go func() {
|
||||
assert.NoError(t, logger.Tail(ctx, testPath, func(entry ...*Entry) { wg.Done() }))
|
||||
assert.NoError(t, logger.Tail(ctx, testStepID, func(entry ...*model.LogEntry) { wg.Done() }))
|
||||
}()
|
||||
go func() {
|
||||
assert.NoError(t, logger.Tail(ctx, testPath, func(entry ...*Entry) { wg.Done() }))
|
||||
assert.NoError(t, logger.Tail(ctx, testStepID, func(entry ...*model.LogEntry) { wg.Done() }))
|
||||
}()
|
||||
|
||||
<-time.After(500 * time.Millisecond)
|
||||
|
||||
wg.Add(4)
|
||||
go func() {
|
||||
assert.NoError(t, logger.Write(ctx, testPath, testEntry))
|
||||
assert.NoError(t, logger.Write(ctx, testPath, testEntry))
|
||||
assert.NoError(t, logger.Write(ctx, testStepID, testEntry))
|
||||
assert.NoError(t, logger.Write(ctx, testStepID, testEntry))
|
||||
}()
|
||||
|
||||
wg.Wait()
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
assert.NoError(t, logger.Tail(ctx, testPath, func(entry ...*Entry) { wg.Done() }))
|
||||
assert.NoError(t, logger.Tail(ctx, testStepID, func(entry ...*model.LogEntry) { wg.Done() }))
|
||||
}()
|
||||
|
||||
<-time.After(500 * time.Millisecond)
|
||||
|
||||
@@ -3,78 +3,27 @@ package logging
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
|
||||
"github.com/woodpecker-ci/woodpecker/server/model"
|
||||
)
|
||||
|
||||
// ErrNotFound is returned when the log does not exist.
|
||||
var ErrNotFound = errors.New("stream: not found")
|
||||
|
||||
// Entry defines a log entry.
|
||||
type Entry struct {
|
||||
// ID identifies this message.
|
||||
ID string `json:"id,omitempty"`
|
||||
|
||||
// Data is the actual data in the entry.
|
||||
Data []byte `json:"data"`
|
||||
|
||||
// Tags represents the key-value pairs the
|
||||
// entry is tagged with.
|
||||
Tags map[string]string `json:"tags,omitempty"`
|
||||
}
|
||||
|
||||
// Handler defines a callback function for handling log entries.
|
||||
type Handler func(...*Entry)
|
||||
type Handler func(...*model.LogEntry)
|
||||
|
||||
// Log defines a log multiplexer.
|
||||
type Log interface {
|
||||
// Open opens the log.
|
||||
Open(c context.Context, path string) error
|
||||
Open(c context.Context, stepID int64) error
|
||||
|
||||
// Write writes the entry to the log.
|
||||
Write(c context.Context, path string, entry *Entry) error
|
||||
Write(c context.Context, stepID int64, entry *model.LogEntry) error
|
||||
|
||||
// Tail tails the log.
|
||||
Tail(c context.Context, path string, handler Handler) error
|
||||
Tail(c context.Context, stepID int64, handler Handler) error
|
||||
|
||||
// Close closes the log.
|
||||
Close(c context.Context, path string) error
|
||||
|
||||
// Snapshot snapshots the stream to Writer w.
|
||||
Snapshot(c context.Context, path string, w io.Writer) error
|
||||
|
||||
// Info returns runtime information about the multiplexer.
|
||||
// Info(c context.Context) (interface{}, error)
|
||||
Close(c context.Context, stepID int64) error
|
||||
}
|
||||
|
||||
// // global streamer
|
||||
// var global = New()
|
||||
//
|
||||
// // Set sets a default global logger.
|
||||
// func Set(log Log) {
|
||||
// global = log
|
||||
// }
|
||||
//
|
||||
// // Open opens the log stream.
|
||||
// func Open(c context.Context, path string) error {
|
||||
// return global.Open(c, path)
|
||||
// }
|
||||
//
|
||||
// // Write writes the log entry to the stream.
|
||||
// func Write(c context.Context, path string, entry *Entry) error {
|
||||
// return global.Write(c, path, entry)
|
||||
// }
|
||||
//
|
||||
// // Tail tails the log stream.
|
||||
// func Tail(c context.Context, path string, handler Handler) error {
|
||||
// return global.Tail(c, path, handler)
|
||||
// }
|
||||
//
|
||||
// // Close closes the log stream.
|
||||
// func Close(c context.Context, path string) error {
|
||||
// return global.Close(c, path)
|
||||
// }
|
||||
//
|
||||
// // Snapshot snapshots the stream to Writer w.
|
||||
// func Snapshot(c context.Context, path string, w io.Writer) error {
|
||||
// return global.Snapshot(c, path, w)
|
||||
// }
|
||||
|
||||
+25
-5
@@ -14,9 +14,29 @@
|
||||
|
||||
package model
|
||||
|
||||
type Logs struct {
|
||||
ID int64 `xorm:"pk autoincr 'log_id'"`
|
||||
StepID int64 `xorm:"UNIQUE 'log_step_id'"`
|
||||
Data []byte `xorm:"LONGBLOB 'log_data'"`
|
||||
// TODO: add create timestamp
|
||||
// LogEntryType identifies the type of line in the logs.
|
||||
type LogEntryType int // @name LogEntryType
|
||||
|
||||
const (
|
||||
LogEntryStdout LogEntryType = iota
|
||||
LogEntryStderr
|
||||
LogEntryExitCode
|
||||
LogEntryMetadata
|
||||
LogEntryProgress
|
||||
)
|
||||
|
||||
type LogEntry struct {
|
||||
ID int64 `json:"id" xorm:"pk autoincr 'id'"`
|
||||
StepID int64 `json:"step_id" xorm:"'step_id'"`
|
||||
Time int64 `json:"time"`
|
||||
Line int `json:"line"`
|
||||
Data []byte `json:"data" xorm:"LONGBLOB"`
|
||||
Created int64 `json:"-" xorm:"created"`
|
||||
Type LogEntryType `json:"type"`
|
||||
} // @name LogEntry
|
||||
|
||||
// TODO: store info what specific command the line belongs to (must be optional and impl. by backend)
|
||||
|
||||
func (LogEntry) TableName() string {
|
||||
return "log_entries"
|
||||
}
|
||||
|
||||
@@ -31,6 +31,7 @@ type StepStore interface {
|
||||
// Step represents a process in the pipeline.
|
||||
type Step struct {
|
||||
ID int64 `json:"id" xorm:"pk autoincr 'step_id'"`
|
||||
UUID string `json:"uuid" xorm:"UNIQUE INDEX 'step_uuid'"`
|
||||
PipelineID int64 `json:"pipeline_id" xorm:"UNIQUE(s) INDEX 'step_pipeline_id'"`
|
||||
PID int `json:"pid" xorm:"UNIQUE(s) 'step_pid'"`
|
||||
PPID int `json:"ppid" xorm:"step_ppid"`
|
||||
|
||||
@@ -23,6 +23,7 @@ import (
|
||||
func TestTree(t *testing.T) {
|
||||
steps := []*Step{{
|
||||
ID: 25,
|
||||
UUID: "f80df0bb-77a7-4964-9412-2e1049872d57",
|
||||
PID: 2,
|
||||
PipelineID: 6,
|
||||
PPID: 1,
|
||||
@@ -32,6 +33,7 @@ func TestTree(t *testing.T) {
|
||||
Error: "0",
|
||||
}, {
|
||||
ID: 24,
|
||||
UUID: "c19b49c5-990d-4722-ba9c-1c4fe9db1f91",
|
||||
PipelineID: 6,
|
||||
PID: 1,
|
||||
PPID: 0,
|
||||
@@ -41,6 +43,7 @@ func TestTree(t *testing.T) {
|
||||
Error: "1",
|
||||
}, {
|
||||
ID: 26,
|
||||
UUID: "4380146f-c0ff-4482-8107-c90937d1faba",
|
||||
PipelineID: 6,
|
||||
PID: 3,
|
||||
PPID: 1,
|
||||
@@ -56,6 +59,7 @@ func TestTree(t *testing.T) {
|
||||
|
||||
steps = []*Step{{
|
||||
ID: 25,
|
||||
UUID: "f80df0bb-77a7-4964-9412-2e1049872d57",
|
||||
PID: 2,
|
||||
PipelineID: 6,
|
||||
PPID: 1,
|
||||
|
||||
@@ -131,18 +131,18 @@ func cancelPreviousPipelines(
|
||||
return err
|
||||
}
|
||||
|
||||
pipelineNeedsCancel := func(active *model.Pipeline) (bool, error) {
|
||||
pipelineNeedsCancel := func(active *model.Pipeline) bool {
|
||||
// always filter on same event
|
||||
if active.Event != pipeline.Event {
|
||||
return false, nil
|
||||
return false
|
||||
}
|
||||
|
||||
// find events for the same context
|
||||
switch pipeline.Event {
|
||||
case model.EventPush:
|
||||
return pipeline.Branch == active.Branch, nil
|
||||
return pipeline.Branch == active.Branch
|
||||
default:
|
||||
return pipeline.Refspec == active.Refspec, nil
|
||||
return pipeline.Refspec == active.Refspec
|
||||
}
|
||||
}
|
||||
|
||||
@@ -152,14 +152,7 @@ func cancelPreviousPipelines(
|
||||
continue
|
||||
}
|
||||
|
||||
cancel, err := pipelineNeedsCancel(active)
|
||||
if err != nil {
|
||||
log.Error().
|
||||
Err(err).
|
||||
Str("Ref", active.Ref).
|
||||
Msg("Error while trying to cancel pipeline, skipping")
|
||||
continue
|
||||
}
|
||||
cancel := pipelineNeedsCancel(active)
|
||||
|
||||
if !cancel {
|
||||
continue
|
||||
|
||||
@@ -51,7 +51,7 @@ func zeroSteps(currentPipeline *model.Pipeline, forgeYamlConfigs []*forge_types.
|
||||
return false
|
||||
}
|
||||
|
||||
// TODO: parse yaml once and not for each filter function
|
||||
// TODO: parse yaml once and not for each filter function (-> move server/pipeline/filter* into pipeline/step_builder)
|
||||
// Check if at least one pipeline step will be execute otherwise we will just ignore this webhook
|
||||
func checkIfFiltered(repo *model.Repo, p *model.Pipeline, forgeYamlConfigs []*forge_types.FileMeta) (bool, error) {
|
||||
log.Trace().Msgf("hook.branchFiltered(): pipeline branch: '%s' pipeline event: '%s' config count: %d", p.Branch, p.Event, len(forgeYamlConfigs))
|
||||
|
||||
@@ -49,9 +49,6 @@ func queuePipeline(repo *model.Repo, pipelineItems []*pipeline.Item) error {
|
||||
Timeout: repo.Timeout,
|
||||
})
|
||||
|
||||
if err := server.Config.Services.Logs.Open(context.Background(), task.ID); err != nil {
|
||||
return err
|
||||
}
|
||||
tasks = append(tasks, task)
|
||||
}
|
||||
return server.Config.Services.Queue.PushAtOnce(context.Background(), tasks)
|
||||
|
||||
@@ -20,6 +20,7 @@ import (
|
||||
"github.com/rs/zerolog/log"
|
||||
|
||||
"github.com/woodpecker-ci/woodpecker/pipeline"
|
||||
"github.com/woodpecker-ci/woodpecker/server"
|
||||
"github.com/woodpecker-ci/woodpecker/server/model"
|
||||
"github.com/woodpecker-ci/woodpecker/server/store"
|
||||
)
|
||||
@@ -46,6 +47,16 @@ func start(ctx context.Context, store store.Store, activePipeline *model.Pipelin
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// open logs streamer for each step
|
||||
go func() {
|
||||
steps := activePipeline.Steps
|
||||
for _, step := range steps {
|
||||
if err := server.Config.Services.Logs.Open(context.Background(), step.ID); err != nil {
|
||||
log.Error().Err(err).Msgf("could not open log stream for step %d", step.ID)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
updatePipelineStatus(ctx, activePipeline, repo, user)
|
||||
|
||||
return activePipeline, nil
|
||||
|
||||
+10
-8
@@ -89,8 +89,7 @@ func apiRoutes(e *gin.Engine) {
|
||||
repo.POST("/pipelines/:number/approve", session.MustPush, api.PostApproval)
|
||||
repo.POST("/pipelines/:number/decline", session.MustPush, api.PostDecline)
|
||||
|
||||
repo.GET("/logs/:number/:pid", api.GetStepLogs)
|
||||
repo.GET("/logs/:number/:pid/:step", api.GetPipelineLogs)
|
||||
repo.GET("/logs/:number/:stepId", api.GetStepLogs)
|
||||
|
||||
// requires push permissions
|
||||
repo.DELETE("/logs/:number", session.MustPush, api.DeletePipelineLogs)
|
||||
@@ -179,6 +178,15 @@ func apiRoutes(e *gin.Engine) {
|
||||
|
||||
apiBase.POST("/hook", api.PostHook)
|
||||
|
||||
stream := apiBase.Group("/stream")
|
||||
{
|
||||
stream.GET("/logs/:owner/:name/:pipeline/:stepId",
|
||||
session.SetRepo(),
|
||||
session.SetPerm(),
|
||||
session.MustPull,
|
||||
api.LogStreamSSE)
|
||||
}
|
||||
|
||||
if zerolog.GlobalLevel() <= zerolog.DebugLevel {
|
||||
debugger := apiBase.Group("/debug")
|
||||
{
|
||||
@@ -204,11 +212,5 @@ func apiRoutes(e *gin.Engine) {
|
||||
sse := e.Group("/stream")
|
||||
{
|
||||
sse.GET("/events", api.EventStreamSSE)
|
||||
sse.GET("/logs/:owner/:name/:pipeline/:number",
|
||||
session.SetRepo(),
|
||||
session.SetPerm(),
|
||||
session.MustPull,
|
||||
api.LogStreamSSE,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,50 +15,41 @@
|
||||
package datastore
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"io"
|
||||
"fmt"
|
||||
|
||||
"github.com/woodpecker-ci/woodpecker/server/model"
|
||||
)
|
||||
|
||||
func (s storage) LogFind(step *model.Step) (io.ReadCloser, error) {
|
||||
logs := &model.Logs{
|
||||
StepID: step.ID,
|
||||
}
|
||||
if err := wrapGet(s.engine.Get(logs)); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
buf := bytes.NewBuffer(logs.Data)
|
||||
return io.NopCloser(buf), nil
|
||||
func (s storage) LogFind(step *model.Step) ([]*model.LogEntry, error) {
|
||||
var logEntries []*model.LogEntry
|
||||
return logEntries, s.engine.Asc("id").Where("step_id = ?", step.ID).Find(&logEntries)
|
||||
}
|
||||
|
||||
func (s storage) LogSave(step *model.Step, reader io.Reader) error {
|
||||
data, _ := io.ReadAll(reader)
|
||||
|
||||
func (s storage) LogSave(step *model.Step, logEntries []*model.LogEntry) error {
|
||||
sess := s.engine.NewSession()
|
||||
defer sess.Close()
|
||||
if err := sess.Begin(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
logs := new(model.Logs)
|
||||
exist, err := sess.Where("log_step_id = ?", step.ID).Get(logs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if exist {
|
||||
if _, err := sess.ID(logs.ID).Cols("log_data").Update(&model.Logs{Data: data}); err != nil {
|
||||
return err
|
||||
for _, logEntry := range logEntries {
|
||||
if logEntry.StepID != step.ID {
|
||||
return fmt.Errorf("got a log-entry with step id '%d' but expected '%d'", logEntry.StepID, step.ID)
|
||||
}
|
||||
} else {
|
||||
if _, err := sess.Insert(&model.Logs{
|
||||
StepID: step.ID,
|
||||
Data: data,
|
||||
}); err != nil {
|
||||
if _, err := sess.Insert(logEntry); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return sess.Commit()
|
||||
}
|
||||
|
||||
func (s storage) LogAppend(logEntry *model.LogEntry) error {
|
||||
_, err := s.engine.Insert(logEntry)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s storage) LogDelete(step *model.Step) error {
|
||||
_, err := s.engine.Where("step_id = ?", step.ID).Delete(new(model.LogEntry))
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -15,64 +15,84 @@
|
||||
package datastore
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"io"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/woodpecker-ci/woodpecker/server/model"
|
||||
)
|
||||
|
||||
func TestLogCreateFind(t *testing.T) {
|
||||
store, closer := newTestStore(t, new(model.Step), new(model.Logs))
|
||||
func TestLogCreateFindDelete(t *testing.T) {
|
||||
store, closer := newTestStore(t, new(model.Step), new(model.LogEntry))
|
||||
defer closer()
|
||||
|
||||
step := model.Step{
|
||||
ID: 1,
|
||||
}
|
||||
buf := bytes.NewBufferString("echo hi")
|
||||
err := store.LogSave(&step, buf)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error: log create: %s", err)
|
||||
|
||||
logEntries := []*model.LogEntry{
|
||||
{
|
||||
StepID: step.ID,
|
||||
Data: []byte("hello"),
|
||||
Line: 1,
|
||||
Time: 0,
|
||||
},
|
||||
{
|
||||
StepID: step.ID,
|
||||
Data: []byte("world"),
|
||||
Line: 2,
|
||||
Time: 10,
|
||||
},
|
||||
}
|
||||
|
||||
rc, err := store.LogFind(&step)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error: log create: %s", err)
|
||||
}
|
||||
// first insert should just work
|
||||
assert.NoError(t, store.LogSave(&step, logEntries))
|
||||
|
||||
defer rc.Close()
|
||||
out, _ := io.ReadAll(rc)
|
||||
if got, want := string(out), "echo hi"; got != want {
|
||||
t.Errorf("Want log data %s, got %s", want, got)
|
||||
}
|
||||
// we want to find our inserted logs
|
||||
_logEntries, err := store.LogFind(&step)
|
||||
assert.NoError(t, err)
|
||||
assert.Len(t, _logEntries, len(logEntries))
|
||||
|
||||
// delete and check
|
||||
assert.NoError(t, store.LogDelete(&step))
|
||||
_logEntries, err = store.LogFind(&step)
|
||||
assert.NoError(t, err)
|
||||
assert.Len(t, _logEntries, 0)
|
||||
}
|
||||
|
||||
func TestLogUpdate(t *testing.T) {
|
||||
store, closer := newTestStore(t, new(model.Step), new(model.Logs))
|
||||
func TestLogAppend(t *testing.T) {
|
||||
store, closer := newTestStore(t, new(model.Step), new(model.LogEntry))
|
||||
defer closer()
|
||||
|
||||
step := model.Step{
|
||||
ID: 1,
|
||||
}
|
||||
buf1 := bytes.NewBufferString("echo hi")
|
||||
buf2 := bytes.NewBufferString("echo allo?")
|
||||
err1 := store.LogSave(&step, buf1)
|
||||
err2 := store.LogSave(&step, buf2)
|
||||
if err1 != nil {
|
||||
t.Errorf("Unexpected error: log create: %s", err1)
|
||||
}
|
||||
if err2 != nil {
|
||||
t.Errorf("Unexpected error: log update: %s", err2)
|
||||
logEntries := []*model.LogEntry{
|
||||
{
|
||||
StepID: step.ID,
|
||||
Data: []byte("hello"),
|
||||
Line: 1,
|
||||
Time: 0,
|
||||
},
|
||||
{
|
||||
StepID: step.ID,
|
||||
Data: []byte("world"),
|
||||
Line: 2,
|
||||
Time: 10,
|
||||
},
|
||||
}
|
||||
|
||||
rc, err := store.LogFind(&step)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error: log create: %s", err)
|
||||
assert.NoError(t, store.LogSave(&step, logEntries))
|
||||
|
||||
logEntry := &model.LogEntry{
|
||||
StepID: step.ID,
|
||||
Data: []byte("allo?"),
|
||||
Line: 3,
|
||||
Time: 20,
|
||||
}
|
||||
|
||||
defer rc.Close()
|
||||
out, _ := io.ReadAll(rc)
|
||||
if got, want := string(out), "echo allo?"; got != want {
|
||||
t.Errorf("Want log data %s, got %s", want, got)
|
||||
}
|
||||
assert.NoError(t, store.LogAppend(logEntry))
|
||||
|
||||
_logEntries, err := store.LogFind(&step)
|
||||
assert.NoError(t, err)
|
||||
assert.Len(t, _logEntries, len(logEntries)+1)
|
||||
}
|
||||
|
||||
@@ -54,7 +54,7 @@ var allBeans = []interface{}{
|
||||
new(model.Pipeline),
|
||||
new(model.PipelineConfig),
|
||||
new(model.Config),
|
||||
new(model.Logs),
|
||||
new(model.LogEntry),
|
||||
new(model.Perm),
|
||||
new(model.Step),
|
||||
new(model.Registry),
|
||||
|
||||
@@ -299,7 +299,7 @@ func TestRepoCrud(t *testing.T) {
|
||||
new(model.Perm),
|
||||
new(model.Pipeline),
|
||||
new(model.PipelineConfig),
|
||||
new(model.Logs),
|
||||
new(model.LogEntry),
|
||||
new(model.Step),
|
||||
new(model.Secret),
|
||||
new(model.Registry),
|
||||
@@ -334,6 +334,7 @@ func TestRepoCrud(t *testing.T) {
|
||||
RepoID: repoUnrelated.ID,
|
||||
}
|
||||
stepUnrelated := model.Step{
|
||||
UUID: "44c0de71-a6be-41c9-b860-e3716d1dfcef",
|
||||
Name: "a unrelated step",
|
||||
}
|
||||
assert.NoError(t, store.CreatePipeline(&pipelineUnrelated, &stepUnrelated))
|
||||
|
||||
@@ -33,6 +33,11 @@ func (s storage) StepFind(pipeline *model.Pipeline, pid int) (*model.Step, error
|
||||
return step, wrapGet(s.engine.Get(step))
|
||||
}
|
||||
|
||||
func (s storage) StepByUUID(uuid string) (*model.Step, error) {
|
||||
step := new(model.Step)
|
||||
return step, wrapGet(s.engine.Where("step_uuid = ?", uuid).Get(step))
|
||||
}
|
||||
|
||||
func (s storage) StepChild(pipeline *model.Pipeline, ppid int, child string) (*model.Step, error) {
|
||||
step := &model.Step{
|
||||
PipelineID: pipeline.ID,
|
||||
@@ -87,7 +92,7 @@ func (s storage) StepClear(pipeline *model.Pipeline) error {
|
||||
}
|
||||
|
||||
func deleteStep(sess *xorm.Session, stepID int64) error {
|
||||
if _, err := sess.Where("log_step_id = ?", stepID).Delete(new(model.Logs)); err != nil {
|
||||
if _, err := sess.Where("step_id = ?", stepID).Delete(new(model.LogEntry)); err != nil {
|
||||
return err
|
||||
}
|
||||
_, err := sess.ID(stepID).Delete(new(model.Step))
|
||||
|
||||
@@ -21,6 +21,7 @@ import (
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"github.com/woodpecker-ci/woodpecker/server/model"
|
||||
"github.com/woodpecker-ci/woodpecker/server/store/types"
|
||||
)
|
||||
|
||||
func TestStepFind(t *testing.T) {
|
||||
@@ -29,6 +30,7 @@ func TestStepFind(t *testing.T) {
|
||||
|
||||
steps := []*model.Step{
|
||||
{
|
||||
UUID: "8d89104f-d44e-4b45-b86e-17f8b5e74a0e",
|
||||
PipelineID: 1000,
|
||||
PID: 1,
|
||||
PPID: 2,
|
||||
@@ -59,6 +61,7 @@ func TestStepChild(t *testing.T) {
|
||||
|
||||
err := store.StepCreate([]*model.Step{
|
||||
{
|
||||
UUID: "ea6d4008-8ace-4f8a-ad03-53f1756465d9",
|
||||
PipelineID: 1,
|
||||
PID: 1,
|
||||
PPID: 1,
|
||||
@@ -66,6 +69,7 @@ func TestStepChild(t *testing.T) {
|
||||
State: "success",
|
||||
},
|
||||
{
|
||||
UUID: "2bf387f7-2913-4907-814c-c9ada88707c0",
|
||||
PipelineID: 1,
|
||||
PID: 2,
|
||||
PGID: 2,
|
||||
@@ -98,6 +102,7 @@ func TestStepList(t *testing.T) {
|
||||
|
||||
err := store.StepCreate([]*model.Step{
|
||||
{
|
||||
UUID: "2bf387f7-2913-4907-814c-c9ada88707c0",
|
||||
PipelineID: 2,
|
||||
PID: 1,
|
||||
PPID: 1,
|
||||
@@ -105,6 +110,7 @@ func TestStepList(t *testing.T) {
|
||||
State: "success",
|
||||
},
|
||||
{
|
||||
UUID: "4b04073c-1827-4aa4-a5f5-c7b21c5e44a6",
|
||||
PipelineID: 1,
|
||||
PID: 1,
|
||||
PPID: 1,
|
||||
@@ -112,6 +118,7 @@ func TestStepList(t *testing.T) {
|
||||
State: "success",
|
||||
},
|
||||
{
|
||||
UUID: "40aab045-970b-4892-b6df-6f825a7ec97a",
|
||||
PipelineID: 1,
|
||||
PID: 2,
|
||||
PGID: 2,
|
||||
@@ -139,6 +146,7 @@ func TestStepUpdate(t *testing.T) {
|
||||
defer closer()
|
||||
|
||||
step := &model.Step{
|
||||
UUID: "fc7c7fd6-553e-480b-8ed7-30d8563d0b79",
|
||||
PipelineID: 1,
|
||||
PID: 1,
|
||||
PPID: 2,
|
||||
@@ -176,6 +184,7 @@ func TestStepIndexes(t *testing.T) {
|
||||
|
||||
if err := store.StepCreate([]*model.Step{
|
||||
{
|
||||
UUID: "4db7e5fc-5312-4d02-9e14-b51b9e3242cc",
|
||||
PipelineID: 1,
|
||||
PID: 1,
|
||||
PPID: 1,
|
||||
@@ -191,6 +200,7 @@ func TestStepIndexes(t *testing.T) {
|
||||
// fail due to duplicate pid
|
||||
if err := store.StepCreate([]*model.Step{
|
||||
{
|
||||
UUID: "c1f33a9e-2a02-4579-95ec-90255d785a12",
|
||||
PipelineID: 1,
|
||||
PID: 1,
|
||||
PPID: 1,
|
||||
@@ -201,6 +211,60 @@ func TestStepIndexes(t *testing.T) {
|
||||
}); err == nil {
|
||||
t.Errorf("Unexpected error: duplicate pid")
|
||||
}
|
||||
|
||||
// fail due to duplicate uuid
|
||||
if err := store.StepCreate([]*model.Step{
|
||||
{
|
||||
UUID: "4db7e5fc-5312-4d02-9e14-b51b9e3242cc",
|
||||
PipelineID: 5,
|
||||
PID: 4,
|
||||
PPID: 3,
|
||||
PGID: 2,
|
||||
State: "success",
|
||||
Name: "clone",
|
||||
},
|
||||
}); err == nil {
|
||||
t.Errorf("Unexpected error: duplicate pid")
|
||||
}
|
||||
}
|
||||
|
||||
func TestStepByUUID(t *testing.T) {
|
||||
store, closer := newTestStore(t, new(model.Step), new(model.Pipeline))
|
||||
defer closer()
|
||||
|
||||
assert.NoError(t, store.StepCreate([]*model.Step{
|
||||
{
|
||||
UUID: "4db7e5fc-5312-4d02-9e14-b51b9e3242cc",
|
||||
PipelineID: 1,
|
||||
PID: 1,
|
||||
PPID: 1,
|
||||
PGID: 1,
|
||||
State: "running",
|
||||
Name: "build",
|
||||
},
|
||||
{
|
||||
UUID: "fc7c7fd6-553e-480b-8ed7-30d8563d0b79",
|
||||
PipelineID: 4,
|
||||
PID: 6,
|
||||
PPID: 7,
|
||||
PGID: 8,
|
||||
Name: "build",
|
||||
State: "pending",
|
||||
Error: "pc load letter",
|
||||
ExitCode: 255,
|
||||
AgentID: 1,
|
||||
Platform: "linux/amd64",
|
||||
Environ: map[string]string{"GOLANG": "tip"},
|
||||
},
|
||||
}))
|
||||
|
||||
step, err := store.StepByUUID("4db7e5fc-5312-4d02-9e14-b51b9e3242cc")
|
||||
assert.NoError(t, err)
|
||||
assert.NotEmpty(t, step)
|
||||
|
||||
step, err = store.StepByUUID("52feb6f5-8ce2-40c0-9937-9d0e3349c98c")
|
||||
assert.ErrorIs(t, err, types.RecordNotExist)
|
||||
assert.Empty(t, step)
|
||||
}
|
||||
|
||||
// TODO: func TestStepCascade(t *testing.T) {}
|
||||
|
||||
+62
-10
@@ -3,8 +3,6 @@
|
||||
package mocks
|
||||
|
||||
import (
|
||||
io "io"
|
||||
|
||||
mock "github.com/stretchr/testify/mock"
|
||||
model "github.com/woodpecker-ci/woodpecker/server/model"
|
||||
)
|
||||
@@ -1089,20 +1087,48 @@ func (_m *Store) HasRedirectionForRepo(_a0 int64, _a1 string) (bool, error) {
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
// LogFind provides a mock function with given fields: _a0
|
||||
func (_m *Store) LogFind(_a0 *model.Step) (io.ReadCloser, error) {
|
||||
// LogAppend provides a mock function with given fields: logEntry
|
||||
func (_m *Store) LogAppend(logEntry *model.LogEntry) error {
|
||||
ret := _m.Called(logEntry)
|
||||
|
||||
var r0 error
|
||||
if rf, ok := ret.Get(0).(func(*model.LogEntry) error); ok {
|
||||
r0 = rf(logEntry)
|
||||
} else {
|
||||
r0 = ret.Error(0)
|
||||
}
|
||||
|
||||
return r0
|
||||
}
|
||||
|
||||
// LogDelete provides a mock function with given fields: _a0
|
||||
func (_m *Store) LogDelete(_a0 *model.Step) error {
|
||||
ret := _m.Called(_a0)
|
||||
|
||||
var r0 io.ReadCloser
|
||||
var r0 error
|
||||
if rf, ok := ret.Get(0).(func(*model.Step) error); ok {
|
||||
r0 = rf(_a0)
|
||||
} else {
|
||||
r0 = ret.Error(0)
|
||||
}
|
||||
|
||||
return r0
|
||||
}
|
||||
|
||||
// LogFind provides a mock function with given fields: _a0
|
||||
func (_m *Store) LogFind(_a0 *model.Step) ([]*model.LogEntry, error) {
|
||||
ret := _m.Called(_a0)
|
||||
|
||||
var r0 []*model.LogEntry
|
||||
var r1 error
|
||||
if rf, ok := ret.Get(0).(func(*model.Step) (io.ReadCloser, error)); ok {
|
||||
if rf, ok := ret.Get(0).(func(*model.Step) ([]*model.LogEntry, error)); ok {
|
||||
return rf(_a0)
|
||||
}
|
||||
if rf, ok := ret.Get(0).(func(*model.Step) io.ReadCloser); ok {
|
||||
if rf, ok := ret.Get(0).(func(*model.Step) []*model.LogEntry); ok {
|
||||
r0 = rf(_a0)
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).(io.ReadCloser)
|
||||
r0 = ret.Get(0).([]*model.LogEntry)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1116,11 +1142,11 @@ func (_m *Store) LogFind(_a0 *model.Step) (io.ReadCloser, error) {
|
||||
}
|
||||
|
||||
// LogSave provides a mock function with given fields: _a0, _a1
|
||||
func (_m *Store) LogSave(_a0 *model.Step, _a1 io.Reader) error {
|
||||
func (_m *Store) LogSave(_a0 *model.Step, _a1 []*model.LogEntry) error {
|
||||
ret := _m.Called(_a0, _a1)
|
||||
|
||||
var r0 error
|
||||
if rf, ok := ret.Get(0).(func(*model.Step, io.Reader) error); ok {
|
||||
if rf, ok := ret.Get(0).(func(*model.Step, []*model.LogEntry) error); ok {
|
||||
r0 = rf(_a0, _a1)
|
||||
} else {
|
||||
r0 = ret.Error(0)
|
||||
@@ -1609,6 +1635,32 @@ func (_m *Store) ServerConfigSet(_a0 string, _a1 string) error {
|
||||
return r0
|
||||
}
|
||||
|
||||
// StepByUUID provides a mock function with given fields: _a0
|
||||
func (_m *Store) StepByUUID(_a0 string) (*model.Step, error) {
|
||||
ret := _m.Called(_a0)
|
||||
|
||||
var r0 *model.Step
|
||||
var r1 error
|
||||
if rf, ok := ret.Get(0).(func(string) (*model.Step, error)); ok {
|
||||
return rf(_a0)
|
||||
}
|
||||
if rf, ok := ret.Get(0).(func(string) *model.Step); ok {
|
||||
r0 = rf(_a0)
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).(*model.Step)
|
||||
}
|
||||
}
|
||||
|
||||
if rf, ok := ret.Get(1).(func(string) error); ok {
|
||||
r1 = rf(_a0)
|
||||
} else {
|
||||
r1 = ret.Error(1)
|
||||
}
|
||||
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
// StepChild provides a mock function with given fields: _a0, _a1, _a2
|
||||
func (_m *Store) StepChild(_a0 *model.Pipeline, _a1 int, _a2 string) (*model.Step, error) {
|
||||
ret := _m.Called(_a0, _a1, _a2)
|
||||
|
||||
@@ -18,8 +18,6 @@ package store
|
||||
//go:generate mockery --name Store --output mocks --case underscore
|
||||
|
||||
import (
|
||||
"io"
|
||||
|
||||
"github.com/woodpecker-ci/woodpecker/server/model"
|
||||
)
|
||||
|
||||
@@ -138,6 +136,7 @@ type Store interface {
|
||||
// Steps
|
||||
StepLoad(int64) (*model.Step, error)
|
||||
StepFind(*model.Pipeline, int) (*model.Step, error)
|
||||
StepByUUID(string) (*model.Step, error)
|
||||
StepChild(*model.Pipeline, int, string) (*model.Step, error)
|
||||
StepList(*model.Pipeline) ([]*model.Step, error)
|
||||
StepCreate([]*model.Step) error
|
||||
@@ -145,10 +144,10 @@ type Store interface {
|
||||
StepClear(*model.Pipeline) error
|
||||
|
||||
// Logs
|
||||
LogFind(*model.Step) (io.ReadCloser, error)
|
||||
// TODO: since we do ReadAll in any case a ioReader is not the best idea
|
||||
// so either find a way to write log in chunks by xorm ...
|
||||
LogSave(*model.Step, io.Reader) error
|
||||
LogFind(*model.Step) ([]*model.LogEntry, error)
|
||||
LogSave(*model.Step, []*model.LogEntry) error
|
||||
LogAppend(logEntry *model.LogEntry) error
|
||||
LogDelete(*model.Step) error
|
||||
|
||||
// Tasks
|
||||
// TaskList TODO: paginate & opt filter
|
||||
|
||||
Reference in New Issue
Block a user