From 81b7cfd2eba3cf5bc9696f5640865fc162d5dce5 Mon Sep 17 00:00:00 2001 From: Tim Voronov Date: Fri, 25 Jul 2025 09:50:56 -0400 Subject: [PATCH] Refactor loop collection: enhance `CollectorSpec` with destination handling, streamline projection logic, improve aggregation handling, optimize register allocation, and update `COLLECT` integration tests. --- pkg/compiler/internal/core/collector_spec.go | 16 +++++++---- pkg/compiler/internal/loop_collect.go | 28 ++++++++++---------- pkg/compiler/internal/loop_collect_agg.go | 16 +++++++++-- pkg/compiler/internal/loop_collect_prj.go | 25 ++++++++++------- 4 files changed, 54 insertions(+), 31 deletions(-) diff --git a/pkg/compiler/internal/core/collector_spec.go b/pkg/compiler/internal/core/collector_spec.go index e2f88f8f..efc4b1b9 100644 --- a/pkg/compiler/internal/core/collector_spec.go +++ b/pkg/compiler/internal/core/collector_spec.go @@ -1,10 +1,13 @@ package core +import "github.com/MontFerret/ferret/pkg/vm" + type ( CollectorType int CollectorSpec struct { typ CollectorType + dst vm.Operand projection *CollectorProjection groupSelectors []*CollectSelector aggregationSelectors []*AggregateSelector @@ -18,20 +21,19 @@ const ( CollectorTypeKeyGroup ) -func NewCollectorSpec(type_ CollectorType, projection *CollectorProjection, groupSelectors []*CollectSelector, aggregationSelectors []*AggregateSelector) *CollectorSpec { +func NewCollectorSpec(type_ CollectorType, dst vm.Operand, projection *CollectorProjection, groupSelectors []*CollectSelector, aggregationSelectors []*AggregateSelector) *CollectorSpec { return &CollectorSpec{ typ: type_, + dst: dst, projection: projection, groupSelectors: groupSelectors, aggregationSelectors: aggregationSelectors, } } -func DetermineCollectorType(withGrouping, withAggregation bool, projection *CollectorProjection) CollectorType { - withProjection := projection != nil - +func DetermineCollectorType(withGrouping, withAggregation, withProjection, withCounter bool) CollectorType { if withGrouping { - if withProjection && projection.IsCounted() { + if withCounter { return CollectorTypeKeyCounter } @@ -49,6 +51,10 @@ func (c *CollectorSpec) Type() CollectorType { return c.typ } +func (c *CollectorSpec) Destination() vm.Operand { + return c.dst +} + func (c *CollectorSpec) Projection() *CollectorProjection { return c.projection } diff --git a/pkg/compiler/internal/loop_collect.go b/pkg/compiler/internal/loop_collect.go index da200f34..01970cc6 100644 --- a/pkg/compiler/internal/loop_collect.go +++ b/pkg/compiler/internal/loop_collect.go @@ -23,16 +23,16 @@ func (c *LoopCollectCompiler) Compile(ctx fql.ICollectClauseContext) { func (c *LoopCollectCompiler) compileCollector(ctx fql.ICollectClauseContext) *core.CollectorSpec { grouping := ctx.CollectGrouping() + projection := ctx.CollectGroupProjection() counter := ctx.CollectCounter() aggregation := ctx.CollectAggregator() // We gather keys and values for the collector. kv, groupSelectors := c.initializeGrouping(grouping) - projection := c.initializeProjection(ctx, kv, counter) - loop := c.ctx.Loops.Current() - collectorType := core.DetermineCollectorType(len(groupSelectors) > 0, aggregation != nil, projection) + collectorType := core.DetermineCollectorType(len(groupSelectors) > 0, aggregation != nil, projection != nil, counter != nil) // We replace DataSet initialization with Collector initialization + loop := c.ctx.Loops.Current() dst := loop.PatchDestinationAx(c.ctx.Registers, c.ctx.Emitter, vm.OpDataSetCollector, int(collectorType)) var aggregationSelectors []*core.AggregateSelector @@ -42,15 +42,17 @@ func (c *LoopCollectCompiler) compileCollector(ctx fql.ICollectClauseContext) *c aggregationSelectors = c.initializeAggregation(aggregation, dst, kv, len(groupSelectors) > 0) } - scope := core.NewCollectorSpec(collectorType, projection, groupSelectors, aggregationSelectors) + groupProjection := c.initializeProjection(kv, projection, counter) - c.finalizeCollector(dst, kv, scope) + spec := core.NewCollectorSpec(collectorType, dst, groupProjection, groupSelectors, aggregationSelectors) + + c.finalizeCollector(dst, kv, spec) // We no longer need KV, so we free registers c.ctx.Registers.Free(kv.Key) c.ctx.Registers.Free(kv.Value) - return scope + return spec } func (c *LoopCollectCompiler) finalizeCollector(dst vm.Operand, kv *core.KV, spec *core.CollectorSpec) { @@ -67,9 +69,6 @@ func (c *LoopCollectCompiler) finalizeCollector(dst vm.Operand, kv *core.KV, spe } loop.EmitFinalization(c.ctx.Emitter) - - // Move the collector to the next loop source - c.ctx.Emitter.EmitMove(loop.Src, dst) } func (c *LoopCollectCompiler) compileLoop(spec *core.CollectorSpec) { @@ -91,6 +90,8 @@ func (c *LoopCollectCompiler) compileLoop(spec *core.CollectorSpec) { doInit := spec.HasGrouping() || !spec.HasAggregation() if doInit { + // Move the collector to the next loop source + c.ctx.Emitter.EmitMove(loop.Src, spec.Destination()) loop.EmitInitialization(c.ctx.Registers, c.ctx.Emitter, c.ctx.Loops.Depth()) } @@ -99,12 +100,11 @@ func (c *LoopCollectCompiler) compileLoop(spec *core.CollectorSpec) { c.compileAggregation(spec) } - // If the projection is used, we allocate a new register for the variable and put the iterator's value into it - if spec.HasProjection() { - c.finalizeProjection(spec) - } - if spec.HasGrouping() { c.compileGrouping(spec) } + + if spec.HasProjection() && !spec.HasAggregation() { + c.finalizeProjection(spec, loop.Value) + } } diff --git a/pkg/compiler/internal/loop_collect_agg.go b/pkg/compiler/internal/loop_collect_agg.go index c9537a79..51853a09 100644 --- a/pkg/compiler/internal/loop_collect_agg.go +++ b/pkg/compiler/internal/loop_collect_agg.go @@ -199,12 +199,12 @@ func (c *LoopCollectCompiler) compileGlobalAggregation(spec *core.CollectorSpec) loop.EmitInitialization(c.ctx.Registers, c.ctx.Emitter, c.ctx.Loops.Depth()) // We just need to take the grouped values and call aggregation functions using them as args - c.compileAggregationFuncCalls(spec.AggregationSelectors(), prevLoop.Dst) + c.compileAggregationFuncCalls(spec, prevLoop.Dst) c.ctx.Registers.Free(prevLoop.Dst) } -func (c *LoopCollectCompiler) compileAggregationFuncCalls(selectors []*core.AggregateSelector, aggregator vm.Operand) { +func (c *LoopCollectCompiler) compileAggregationFuncCalls(spec *core.CollectorSpec, aggregator vm.Operand) { // Gets the number of records in the accumulator cond := c.ctx.Registers.Allocate(core.Temp) c.ctx.Emitter.EmitAB(vm.OpLength, cond, aggregator) @@ -218,6 +218,7 @@ func (c *LoopCollectCompiler) compileAggregationFuncCalls(selectors []*core.Aggr // We skip the key retrieval and function call of there are no records in the accumulator c.ctx.Emitter.EmitJumpIfTrue(cond, elseLabel) + selectors := spec.AggregationSelectors() selectorVarRegs := make([]vm.Operand, len(selectors)) for i, selector := range selectors { @@ -251,6 +252,13 @@ func (c *LoopCollectCompiler) compileAggregationFuncCalls(selectors []*core.Aggr c.ctx.Registers.Free(result) } + var projVar vm.Operand + + // If the projection is used, we allocate a new register for the variable and put the iterator's value into it + if spec.HasProjection() { + projVar = c.finalizeProjection(spec, aggregator) + } + c.ctx.Emitter.EmitJump(endLabel) c.ctx.Emitter.MarkLabel(elseLabel) @@ -258,6 +266,10 @@ func (c *LoopCollectCompiler) compileAggregationFuncCalls(selectors []*core.Aggr c.ctx.Emitter.EmitA(vm.OpLoadNone, varReg) } + if projVar != vm.NoopOperand { + c.ctx.Emitter.EmitA(vm.OpLoadNone, projVar) + } + c.ctx.Emitter.MarkLabel(endLabel) c.ctx.Registers.Free(cond) } diff --git a/pkg/compiler/internal/loop_collect_prj.go b/pkg/compiler/internal/loop_collect_prj.go index eb0cd869..9861a977 100644 --- a/pkg/compiler/internal/loop_collect_prj.go +++ b/pkg/compiler/internal/loop_collect_prj.go @@ -11,10 +11,11 @@ import ( // initializeProjection handles the projection setup for group variables and counters. // Returns the projection variable name and the appropriate collector type. -func (c *LoopCollectCompiler) initializeProjection(ctx fql.ICollectClauseContext, kv *core.KV, counter fql.ICollectCounterContext) *core.CollectorProjection { +func (c *LoopCollectCompiler) initializeProjection(kv *core.KV, projection fql.ICollectGroupProjectionContext, counter fql.ICollectCounterContext) *core.CollectorProjection { // Handle group variable projection - if groupVar := ctx.CollectGroupProjection(); groupVar != nil { - varName := c.compileGroupVariableProjection(kv, groupVar) + if projection != nil { + varName := c.compileGroupVariableProjection(kv, projection) + return core.NewCollectorGroupProjection(varName) } @@ -28,20 +29,24 @@ func (c *LoopCollectCompiler) initializeProjection(ctx fql.ICollectClauseContext return nil } -func (c *LoopCollectCompiler) finalizeProjection(spec *core.CollectorSpec) { +func (c *LoopCollectCompiler) finalizeProjection(spec *core.CollectorSpec, aggregator vm.Operand) vm.Operand { loop := c.ctx.Loops.Current() varName := spec.Projection().VariableName() if spec.HasGrouping() || !spec.HasAggregation() { // Now we need to expand group variables from the dataset loop.ValueName = varName - c.ctx.Symbols.AssignLocal(loop.ValueName, core.TypeUnknown, loop.Value) - } else { - key := loadConstant(c.ctx, runtime.String(varName)) - val := c.ctx.Symbols.DeclareLocal(varName, core.TypeUnknown) - c.ctx.Emitter.EmitABC(vm.OpLoadKey, val, loop.Dst, key) - c.ctx.Registers.Free(key) + c.ctx.Symbols.AssignLocal(loop.ValueName, core.TypeUnknown, aggregator) + + return loop.Value } + + key := loadConstant(c.ctx, runtime.String(varName)) + val := c.ctx.Symbols.DeclareLocal(varName, core.TypeUnknown) + c.ctx.Emitter.EmitABC(vm.OpLoadKey, val, aggregator, key) + c.ctx.Registers.Free(key) + + return val } // compileGroupVariableProjection processes group variable projections (both default and custom).