From 0a007a36c3f1a45fac638d45bfdaef71f1f6b702 Mon Sep 17 00:00:00 2001 From: Tim Voronov Date: Mon, 9 Jun 2025 16:14:34 -0400 Subject: [PATCH] Introduce CollectorType enumeration and refactor loop handling for aggregation support --- pkg/compiler/internal/loops.go | 11 ++++- pkg/compiler/internal/visitor.go | 76 +++++++++++++++++++++----------- pkg/vm/vm.go | 2 +- 3 files changed, 61 insertions(+), 28 deletions(-) diff --git a/pkg/compiler/internal/loops.go b/pkg/compiler/internal/loops.go index 5e0f7e93..ba987626 100644 --- a/pkg/compiler/internal/loops.go +++ b/pkg/compiler/internal/loops.go @@ -9,6 +9,8 @@ type ( LoopKind int + CollectorType int + Loop struct { Type LoopType Kind LoopKind @@ -44,6 +46,13 @@ const ( DoWhileLoop ) +const ( + CollectorTypeCounter CollectorType = iota + CollectorTypeKey + CollectorTypeKeyCounter + CollectorTypeKeyGroup +) + func NewLoopTable(registers *RegisterAllocator) *LoopTable { return &LoopTable{ loops: make([]*Loop, 0), @@ -71,7 +80,7 @@ func (lt *LoopTable) EnterLoop(loopType LoopType, kind LoopKind, distinct bool) state = lt.loops[len(lt.loops)-1].Result } - if allocate { + if allocate && loopType != TemporalLoop { state = lt.registers.Allocate(Result) } diff --git a/pkg/compiler/internal/visitor.go b/pkg/compiler/internal/visitor.go index 697957fa..6053a591 100644 --- a/pkg/compiler/internal/visitor.go +++ b/pkg/compiler/internal/visitor.go @@ -422,8 +422,7 @@ func (v *Visitor) VisitCollectClause(ctx *fql.CollectClauseContext) interface{} v.emitIterValue(loop, kvValReg) var projectionVariableName string - // TODO: Create enum for better readability - collectorType := 1 + collectorType := CollectorTypeKey // If we have a collect group variable, we need to project it if groupVar := ctx.CollectGroupVariable(); groupVar != nil { @@ -434,19 +433,27 @@ func (v *Visitor) VisitCollectClause(ctx *fql.CollectClauseContext) interface{} projectionVariableName = v.emitCollectCustomGroupProjection(loop, kvValReg, selector) } - collectorType = 3 + collectorType = CollectorTypeKeyGroup } else if countVar := ctx.CollectCounter(); countVar != nil { projectionVariableName = v.emitCollectCountProjection(loop, kvValReg, countVar) if isGrouping { - collectorType = 2 + collectorType = CollectorTypeKeyCounter } else { - collectorType = 0 + collectorType = CollectorTypeCounter } } + aggregateCtx := ctx.CollectAggregator() + + // If we use aggregators, we need to collect group items by key + if aggregateCtx != nil && collectorType != CollectorTypeKeyGroup { + // We need to patch the loop result to be a collector + collectorType = CollectorTypeKeyGroup + } + // We replace DataSet initialization with Collector initialization - v.Emitter.PatchSwapAx(loop.ResultPos, vm.OpDataSetCollector, loop.Result, collectorType) + v.Emitter.PatchSwapAx(loop.ResultPos, vm.OpDataSetCollector, loop.Result, int(collectorType)) v.Emitter.EmitABC(vm.OpPushKV, loop.Result, kvKeyReg, kvValReg) v.emitIterJumpOrClose(loop) @@ -459,12 +466,13 @@ func (v *Visitor) VisitCollectClause(ctx *fql.CollectClauseContext) interface{} v.emitIterKey(loop, kvValReg) v.emitIterValue(loop, v.Symbols.DefineVariable(projectionVariableName)) } else { + v.emitIterKey(loop, kvKeyReg) v.emitIterValue(loop, kvValReg) } // Aggregation loop - if c := ctx.CollectAggregator(); c != nil { - v.emitCollectAggregator(c, loop) + if aggregateCtx != nil { + v.emitCollectAggregator(aggregateCtx, loop) } // TODO: Reuse the Registers @@ -474,13 +482,14 @@ func (v *Visitor) VisitCollectClause(ctx *fql.CollectClauseContext) interface{} loop.Key = vm.NoopOperand if isGrouping { - v.emitCollectGroupKeySelectorVariables(groupSelectors, kvValReg) + // Now we are defining new variables for the group selectors + v.emitCollectGroupKeySelectorVariables(groupSelectors, kvKeyReg, kvValReg, aggregateCtx != nil) } return nil } -func (v *Visitor) emitCollectAggregator(c fql.ICollectAggregatorContext, loop *Loop) { +func (v *Visitor) emitCollectAggregator(c fql.ICollectAggregatorContext, parentLoop *Loop) { // First of all, we allocate registers for accumulators selectors := c.AllCollectAggregateSelector() accums := make([]vm.Operand, len(selectors)) @@ -493,21 +502,22 @@ func (v *Visitor) emitCollectAggregator(c fql.ICollectAggregatorContext, loop *L v.Emitter.EmitA(vm.OpList, reg) } - // Now we iterate over the grouped items - aggrIter := v.Registers.Allocate(Temp) - v.emitIterValue(loop, aggrIter) - - // We just re-use the same register - v.Emitter.EmitAB(vm.OpIter, aggrIter, aggrIter) - // jumpPlaceholder is a placeholder for the exit aggrIterJump position - aggrIterJump := v.Emitter.EmitJumpc(vm.OpIterNext, jumpPlaceholder, loop.Iterator) - // Store upper scope for aggregators mainScope := v.Symbols.Scope() // Nested scope for aggregators v.Symbols.EnterScope() - aggrIterVal := v.Symbols.DefineVariable(loop.ValueName) - v.Emitter.EmitAB(vm.OpIterValue, aggrIterVal, aggrIter) + loop := v.Loops.EnterLoop(TemporalLoop, ForLoop, false) + + // Now we iterate over the grouped items + v.emitIterValue(parentLoop, loop.Iterator) + + // We just re-use the same register + v.Emitter.EmitAB(vm.OpIter, loop.Iterator, loop.Iterator) + // jumpPlaceholder is a placeholder for the exit aggrIterJump position + loop.Jump = v.Emitter.EmitJumpc(vm.OpIterNext, jumpPlaceholder, loop.Iterator) + + aggrIterVal := v.Symbols.DefineVariable(parentLoop.ValueName) + v.Emitter.EmitAB(vm.OpIterValue, aggrIterVal, loop.Iterator) // Now we add value selectors to the accumulators for i := 0; i < len(selectors); i++ { @@ -531,8 +541,7 @@ func (v *Visitor) emitCollectAggregator(c fql.ICollectAggregatorContext, loop *L } // Now we can iterate over the grouped items - v.Emitter.EmitJump(vm.OpJump, aggrIterJump) - v.Emitter.EmitA(vm.OpClose, aggrIter) + v.emitIterJumpOrClose(loop) // Now we can iterate over the selectors and execute the aggregation functions by passing the accumulators // And define variables for each accumulator result @@ -552,8 +561,10 @@ func (v *Visitor) emitCollectAggregator(c fql.ICollectAggregatorContext, loop *L v.Registers.Free(result) } + v.Loops.ExitLoop() // Now close the aggregators scope v.Symbols.ExitScope() + // Free the registers for accumulators for _, reg := range accums { v.Registers.Free(reg) @@ -588,7 +599,7 @@ func (v *Visitor) emitCollectGroupKeySelectors(selectors []fql.ICollectSelectorC return kvKeyReg } -func (v *Visitor) emitCollectGroupKeySelectorVariables(selectors []fql.ICollectSelectorContext, kvValReg vm.Operand) { +func (v *Visitor) emitCollectGroupKeySelectorVariables(selectors []fql.ICollectSelectorContext, kvKeyReg, kvValReg vm.Operand, isAggregation bool) { if len(selectors) > 1 { variables := make([]vm.Operand, len(selectors)) @@ -599,7 +610,13 @@ func (v *Visitor) emitCollectGroupKeySelectorVariables(selectors []fql.ICollectS variables[i] = v.Symbols.DefineVariable(name) } - v.Emitter.EmitABC(vm.OpLoadIndex, variables[i], kvValReg, v.loadConstant(runtime.Int(i))) + reg := kvValReg + + if isAggregation { + reg = kvKeyReg + } + + v.Emitter.EmitABC(vm.OpLoadIndex, variables[i], reg, v.loadConstant(runtime.Int(i))) } // Free the register after moving its value to the variable @@ -611,8 +628,15 @@ func (v *Visitor) emitCollectGroupKeySelectorVariables(selectors []fql.ICollectS name := selectors[0].Identifier().GetText() // Define a variable for each selector varReg := v.Symbols.DefineVariable(name) + + reg := kvValReg + + if isAggregation { + reg = kvKeyReg + } + // If we have a single selector, we can just move the value - v.Emitter.EmitAB(vm.OpMove, varReg, kvValReg) + v.Emitter.EmitAB(vm.OpMove, varReg, reg) } } diff --git a/pkg/vm/vm.go b/pkg/vm/vm.go index 99f467c2..ddca4b0d 100644 --- a/pkg/vm/vm.go +++ b/pkg/vm/vm.go @@ -371,7 +371,7 @@ loop: case OpDataSetCollector: reg[dst] = internal.NewCollector(internal.CollectorType(src1)) case OpPush: - ds := reg[dst].(*internal.DataSet) + ds := reg[dst].(runtime.List) if err := ds.Add(ctx, reg[src1]); err != nil { if _, catch := tryCatch(vm.pc); catch {