You've already forked woodpecker
mirror of
https://github.com/woodpecker-ci/woodpecker.git
synced 2026-06-03 16:35:37 +02:00
Show human readable information in queue info (#5516)
Co-authored-by: qwerty287 <80460567+qwerty287@users.noreply.github.com>
This commit is contained in:
+104
-5
@@ -37,16 +37,115 @@ import (
|
||||
// GetQueueInfo
|
||||
//
|
||||
// @Summary Get pipeline queue information
|
||||
// @Description TODO: link the InfoT response object - this is blocked, until the `swaggo/swag` tool dependency is v1.18.12 or newer
|
||||
// @Description Returns pipeline queue information with agent details
|
||||
// @Router /queue/info [get]
|
||||
// @Produce json
|
||||
// @Success 200 {object} map[string]string
|
||||
// @Success 200 {object} QueueInfo
|
||||
// @Tags Pipeline queues
|
||||
// @Param Authorization header string true "Insert your personal access token" default(Bearer <personal access token>)
|
||||
func GetQueueInfo(c *gin.Context) {
|
||||
c.IndentedJSON(http.StatusOK,
|
||||
server.Config.Services.Queue.Info(c),
|
||||
)
|
||||
info := server.Config.Services.Queue.Info(c)
|
||||
_store := store.FromContext(c)
|
||||
|
||||
// Create a map to store agent names by ID
|
||||
agentNameMap := make(map[int64]string)
|
||||
|
||||
// Process tasks and add agent names
|
||||
pendingWithAgents, err := processQueueTasks(_store, info.Pending, agentNameMap)
|
||||
if err != nil {
|
||||
c.String(http.StatusInternalServerError, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
waitingWithAgents, err := processQueueTasks(_store, info.WaitingOnDeps, agentNameMap)
|
||||
if err != nil {
|
||||
c.String(http.StatusInternalServerError, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
runningWithAgents, err := processQueueTasks(_store, info.Running, agentNameMap)
|
||||
if err != nil {
|
||||
c.String(http.StatusInternalServerError, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
// Create response with agent-enhanced tasks
|
||||
response := model.QueueInfo{
|
||||
Pending: pendingWithAgents,
|
||||
WaitingOnDeps: waitingWithAgents,
|
||||
Running: runningWithAgents,
|
||||
Stats: struct {
|
||||
WorkerCount int `json:"worker_count"`
|
||||
PendingCount int `json:"pending_count"`
|
||||
WaitingOnDepsCount int `json:"waiting_on_deps_count"`
|
||||
RunningCount int `json:"running_count"`
|
||||
}{
|
||||
WorkerCount: info.Stats.Workers,
|
||||
PendingCount: info.Stats.Pending,
|
||||
WaitingOnDepsCount: info.Stats.WaitingOnDeps,
|
||||
RunningCount: info.Stats.Running,
|
||||
},
|
||||
Paused: info.Paused,
|
||||
}
|
||||
|
||||
c.IndentedJSON(http.StatusOK, response)
|
||||
}
|
||||
|
||||
// getAgentName finds an agent's name, utilizing a map as a cache.
|
||||
func getAgentName(store store.Store, agentNameMap map[int64]string, agentID int64) (string, bool) {
|
||||
// 1. Check the cache first.
|
||||
name, exists := agentNameMap[agentID]
|
||||
if exists {
|
||||
return name, true
|
||||
}
|
||||
|
||||
// 2. If not in cache, query the store.
|
||||
agent, err := store.AgentFind(agentID)
|
||||
if err != nil || agent == nil {
|
||||
// Agent not found or an error occurred.
|
||||
return "", false
|
||||
}
|
||||
|
||||
// 3. Found the agent, update the cache and return the name.
|
||||
if agent.Name != "" {
|
||||
agentNameMap[agentID] = agent.Name
|
||||
return agent.Name, true
|
||||
}
|
||||
|
||||
return "", false
|
||||
}
|
||||
|
||||
// processQueueTasks converts tasks to QueueTask structs and adds agent names.
|
||||
func processQueueTasks(store store.Store, tasks []*model.Task, agentNameMap map[int64]string) ([]model.QueueTask, error) {
|
||||
result := make([]model.QueueTask, 0, len(tasks))
|
||||
|
||||
for _, task := range tasks {
|
||||
taskResponse := model.QueueTask{
|
||||
Task: *task,
|
||||
}
|
||||
|
||||
if task.AgentID == 0 {
|
||||
result = append(result, taskResponse)
|
||||
continue
|
||||
}
|
||||
|
||||
name, ok := getAgentName(store, agentNameMap, task.AgentID)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("agent not found for task %s", task.ID)
|
||||
}
|
||||
|
||||
taskResponse.AgentName = name
|
||||
|
||||
p, err := store.GetPipeline(task.PipelineID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
taskResponse.PipelineNumber = p.Number
|
||||
|
||||
result = append(result, taskResponse)
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// PauseQueue
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
package grpc
|
||||
|
||||
import (
|
||||
"maps"
|
||||
"strings"
|
||||
|
||||
pipelineConsts "go.woodpecker-ci.org/woodpecker/v3/pipeline"
|
||||
@@ -25,15 +26,18 @@ import (
|
||||
|
||||
func createFilterFunc(agentFilter rpc.Filter) queue.FilterFn {
|
||||
return func(task *model.Task) (bool, int) {
|
||||
// Create a copy of the labels for filtering to avoid modifying the original task
|
||||
labels := maps.Clone(task.Labels)
|
||||
|
||||
// ignore internal labels for filtering
|
||||
for k := range task.Labels {
|
||||
for k := range labels {
|
||||
if strings.HasPrefix(k, pipelineConsts.InternalLabelPrefix) {
|
||||
delete(task.Labels, k)
|
||||
delete(labels, k)
|
||||
}
|
||||
}
|
||||
|
||||
score := 0
|
||||
for taskLabel, taskLabelValue := range task.Labels {
|
||||
for taskLabel, taskLabelValue := range labels {
|
||||
// if a task label is empty it will be ignored
|
||||
if taskLabelValue == "" {
|
||||
continue
|
||||
|
||||
@@ -0,0 +1,22 @@
|
||||
package model
|
||||
|
||||
// QueueTask represents a task in the queue with additional API-specific fields.
|
||||
type QueueTask struct {
|
||||
Task
|
||||
PipelineNumber int64 `json:"pipeline_number"`
|
||||
AgentName string `json:"agent_name"`
|
||||
}
|
||||
|
||||
// QueueInfo represents the response structure for queue information API.
|
||||
type QueueInfo struct {
|
||||
Pending []QueueTask `json:"pending"`
|
||||
WaitingOnDeps []QueueTask `json:"waiting_on_deps"`
|
||||
Running []QueueTask `json:"running"`
|
||||
Stats struct {
|
||||
WorkerCount int `json:"worker_count"`
|
||||
PendingCount int `json:"pending_count"`
|
||||
WaitingOnDepsCount int `json:"waiting_on_deps_count"`
|
||||
RunningCount int `json:"running_count"`
|
||||
} `json:"stats"`
|
||||
Paused bool `json:"paused"`
|
||||
} // @name QueueInfo
|
||||
@@ -25,12 +25,16 @@ import (
|
||||
// Task defines scheduled pipeline Task.
|
||||
type Task struct {
|
||||
ID string `json:"id" xorm:"PK UNIQUE 'id'"`
|
||||
PID int `json:"pid" xorm:"'pid'"`
|
||||
Name string `json:"name" xorm:"'name'"`
|
||||
Data []byte `json:"-" xorm:"LONGBLOB 'data'"`
|
||||
Labels map[string]string `json:"labels" xorm:"json 'labels'"`
|
||||
Dependencies []string `json:"dependencies" xorm:"json 'dependencies'"`
|
||||
RunOn []string `json:"run_on" xorm:"json 'run_on'"`
|
||||
DepStatus map[string]StatusValue `json:"dep_status" xorm:"json 'dependencies_status'"`
|
||||
AgentID int64 `json:"agent_id" xorm:"'agent_id'"`
|
||||
PipelineID int64 `json:"pipeline_id" xorm:"'pipeline_id'"`
|
||||
RepoID int64 `json:"repo_id" xorm:"'repo_id'"`
|
||||
} // @name Task
|
||||
|
||||
// TableName return database table name for xorm.
|
||||
|
||||
@@ -33,8 +33,12 @@ func queuePipeline(ctx context.Context, repo *model.Repo, pipelineItems []*stepb
|
||||
continue
|
||||
}
|
||||
task := &model.Task{
|
||||
ID: fmt.Sprint(item.Workflow.ID),
|
||||
Labels: make(map[string]string),
|
||||
ID: fmt.Sprint(item.Workflow.ID),
|
||||
PID: item.Workflow.PID,
|
||||
Name: item.Workflow.Name,
|
||||
Labels: make(map[string]string),
|
||||
PipelineID: item.Workflow.PipelineID,
|
||||
RepoID: repo.ID,
|
||||
}
|
||||
maps.Copy(task.Labels, item.Labels)
|
||||
err := task.ApplyLabelsFromRepo(repo)
|
||||
|
||||
Reference in New Issue
Block a user