From 11f493ef794f79bea126048df5ca37e5c3b2b32f Mon Sep 17 00:00:00 2001 From: Tim Voronov Date: Mon, 9 Jun 2025 20:34:04 -0400 Subject: [PATCH] wip --- pkg/compiler/internal/visitor.go | 262 +++++++++++++-------- pkg/vm/internal/collector_key_group.go | 38 +-- pkg/vm/internal/collector_value.go | 34 --- test/integration/vm/vm_for_collect_test.go | 42 ++-- 4 files changed, 209 insertions(+), 167 deletions(-) delete mode 100644 pkg/vm/internal/collector_value.go diff --git a/pkg/compiler/internal/visitor.go b/pkg/compiler/internal/visitor.go index 81ba3097..d75bbb3f 100644 --- a/pkg/compiler/internal/visitor.go +++ b/pkg/compiler/internal/visitor.go @@ -407,72 +407,76 @@ func (v *Visitor) VisitCollectClause(ctx *fql.CollectClauseContext) interface{} // And wrap each loop element by a KeyValuePair // Where a key is either a single value or a list of values // These KeyValuePairs are then added to the dataset - var kvKeyReg vm.Operand + var kvKeyReg, kvValReg vm.Operand var groupSelectors []fql.ICollectSelectorContext var isGrouping bool grouping := ctx.CollectGrouping() + counter := ctx.CollectCounter() + aggregator := ctx.CollectAggregator() - if grouping != nil { - isGrouping = true - groupSelectors = grouping.AllCollectSelector() - kvKeyReg = v.emitCollectGroupKeySelectors(groupSelectors) - } + isCollecting := grouping != nil || counter != nil - kvValReg := v.Registers.Allocate(Temp) - v.emitIterValue(loop, kvValReg) - - var projectionVariableName string - collectorType := CollectorTypeKey - - // If we have a collect group variable, we need to project it - if groupVar := ctx.CollectGroupVariable(); groupVar != nil { - // Projection can be either a default projection (identifier) or a custom projection (selector expression) - if identifier := groupVar.Identifier(); identifier != nil { - projectionVariableName = v.emitCollectDefaultGroupProjection(loop, kvValReg, identifier, groupVar.CollectGroupVariableKeeper()) - } else if selector := groupVar.CollectSelector(); selector != nil { - projectionVariableName = v.emitCollectCustomGroupProjection(loop, kvValReg, selector) + if isCollecting { + if grouping != nil { + isGrouping = true + groupSelectors = grouping.AllCollectSelector() + kvKeyReg = v.emitCollectGroupKeySelectors(groupSelectors) } - collectorType = CollectorTypeKeyGroup - } else if countVar := ctx.CollectCounter(); countVar != nil { - projectionVariableName = v.emitCollectCountProjection(loop, kvValReg, countVar) - - if isGrouping { - collectorType = CollectorTypeKeyCounter - } else { - collectorType = CollectorTypeCounter - } - } - - aggregateCtx := ctx.CollectAggregator() - - // If we use aggregators, we need to collect group items by key - if aggregateCtx != nil && collectorType != CollectorTypeKeyGroup { - // We need to patch the loop result to be a collector - collectorType = CollectorTypeKeyGroup - } - - // We replace DataSet initialization with Collector initialization - v.Emitter.PatchSwapAx(loop.ResultPos, vm.OpDataSetCollector, loop.Result, int(collectorType)) - v.Emitter.EmitABC(vm.OpPushKV, loop.Result, kvKeyReg, kvValReg) - v.emitIterJumpOrClose(loop) - - // Replace the source with the collector - v.patchSwitchLoop(loop) - - // If the projection is used, we allocate a new register for the variable and put the iterator's value into it - if projectionVariableName != "" { - // Now we need to expand group variables from the dataset - v.emitIterKey(loop, kvValReg) - v.emitIterValue(loop, v.Symbols.DefineVariable(projectionVariableName)) - } else { - v.emitIterKey(loop, kvKeyReg) + kvValReg = v.Registers.Allocate(Temp) v.emitIterValue(loop, kvValReg) + + var projectionVariableName string + collectorType := CollectorTypeKey + + // If we have a collect group variable, we need to project it + if groupVar := ctx.CollectGroupVariable(); groupVar != nil { + // Projection can be either a default projection (identifier) or a custom projection (selector expression) + if identifier := groupVar.Identifier(); identifier != nil { + projectionVariableName = v.emitCollectDefaultGroupProjection(loop, kvValReg, identifier, groupVar.CollectGroupVariableKeeper()) + } else if selector := groupVar.CollectSelector(); selector != nil { + projectionVariableName = v.emitCollectCustomGroupProjection(loop, kvValReg, selector) + } + + collectorType = CollectorTypeKeyGroup + } else if counter != nil { + projectionVariableName = v.emitCollectCountProjection(loop, kvValReg, counter) + + if isGrouping { + collectorType = CollectorTypeKeyCounter + } else { + collectorType = CollectorTypeCounter + } + } + + // If we use aggregators, we need to collect group items by key + if aggregator != nil && collectorType != CollectorTypeKeyGroup { + // We need to patch the loop result to be a collector + collectorType = CollectorTypeKeyGroup + } + + // We replace DataSet initialization with Collector initialization + v.Emitter.PatchSwapAx(loop.ResultPos, vm.OpDataSetCollector, loop.Result, int(collectorType)) + v.Emitter.EmitABC(vm.OpPushKV, loop.Result, kvKeyReg, kvValReg) + v.emitIterJumpOrClose(loop) + + // Replace the source with the collector + v.emitPatchLoop(loop) + + // If the projection is used, we allocate a new register for the variable and put the iterator's value into it + if projectionVariableName != "" { + // Now we need to expand group variables from the dataset + v.emitIterKey(loop, kvValReg) + v.emitIterValue(loop, v.Symbols.DefineVariable(projectionVariableName)) + } else { + v.emitIterKey(loop, kvKeyReg) + v.emitIterValue(loop, kvValReg) + } } // Aggregation loop - if aggregateCtx != nil { - v.emitCollectAggregator(aggregateCtx, loop) + if aggregator != nil { + v.emitCollectAggregator(aggregator, loop, isCollecting) } // TODO: Reuse the Registers @@ -481,42 +485,53 @@ func (v *Visitor) VisitCollectClause(ctx *fql.CollectClauseContext) interface{} loop.Value = vm.NoopOperand loop.Key = vm.NoopOperand - if isGrouping { + if isCollecting && isGrouping { // Now we are defining new variables for the group selectors - v.emitCollectGroupKeySelectorVariables(groupSelectors, kvKeyReg, kvValReg, aggregateCtx != nil) + v.emitCollectGroupKeySelectorVariables(groupSelectors, kvKeyReg, kvValReg, aggregator != nil) } return nil } -func (v *Visitor) emitCollectAggregator(c fql.ICollectAggregatorContext, parentLoop *Loop) { - // First of all, we allocate registers for accumulators +func (v *Visitor) emitCollectAggregator(c fql.ICollectAggregatorContext, parentLoop *Loop, isCollected bool) { + var accums []vm.Operand + var loop *Loop selectors := c.AllCollectAggregateSelector() - accums := make([]vm.Operand, len(selectors)) - // We need to allocate a register for each accumulator - for i := 0; i < len(selectors); i++ { - reg := v.Registers.Allocate(Temp) - accums[i] = reg - // TODO: Select persistent List type, we do not know how many items we will have - v.Emitter.EmitA(vm.OpList, reg) + // If data is collected, we need to allocate a temporary accumulators to store aggregation results + if isCollected { + // First of all, we allocate registers for accumulators + accums = make([]vm.Operand, len(selectors)) + + // We need to allocate a register for each accumulator + for i := 0; i < len(selectors); i++ { + reg := v.Registers.Allocate(Temp) + accums[i] = reg + // TODO: Select persistent List type, we do not know how many items we will have + v.Emitter.EmitA(vm.OpList, reg) + } + + loop = v.Loops.EnterLoop(TemporalLoop, ForLoop, false) + + // Now we iterate over the grouped items + v.emitIterValue(parentLoop, loop.Iterator) + // We just re-use the same register + v.Emitter.EmitAB(vm.OpIter, loop.Iterator, loop.Iterator) + // jumpPlaceholder is a placeholder for the exit aggrIterJump position + loop.Jump = v.Emitter.EmitJumpc(vm.OpIterNext, jumpPlaceholder, loop.Iterator) + loop.ValueName = parentLoop.ValueName + } else { + loop = parentLoop + // Otherwise, we create a custom collector for aggregators + v.Emitter.PatchSwapAx(loop.ResultPos, vm.OpDataSetCollector, loop.Result, int(CollectorTypeKeyGroup)) } // Store upper scope for aggregators mainScope := v.Symbols.Scope() // Nested scope for aggregators v.Symbols.EnterScope() - loop := v.Loops.EnterLoop(TemporalLoop, ForLoop, false) - // Now we iterate over the grouped items - v.emitIterValue(parentLoop, loop.Iterator) - - // We just re-use the same register - v.Emitter.EmitAB(vm.OpIter, loop.Iterator, loop.Iterator) - // jumpPlaceholder is a placeholder for the exit aggrIterJump position - loop.Jump = v.Emitter.EmitJumpc(vm.OpIterNext, jumpPlaceholder, loop.Iterator) - - aggrIterVal := v.Symbols.DefineVariable(parentLoop.ValueName) + aggrIterVal := v.Symbols.DefineVariable(loop.ValueName) v.Emitter.EmitAB(vm.OpIterValue, aggrIterVal, loop.Iterator) // Now we add value selectors to the accumulators @@ -536,7 +551,16 @@ func (v *Visitor) emitCollectAggregator(c fql.ICollectAggregatorContext, parentL } resultReg := args[0].Accept(v).(vm.Operand) - v.Emitter.EmitAB(vm.OpPush, accums[i], resultReg) + + if isCollected { + v.Emitter.EmitAB(vm.OpPush, accums[i], resultReg) + } else { + aggrKeyName := selector.Identifier().GetText() + aggrKeyReg := v.loadConstant(runtime.String(aggrKeyName)) + v.Emitter.EmitABC(vm.OpPushKV, loop.Result, aggrKeyReg, resultReg) + v.Registers.Free(aggrKeyReg) + } + v.Registers.Free(resultReg) } @@ -545,26 +569,74 @@ func (v *Visitor) emitCollectAggregator(c fql.ICollectAggregatorContext, parentL // Now we can iterate over the selectors and execute the aggregation functions by passing the accumulators // And define variables for each accumulator result - for i, selector := range selectors { - fcx := selector.FunctionCallExpression() - // We won't make any checks here, as we already did it before - selectorVarName := selector.Identifier().GetText() + if isCollected { + for i, selector := range selectors { + fcx := selector.FunctionCallExpression() + // We won't make any checks here, as we already did it before + selectorVarName := selector.Identifier().GetText() - // We execute the function call with the accumulator as an argument - accum := accums[i] - result := v.emitFunctionCall(fcx.FunctionCall(), fcx.ErrorOperator() != nil, NewRegisterSequence(accum)) + // We execute the function call with the accumulator as an argument + accum := accums[i] + result := v.emitFunctionCall(fcx.FunctionCall(), fcx.ErrorOperator() != nil, NewRegisterSequence(accum)) - // We define the variable for the selector result in the upper scope - // Since this temporary scope is only for aggregators and will be closed after the aggregation - varReg := v.Symbols.DefineVariableInScope(selectorVarName, mainScope) - v.Emitter.EmitAB(vm.OpMove, varReg, result) - v.Registers.Free(result) + // We define the variable for the selector result in the upper scope + // Since this temporary scope is only for aggregators and will be closed after the aggregation + varReg := v.Symbols.DefineVariableInScope(selectorVarName, mainScope) + v.Emitter.EmitAB(vm.OpMove, varReg, result) + v.Registers.Free(result) + } + + v.Loops.ExitLoop() + // Now close the aggregators scope + v.Symbols.ExitScope() + } else { + // Now close the aggregators scope + v.Symbols.ExitScope() + + parentLoop.ValueName = "" + parentLoop.KeyName = "" + + // Since we we in the middle of the loop, we need to patch the loop result + // Now we just create a range with 1 item to push the aggregated values to the dataset + // Replace source with sorted array + zero := v.loadConstant(runtime.Int(0)) + one := v.loadConstant(runtime.Int(1)) + aggregator := v.Registers.Allocate(Temp) + v.Emitter.EmitAB(vm.OpMove, aggregator, loop.Result) + v.Symbols.ExitScope() + + v.Symbols.EnterScope() + + // Create new for loop + v.Emitter.EmitABC(vm.OpRange, loop.Src, zero, one) + v.Emitter.EmitAb(vm.OpDataSet, loop.Result, loop.Distinct) + + // In case of non-collected aggregators, we just iterate over the grouped items + // Retrieve the grouped values by key, execute aggregation funcs and assign variable names to the results + for _, selector := range selectors { + fcx := selector.FunctionCallExpression() + // We won't make any checks here, as we already did it before + selectorVarName := selector.Identifier().GetText() + + // We execute the function call with the accumulator as an argument + key := v.loadConstant(runtime.String(selectorVarName)) + value := v.Registers.Allocate(Temp) + v.Emitter.EmitABC(vm.OpLoadKey, value, aggregator, key) + + result := v.emitFunctionCall(fcx.FunctionCall(), fcx.ErrorOperator() != nil, NewRegisterSequence(value)) + + // We define the variable for the selector result in the upper scope + // Since this temporary scope is only for aggregators and will be closed after the aggregation + varReg := v.Symbols.DefineVariableInScope(selectorVarName, mainScope) + v.Emitter.EmitAB(vm.OpMove, varReg, result) + v.Registers.Free(result) + v.Registers.Free(value) + v.Registers.Free(key) + } + + v.Registers.Free(aggregator) } - v.Loops.ExitLoop() - // Now close the aggregators scope - v.Symbols.ExitScope() - // Free the registers for accumulators for _, reg := range accums { v.Registers.Free(reg) @@ -1600,8 +1672,8 @@ func (v *Visitor) emitIterJumpOrClose(loop *Loop) { } } -// patchSwitchLoop replaces the source of the loop with a modified dataset -func (v *Visitor) patchSwitchLoop(loop *Loop) { +// emitPatchLoop replaces the source of the loop with a modified dataset +func (v *Visitor) emitPatchLoop(loop *Loop) { // Replace source with sorted array v.Emitter.EmitAB(vm.OpMove, loop.Src, loop.Result) diff --git a/pkg/vm/internal/collector_key_group.go b/pkg/vm/internal/collector_key_group.go index 775001e7..006f29b1 100644 --- a/pkg/vm/internal/collector_key_group.go +++ b/pkg/vm/internal/collector_key_group.go @@ -21,6 +21,10 @@ func NewKeyGroupCollector() Transformer { } } +func (c *KeyGroupCollector) Get(_ context.Context, key runtime.Value) (runtime.Value, error) { + return c.grouping[key.String()], nil +} + func (c *KeyGroupCollector) Iterate(ctx context.Context) (runtime.Iterator, error) { if !c.sorted { if err := c.sort(ctx); err != nil { @@ -39,23 +43,6 @@ func (c *KeyGroupCollector) Iterate(ctx context.Context) (runtime.Iterator, erro return NewKVIterator(iter), nil } -func (c *KeyGroupCollector) sort(ctx context.Context) error { - return runtime.SortListWith(ctx, c.Value, func(first, second runtime.Value) int64 { - firstKV, firstOk := first.(*KV) - secondKV, secondOk := second.(*KV) - - var comp int64 - - if firstOk && secondOk { - comp = runtime.CompareValues(firstKV.Key, secondKV.Key) - } else { - comp = runtime.CompareValues(first, second) - } - - return comp - }) -} - func (c *KeyGroupCollector) Add(ctx context.Context, key, value runtime.Value) error { k, err := Stringify(ctx, key) @@ -79,3 +66,20 @@ func (c *KeyGroupCollector) Add(ctx context.Context, key, value runtime.Value) e return group.Add(ctx, value) } + +func (c *KeyGroupCollector) sort(ctx context.Context) error { + return runtime.SortListWith(ctx, c.Value, func(first, second runtime.Value) int64 { + firstKV, firstOk := first.(*KV) + secondKV, secondOk := second.(*KV) + + var comp int64 + + if firstOk && secondOk { + comp = runtime.CompareValues(firstKV.Key, secondKV.Key) + } else { + comp = runtime.CompareValues(first, second) + } + + return comp + }) +} diff --git a/pkg/vm/internal/collector_value.go b/pkg/vm/internal/collector_value.go deleted file mode 100644 index fa52edb7..00000000 --- a/pkg/vm/internal/collector_value.go +++ /dev/null @@ -1,34 +0,0 @@ -package internal - -import ( - "context" - "github.com/MontFerret/ferret/pkg/runtime" -) - -type ValueCollector struct { - *runtime.Box[runtime.List] - sorted bool -} - -func NewValueCollector() Transformer { - return &ValueCollector{ - Box: &runtime.Box[runtime.List]{ - Value: runtime.NewArray(16), - }} -} - -func (c *ValueCollector) Iterate(ctx context.Context) (runtime.Iterator, error) { - if !c.sorted { - if err := runtime.SortAsc(ctx, c.Value); err != nil { - return nil, err - } - - c.sorted = true - } - - return c.Value.Iterate(ctx) -} - -func (c *ValueCollector) Add(ctx context.Context, _, value runtime.Value) error { - return c.Value.Add(ctx, value) -} diff --git a/test/integration/vm/vm_for_collect_test.go b/test/integration/vm/vm_for_collect_test.go index d5082507..29bd3829 100644 --- a/test/integration/vm/vm_for_collect_test.go +++ b/test/integration/vm/vm_for_collect_test.go @@ -23,7 +23,7 @@ import ( // Aside from COLLECTs sophisticated grouping and aggregation capabilities, it allows you to place a LIMIT operation before RETURN to potentially stop the COLLECT operation early. func TestCollect(t *testing.T) { RunUseCases(t, []UseCase{ - SkipCaseCompilationError(` + CaseCompilationError(` LET users = [ { active: true, @@ -63,7 +63,7 @@ func TestCollect(t *testing.T) { gender: gender } `, "Should not have access to initial variables"), - SkipCaseCompilationError(` + CaseCompilationError(` LET users = [ { active: true, @@ -101,7 +101,7 @@ func TestCollect(t *testing.T) { COLLECT gender = i.gender RETURN {x, gender} `, "Should not have access to variables defined before COLLECT"), - SkipCaseArray(` + CaseArray(` LET users = [ { active: true, @@ -138,7 +138,7 @@ LET users = [ COLLECT gender = i.gender RETURN gender `, []any{"f", "m"}, "Should group result by a single key"), - SkipCaseArray(` + CaseArray(` LET users = [ { active: true, @@ -181,7 +181,7 @@ LET users = [ map[string]int{"ageGroup": 9}, map[string]int{"ageGroup": 13}, }, "Should group result by a single key expression"), - SkipCase(` + Case(` LET users = [ { active: true, @@ -219,7 +219,7 @@ LET users = [ RETURN gender) RETURN grouped[0] `, "f", "Should return correct group key by an index"), - SkipCaseArray( + CaseArray( `LET users = [ { active: true, @@ -262,7 +262,7 @@ LET users = [ map[string]any{"age": 36, "gender": "m"}, map[string]any{"age": 69, "gender": "m"}, }, "Should group result by multiple keys"), - SkipCaseArray(` + CaseArray(` LET users = [ { active: true, @@ -353,7 +353,7 @@ LET users = [ }, }, }, "Should create default projection"), - SkipCaseArray(` + CaseArray(` LET users = [] FOR i IN users COLLECT gender = i.gender INTO genders @@ -362,7 +362,7 @@ LET users = [ values: genders } `, []any{}, "COLLECT gender = i.gender INTO genders: should return an empty array when source is empty"), - SkipCaseArray( + CaseArray( `LET users = [ { active: true, @@ -418,7 +418,7 @@ LET users = [ }, }, }, "Should create custom projection"), - SkipCaseArray( + CaseArray( `LET users = [ { active: true, @@ -495,7 +495,7 @@ LET users = [ }, }, }, "Should create custom projection grouped by multiple keys"), - SkipCaseArray(` + CaseArray(` LET users = [ { active: true, @@ -552,7 +552,7 @@ LET users = [ }, }, }, "Should create default projection with default KEEP"), - SkipCaseArray(` + CaseArray(` LET users = [] FOR i IN users LET married = i.married @@ -562,7 +562,7 @@ LET users = [ values: genders } `, []any{}, "COLLECT gender = i.gender INTO genders KEEP married: Should return an empty array when source is empty"), - SkipCaseArray(` + CaseArray(` LET users = [ { active: true, @@ -635,7 +635,7 @@ LET users = [ }, }, }, "Should create default projection with default KEEP using multiple keys"), - SkipCaseArray(` + CaseArray(` LET users = [ { active: true, @@ -692,7 +692,7 @@ LET users = [ }, }, }, "Should create default projection with custom KEEP"), - SkipCaseArray(` + CaseArray(` LET users = [ { active: true, @@ -765,7 +765,7 @@ LET users = [ }, }, }, "Should create default projection with custom KEEP using multiple keys"), - SkipCaseArray(` + CaseArray(` LET users = [ { active: true, @@ -822,7 +822,7 @@ LET users = [ }, }, }, "Should create default projection with custom KEEP with custom name"), - SkipCaseArray(` + CaseArray(` LET users = [ { active: true, @@ -880,7 +880,7 @@ LET users = [ }, }, }, "Should create default projection with custom KEEP with multiple custom names"), - SkipCaseArray( + CaseArray( `LET users = [ { active: true, @@ -930,7 +930,7 @@ LET users = [ }, }, "Should group and count result by a single key"), - SkipCaseArray( + CaseArray( ` LET users = [] FOR i IN users @@ -940,7 +940,7 @@ LET users = [ values: numberOfUsers } `, []any{}, "COLLECT gender = i.gender WITH COUNT INTO numberOfUsers: Should return empty array when source is empty"), - SkipCaseArray( + CaseArray( `LET users = [ { active: true, @@ -979,7 +979,7 @@ LET users = [ `, []any{ 5, }, "Should just count the number of items in the source"), - SkipCaseArray( + CaseArray( `LET users = [] FOR i IN users COLLECT WITH COUNT INTO numberOfUsers