From 2e78bc4c64b40acc6207ddd85faf40ae98033d93 Mon Sep 17 00:00:00 2001 From: Tim Voronov Date: Wed, 18 Jun 2025 17:22:39 -0400 Subject: [PATCH] Refactor collect handling; restructure group and aggregation logic, unify loop operations, and improve method clarity --- pkg/compiler/internal/loop_collect.go | 187 +++++++++++++------------- 1 file changed, 94 insertions(+), 93 deletions(-) diff --git a/pkg/compiler/internal/loop_collect.go b/pkg/compiler/internal/loop_collect.go index 7dd255ee..a22043f1 100644 --- a/pkg/compiler/internal/loop_collect.go +++ b/pkg/compiler/internal/loop_collect.go @@ -18,110 +18,110 @@ func NewCollectCompiler(ctx *CompilerContext) *CollectCompiler { } func (cc *CollectCompiler) Compile(ctx fql.ICollectClauseContext) { - // TODO: Undefine original loop variables - loop := cc.ctx.Loops.Current() - - // We collect the aggregation keys - // And wrap each loop element by a KeyValuePair - // Where a key is either a single value or a list of values - // These KeyValuePairs are then added to the dataset - var kvKeyReg, kvValReg vm.Operand - var groupSelectors []fql.ICollectSelectorContext - var isGrouping bool - grouping := ctx.CollectGrouping() - counter := ctx.CollectCounter() aggregator := ctx.CollectAggregator() - - isCollecting := grouping != nil || counter != nil - - if isCollecting { - if grouping != nil { - isGrouping = true - groupSelectors = grouping.AllCollectSelector() - kvKeyReg = cc.compileCollectGroupKeySelectors(groupSelectors) - } - - kvValReg = cc.ctx.Registers.Allocate(core.Temp) - loop.EmitValue(kvValReg, cc.ctx.Emitter) - - var projectionVariableName string - collectorType := core.CollectorTypeKey - - // If we have a collect group variable, we need to project it - if groupVar := ctx.CollectGroupVariable(); groupVar != nil { - // Projection can be either a default projection (identifier) or a custom projection (selector expression) - if identifier := groupVar.Identifier(); identifier != nil { - projectionVariableName = cc.compileDefaultGroupProjection(loop, kvValReg, identifier, groupVar.CollectGroupVariableKeeper()) - } else if selector := groupVar.CollectSelector(); selector != nil { - projectionVariableName = cc.compileCustomGroupProjection(loop, kvValReg, selector) - } - - collectorType = core.CollectorTypeKeyGroup - } else if counter != nil { - projectionVariableName = counter.Identifier().GetText() - - if isGrouping { - collectorType = core.CollectorTypeKeyCounter - } else { - collectorType = core.CollectorTypeCounter - } - } - - // If we use aggregators, we need to collect group items by key - if aggregator != nil && collectorType != core.CollectorTypeKeyGroup { - // We need to patch the loop result to be a collector - collectorType = core.CollectorTypeKeyGroup - } - - // We replace DataSet initialization with Collector initialization - cc.ctx.Emitter.PatchSwapAx(loop.ResultPos, vm.OpDataSetCollector, loop.Result, int(collectorType)) - cc.ctx.Emitter.EmitABC(vm.OpPushKV, loop.Result, kvKeyReg, kvValReg) - loop.EmitFinalization(cc.ctx.Emitter) - - cc.ctx.Emitter.EmitMove(loop.Src, loop.Result) - - cc.ctx.Registers.Free(loop.Value) - cc.ctx.Registers.Free(loop.Key) - loop.Value = kvValReg - loop.Key = vm.NoopOperand - - // If the projection is used, we allocate a new register for the variable and put the iterator's value into it - if projectionVariableName != "" { - // Now we need to expand group variables from the dataset - loop.DeclareValueVar(projectionVariableName, cc.ctx.Symbols) - - cc.ctx.LoopCompiler.EmitLoopBegin(loop) - - loop.EmitKey(kvValReg, cc.ctx.Emitter) - loop.BindValueVar(cc.ctx.Emitter) - } else { - cc.ctx.LoopCompiler.EmitLoopBegin(loop) - - loop.EmitKey(kvKeyReg, cc.ctx.Emitter) - //loop.EmitValue(kvValReg, cc.ctx.Emitter) - } - } + kvKeyReg, kvValReg, groupSelectors := cc.compileGrouping(ctx, aggregator != nil) // Aggregation loop if aggregator != nil { - cc.compileAggregation(aggregator, loop, isCollecting) + cc.compileAggregation(aggregator, len(groupSelectors) > 0) } - if isCollecting && isGrouping { + if len(groupSelectors) > 0 { // Now we are defining new variables for the group selectors - cc.compileCollectGroupKeySelectorVariables(groupSelectors, kvKeyReg, kvValReg, aggregator != nil) + cc.compileGroupSelectorVariables(groupSelectors, kvKeyReg, kvValReg, aggregator != nil) } } -func (cc *CollectCompiler) compileAggregation(c fql.ICollectAggregatorContext, parentLoop *core.Loop, isCollected bool) { - if isCollected { - cc.compileGroupedAggregation(c, parentLoop) +func (cc *CollectCompiler) compileGrouping(ctx fql.ICollectClauseContext, aggregation bool) (vm.Operand, vm.Operand, []fql.ICollectSelectorContext) { + var kvKeyReg, kvValReg vm.Operand + var groupSelectors []fql.ICollectSelectorContext + grouping := ctx.CollectGrouping() + counter := ctx.CollectCounter() + + if grouping == nil && counter == nil { + return kvKeyReg, kvValReg, groupSelectors + } + + loop := cc.ctx.Loops.Current() + + if grouping != nil { + groupSelectors = grouping.AllCollectSelector() + kvKeyReg = cc.compileGroupSelectors(groupSelectors) + } + + kvValReg = cc.ctx.Registers.Allocate(core.Temp) + loop.EmitValue(kvValReg, cc.ctx.Emitter) + + var projectionVariableName string + collectorType := core.CollectorTypeKey + + // If we have a collect group variable, we need to project it + if groupVar := ctx.CollectGroupVariable(); groupVar != nil { + // Projection can be either a default projection (identifier) or a custom projection (selector expression) + if identifier := groupVar.Identifier(); identifier != nil { + projectionVariableName = cc.compileDefaultGroupProjection(loop, kvValReg, identifier, groupVar.CollectGroupVariableKeeper()) + } else if selector := groupVar.CollectSelector(); selector != nil { + projectionVariableName = cc.compileCustomGroupProjection(loop, kvValReg, selector) + } + + collectorType = core.CollectorTypeKeyGroup + } else if counter != nil { + projectionVariableName = counter.Identifier().GetText() + + if grouping != nil { + collectorType = core.CollectorTypeKeyCounter + } else { + collectorType = core.CollectorTypeCounter + } + } + + // If we use aggregators, we need to collect group items by key + if aggregation && collectorType != core.CollectorTypeKeyGroup { + // We need to patch the loop result to be a collector + collectorType = core.CollectorTypeKeyGroup + } + + // We replace DataSet initialization with Collector initialization + cc.ctx.Emitter.PatchSwapAx(loop.ResultPos, vm.OpDataSetCollector, loop.Result, int(collectorType)) + cc.ctx.Emitter.EmitABC(vm.OpPushKV, loop.Result, kvKeyReg, kvValReg) + loop.EmitFinalization(cc.ctx.Emitter) + + cc.ctx.Emitter.EmitMove(loop.Src, loop.Result) + + cc.ctx.Registers.Free(loop.Value) + cc.ctx.Registers.Free(loop.Key) + loop.Value = kvValReg + loop.Key = vm.NoopOperand + + // If the projection is used, we allocate a new register for the variable and put the iterator's value into it + if projectionVariableName != "" { + // Now we need to expand group variables from the dataset + loop.DeclareValueVar(projectionVariableName, cc.ctx.Symbols) + + cc.ctx.LoopCompiler.EmitLoopBegin(loop) + + loop.EmitKey(kvValReg, cc.ctx.Emitter) + loop.BindValueVar(cc.ctx.Emitter) } else { - cc.compileGlobalAggregation(c, parentLoop) + cc.ctx.LoopCompiler.EmitLoopBegin(loop) + + loop.EmitKey(kvKeyReg, cc.ctx.Emitter) + //loop.EmitValue(kvValReg, cc.ctx.Emitter) + } + + return kvKeyReg, kvValReg, groupSelectors +} + +func (cc *CollectCompiler) compileAggregation(c fql.ICollectAggregatorContext, isGrouped bool) { + if isGrouped { + cc.compileGroupedAggregation(c) + } else { + cc.compileGlobalAggregation(c) } } -func (cc *CollectCompiler) compileGroupedAggregation(c fql.ICollectAggregatorContext, parentLoop *core.Loop) { +func (cc *CollectCompiler) compileGroupedAggregation(c fql.ICollectAggregatorContext) { + parentLoop := cc.ctx.Loops.Current() // We need to allocate a temporary accumulators to store aggregation results selectors := c.AllCollectAggregateSelector() accums := cc.initAggrAccumulators(selectors) @@ -164,7 +164,8 @@ func (cc *CollectCompiler) compileGroupedAggregation(c fql.ICollectAggregatorCon // cc.ctx.Registers.Free(aggrIterVal) } -func (cc *CollectCompiler) compileGlobalAggregation(c fql.ICollectAggregatorContext, parentLoop *core.Loop) { +func (cc *CollectCompiler) compileGlobalAggregation(c fql.ICollectAggregatorContext) { + parentLoop := cc.ctx.Loops.Current() loop := parentLoop // we create a custom collector for aggregators cc.ctx.Emitter.PatchSwapAx(loop.ResultPos, vm.OpDataSetCollector, loop.Result, int(core.CollectorTypeKeyGroup)) @@ -269,7 +270,7 @@ func (cc *CollectCompiler) compileAggregationFuncCall(selectors []fql.ICollectAg } } -func (cc *CollectCompiler) compileCollectGroupKeySelectors(selectors []fql.ICollectSelectorContext) vm.Operand { +func (cc *CollectCompiler) compileGroupSelectors(selectors []fql.ICollectSelectorContext) vm.Operand { if len(selectors) == 0 { return vm.NoopOperand } @@ -298,7 +299,7 @@ func (cc *CollectCompiler) compileCollectGroupKeySelectors(selectors []fql.IColl return kvKeyReg } -func (cc *CollectCompiler) compileCollectGroupKeySelectorVariables(selectors []fql.ICollectSelectorContext, kvKeyReg, kvValReg vm.Operand, isAggregation bool) { +func (cc *CollectCompiler) compileGroupSelectorVariables(selectors []fql.ICollectSelectorContext, kvKeyReg, kvValReg vm.Operand, isAggregation bool) { if len(selectors) > 1 { variables := make([]vm.Operand, len(selectors))