1
0
mirror of https://github.com/SAP/jenkins-library.git synced 2025-02-17 14:11:24 +02:00
Googlom af05acad58
feat(events): Publish events to GCP PubSub by each step ()
* test

* test

* draft

* generator

* some polishing

* go mod tidy

* fix unit

* fix unit

* fix unit

* fix unit

* fix unit

* resolve review comments

* resolve review comments

* add debug message on successful publish

* refactor to use global vault client

* cleanup

* rename

* clenup

* refactor

* remove token revocation

* handle nil vaultClient and add comments

* feat(events): Publish events (generated part) ()

* add generated

* add generated

* refactor vaultClient usage

* fix unit tests

* fix unit tests

* fix
2024-10-11 14:55:39 +05:00

81 lines
2.4 KiB
Go

package gcp
import (
"cloud.google.com/go/pubsub"
"context"
piperConfig "github.com/SAP/jenkins-library/pkg/config"
"github.com/SAP/jenkins-library/pkg/log"
"github.com/pkg/errors"
"golang.org/x/oauth2"
"google.golang.org/api/option"
)
type PubsubClient interface {
Publish(topic string, data []byte) error
}
type pubsubClient struct {
vaultClient piperConfig.VaultClient
projectNumber string
pool string
provider string
orderingKey string
oidcRoleId string
}
func NewGcpPubsubClient(vaultClient piperConfig.VaultClient, projectNumber, pool, provider, orderingKey, oidcRoleId string) PubsubClient {
return &pubsubClient{
vaultClient: vaultClient,
projectNumber: projectNumber,
pool: pool,
provider: provider,
orderingKey: orderingKey,
oidcRoleId: oidcRoleId,
}
}
func (cl *pubsubClient) Publish(topic string, data []byte) error {
ctx := context.Background()
psClient, err := cl.getAuthorizedGCPClient(ctx)
if err != nil {
return errors.Wrap(err, "could not get authorized pubsub client token")
}
return cl.publish(ctx, psClient, topic, cl.orderingKey, data)
}
func (cl *pubsubClient) getAuthorizedGCPClient(ctx context.Context) (*pubsub.Client, error) {
if cl.vaultClient == nil {
return nil, errors.New("Vault client is not configured")
}
oidcToken, err := cl.vaultClient.GetOIDCTokenByValidation(cl.oidcRoleId)
if err != nil {
return nil, errors.Wrap(err, "could not get oidc token")
}
accessToken, err := getFederatedToken(cl.projectNumber, cl.pool, cl.provider, oidcToken)
if err != nil {
return nil, errors.Wrap(err, "could not get federated token")
}
staticTokenSource := oauth2.StaticTokenSource(&oauth2.Token{AccessToken: accessToken})
return pubsub.NewClient(ctx, cl.projectNumber, option.WithTokenSource(staticTokenSource))
}
func (cl *pubsubClient) publish(ctx context.Context, psClient *pubsub.Client, topic, orderingKey string, data []byte) error {
t := psClient.Topic(topic)
t.EnableMessageOrdering = true
publishResult := t.Publish(ctx, &pubsub.Message{Data: data, OrderingKey: orderingKey})
// publishResult.Get() will make API call synchronous by awaiting messageId or error.
// By removing .Get() method call we can make publishing asynchronous, but without ability to catch errors
msgID, err := publishResult.Get(context.Background())
if err != nil {
return errors.Wrap(err, "event publish failed")
}
log.Entry().Debugf("Event published with ID: %s", msgID)
return nil
}