1
0
mirror of https://github.com/woodpecker-ci/woodpecker.git synced 2026-06-03 16:35:37 +02:00

Refactor server/pubsub into interface (#6318)

This commit is contained in:
6543
2026-03-30 19:32:44 +02:00
committed by GitHub
parent 098ada7c55
commit 662575630a
16 changed files with 324 additions and 183 deletions
+5 -5
View File
@@ -31,7 +31,7 @@ import (
forge_mocks "go.woodpecker-ci.org/woodpecker/v3/server/forge/mocks"
forge_types "go.woodpecker-ci.org/woodpecker/v3/server/forge/types"
"go.woodpecker-ci.org/woodpecker/v3/server/model"
"go.woodpecker-ci.org/woodpecker/v3/server/pubsub"
"go.woodpecker-ci.org/woodpecker/v3/server/pubsub/memory"
queue_mocks "go.woodpecker-ci.org/woodpecker/v3/server/queue/mocks"
config_service_mocks "go.woodpecker-ci.org/woodpecker/v3/server/services/config/mocks"
manager_mocks "go.woodpecker-ci.org/woodpecker/v3/server/services/mocks"
@@ -264,7 +264,7 @@ func TestCancelPipeline(t *testing.T) {
mockManager := manager_mocks.NewMockManager(t)
mockManager.On("ForgeFromRepo", fakeRepo).Return(mockForge, nil)
server.Config.Services.Manager = mockManager
server.Config.Services.Pubsub = pubsub.New()
server.Config.Services.Pubsub = memory.New()
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -315,7 +315,7 @@ func TestCreatePipeline(t *testing.T) {
mockManager.On("EnvironmentService").Return(nil).Maybe()
server.Config.Services.Manager = mockManager
server.Config.Services.Pubsub = pubsub.New()
server.Config.Services.Pubsub = memory.New()
mockQueue := queue_mocks.NewMockQueue(t)
mockQueue.On("Push", mock.Anything, mock.Anything).Return(nil).Maybe()
mockQueue.On("PushAtOnce", mock.Anything, mock.Anything).Return(nil).Maybe()
@@ -388,7 +388,7 @@ func TestCreatePipeline(t *testing.T) {
mockManager.On("EnvironmentService").Return(nil).Maybe()
server.Config.Services.Manager = mockManager
server.Config.Services.Pubsub = pubsub.New()
server.Config.Services.Pubsub = memory.New()
mockQueue := queue_mocks.NewMockQueue(t)
mockQueue.On("Push", mock.Anything, mock.Anything).Return(nil).Maybe()
mockQueue.On("PushAtOnce", mock.Anything, mock.Anything).Return(nil).Maybe()
@@ -444,7 +444,7 @@ func TestCreatePipeline(t *testing.T) {
mockManager.On("ForgeFromRepo", fakeRepo).Return(mockForge, nil)
mockManager.On("ConfigServiceFromRepo", fakeRepo).Return(mockConfigService)
server.Config.Services.Manager = mockManager
server.Config.Services.Pubsub = pubsub.New()
server.Config.Services.Pubsub = memory.New()
// return nil config with error
mockConfigService.On("Fetch", mock.Anything, mockForge, fakeUser, fakeRepo, mock.Anything, mock.Anything, false).Return(nil, http.ErrHandlerTimeout)
+9 -13
View File
@@ -71,11 +71,14 @@ func EventStreamSSE(c *gin.Context) {
log.Debug().Msg("user feed: connection opened")
user := session.User(c)
repo := map[string]bool{}
subTopics := make(map[string]struct{})
// subscribe to all public state changes
subTopics[pubsub.PublicTopic] = struct{}{}
// subscribe to all private state changes or repos the user owns
if user != nil {
repos, _ := store.FromContext(c).RepoList(user, false, true, nil)
for _, r := range repos {
repo[r.FullName] = true
subTopics[pubsub.GetRepoTopic(r)] = struct{}{}
}
}
@@ -92,23 +95,16 @@ func EventStreamSSE(c *gin.Context) {
}()
go func() {
server.Config.Services.Pubsub.Subscribe(ctx, func(m pubsub.Message) {
defer func() {
obj := recover() // fix #2480 // TODO: check if it's still needed
log.Trace().Msgf("pubsub subscribe recover return: %v", obj)
}()
name := m.Labels["repo"]
priv := m.Labels["private"]
if repo[name] || priv == "false" {
err := server.Config.Services.Pubsub.Subscribe(ctx, subTopics,
func(m pubsub.Message) {
select {
case <-ctx.Done():
return
default:
eventChan <- m.Data
}
}
})
cancel(nil)
})
cancel(err)
}()
for {
+1 -1
View File
@@ -30,7 +30,7 @@ import (
var Config = struct {
Services struct {
Pubsub *pubsub.Publisher
Pubsub pubsub.PubSub
Queue queue.Queue
Logs logging.Log
Membership cache.MembershipService
+4 -1
View File
@@ -89,7 +89,10 @@ func Cancel(ctx context.Context, _forge forge.Forge, store store.Store, repo *mo
if killedPipeline.Workflows, err = store.WorkflowGetTree(killedPipeline); err != nil {
return err
}
publishToTopic(killedPipeline, repo)
if err := publishToTopic(ctx, killedPipeline, repo); err != nil {
log.Error().Err(err).Msg("could not push pipeline status change to pubsub provider")
}
return nil
}
+3 -1
View File
@@ -64,7 +64,9 @@ func Decline(ctx context.Context, store store.Store, pipeline *model.Pipeline, u
updatePipelineStatus(ctx, forge, pipeline, repo, user)
publishToTopic(pipeline, repo)
if err := publishToTopic(ctx, pipeline, repo); err != nil {
log.Error().Err(err).Msg("could not push pipeline status change to pubsub provider")
}
return pipeline, nil
}
+3 -1
View File
@@ -54,6 +54,8 @@ func prepareStart(ctx context.Context, forge forge.Forge, store store.Store, act
}
func publishPipeline(ctx context.Context, forge forge.Forge, pipeline *model.Pipeline, repo *model.Repo, repoUser *model.User) {
publishToTopic(pipeline, repo)
if err := publishToTopic(ctx, pipeline, repo); err != nil {
log.Error().Err(err).Msg("could not push pipeline status change to pubsub provider")
}
updatePipelineStatus(ctx, forge, pipeline, repo, repoUser)
}
+17 -16
View File
@@ -15,10 +15,11 @@
package pipeline
import (
"context"
"encoding/json"
"strconv"
"fmt"
"github.com/rs/zerolog/log"
"github.com/oklog/ulid/v2"
"go.woodpecker-ci.org/woodpecker/v3/server"
"go.woodpecker-ci.org/woodpecker/v3/server/model"
@@ -26,23 +27,23 @@ import (
)
// publishToTopic publishes message to UI clients.
func publishToTopic(pipeline *model.Pipeline, repo *model.Repo) {
message := pubsub.Message{
Labels: map[string]string{
"repo": repo.FullName,
"private": strconv.FormatBool(repo.IsSCMPrivate),
},
}
pipelineCopy := *pipeline
var err error
func publishToTopic(c context.Context, pipeline *model.Pipeline, repo *model.Repo) (err error) {
message := pubsub.Message{ID: ulid.Make().String()}
message.Data, err = json.Marshal(model.Event{
Repo: *repo,
Pipeline: pipelineCopy,
Pipeline: *pipeline,
})
if err != nil {
log.Error().Err(err).Msg("can't marshal JSON")
return
return fmt.Errorf("can't marshal JSON: %w", err)
}
server.Config.Services.Pubsub.Publish(message)
subTopics := make(map[string]struct{})
// if repo is public, push to public topic
if !repo.IsSCMPrivate {
subTopics[pubsub.PublicTopic] = struct{}{}
}
// publish to repo specific topic
subTopics[pubsub.GetRepoTopic(repo)] = struct{}{}
return server.Config.Services.Pubsub.Publish(c, subTopics, message)
}
-29
View File
@@ -1,29 +0,0 @@
BSD 3-Clause License
Copyright (c) 2017, Brad Rydzewski
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
* Neither the name of the copyright holder nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+89
View File
@@ -0,0 +1,89 @@
// Copyright 2026 Woodpecker Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package memory
import (
"context"
"fmt"
"slices"
"sync"
"github.com/rs/zerolog/log"
"go.woodpecker-ci.org/woodpecker/v3/server/pubsub"
)
type publisher struct {
sync.RWMutex
subs map[*pubsub.Receiver][]string
}
// New creates an in-memory publisher.
func New() pubsub.PubSub {
return &publisher{
subs: make(map[*pubsub.Receiver][]string),
}
}
func (p *publisher) Publish(_ context.Context, topics pubsub.Topics, message pubsub.Message) error {
if len(topics) == 0 {
return fmt.Errorf("%w: specify at least one", pubsub.ErrNoTopic)
}
p.RLock()
defer p.RUnlock()
for s, tl := range p.subs {
// callback is from outside so just make sure it still exists
if s == nil || *s == nil {
log.Error().Msg("found nil callback func in subscribers!")
continue
}
for t := range topics {
if slices.Contains(tl, t) {
go (*s)(message)
break
}
}
}
return nil
}
func (p *publisher) Subscribe(c context.Context, topics pubsub.Topics, receiver pubsub.Receiver) error {
if len(topics) == 0 {
return fmt.Errorf("%w: subscribe to at least one", pubsub.ErrNoTopic)
}
var tl []string
for k := range topics {
tl = append(tl, k)
}
defer func() {
p.Lock()
delete(p.subs, &receiver)
p.Unlock()
}()
p.Lock()
p.subs[&receiver] = tl
p.Unlock()
<-c.Done()
return nil
}
@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package pubsub
package memory
import (
"context"
@@ -21,13 +21,17 @@ import (
"time"
"github.com/stretchr/testify/assert"
"go.woodpecker-ci.org/woodpecker/v3/server/pubsub"
)
func TestPubsub(t *testing.T) {
var (
wg sync.WaitGroup
testMessage = Message{
testTopic = map[string]struct{}{"test": {}}
testMessage = pubsub.Message{
Data: []byte("test"),
}
)
@@ -35,20 +39,21 @@ func TestPubsub(t *testing.T) {
ctx, cancel := context.WithCancelCause(
t.Context(),
)
broker := New()
assert.Error(t, broker.Subscribe(ctx, nil, func(pubsub.Message) {}))
go func() {
broker.Subscribe(ctx, func(message Message) { assert.Equal(t, testMessage, message); wg.Done() })
assert.NoError(t, broker.Subscribe(ctx, testTopic, func(message pubsub.Message) { assert.Equal(t, testMessage, message); wg.Done() }))
}()
go func() {
broker.Subscribe(ctx, func(_ Message) { wg.Done() })
assert.NoError(t, broker.Subscribe(ctx, testTopic, func(pubsub.Message) { wg.Done() }))
}()
<-time.After(500 * time.Millisecond)
wg.Add(2)
go func() {
broker.Publish(testMessage)
assert.NoError(t, broker.Publish(ctx, testTopic, testMessage))
}()
wg.Wait()
-66
View File
@@ -1,66 +0,0 @@
// Copyright 2023 Woodpecker Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pubsub
import (
"context"
"sync"
)
// Message defines a published message.
type Message struct {
// ID identifies this message.
ID string `json:"id,omitempty"`
// Data is the actual data in the entry.
Data []byte `json:"data"`
// Labels represents the key-value pairs the entry is labeled with.
Labels map[string]string `json:"labels,omitempty"`
}
// Receiver receives published messages.
type Receiver func(Message)
type Publisher struct {
sync.Mutex
subs map[*Receiver]struct{}
}
// New creates an in-memory publisher.
func New() *Publisher {
return &Publisher{
subs: make(map[*Receiver]struct{}),
}
}
func (p *Publisher) Publish(message Message) {
p.Lock()
for s := range p.subs {
go (*s)(message)
}
p.Unlock()
}
func (p *Publisher) Subscribe(c context.Context, receiver Receiver) {
p.Lock()
p.subs[&receiver] = struct{}{}
p.Unlock()
<-c.Done()
p.Lock()
delete(p.subs, &receiver)
p.Unlock()
}
+58
View File
@@ -0,0 +1,58 @@
// Copyright 2026 Woodpecker Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pubsub
import (
"context"
"errors"
"fmt"
"go.woodpecker-ci.org/woodpecker/v3/server/model"
)
// PubSub provider interface, used to signal pipeline state changes to WebUI.
type PubSub interface {
// Publish pushes a state change to all subscribers,
// that have at least subscribed to one of the topics we publish it under.
Publish(context.Context, Topics, Message) error
// Subscribe gets all state changes that match the same topic.
// If multiple topics are subscribed, and a message also match multiple,
// the implementation takes care of deduplication.
Subscribe(context.Context, Topics, Receiver) error
}
// Message defines a published message.
type Message struct {
// ID identifies this message.
ID string `json:"id,omitempty"`
// Data is the actual data in the entry.
Data []byte `json:"data"`
}
// Receiver receives published messages.
type Receiver func(Message)
// Topics are key-value pairs, messages are filtered upon
// the the key is the base-key and the value to the sub-key.
type Topics map[string]struct{}
func GetRepoTopic(r *model.Repo) string {
return fmt.Sprintf("repo.id.%d", r.ID)
}
const PublicTopic = "public"
var ErrNoTopic = errors.New("no topic specified")
+100
View File
@@ -0,0 +1,100 @@
// Copyright 2026 Woodpecker Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pubsub_test
import (
"context"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
"go.woodpecker-ci.org/woodpecker/v3/server/pubsub"
"go.woodpecker-ci.org/woodpecker/v3/server/pubsub/memory"
)
func TestPubSub(t *testing.T) {
// for each pubsub adapter (currently we have only one)
t.Run("in_memory", func(t *testing.T) {
testPubSub(t, memory.New())
})
}
func testPubSub(t *testing.T, adapter pubsub.PubSub) {
assert.NoError(t,
adapter.Publish(t.Context(), pubsub.Topics{"a": {}}, pubsub.Message{ID: "1", Data: []byte(`dummy`)}),
"expect no issue publish to a pubsub with no subscribers",
)
t.Run("test deduplication asumptions", func(t *testing.T) {
treeTopicCloser, treeTopicGetMSGs := genTestSub(t, adapter, pubsub.Topics{"tree": {}})
t.Cleanup(treeTopicCloser)
closer, getMSGs := genTestSub(t, adapter, pubsub.Topics{"apples": {}, "tree": {}, "raspberry": {}})
t.Cleanup(closer)
assert.Len(t, getMSGs(), 0)
time.Sleep(10 * time.Millisecond)
assert.NoError(t, adapter.Publish(t.Context(), pubsub.Topics{"tree": {}, "raspberry": {}, "tails": {}}, pubsub.Message{ID: "2"}))
assert.NoError(t, adapter.Publish(t.Context(), pubsub.Topics{"apples": {}, "raspberry": {}, "tails": {}}, pubsub.Message{ID: "3"}))
time.Sleep(100 * time.Millisecond)
if assert.Len(t, getMSGs(), 2) {
assert.ElementsMatch(t, []string{"2", "3"}, messagesToIDs(getMSGs()))
}
assert.EqualValues(t, "2", treeTopicGetMSGs()[0].ID)
})
t.Run("test adapters calc for strange input", func(t *testing.T) {
t.Run("empty topic", func(t *testing.T) {
assert.Error(t, adapter.Subscribe(t.Context(), nil, func(pubsub.Message) {}))
assert.Error(t, adapter.Subscribe(t.Context(), pubsub.Topics{}, func(pubsub.Message) {}))
assert.Error(t, adapter.Publish(t.Context(), nil, pubsub.Message{}))
assert.Error(t, adapter.Publish(t.Context(), pubsub.Topics{}, pubsub.Message{}))
})
})
}
func genTestSub(t *testing.T, adapter pubsub.PubSub, topics pubsub.Topics) (close func(), getMSGs func() []pubsub.Message) {
ctx, closer := context.WithCancelCause(t.Context())
var mu sync.Mutex
var messages []pubsub.Message
go func() {
err := adapter.Subscribe(ctx, topics, func(m pubsub.Message) {
mu.Lock()
messages = append(messages, m)
mu.Unlock()
})
assert.NoError(t, err)
}()
return func() { closer(nil) }, func() []pubsub.Message {
mu.Lock()
defer mu.Unlock()
cp := make([]pubsub.Message, len(messages))
copy(cp, messages)
return cp
}
}
func messagesToIDs(msgs []pubsub.Message) (ids []string) {
for i := range msgs {
ids = append(ids, msgs[i].ID)
}
return ids
}
+21 -41
View File
@@ -25,6 +25,7 @@ import (
"strconv"
"time"
"github.com/oklog/ulid/v2"
"github.com/prometheus/client_golang/prometheus"
"github.com/rs/zerolog/log"
grpc_metadata "google.golang.org/grpc/metadata"
@@ -45,7 +46,7 @@ const updateAgentLastWorkDelay = time.Minute
type RPC struct {
queue queue.Queue
pubsub *pubsub.Publisher
pubsub pubsub.PubSub
logger logging.Log
store store.Store
pipelineTime *prometheus.GaugeVec
@@ -207,22 +208,8 @@ func (s *RPC) Update(c context.Context, strWorkflowID string, state rpc.StepStat
log.Error().Err(err).Msg("cannot build tree from step list")
return err
}
message := pubsub.Message{
Labels: map[string]string{
"repo": repo.FullName,
"private": strconv.FormatBool(repo.IsSCMPrivate),
},
}
message.Data, err = json.Marshal(model.Event{
Repo: *repo,
Pipeline: *currentPipeline,
})
if err != nil {
return err
}
s.pubsub.Publish(message)
return nil
return s.notify(c, repo, currentPipeline)
}
// Init signals the workflow is initialized.
@@ -272,21 +259,10 @@ func (s *RPC) Init(c context.Context, strWorkflowID string, state rpc.WorkflowSt
defer func() {
currentPipeline.Workflows, _ = s.store.WorkflowGetTree(currentPipeline)
message := pubsub.Message{
Labels: map[string]string{
"repo": repo.FullName,
"private": strconv.FormatBool(repo.IsSCMPrivate),
},
if err := s.notify(c, repo, currentPipeline); err != nil {
log.Error().Err(err).Msg("could not publish pipeline state change to pubsub")
}
message.Data, err = json.Marshal(model.Event{
Repo: *repo,
Pipeline: *currentPipeline,
})
if err != nil {
log.Error().Err(err).Msg("could not marshal JSON")
return
}
s.pubsub.Publish(message)
}()
workflow, err = pipeline.UpdateWorkflowStatusToRunning(s.store, *workflow, state)
@@ -394,7 +370,7 @@ func (s *RPC) Done(c context.Context, strWorkflowID string, state rpc.WorkflowSt
}
}()
if err := s.notify(repo, currentPipeline); err != nil {
if err := s.notify(c, repo, currentPipeline); err != nil {
return err
}
@@ -603,22 +579,26 @@ func (s *RPC) updateForgeStatus(ctx context.Context, repo *model.Repo, pipeline
}
}
func (s *RPC) notify(repo *model.Repo, pipeline *model.Pipeline) (err error) {
message := pubsub.Message{
Labels: map[string]string{
"repo": repo.FullName,
"private": strconv.FormatBool(repo.IsSCMPrivate),
},
}
// Notify push to our pubsub infra pipeline state changes.
func (s *RPC) notify(c context.Context, repo *model.Repo, pipeline *model.Pipeline) (err error) {
message := pubsub.Message{ID: ulid.Make().String()}
message.Data, err = json.Marshal(model.Event{
Repo: *repo,
Pipeline: *pipeline,
})
if err != nil {
return err
return fmt.Errorf("can't marshal JSON: %w", err)
}
s.pubsub.Publish(message)
return nil
subTopics := make(map[string]struct{})
// if repo is public, push to public topic
if !repo.IsSCMPrivate {
subTopics[pubsub.PublicTopic] = struct{}{}
}
// publish to repo specific topic
subTopics[pubsub.GetRepoTopic(repo)] = struct{}{}
return s.pubsub.Publish(c, subTopics, message)
}
func (s *RPC) getAgentFromContext(ctx context.Context) (*model.Agent, error) {
+1 -1
View File
@@ -37,7 +37,7 @@ type WoodpeckerServer struct {
peer RPC
}
func NewWoodpeckerServer(queue queue.Queue, logger logging.Log, pubsub *pubsub.Publisher, store store.Store) proto.WoodpeckerServer {
func NewWoodpeckerServer(queue queue.Queue, logger logging.Log, pubsub pubsub.PubSub, store store.Store) proto.WoodpeckerServer {
pipelineTime := promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "woodpecker",
Name: "pipeline_time",