1
0
mirror of https://github.com/go-micro/go-micro.git synced 2026-06-15 19:35:13 +02:00
Files
Asim Aslam e416ea4a75 Enhance agent workflows with guardrails and documentation updates (#2952)
* docs: map go-micro onto Anthropic's workflows-vs-agents taxonomy

- new guide 'Agents and Workflows': adopts Anthropic's Building Effective
  Agents vocabulary — workflow (predefined path) = flow, agent (dynamic
  self-direction) = agent — maps the augmented-LLM building block and the
  five workflow patterns onto go-micro, and shows routing (chat router)
  and orchestrator-workers (conductor + plan/delegate) are already native.
- flow package doc reframed as a workflow (predefined path) per the same
  taxonomy, with guidance on flow vs agent.
- nav + README link the new guide.

* feat: agent guardrails — step limit and tool approval hook

Anthropic's Building Effective Agents stresses stopping conditions and
human-in-the-loop checkpoints for autonomous agents. Add both as plain
options enforced at the tool-handler choke point — no provider changes,
no new abstraction:

- MaxSteps(n): bound tool executions per Ask; beyond the limit, actions
  are refused and the model is told to stop and summarize.
- ApproveTool(fn): gate each action before it runs; returning false
  blocks it and surfaces the reason to the model. The internal plan tool
  is never gated.

Exposed at the micro package (AgentMaxSteps, AgentApproveTool, ApproveFunc).
Tests cover the limit, blocking, and that plan is not gated. Guardrails
section of the agents-and-workflows guide updated from 'active work' to
documented options.

* feat: flow can dispatch to an agent (flow triggers, agent reasons)

Unify the engine without collapsing the workflow/agent distinction. A
Flow with Agent set hands each event's rendered prompt to a named
registered agent over RPC (Agent.Chat) instead of running its own LLM
step — so the workflow stays the deterministic trigger and the agent is
the reasoning engine, with its plan, delegate, memory, and guardrails.
A plain flow is unchanged (single augmented-LLM step).

- flow.Agent(name) / micro.FlowAgent(name); flow stores the client and
  skips model setup when dispatching.
- test: dispatch routes to comms.Agent.Chat with the rendered prompt and
  records the reply.
- guide: 'Flow triggers, Agent reasons' section.

---------

Co-authored-by: Claude <noreply@anthropic.com>
2026-06-08 08:32:32 +01:00

264 lines
7.2 KiB
Go

// Package flow provides event-driven workflows for go-micro services.
//
// A Flow is a workflow in the sense of Anthropic's "Building Effective
// Agents": LLMs and tools orchestrated through a predefined path. It
// subscribes to a broker topic and, for each event, runs one augmented
// LLM step — the registered services as tools, a fixed prompt — and
// lets the model decide which RPCs to call. Use a Flow when the task is
// well-defined and you want a deterministic trigger; use an Agent (see
// the agent package) when the work needs to direct itself dynamically.
//
// Usage:
//
// f := flow.New("onboard-user",
// flow.Trigger("events.user.created"),
// flow.Prompt("New user created: {{.Data}}. Send welcome email and create workspace."),
// flow.Provider("anthropic"),
// flow.APIKey(key),
// )
// f.Register(service)
// service.Run()
package flow
import (
"bytes"
"context"
"encoding/json"
"fmt"
"sync"
"text/template"
"time"
"go-micro.dev/v5/ai"
"go-micro.dev/v5/broker"
"go-micro.dev/v5/client"
codecbytes "go-micro.dev/v5/codec/bytes"
"go-micro.dev/v5/logger"
"go-micro.dev/v5/registry"
// Register default providers.
_ "go-micro.dev/v5/ai/anthropic"
_ "go-micro.dev/v5/ai/atlascloud"
_ "go-micro.dev/v5/ai/gemini"
_ "go-micro.dev/v5/ai/groq"
_ "go-micro.dev/v5/ai/mistral"
_ "go-micro.dev/v5/ai/openai"
_ "go-micro.dev/v5/ai/together"
)
// Flow is an event-driven LLM orchestration unit. It subscribes to
// a broker topic, discovers services as tools, and feeds each event
// into an LLM that decides which RPCs to call.
type Flow struct {
name string
opts Options
model ai.Model
toolSet *ai.Tools
client client.Client
tmpl *template.Template
log logger.Logger
mu sync.Mutex
results []Result
}
// Result records one flow execution.
type Result struct {
FlowName string `json:"flow"`
Trigger string `json:"trigger"`
Prompt string `json:"prompt"`
Reply string `json:"reply,omitempty"`
Answer string `json:"answer,omitempty"`
ToolCalls []string `json:"tool_calls,omitempty"`
Error string `json:"error,omitempty"`
Timestamp time.Time `json:"timestamp"`
Duration float64 `json:"duration_seconds"`
}
// New creates a Flow with the given name and options.
func New(name string, opts ...Option) *Flow {
o := Options{
Provider: "openai",
SystemPrompt: "You are a service orchestrator. Use the available tools to fulfill the request. Explain what you do.",
HistoryLimit: 20,
}
for _, opt := range opts {
opt(&o)
}
var tmpl *template.Template
if o.Prompt != "" {
var err error
tmpl, err = template.New(name).Parse(o.Prompt)
if err != nil {
tmpl = template.Must(template.New(name).Parse("{{.Data}}"))
}
}
return &Flow{
name: name,
opts: o,
tmpl: tmpl,
log: logger.DefaultLogger,
}
}
// Register wires the flow into a running service. It sets up the
// model, discovers tools from the registry, and subscribes to the
// trigger topic on the broker. Call this before service.Run().
func (f *Flow) Register(reg registry.Registry, br broker.Broker, cl client.Client) error {
f.client = cl
f.toolSet = ai.NewTools(reg, ai.ToolClient(cl))
// A flow that dispatches to an agent doesn't run its own model — the
// agent is the engine. Otherwise, set up the augmented LLM.
if f.opts.Agent == "" {
var modelOpts []ai.Option
if f.opts.APIKey != "" {
modelOpts = append(modelOpts, ai.WithAPIKey(f.opts.APIKey))
}
if f.opts.Model != "" {
modelOpts = append(modelOpts, ai.WithModel(f.opts.Model))
}
if f.opts.BaseURL != "" {
modelOpts = append(modelOpts, ai.WithBaseURL(f.opts.BaseURL))
}
modelOpts = append(modelOpts, ai.WithTools(f.toolSet))
f.model = ai.New(f.opts.Provider, modelOpts...)
if f.model == nil {
return fmt.Errorf("unknown provider: %s", f.opts.Provider)
}
}
if f.opts.TriggerTopic != "" {
_, err := br.Subscribe(f.opts.TriggerTopic, func(p broker.Event) error {
data := string(p.Message().Body)
if err := f.Execute(context.Background(), data); err != nil {
f.log.Logf(logger.ErrorLevel, "Flow %s failed: %v", f.name, err)
}
return nil
})
if err != nil {
return fmt.Errorf("subscribe to %s: %w", f.opts.TriggerTopic, err)
}
f.log.Logf(logger.InfoLevel, "Flow %s subscribed to %s", f.name, f.opts.TriggerTopic)
}
return nil
}
// Execute runs the flow once with the given input data. This is
// called automatically on each broker event, but can also be
// invoked directly for testing or one-shot use.
func (f *Flow) Execute(ctx context.Context, data string) error {
start := time.Now()
prompt := data
if f.tmpl != nil {
var buf bytes.Buffer
f.tmpl.Execute(&buf, map[string]string{"Data": data})
prompt = buf.String()
}
result := Result{
FlowName: f.name,
Trigger: f.opts.TriggerTopic,
Prompt: prompt,
Timestamp: start,
}
// Flow triggers, Agent reasons: hand the event to the named agent.
if f.opts.Agent != "" {
reply, err := f.callAgent(ctx, f.opts.Agent, prompt)
result.Duration = time.Since(start).Seconds()
if err != nil {
result.Error = err.Error()
f.record(result)
return err
}
result.Reply = reply
f.record(result)
f.log.Logf(logger.InfoLevel, "Flow %s dispatched to agent %s in %.1fs",
f.name, f.opts.Agent, result.Duration)
return nil
}
// Otherwise run a single augmented-LLM step with the services as tools.
discovered, err := f.toolSet.Discover()
if err != nil {
result.Duration = time.Since(start).Seconds()
result.Error = err.Error()
f.record(result)
return fmt.Errorf("discover tools: %w", err)
}
resp, err := f.model.Generate(ctx, &ai.Request{
Prompt: prompt,
SystemPrompt: f.opts.SystemPrompt,
Tools: discovered,
})
result.Duration = time.Since(start).Seconds()
if err != nil {
result.Error = err.Error()
f.record(result)
return err
}
result.Reply = resp.Reply
result.Answer = resp.Answer
for _, tc := range resp.ToolCalls {
args, _ := json.Marshal(tc.Input)
result.ToolCalls = append(result.ToolCalls, fmt.Sprintf("%s(%s)", tc.Name, args))
}
f.record(result)
f.log.Logf(logger.InfoLevel, "Flow %s completed in %.1fs: %d tool calls",
f.name, result.Duration, len(result.ToolCalls))
return nil
}
// callAgent hands the rendered prompt to a registered agent's Agent.Chat
// endpoint over RPC and returns its reply.
func (f *Flow) callAgent(ctx context.Context, name, message string) (string, error) {
body, _ := json.Marshal(map[string]string{"message": message})
req := f.client.NewRequest(name, "Agent.Chat", &codecbytes.Frame{Data: body})
var rsp codecbytes.Frame
if err := f.client.Call(ctx, req, &rsp); err != nil {
return "", err
}
var out struct {
Reply string `json:"reply"`
}
if err := json.Unmarshal(rsp.Data, &out); err != nil {
return "", err
}
return out.Reply, nil
}
// Results returns a copy of all recorded execution results.
func (f *Flow) Results() []Result {
f.mu.Lock()
defer f.mu.Unlock()
out := make([]Result, len(f.results))
copy(out, f.results)
return out
}
// Name returns the flow name.
func (f *Flow) Name() string {
return f.name
}
func (f *Flow) record(r Result) {
f.mu.Lock()
f.results = append(f.results, r)
f.mu.Unlock()
if f.opts.OnResult != nil {
f.opts.OnResult(r)
}
}