From 46b73078e97cd7b8620fb449f7429c2c2fc9fe76 Mon Sep 17 00:00:00 2001 From: 6543 <6543@obermui.de> Date: Thu, 23 Apr 2026 09:57:43 +0200 Subject: [PATCH] e2e test wait for grpc server teardown and stop agents (#6479) --- agent/rpc/client_grpc.go | 8 ++------ e2e/scenarios/agent_routing_test.go | 8 ++++---- e2e/scenarios/cancel_test.go | 2 +- e2e/scenarios/infra_test.go | 2 +- e2e/scenarios/matrix_test.go | 8 ++++---- e2e/scenarios/restart_test.go | 2 +- e2e/scenarios/suite_test.go | 2 +- e2e/setup/agent.go | 26 +++++++++++++------------- e2e/setup/server.go | 7 ++++++- 9 files changed, 33 insertions(+), 32 deletions(-) diff --git a/agent/rpc/client_grpc.go b/agent/rpc/client_grpc.go index 9c1c42d9b8..96f864cf25 100644 --- a/agent/rpc/client_grpc.go +++ b/agent/rpc/client_grpc.go @@ -70,11 +70,6 @@ func NewGrpcClient(ctx context.Context, conn *grpc.ClientConn) rpc.Peer { return client } -func (c *client) Close() error { - close(c.logs) - return c.conn.Close() -} - func (c *client) IsConnected() bool { state := c.conn.GetState() connected := state == connectivity.Ready || state == connectivity.Idle @@ -506,9 +501,10 @@ func (c *client) processLogs(ctx context.Context) { bytes = 0 } - // ctx.Done() is covered by the log channel being closed for { select { + case <-ctx.Done(): + return case entry, ok := <-c.logs: if !ok { log.Info().Msg("log drain: channel closed") diff --git a/e2e/scenarios/agent_routing_test.go b/e2e/scenarios/agent_routing_test.go index f831ae9486..e2ff4661b8 100644 --- a/e2e/scenarios/agent_routing_test.go +++ b/e2e/scenarios/agent_routing_test.go @@ -50,12 +50,12 @@ func TestAgentLabelRouting(t *testing.T) { }) // Plain agent: wildcard repo label only — cannot satisfy gpu=true. - plainAgent := setup.StartAgent(t.Context(), t, env.GRPCAddr, + plainAgent := setup.StartAgent(t, env.GRPCAddr, setup.WithHostname("plain-agent"), ) // GPU agent: carries gpu=true — the only agent that can accept the task. - gpuAgent := setup.StartAgent(t.Context(), t, env.GRPCAddr, + gpuAgent := setup.StartAgent(t, env.GRPCAddr, setup.WithHostname("gpu-agent"), setup.WithCustomLabels(map[string]string{"gpu": "true"}), ) @@ -108,12 +108,12 @@ Func TestOrgAgentPreferredOverGlobal(t *testing.T) { }) // Global agent: matches org-id=* (score 1). - globalAgent := setup.StartAgent(t.Context(), t, env.GRPCAddr, + globalAgent := setup.StartAgent(t, env.GRPCAddr, setup.WithHostname("global-agent"), ) // Org agent: will be patched with the repo's OrgID (score 10). - orgAgent := setup.StartAgent(t.Context(), t, env.GRPCAddr, + orgAgent := setup.StartAgent(t, env.GRPCAddr, setup.WithHostname("org-agent"), setup.WithOrgID(env.Fixtures.Repo.OrgID), ) diff --git a/e2e/scenarios/cancel_test.go b/e2e/scenarios/cancel_test.go index d4168bcec3..835073a055 100644 --- a/e2e/scenarios/cancel_test.go +++ b/e2e/scenarios/cancel_test.go @@ -55,7 +55,7 @@ func TestCancelRunningPipeline(t *testing.T) { env := setup.StartServer(t.Context(), t, []*forge_types.FileMeta{ {Name: ".woodpecker.yaml", Data: cancelPipelineYAML}, }) - agent := setup.StartAgent(t.Context(), t, env.GRPCAddr) + agent := setup.StartAgent(t, env.GRPCAddr) setup.WaitForAgentRegistered(t, env.Store, agent) created, err := pipeline.Create(t.Context(), env.Store, env.Fixtures.Repo, &model.Pipeline{ diff --git a/e2e/scenarios/infra_test.go b/e2e/scenarios/infra_test.go index 6c5bef220f..8b0031cefa 100644 --- a/e2e/scenarios/infra_test.go +++ b/e2e/scenarios/infra_test.go @@ -71,7 +71,7 @@ func TestInfraSmoke(t *testing.T) { env := setup.StartServer(t.Context(), t, []*forge_types.FileMeta{ {Name: ".woodpecker.yaml", Data: simpleSuccessYAML}, }) - agent := setup.StartAgent(t.Context(), t, env.GRPCAddr) + agent := setup.StartAgent(t, env.GRPCAddr) setup.WaitForAgentRegistered(t, env.Store, agent) draftPipeline := &model.Pipeline{ diff --git a/e2e/scenarios/matrix_test.go b/e2e/scenarios/matrix_test.go index a4d8dd7533..9474395b8f 100644 --- a/e2e/scenarios/matrix_test.go +++ b/e2e/scenarios/matrix_test.go @@ -73,7 +73,7 @@ func TestMatrixPipeline(t *testing.T) { env := setup.StartServer(t.Context(), t, []*forge_types.FileMeta{ {Name: ".woodpecker.yaml", Data: matrixPipelineYAML}, }) - agent := setup.StartAgent(t.Context(), t, env.GRPCAddr) + agent := setup.StartAgent(t, env.GRPCAddr) setup.WaitForAgentRegistered(t, env.Store, agent) created, err := pipeline.Create(t.Context(), env.Store, env.Fixtures.Repo, &model.Pipeline{ @@ -139,7 +139,7 @@ func TestMatrixIncludePipeline(t *testing.T) { env := setup.StartServer(t.Context(), t, []*forge_types.FileMeta{ {Name: ".woodpecker.yaml", Data: matrixIncludePipelineYAML}, }) - agent := setup.StartAgent(t.Context(), t, env.GRPCAddr) + agent := setup.StartAgent(t, env.GRPCAddr) setup.WaitForAgentRegistered(t, env.Store, agent) created, err := pipeline.Create(t.Context(), env.Store, env.Fixtures.Repo, &model.Pipeline{ @@ -211,7 +211,7 @@ steps: env := setup.StartServer(t.Context(), t, []*forge_types.FileMeta{ {Name: ".woodpecker.yaml", Data: yaml}, }) - agent := setup.StartAgent(t.Context(), t, env.GRPCAddr) + agent := setup.StartAgent(t, env.GRPCAddr) setup.WaitForAgentRegistered(t, env.Store, agent) created, err := pipeline.Create(t.Context(), env.Store, env.Fixtures.Repo, &model.Pipeline{ @@ -259,7 +259,7 @@ steps: env := setup.StartServer(t.Context(), t, []*forge_types.FileMeta{ {Name: ".woodpecker.yaml", Data: yaml}, }) - agent := setup.StartAgent(t.Context(), t, env.GRPCAddr) + agent := setup.StartAgent(t, env.GRPCAddr) setup.WaitForAgentRegistered(t, env.Store, agent) created, err := pipeline.Create(t.Context(), env.Store, env.Fixtures.Repo, &model.Pipeline{ diff --git a/e2e/scenarios/restart_test.go b/e2e/scenarios/restart_test.go index 051f022bc6..7301303a43 100644 --- a/e2e/scenarios/restart_test.go +++ b/e2e/scenarios/restart_test.go @@ -35,7 +35,7 @@ func TestRestartPipeline(t *testing.T) { env := setup.StartServer(t.Context(), t, []*forge_types.FileMeta{ {Name: ".woodpecker.yaml", Data: simpleSuccessYAML}, }) - agent := setup.StartAgent(t.Context(), t, env.GRPCAddr) + agent := setup.StartAgent(t, env.GRPCAddr) setup.WaitForAgentRegistered(t, env.Store, agent) // First run. diff --git a/e2e/scenarios/suite_test.go b/e2e/scenarios/suite_test.go index ca5c29ae22..93c46d5581 100644 --- a/e2e/scenarios/suite_test.go +++ b/e2e/scenarios/suite_test.go @@ -47,7 +47,7 @@ func runScenario(t *testing.T, sc Scenario) { t.Helper() env := setup.StartServer(t.Context(), t, sc.Files) - agent := setup.StartAgent(t.Context(), t, env.GRPCAddr) + agent := setup.StartAgent(t, env.GRPCAddr) setup.WaitForAgentRegistered(t, env.Store, agent) created, err := pipeline.Create(t.Context(), env.Store, env.Fixtures.Repo, &model.Pipeline{ diff --git a/e2e/setup/agent.go b/e2e/setup/agent.go index 8d1be17ea2..6fd4425d57 100644 --- a/e2e/setup/agent.go +++ b/e2e/setup/agent.go @@ -108,7 +108,7 @@ func WithOrgID(id int64) AgentOption { // server at grpcAddr and returns an *AgentEnv whose AgentID is populated once // the agent has registered. Pass AgentOption values to configure labels, hostname, // or org-scoping; multiple agents can be started in the same test. -func StartAgent(ctx context.Context, t *testing.T, grpcAddr string, opts ...AgentOption) *AgentEnv { //nolint:contextcheck +func StartAgent(t *testing.T, grpcAddr string, opts ...AgentOption) *AgentEnv { t.Helper() cfg := &agentConfig{ @@ -128,8 +128,8 @@ func StartAgent(ctx context.Context, t *testing.T, grpcAddr string, opts ...Agen Timeout: shortTimeout, }) - authCtx, authCancel := context.WithCancelCause(context.Background()) - t.Cleanup(func() { authCancel(nil) }) + agentCtx, agentCancel := context.WithCancelCause(t.Context()) + t.Cleanup(func() { agentCancel(nil) }) authConn, err := grpc.NewClient(grpcAddr, transport, keepaliveOpts) if err != nil { @@ -138,7 +138,7 @@ func StartAgent(ctx context.Context, t *testing.T, grpcAddr string, opts ...Agen t.Cleanup(func() { authConn.Close() }) authClient := agent_rpc.NewAuthGrpcClient(authConn, TestAgentToken, -1) - authInterceptor, err := agent_rpc.NewAuthInterceptor(authCtx, authClient, agentAuthRefreshEvery) //nolint:contextcheck + authInterceptor, err := agent_rpc.NewAuthInterceptor(agentCtx, authClient, agentAuthRefreshEvery) if err != nil { t.Fatalf("StartAgent(%s): authenticate with server: %v", cfg.hostname, err) } @@ -155,20 +155,20 @@ func StartAgent(ctx context.Context, t *testing.T, grpcAddr string, opts ...Agen } t.Cleanup(func() { conn.Close() }) - client := agent_rpc.NewGrpcClient(ctx, conn) + client := agent_rpc.NewGrpcClient(agentCtx, conn) - grpcCtx := metadata.NewOutgoingContext(authCtx, metadata.Pairs("hostname", cfg.hostname)) + grpcCtx := metadata.NewOutgoingContext(agentCtx, metadata.Pairs("hostname", cfg.hostname)) backend := dummy.New() - if !backend.IsAvailable(ctx) { + if !backend.IsAvailable(agentCtx) { t.Fatalf("StartAgent(%s): dummy backend is not available", cfg.hostname) } - engInfo, err := backend.Load(ctx) + engInfo, err := backend.Load(agentCtx) if err != nil { t.Fatalf("StartAgent(%s): load dummy backend: %v", cfg.hostname, err) } - env.AgentID, err = client.RegisterAgent(grpcCtx, rpc.AgentInfo{ //nolint:contextcheck + env.AgentID, err = client.RegisterAgent(grpcCtx, rpc.AgentInfo{ Version: version.String(), Backend: backend.Name(), Platform: engInfo.Platform, @@ -218,16 +218,16 @@ func StartAgent(ctx context.Context, t *testing.T, grpcAddr string, opts ...Agen runner := agent.NewRunner(client, filter, cfg.hostname, counter, backend) log.Debug().Int("slot", slot).Str("hostname", cfg.hostname).Msg("test agent: runner started") for { - if ctx.Err() != nil { + if agentCtx.Err() != nil { return } - if err := runner.Run(ctx); err != nil { - if ctx.Err() != nil { + if err := runner.Run(agentCtx); err != nil { + if agentCtx.Err() != nil { return } log.Error().Err(err).Int("slot", slot).Str("hostname", cfg.hostname).Msg("test agent: runner error, retrying") select { - case <-ctx.Done(): + case <-agentCtx.Done(): return case <-time.After(500 * time.Millisecond): } diff --git a/e2e/setup/server.go b/e2e/setup/server.go index 48ad81fb6c..2088254d4d 100644 --- a/e2e/setup/server.go +++ b/e2e/setup/server.go @@ -193,10 +193,12 @@ func startGRPCServer(ctx context.Context, t *testing.T, s store.Store) string { s, )) + stopped := make(chan struct{}) grpcCtx, grpcCancel := context.WithCancelCause(ctx) go func() { <-grpcCtx.Done() grpcServer.GracefulStop() + close(stopped) }() go func() { if err := grpcServer.Serve(lis); err != nil { @@ -204,6 +206,9 @@ func startGRPCServer(ctx context.Context, t *testing.T, s store.Store) string { } }() - t.Cleanup(func() { grpcCancel(nil) }) + t.Cleanup(func() { + grpcCancel(nil) + <-stopped + }) return addr }