From 16137db876530b69b6c58e3d4d60fee60126a328 Mon Sep 17 00:00:00 2001 From: Tim Voronov Date: Wed, 18 Jun 2025 16:55:46 -0400 Subject: [PATCH] Refactor collect aggregation; improve loop handling, add initialization/binding methods, and restructure aggregation logic for grouped and global cases --- pkg/compiler/internal/core/loop.go | 34 +- pkg/compiler/internal/loop_collect.go | 300 ++++++++++-------- .../bytecode/bytecode_for_collect_agg_test.go | 26 ++ ...t_test.go => bytecode_for_collect_test.go} | 0 ...sort_test.go => bytecode_for_sort_test.go} | 0 test/integration/vm/vm_for_collect_test.go | 16 - 6 files changed, 227 insertions(+), 149 deletions(-) create mode 100644 test/integration/bytecode/bytecode_for_collect_agg_test.go rename test/integration/bytecode/{bytecode_collect_test.go => bytecode_for_collect_test.go} (100%) rename test/integration/bytecode/{bytecode_sort_test.go => bytecode_for_sort_test.go} (100%) diff --git a/pkg/compiler/internal/core/loop.go b/pkg/compiler/internal/core/loop.go index 3247f9d7..3a88d80d 100644 --- a/pkg/compiler/internal/core/loop.go +++ b/pkg/compiler/internal/core/loop.go @@ -50,19 +50,31 @@ type Loop struct { } func (l *Loop) DeclareKeyVar(name string, st *SymbolTable) { - if l.canBindVar(name) { + if l.canDeclareVar(name) { l.KeyName = name l.Key = st.DeclareLocal(name) } } func (l *Loop) DeclareValueVar(name string, st *SymbolTable) { - if l.canBindVar(name) { + if l.canDeclareVar(name) { l.ValueName = name l.Value = st.DeclareLocal(name) } } +func (l *Loop) EmitInitialization(alloc *RegisterAllocator, emitter *Emitter) { + if l.Iterator == vm.NoopOperand { + l.Iterator = alloc.Allocate(Temp) + } + + emitter.EmitIter(l.Iterator, l.Src) +} + +func (l *Loop) EmitNext(emitter *Emitter) { + l.Jump = emitter.EmitJumpc(vm.OpIterNext, JumpPlaceholder, l.Iterator) +} + func (l *Loop) EmitValue(dst vm.Operand, emitter *Emitter) { emitter.EmitIterValue(dst, l.Iterator) } @@ -71,12 +83,28 @@ func (l *Loop) EmitKey(dst vm.Operand, emitter *Emitter) { emitter.EmitIterKey(dst, l.Iterator) } +func (l *Loop) BindValueVar(emitter *Emitter) { + if l.canBindVar(l.Value) { + l.EmitValue(l.Value, emitter) + } +} + +func (l *Loop) BindKeyVar(emitter *Emitter) { + if l.canBindVar(l.Key) { + l.EmitKey(l.Key, emitter) + } +} + func (l *Loop) EmitFinalization(emitter *Emitter) { emitter.EmitJump(l.Jump - l.JumpOffset) emitter.EmitA(vm.OpClose, l.Iterator) emitter.PatchJump(l.Jump) } -func (l *Loop) canBindVar(name string) bool { +func (l *Loop) canDeclareVar(name string) bool { return name != "" && name != IgnorePseudoVariable } + +func (l *Loop) canBindVar(op vm.Operand) bool { + return op != vm.NoopOperand +} diff --git a/pkg/compiler/internal/loop_collect.go b/pkg/compiler/internal/loop_collect.go index 3bba58b0..7dd255ee 100644 --- a/pkg/compiler/internal/loop_collect.go +++ b/pkg/compiler/internal/loop_collect.go @@ -89,20 +89,22 @@ func (cc *CollectCompiler) Compile(ctx fql.ICollectClauseContext) { if projectionVariableName != "" { // Now we need to expand group variables from the dataset loop.DeclareValueVar(projectionVariableName, cc.ctx.Symbols) + cc.ctx.LoopCompiler.EmitLoopBegin(loop) + loop.EmitKey(kvValReg, cc.ctx.Emitter) - loop.EmitValue(cc.ctx.Symbols.DeclareLocal(projectionVariableName), cc.ctx.Emitter) + loop.BindValueVar(cc.ctx.Emitter) } else { cc.ctx.LoopCompiler.EmitLoopBegin(loop) - // - //loop.EmitKey(kvKeyReg, cc.ctx.Emitter) + + loop.EmitKey(kvKeyReg, cc.ctx.Emitter) //loop.EmitValue(kvValReg, cc.ctx.Emitter) } } // Aggregation loop if aggregator != nil { - cc.compileAggregator(aggregator, loop, isCollecting) + cc.compileAggregation(aggregator, loop, isCollecting) } if isCollecting && isGrouping { @@ -111,48 +113,123 @@ func (cc *CollectCompiler) Compile(ctx fql.ICollectClauseContext) { } } -func (cc *CollectCompiler) compileAggregator(c fql.ICollectAggregatorContext, parentLoop *core.Loop, isCollected bool) { - var accums []vm.Operand - var loop *core.Loop - selectors := c.AllCollectAggregateSelector() - - // If data is collected, we need to allocate a temporary accumulators to store aggregation results +func (cc *CollectCompiler) compileAggregation(c fql.ICollectAggregatorContext, parentLoop *core.Loop, isCollected bool) { if isCollected { - // First of all, we allocate registers for accumulators - accums = make([]vm.Operand, len(selectors)) - - // We need to allocate a register for each accumulator - for i := 0; i < len(selectors); i++ { - reg := cc.ctx.Registers.Allocate(core.Temp) - accums[i] = reg - // TODO: Select persistent List type, we do not know how many items we will have - cc.ctx.Emitter.EmitA(vm.OpList, reg) - } - - loop = cc.ctx.Loops.NewLoop(core.TemporalLoop, core.ForLoop, false) - - // Now we iterate over the grouped items - parentLoop.EmitValue(loop.Iterator, cc.ctx.Emitter) - // We just re-use the same register - cc.ctx.Emitter.EmitAB(vm.OpIter, loop.Iterator, loop.Iterator) - // jumpPlaceholder is a placeholder for the exit aggrIterJump position - loop.Jump = cc.ctx.Emitter.EmitJumpc(vm.OpIterNext, core.JumpPlaceholder, loop.Iterator) - loop.ValueName = parentLoop.ValueName + cc.compileGroupedAggregation(c, parentLoop) } else { - loop = parentLoop - // Otherwise, we create a custom collector for aggregators - cc.ctx.Emitter.PatchSwapAx(loop.ResultPos, vm.OpDataSetCollector, loop.Result, int(core.CollectorTypeKeyGroup)) + cc.compileGlobalAggregation(c, parentLoop) } +} + +func (cc *CollectCompiler) compileGroupedAggregation(c fql.ICollectAggregatorContext, parentLoop *core.Loop) { + // We need to allocate a temporary accumulators to store aggregation results + selectors := c.AllCollectAggregateSelector() + accums := cc.initAggrAccumulators(selectors) + loop := cc.ctx.Loops.NewLoop(core.TemporalLoop, core.ForLoop, false) + loop.Src = cc.ctx.Registers.Allocate(core.Temp) + + // Now we iterate over the grouped items + parentLoop.EmitValue(loop.Src, cc.ctx.Emitter) + loop.EmitInitialization(cc.ctx.Registers, cc.ctx.Emitter) + loop.EmitNext(cc.ctx.Emitter) + loop.ValueName = parentLoop.ValueName - // Store upper scope for aggregators - //mainScope := cc.ctx.Symbols.Scope() // Nested scope for aggregators cc.ctx.Symbols.EnterScope() - - aggrIterVal := cc.ctx.Symbols.DeclareLocal(loop.ValueName) - cc.ctx.Emitter.EmitAB(vm.OpIterValue, aggrIterVal, loop.Iterator) + loop.DeclareValueVar(loop.ValueName, cc.ctx.Symbols) + loop.BindValueVar(cc.ctx.Emitter) // Now we add value selectors to the accumulators + cc.collectAggregationFuncArgs(selectors, func(i int, resultReg vm.Operand) { + cc.ctx.Emitter.EmitAB(vm.OpPush, accums[i], resultReg) + }) + + // Now we can iterate over the grouped items + loop.EmitFinalization(cc.ctx.Emitter) + // Now close the aggregators scope + cc.ctx.Symbols.ExitScope() + + // Now we can iterate over the selectors and execute the aggregation functions by passing the accumulators + // And define variables for each accumulator result + cc.compileAggregationFuncCall(selectors, func(i int, _ string) core.RegisterSequence { + return core.RegisterSequence{accums[i]} + }, func(i int) {}) + + // Free the registers for accumulators + for _, reg := range accums { + cc.ctx.Registers.Free(reg) + } + + // Free the register for the iterator value + // cc.ctx.Registers.Free(aggrIterVal) +} + +func (cc *CollectCompiler) compileGlobalAggregation(c fql.ICollectAggregatorContext, parentLoop *core.Loop) { + loop := parentLoop + // we create a custom collector for aggregators + cc.ctx.Emitter.PatchSwapAx(loop.ResultPos, vm.OpDataSetCollector, loop.Result, int(core.CollectorTypeKeyGroup)) + + // Nested scope for aggregators + cc.ctx.Symbols.EnterScope() + loop.DeclareValueVar(loop.ValueName, cc.ctx.Symbols) + loop.BindValueVar(cc.ctx.Emitter) + + // Now we add value selectors to the accumulators + selectors := c.AllCollectAggregateSelector() + cc.collectAggregationFuncArgs(selectors, func(i int, resultReg vm.Operand) { + aggrKeyName := selectors[i].Identifier().GetText() + aggrKeyReg := loadConstant(cc.ctx, runtime.String(aggrKeyName)) + cc.ctx.Emitter.EmitABC(vm.OpPushKV, loop.Result, aggrKeyReg, resultReg) + cc.ctx.Registers.Free(aggrKeyReg) + }) + + // Now we can iterate over the grouped items + loop.EmitFinalization(cc.ctx.Emitter) + + // Now close the aggregators scope + cc.ctx.Symbols.ExitScope() + + parentLoop.ValueName = "" + parentLoop.KeyName = "" + + // Since we are in the middle of the loop, we need to patch the loop result + // Now we just create a range with 1 item to push the aggregated values to the dataset + // Replace source with sorted array + zero := loadConstant(cc.ctx, runtime.Int(0)) + one := loadConstant(cc.ctx, runtime.Int(1)) + aggregator := cc.ctx.Registers.Allocate(core.Temp) + cc.ctx.Emitter.EmitAB(vm.OpMove, aggregator, loop.Result) + cc.ctx.Symbols.ExitScope() + + cc.ctx.Symbols.EnterScope() + + // Create new for loop + cc.ctx.Emitter.EmitABC(vm.OpRange, loop.Src, zero, one) + cc.ctx.Emitter.EmitAb(vm.OpDataSet, loop.Result, loop.Distinct) + + // In case of non-collected aggregators, we just iterate over the grouped items + // Retrieve the grouped values by key, execute aggregation funcs and assign variable names to the results + var key vm.Operand + var value vm.Operand + cc.compileAggregationFuncCall(selectors, func(i int, selectorVarName string) core.RegisterSequence { + // We execute the function call with the accumulator as an argument + key = loadConstant(cc.ctx, runtime.String(selectorVarName)) + value = cc.ctx.Registers.Allocate(core.Temp) + cc.ctx.Emitter.EmitABC(vm.OpLoadKey, value, aggregator, key) + + return core.RegisterSequence{value} + }, func(_ int) { + cc.ctx.Registers.Free(value) + cc.ctx.Registers.Free(key) + }) + + cc.ctx.Registers.Free(aggregator) + + // Free the register for the iterator value + // cc.ctx.Registers.Free(aggrIterVal) +} + +func (cc *CollectCompiler) collectAggregationFuncArgs(selectors []fql.ICollectAggregateSelectorContext, collector func(int, vm.Operand)) { for i := 0; i < len(selectors); i++ { selector := selectors[i] fcx := selector.FunctionCallExpression() @@ -169,99 +246,27 @@ func (cc *CollectCompiler) compileAggregator(c fql.ICollectAggregatorContext, pa } resultReg := args[0] - - if isCollected { - cc.ctx.Emitter.EmitAB(vm.OpPush, accums[i], resultReg) - } else { - aggrKeyName := selector.Identifier().GetText() - aggrKeyReg := loadConstant(cc.ctx, runtime.String(aggrKeyName)) - cc.ctx.Emitter.EmitABC(vm.OpPushKV, loop.Result, aggrKeyReg, resultReg) - cc.ctx.Registers.Free(aggrKeyReg) - } - + collector(i, resultReg) cc.ctx.Registers.Free(resultReg) } +} - // Now we can iterate over the grouped items - loop.EmitFinalization(cc.ctx.Emitter) +func (cc *CollectCompiler) compileAggregationFuncCall(selectors []fql.ICollectAggregateSelectorContext, provider func(int, string) core.RegisterSequence, cleanup func(int)) { + for i, selector := range selectors { + fcx := selector.FunctionCallExpression() + // We won't make any checks here, as we already did it before + selectorVarName := selector.Identifier().GetText() - // Now we can iterate over the selectors and execute the aggregation functions by passing the accumulators - // And define variables for each accumulator result - if isCollected { - for i, selector := range selectors { - fcx := selector.FunctionCallExpression() - // We won't make any checks here, as we already did it before - selectorVarName := selector.Identifier().GetText() + result := cc.ctx.ExprCompiler.CompileFunctionCallWith(fcx.FunctionCall(), fcx.ErrorOperator() != nil, provider(i, selectorVarName)) - // We execute the function call with the accumulator as an argument - accum := accums[i] - result := cc.ctx.ExprCompiler.CompileFunctionCallWith(fcx.FunctionCall(), fcx.ErrorOperator() != nil, core.RegisterSequence{accum}) + // We define the variable for the selector result in the upper scope + // Since this temporary scope is only for aggregators and will be closed after the aggregation + varReg := cc.ctx.Symbols.DeclareLocal(selectorVarName) + cc.ctx.Emitter.EmitAB(vm.OpMove, varReg, result) + cc.ctx.Registers.Free(result) - // We define the variable for the selector result in the upper scope - // Since this temporary scope is only for aggregators and will be closed after the aggregation - varReg := cc.ctx.Symbols.DeclareLocal(selectorVarName) - cc.ctx.Emitter.EmitAB(vm.OpMove, varReg, result) - cc.ctx.Registers.Free(result) - } - - cc.ctx.Loops.Pop() - // Now close the aggregators scope - cc.ctx.Symbols.ExitScope() - } else { - // Now close the aggregators scope - cc.ctx.Symbols.ExitScope() - - parentLoop.ValueName = "" - parentLoop.KeyName = "" - - // Since we we in the middle of the loop, we need to patch the loop result - // Now we just create a range with 1 item to push the aggregated values to the dataset - // Replace source with sorted array - zero := loadConstant(cc.ctx, runtime.Int(0)) - one := loadConstant(cc.ctx, runtime.Int(1)) - aggregator := cc.ctx.Registers.Allocate(core.Temp) - cc.ctx.Emitter.EmitAB(vm.OpMove, aggregator, loop.Result) - cc.ctx.Symbols.ExitScope() - - cc.ctx.Symbols.EnterScope() - - // Create new for loop - cc.ctx.Emitter.EmitABC(vm.OpRange, loop.Src, zero, one) - cc.ctx.Emitter.EmitAb(vm.OpDataSet, loop.Result, loop.Distinct) - - // In case of non-collected aggregators, we just iterate over the grouped items - // Retrieve the grouped values by key, execute aggregation funcs and assign variable names to the results - for _, selector := range selectors { - fcx := selector.FunctionCallExpression() - // We won't make any checks here, as we already did it before - selectorVarName := selector.Identifier().GetText() - - // We execute the function call with the accumulator as an argument - key := loadConstant(cc.ctx, runtime.String(selectorVarName)) - value := cc.ctx.Registers.Allocate(core.Temp) - cc.ctx.Emitter.EmitABC(vm.OpLoadKey, value, aggregator, key) - - result := cc.ctx.ExprCompiler.CompileFunctionCallWith(fcx.FunctionCall(), fcx.ErrorOperator() != nil, core.RegisterSequence{value}) - - // We define the variable for the selector result in the upper scope - // Since this temporary scope is only for aggregators and will be closed after the aggregation - varReg := cc.ctx.Symbols.DeclareLocal(selectorVarName) - cc.ctx.Emitter.EmitAB(vm.OpMove, varReg, result) - cc.ctx.Registers.Free(result) - cc.ctx.Registers.Free(value) - cc.ctx.Registers.Free(key) - } - - cc.ctx.Registers.Free(aggregator) + cleanup(i) } - - // Free the registers for accumulators - for _, reg := range accums { - cc.ctx.Registers.Free(reg) - } - - // Free the register for the iterator value - cc.ctx.Registers.Free(aggrIterVal) } func (cc *CollectCompiler) compileCollectGroupKeySelectors(selectors []fql.ICollectSelectorContext) vm.Operand { @@ -322,12 +327,7 @@ func (cc *CollectCompiler) compileCollectGroupKeySelectorVariables(selectors []f name := selectors[0].Identifier().GetText() // Define a variable for each selector varReg := cc.ctx.Symbols.DeclareLocal(name) - - reg := kvValReg - - if isAggregation { - reg = kvKeyReg - } + reg := cc.selectGroupKey(isAggregation, kvKeyReg, kvValReg) // If we have a single selector, we can just move the value cc.ctx.Emitter.EmitAB(vm.OpMove, varReg, reg) @@ -376,3 +376,43 @@ func (cc *CollectCompiler) compileCustomGroupProjection(_ *core.Loop, kvValReg v return selector.Identifier().GetText() } + +func (cc *CollectCompiler) selectGroupKey(isAggregation bool, kvKeyReg, kvValReg vm.Operand) vm.Operand { + if isAggregation { + return kvKeyReg + } + + return kvValReg +} + +func (cc *CollectCompiler) initAggrAccumulators(selectors []fql.ICollectAggregateSelectorContext) []vm.Operand { + accums := make([]vm.Operand, len(selectors)) + + // First of all, we allocate registers for accumulators + accums = make([]vm.Operand, len(selectors)) + + // We need to allocate a register for each accumulator + for i := 0; i < len(selectors); i++ { + reg := cc.ctx.Registers.Allocate(core.Temp) + accums[i] = reg + + // TODO: Select persistent List type, we do not know how many items we will have + cc.ctx.Emitter.EmitA(vm.OpList, reg) + } + + return accums +} + +func (cc *CollectCompiler) emitPushToAggrAccumulators(accums []vm.Operand, selectors []fql.ICollectAggregateSelectorContext, loop *core.Loop) { + for i, selector := range selectors { + fcx := selector.FunctionCallExpression() + args := cc.ctx.ExprCompiler.CompileArgumentList(fcx.FunctionCall().ArgumentList()) + + if len(args) != 1 { + panic("aggregate function must have exactly one argument") + } + + cc.ctx.Emitter.EmitAB(vm.OpPush, accums[i], args[0]) + cc.ctx.Registers.Free(args[0]) + } +} diff --git a/test/integration/bytecode/bytecode_for_collect_agg_test.go b/test/integration/bytecode/bytecode_for_collect_agg_test.go new file mode 100644 index 00000000..5556117e --- /dev/null +++ b/test/integration/bytecode/bytecode_for_collect_agg_test.go @@ -0,0 +1,26 @@ +package bytecode_test + +import ( + "testing" + + "github.com/MontFerret/ferret/pkg/vm" +) + +func TestCollectAggregate(t *testing.T) { + RunUseCases(t, []UseCase{ + ByteCodeCase(` +LET users = [] +FOR u IN users + COLLECT genderGroup = u.gender + AGGREGATE minAge = MIN(u.age), maxAge = MAX(u.age) + + RETURN { + genderGroup, + minAge, + maxAge + } +`, BC{ + I(vm.OpReturn, 0, 7), + }), + }) +} diff --git a/test/integration/bytecode/bytecode_collect_test.go b/test/integration/bytecode/bytecode_for_collect_test.go similarity index 100% rename from test/integration/bytecode/bytecode_collect_test.go rename to test/integration/bytecode/bytecode_for_collect_test.go diff --git a/test/integration/bytecode/bytecode_sort_test.go b/test/integration/bytecode/bytecode_for_sort_test.go similarity index 100% rename from test/integration/bytecode/bytecode_sort_test.go rename to test/integration/bytecode/bytecode_for_sort_test.go diff --git a/test/integration/vm/vm_for_collect_test.go b/test/integration/vm/vm_for_collect_test.go index 434761e7..d4eda812 100644 --- a/test/integration/vm/vm_for_collect_test.go +++ b/test/integration/vm/vm_for_collect_test.go @@ -6,22 +6,6 @@ import ( . "github.com/MontFerret/ferret/test/integration/base" ) -// COLLECT vs. RETURN DISTINCT -// -// In order to make a result set unique, one can either use COLLECT or RETURN DISTINCT. -// -// FOR u IN users -// RETURN DISTINCT u.age -// FOR u IN users -// COLLECT age = u.age -// RETURN age -// Behind the scenes, both variants create a CollectNode. However, they use different implementations of COLLECT that have different properties: -// -// RETURN DISTINCT maintains the order of results, but it is limited to a single value. -// -// COLLECT changes the order of results (sorted or undefined), but it supports multiple values and is more flexible than RETURN DISTINCT. -// -// Aside from COLLECTs sophisticated grouping and aggregation capabilities, it allows you to place a LIMIT operation before RETURN to potentially stop the COLLECT operation early. func TestCollect(t *testing.T) { RunUseCases(t, []UseCase{ SkipCaseCompilationError(`