diff --git a/cmd/server/setup.go b/cmd/server/setup.go index 69510b2962..9e157e1290 100644 --- a/cmd/server/setup.go +++ b/cmd/server/setup.go @@ -34,7 +34,7 @@ import ( "go.woodpecker-ci.org/woodpecker/v3/server/forge/setup" "go.woodpecker-ci.org/woodpecker/v3/server/logging" "go.woodpecker-ci.org/woodpecker/v3/server/model" - "go.woodpecker-ci.org/woodpecker/v3/server/pubsub" + "go.woodpecker-ci.org/woodpecker/v3/server/pubsub/memory" "go.woodpecker-ci.org/woodpecker/v3/server/queue" "go.woodpecker-ci.org/woodpecker/v3/server/services" service_log "go.woodpecker-ci.org/woodpecker/v3/server/services/log" @@ -159,7 +159,7 @@ func setupJWTSecret(_store store.Store) (string, error) { func setupEvilGlobals(ctx context.Context, c *cli.Command, s store.Store) (err error) { // services server.Config.Services.Logs = logging.New() - server.Config.Services.Pubsub = pubsub.New() + server.Config.Services.Pubsub = memory.New() server.Config.Services.Membership = setupMembershipService(ctx, s) server.Config.Services.Queue, err = setupQueue(ctx, s) if err != nil { diff --git a/server/api/pipeline_test.go b/server/api/pipeline_test.go index 7c8738376c..1164fd2184 100644 --- a/server/api/pipeline_test.go +++ b/server/api/pipeline_test.go @@ -31,7 +31,7 @@ import ( forge_mocks "go.woodpecker-ci.org/woodpecker/v3/server/forge/mocks" forge_types "go.woodpecker-ci.org/woodpecker/v3/server/forge/types" "go.woodpecker-ci.org/woodpecker/v3/server/model" - "go.woodpecker-ci.org/woodpecker/v3/server/pubsub" + "go.woodpecker-ci.org/woodpecker/v3/server/pubsub/memory" queue_mocks "go.woodpecker-ci.org/woodpecker/v3/server/queue/mocks" config_service_mocks "go.woodpecker-ci.org/woodpecker/v3/server/services/config/mocks" manager_mocks "go.woodpecker-ci.org/woodpecker/v3/server/services/mocks" @@ -264,7 +264,7 @@ func TestCancelPipeline(t *testing.T) { mockManager := manager_mocks.NewMockManager(t) mockManager.On("ForgeFromRepo", fakeRepo).Return(mockForge, nil) server.Config.Services.Manager = mockManager - server.Config.Services.Pubsub = pubsub.New() + server.Config.Services.Pubsub = memory.New() w := httptest.NewRecorder() c, _ := gin.CreateTestContext(w) @@ -315,7 +315,7 @@ func TestCreatePipeline(t *testing.T) { mockManager.On("EnvironmentService").Return(nil).Maybe() server.Config.Services.Manager = mockManager - server.Config.Services.Pubsub = pubsub.New() + server.Config.Services.Pubsub = memory.New() mockQueue := queue_mocks.NewMockQueue(t) mockQueue.On("Push", mock.Anything, mock.Anything).Return(nil).Maybe() mockQueue.On("PushAtOnce", mock.Anything, mock.Anything).Return(nil).Maybe() @@ -388,7 +388,7 @@ func TestCreatePipeline(t *testing.T) { mockManager.On("EnvironmentService").Return(nil).Maybe() server.Config.Services.Manager = mockManager - server.Config.Services.Pubsub = pubsub.New() + server.Config.Services.Pubsub = memory.New() mockQueue := queue_mocks.NewMockQueue(t) mockQueue.On("Push", mock.Anything, mock.Anything).Return(nil).Maybe() mockQueue.On("PushAtOnce", mock.Anything, mock.Anything).Return(nil).Maybe() @@ -444,7 +444,7 @@ func TestCreatePipeline(t *testing.T) { mockManager.On("ForgeFromRepo", fakeRepo).Return(mockForge, nil) mockManager.On("ConfigServiceFromRepo", fakeRepo).Return(mockConfigService) server.Config.Services.Manager = mockManager - server.Config.Services.Pubsub = pubsub.New() + server.Config.Services.Pubsub = memory.New() // return nil config with error mockConfigService.On("Fetch", mock.Anything, mockForge, fakeUser, fakeRepo, mock.Anything, mock.Anything, false).Return(nil, http.ErrHandlerTimeout) diff --git a/server/api/stream.go b/server/api/stream.go index 2d7fd6cd40..9c445c7958 100644 --- a/server/api/stream.go +++ b/server/api/stream.go @@ -71,11 +71,14 @@ func EventStreamSSE(c *gin.Context) { log.Debug().Msg("user feed: connection opened") user := session.User(c) - repo := map[string]bool{} + subTopics := make(map[string]struct{}) + // subscribe to all public state changes + subTopics[pubsub.PublicTopic] = struct{}{} + // subscribe to all private state changes or repos the user owns if user != nil { repos, _ := store.FromContext(c).RepoList(user, false, true, nil) for _, r := range repos { - repo[r.FullName] = true + subTopics[pubsub.GetRepoTopic(r)] = struct{}{} } } @@ -92,23 +95,16 @@ func EventStreamSSE(c *gin.Context) { }() go func() { - server.Config.Services.Pubsub.Subscribe(ctx, func(m pubsub.Message) { - defer func() { - obj := recover() // fix #2480 // TODO: check if it's still needed - log.Trace().Msgf("pubsub subscribe recover return: %v", obj) - }() - name := m.Labels["repo"] - priv := m.Labels["private"] - if repo[name] || priv == "false" { + err := server.Config.Services.Pubsub.Subscribe(ctx, subTopics, + func(m pubsub.Message) { select { case <-ctx.Done(): return default: eventChan <- m.Data } - } - }) - cancel(nil) + }) + cancel(err) }() for { diff --git a/server/config.go b/server/config.go index 29fa452df0..b94b98e743 100644 --- a/server/config.go +++ b/server/config.go @@ -30,7 +30,7 @@ import ( var Config = struct { Services struct { - Pubsub *pubsub.Publisher + Pubsub pubsub.PubSub Queue queue.Queue Logs logging.Log Membership cache.MembershipService diff --git a/server/pipeline/cancel.go b/server/pipeline/cancel.go index e82c2a2584..cff9243e1f 100644 --- a/server/pipeline/cancel.go +++ b/server/pipeline/cancel.go @@ -89,7 +89,10 @@ func Cancel(ctx context.Context, _forge forge.Forge, store store.Store, repo *mo if killedPipeline.Workflows, err = store.WorkflowGetTree(killedPipeline); err != nil { return err } - publishToTopic(killedPipeline, repo) + + if err := publishToTopic(ctx, killedPipeline, repo); err != nil { + log.Error().Err(err).Msg("could not push pipeline status change to pubsub provider") + } return nil } diff --git a/server/pipeline/decline.go b/server/pipeline/decline.go index d717f878f0..56336c043e 100644 --- a/server/pipeline/decline.go +++ b/server/pipeline/decline.go @@ -64,7 +64,9 @@ func Decline(ctx context.Context, store store.Store, pipeline *model.Pipeline, u updatePipelineStatus(ctx, forge, pipeline, repo, user) - publishToTopic(pipeline, repo) + if err := publishToTopic(ctx, pipeline, repo); err != nil { + log.Error().Err(err).Msg("could not push pipeline status change to pubsub provider") + } return pipeline, nil } diff --git a/server/pipeline/start.go b/server/pipeline/start.go index 0316f2b935..cb7b6480f5 100644 --- a/server/pipeline/start.go +++ b/server/pipeline/start.go @@ -54,6 +54,8 @@ func prepareStart(ctx context.Context, forge forge.Forge, store store.Store, act } func publishPipeline(ctx context.Context, forge forge.Forge, pipeline *model.Pipeline, repo *model.Repo, repoUser *model.User) { - publishToTopic(pipeline, repo) + if err := publishToTopic(ctx, pipeline, repo); err != nil { + log.Error().Err(err).Msg("could not push pipeline status change to pubsub provider") + } updatePipelineStatus(ctx, forge, pipeline, repo, repoUser) } diff --git a/server/pipeline/topic.go b/server/pipeline/topic.go index 8bcb52efd0..6477fdc704 100644 --- a/server/pipeline/topic.go +++ b/server/pipeline/topic.go @@ -15,10 +15,11 @@ package pipeline import ( + "context" "encoding/json" - "strconv" + "fmt" - "github.com/rs/zerolog/log" + "github.com/oklog/ulid/v2" "go.woodpecker-ci.org/woodpecker/v3/server" "go.woodpecker-ci.org/woodpecker/v3/server/model" @@ -26,23 +27,23 @@ import ( ) // publishToTopic publishes message to UI clients. -func publishToTopic(pipeline *model.Pipeline, repo *model.Repo) { - message := pubsub.Message{ - Labels: map[string]string{ - "repo": repo.FullName, - "private": strconv.FormatBool(repo.IsSCMPrivate), - }, - } - pipelineCopy := *pipeline - - var err error +func publishToTopic(c context.Context, pipeline *model.Pipeline, repo *model.Repo) (err error) { + message := pubsub.Message{ID: ulid.Make().String()} message.Data, err = json.Marshal(model.Event{ Repo: *repo, - Pipeline: pipelineCopy, + Pipeline: *pipeline, }) if err != nil { - log.Error().Err(err).Msg("can't marshal JSON") - return + return fmt.Errorf("can't marshal JSON: %w", err) } - server.Config.Services.Pubsub.Publish(message) + + subTopics := make(map[string]struct{}) + // if repo is public, push to public topic + if !repo.IsSCMPrivate { + subTopics[pubsub.PublicTopic] = struct{}{} + } + // publish to repo specific topic + subTopics[pubsub.GetRepoTopic(repo)] = struct{}{} + + return server.Config.Services.Pubsub.Publish(c, subTopics, message) } diff --git a/server/pubsub/LICENSE b/server/pubsub/LICENSE deleted file mode 100644 index 64e202179d..0000000000 --- a/server/pubsub/LICENSE +++ /dev/null @@ -1,29 +0,0 @@ -BSD 3-Clause License - -Copyright (c) 2017, Brad Rydzewski -All rights reserved. - -Redistribution and use in source and binary forms, with or without -modification, are permitted provided that the following conditions are met: - -* Redistributions of source code must retain the above copyright notice, this - list of conditions and the following disclaimer. - -* Redistributions in binary form must reproduce the above copyright notice, - this list of conditions and the following disclaimer in the documentation - and/or other materials provided with the distribution. - -* Neither the name of the copyright holder nor the names of its - contributors may be used to endorse or promote products derived from - this software without specific prior written permission. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" -AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE -DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE -FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL -DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR -SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER -CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, -OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/server/pubsub/memory/pub.go b/server/pubsub/memory/pub.go new file mode 100644 index 0000000000..d66a2e331e --- /dev/null +++ b/server/pubsub/memory/pub.go @@ -0,0 +1,89 @@ +// Copyright 2026 Woodpecker Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package memory + +import ( + "context" + "fmt" + "slices" + "sync" + + "github.com/rs/zerolog/log" + + "go.woodpecker-ci.org/woodpecker/v3/server/pubsub" +) + +type publisher struct { + sync.RWMutex + + subs map[*pubsub.Receiver][]string +} + +// New creates an in-memory publisher. +func New() pubsub.PubSub { + return &publisher{ + subs: make(map[*pubsub.Receiver][]string), + } +} + +func (p *publisher) Publish(_ context.Context, topics pubsub.Topics, message pubsub.Message) error { + if len(topics) == 0 { + return fmt.Errorf("%w: specify at least one", pubsub.ErrNoTopic) + } + + p.RLock() + defer p.RUnlock() + + for s, tl := range p.subs { + // callback is from outside so just make sure it still exists + if s == nil || *s == nil { + log.Error().Msg("found nil callback func in subscribers!") + continue + } + + for t := range topics { + if slices.Contains(tl, t) { + go (*s)(message) + break + } + } + } + + return nil +} + +func (p *publisher) Subscribe(c context.Context, topics pubsub.Topics, receiver pubsub.Receiver) error { + if len(topics) == 0 { + return fmt.Errorf("%w: subscribe to at least one", pubsub.ErrNoTopic) + } + + var tl []string + for k := range topics { + tl = append(tl, k) + } + + defer func() { + p.Lock() + delete(p.subs, &receiver) + p.Unlock() + }() + + p.Lock() + p.subs[&receiver] = tl + p.Unlock() + + <-c.Done() + return nil +} diff --git a/server/pubsub/pub_test.go b/server/pubsub/memory/pub_test.go similarity index 65% rename from server/pubsub/pub_test.go rename to server/pubsub/memory/pub_test.go index b18d34793a..8ab1993c35 100644 --- a/server/pubsub/pub_test.go +++ b/server/pubsub/memory/pub_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package pubsub +package memory import ( "context" @@ -21,13 +21,17 @@ import ( "time" "github.com/stretchr/testify/assert" + + "go.woodpecker-ci.org/woodpecker/v3/server/pubsub" ) func TestPubsub(t *testing.T) { var ( wg sync.WaitGroup - testMessage = Message{ + testTopic = map[string]struct{}{"test": {}} + + testMessage = pubsub.Message{ Data: []byte("test"), } ) @@ -35,20 +39,21 @@ func TestPubsub(t *testing.T) { ctx, cancel := context.WithCancelCause( t.Context(), ) - broker := New() + + assert.Error(t, broker.Subscribe(ctx, nil, func(pubsub.Message) {})) go func() { - broker.Subscribe(ctx, func(message Message) { assert.Equal(t, testMessage, message); wg.Done() }) + assert.NoError(t, broker.Subscribe(ctx, testTopic, func(message pubsub.Message) { assert.Equal(t, testMessage, message); wg.Done() })) }() go func() { - broker.Subscribe(ctx, func(_ Message) { wg.Done() }) + assert.NoError(t, broker.Subscribe(ctx, testTopic, func(pubsub.Message) { wg.Done() })) }() <-time.After(500 * time.Millisecond) wg.Add(2) go func() { - broker.Publish(testMessage) + assert.NoError(t, broker.Publish(ctx, testTopic, testMessage)) }() wg.Wait() diff --git a/server/pubsub/pub.go b/server/pubsub/pub.go deleted file mode 100644 index 73d2114c93..0000000000 --- a/server/pubsub/pub.go +++ /dev/null @@ -1,66 +0,0 @@ -// Copyright 2023 Woodpecker Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package pubsub - -import ( - "context" - "sync" -) - -// Message defines a published message. -type Message struct { - // ID identifies this message. - ID string `json:"id,omitempty"` - - // Data is the actual data in the entry. - Data []byte `json:"data"` - - // Labels represents the key-value pairs the entry is labeled with. - Labels map[string]string `json:"labels,omitempty"` -} - -// Receiver receives published messages. -type Receiver func(Message) - -type Publisher struct { - sync.Mutex - - subs map[*Receiver]struct{} -} - -// New creates an in-memory publisher. -func New() *Publisher { - return &Publisher{ - subs: make(map[*Receiver]struct{}), - } -} - -func (p *Publisher) Publish(message Message) { - p.Lock() - for s := range p.subs { - go (*s)(message) - } - p.Unlock() -} - -func (p *Publisher) Subscribe(c context.Context, receiver Receiver) { - p.Lock() - p.subs[&receiver] = struct{}{} - p.Unlock() - <-c.Done() - p.Lock() - delete(p.subs, &receiver) - p.Unlock() -} diff --git a/server/pubsub/pubsub.go b/server/pubsub/pubsub.go new file mode 100644 index 0000000000..2945c81c71 --- /dev/null +++ b/server/pubsub/pubsub.go @@ -0,0 +1,58 @@ +// Copyright 2026 Woodpecker Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pubsub + +import ( + "context" + "errors" + "fmt" + + "go.woodpecker-ci.org/woodpecker/v3/server/model" +) + +// PubSub provider interface, used to signal pipeline state changes to WebUI. +type PubSub interface { + // Publish pushes a state change to all subscribers, + // that have at least subscribed to one of the topics we publish it under. + Publish(context.Context, Topics, Message) error + // Subscribe gets all state changes that match the same topic. + // If multiple topics are subscribed, and a message also match multiple, + // the implementation takes care of deduplication. + Subscribe(context.Context, Topics, Receiver) error +} + +// Message defines a published message. +type Message struct { + // ID identifies this message. + ID string `json:"id,omitempty"` + + // Data is the actual data in the entry. + Data []byte `json:"data"` +} + +// Receiver receives published messages. +type Receiver func(Message) + +// Topics are key-value pairs, messages are filtered upon +// the the key is the base-key and the value to the sub-key. +type Topics map[string]struct{} + +func GetRepoTopic(r *model.Repo) string { + return fmt.Sprintf("repo.id.%d", r.ID) +} + +const PublicTopic = "public" + +var ErrNoTopic = errors.New("no topic specified") diff --git a/server/pubsub/pubsub_test.go b/server/pubsub/pubsub_test.go new file mode 100644 index 0000000000..8905eb9947 --- /dev/null +++ b/server/pubsub/pubsub_test.go @@ -0,0 +1,100 @@ +// Copyright 2026 Woodpecker Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pubsub_test + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "go.woodpecker-ci.org/woodpecker/v3/server/pubsub" + "go.woodpecker-ci.org/woodpecker/v3/server/pubsub/memory" +) + +func TestPubSub(t *testing.T) { + // for each pubsub adapter (currently we have only one) + t.Run("in_memory", func(t *testing.T) { + testPubSub(t, memory.New()) + }) +} + +func testPubSub(t *testing.T, adapter pubsub.PubSub) { + assert.NoError(t, + adapter.Publish(t.Context(), pubsub.Topics{"a": {}}, pubsub.Message{ID: "1", Data: []byte(`dummy`)}), + "expect no issue publish to a pubsub with no subscribers", + ) + + t.Run("test deduplication asumptions", func(t *testing.T) { + treeTopicCloser, treeTopicGetMSGs := genTestSub(t, adapter, pubsub.Topics{"tree": {}}) + t.Cleanup(treeTopicCloser) + closer, getMSGs := genTestSub(t, adapter, pubsub.Topics{"apples": {}, "tree": {}, "raspberry": {}}) + t.Cleanup(closer) + assert.Len(t, getMSGs(), 0) + + time.Sleep(10 * time.Millisecond) + assert.NoError(t, adapter.Publish(t.Context(), pubsub.Topics{"tree": {}, "raspberry": {}, "tails": {}}, pubsub.Message{ID: "2"})) + assert.NoError(t, adapter.Publish(t.Context(), pubsub.Topics{"apples": {}, "raspberry": {}, "tails": {}}, pubsub.Message{ID: "3"})) + time.Sleep(100 * time.Millisecond) + + if assert.Len(t, getMSGs(), 2) { + assert.ElementsMatch(t, []string{"2", "3"}, messagesToIDs(getMSGs())) + } + + assert.EqualValues(t, "2", treeTopicGetMSGs()[0].ID) + }) + + t.Run("test adapters calc for strange input", func(t *testing.T) { + t.Run("empty topic", func(t *testing.T) { + assert.Error(t, adapter.Subscribe(t.Context(), nil, func(pubsub.Message) {})) + assert.Error(t, adapter.Subscribe(t.Context(), pubsub.Topics{}, func(pubsub.Message) {})) + + assert.Error(t, adapter.Publish(t.Context(), nil, pubsub.Message{})) + assert.Error(t, adapter.Publish(t.Context(), pubsub.Topics{}, pubsub.Message{})) + }) + }) +} + +func genTestSub(t *testing.T, adapter pubsub.PubSub, topics pubsub.Topics) (close func(), getMSGs func() []pubsub.Message) { + ctx, closer := context.WithCancelCause(t.Context()) + var mu sync.Mutex + var messages []pubsub.Message + + go func() { + err := adapter.Subscribe(ctx, topics, func(m pubsub.Message) { + mu.Lock() + messages = append(messages, m) + mu.Unlock() + }) + assert.NoError(t, err) + }() + + return func() { closer(nil) }, func() []pubsub.Message { + mu.Lock() + defer mu.Unlock() + cp := make([]pubsub.Message, len(messages)) + copy(cp, messages) + return cp + } +} + +func messagesToIDs(msgs []pubsub.Message) (ids []string) { + for i := range msgs { + ids = append(ids, msgs[i].ID) + } + return ids +} diff --git a/server/rpc/rpc.go b/server/rpc/rpc.go index 72372589f6..0b97fd8b81 100644 --- a/server/rpc/rpc.go +++ b/server/rpc/rpc.go @@ -25,6 +25,7 @@ import ( "strconv" "time" + "github.com/oklog/ulid/v2" "github.com/prometheus/client_golang/prometheus" "github.com/rs/zerolog/log" grpc_metadata "google.golang.org/grpc/metadata" @@ -45,7 +46,7 @@ const updateAgentLastWorkDelay = time.Minute type RPC struct { queue queue.Queue - pubsub *pubsub.Publisher + pubsub pubsub.PubSub logger logging.Log store store.Store pipelineTime *prometheus.GaugeVec @@ -207,22 +208,8 @@ func (s *RPC) Update(c context.Context, strWorkflowID string, state rpc.StepStat log.Error().Err(err).Msg("cannot build tree from step list") return err } - message := pubsub.Message{ - Labels: map[string]string{ - "repo": repo.FullName, - "private": strconv.FormatBool(repo.IsSCMPrivate), - }, - } - message.Data, err = json.Marshal(model.Event{ - Repo: *repo, - Pipeline: *currentPipeline, - }) - if err != nil { - return err - } - s.pubsub.Publish(message) - return nil + return s.notify(c, repo, currentPipeline) } // Init signals the workflow is initialized. @@ -272,21 +259,10 @@ func (s *RPC) Init(c context.Context, strWorkflowID string, state rpc.WorkflowSt defer func() { currentPipeline.Workflows, _ = s.store.WorkflowGetTree(currentPipeline) - message := pubsub.Message{ - Labels: map[string]string{ - "repo": repo.FullName, - "private": strconv.FormatBool(repo.IsSCMPrivate), - }, + + if err := s.notify(c, repo, currentPipeline); err != nil { + log.Error().Err(err).Msg("could not publish pipeline state change to pubsub") } - message.Data, err = json.Marshal(model.Event{ - Repo: *repo, - Pipeline: *currentPipeline, - }) - if err != nil { - log.Error().Err(err).Msg("could not marshal JSON") - return - } - s.pubsub.Publish(message) }() workflow, err = pipeline.UpdateWorkflowStatusToRunning(s.store, *workflow, state) @@ -394,7 +370,7 @@ func (s *RPC) Done(c context.Context, strWorkflowID string, state rpc.WorkflowSt } }() - if err := s.notify(repo, currentPipeline); err != nil { + if err := s.notify(c, repo, currentPipeline); err != nil { return err } @@ -603,22 +579,26 @@ func (s *RPC) updateForgeStatus(ctx context.Context, repo *model.Repo, pipeline } } -func (s *RPC) notify(repo *model.Repo, pipeline *model.Pipeline) (err error) { - message := pubsub.Message{ - Labels: map[string]string{ - "repo": repo.FullName, - "private": strconv.FormatBool(repo.IsSCMPrivate), - }, - } +// Notify push to our pubsub infra pipeline state changes. +func (s *RPC) notify(c context.Context, repo *model.Repo, pipeline *model.Pipeline) (err error) { + message := pubsub.Message{ID: ulid.Make().String()} message.Data, err = json.Marshal(model.Event{ Repo: *repo, Pipeline: *pipeline, }) if err != nil { - return err + return fmt.Errorf("can't marshal JSON: %w", err) } - s.pubsub.Publish(message) - return nil + + subTopics := make(map[string]struct{}) + // if repo is public, push to public topic + if !repo.IsSCMPrivate { + subTopics[pubsub.PublicTopic] = struct{}{} + } + // publish to repo specific topic + subTopics[pubsub.GetRepoTopic(repo)] = struct{}{} + + return s.pubsub.Publish(c, subTopics, message) } func (s *RPC) getAgentFromContext(ctx context.Context) (*model.Agent, error) { diff --git a/server/rpc/server.go b/server/rpc/server.go index 51b01be13a..d9cee658e3 100644 --- a/server/rpc/server.go +++ b/server/rpc/server.go @@ -37,7 +37,7 @@ type WoodpeckerServer struct { peer RPC } -func NewWoodpeckerServer(queue queue.Queue, logger logging.Log, pubsub *pubsub.Publisher, store store.Store) proto.WoodpeckerServer { +func NewWoodpeckerServer(queue queue.Queue, logger logging.Log, pubsub pubsub.PubSub, store store.Store) proto.WoodpeckerServer { pipelineTime := promauto.NewGaugeVec(prometheus.GaugeOpts{ Namespace: "woodpecker", Name: "pipeline_time",