mirror of
https://github.com/MontFerret/ferret.git
synced 2025-08-15 20:02:56 +02:00
Introduce CollectorType enumeration and refactor loop handling for aggregation support
This commit is contained in:
@@ -9,6 +9,8 @@ type (
|
||||
|
||||
LoopKind int
|
||||
|
||||
CollectorType int
|
||||
|
||||
Loop struct {
|
||||
Type LoopType
|
||||
Kind LoopKind
|
||||
@@ -44,6 +46,13 @@ const (
|
||||
DoWhileLoop
|
||||
)
|
||||
|
||||
const (
|
||||
CollectorTypeCounter CollectorType = iota
|
||||
CollectorTypeKey
|
||||
CollectorTypeKeyCounter
|
||||
CollectorTypeKeyGroup
|
||||
)
|
||||
|
||||
func NewLoopTable(registers *RegisterAllocator) *LoopTable {
|
||||
return &LoopTable{
|
||||
loops: make([]*Loop, 0),
|
||||
@@ -71,7 +80,7 @@ func (lt *LoopTable) EnterLoop(loopType LoopType, kind LoopKind, distinct bool)
|
||||
state = lt.loops[len(lt.loops)-1].Result
|
||||
}
|
||||
|
||||
if allocate {
|
||||
if allocate && loopType != TemporalLoop {
|
||||
state = lt.registers.Allocate(Result)
|
||||
}
|
||||
|
||||
|
@@ -422,8 +422,7 @@ func (v *Visitor) VisitCollectClause(ctx *fql.CollectClauseContext) interface{}
|
||||
v.emitIterValue(loop, kvValReg)
|
||||
|
||||
var projectionVariableName string
|
||||
// TODO: Create enum for better readability
|
||||
collectorType := 1
|
||||
collectorType := CollectorTypeKey
|
||||
|
||||
// If we have a collect group variable, we need to project it
|
||||
if groupVar := ctx.CollectGroupVariable(); groupVar != nil {
|
||||
@@ -434,19 +433,27 @@ func (v *Visitor) VisitCollectClause(ctx *fql.CollectClauseContext) interface{}
|
||||
projectionVariableName = v.emitCollectCustomGroupProjection(loop, kvValReg, selector)
|
||||
}
|
||||
|
||||
collectorType = 3
|
||||
collectorType = CollectorTypeKeyGroup
|
||||
} else if countVar := ctx.CollectCounter(); countVar != nil {
|
||||
projectionVariableName = v.emitCollectCountProjection(loop, kvValReg, countVar)
|
||||
|
||||
if isGrouping {
|
||||
collectorType = 2
|
||||
collectorType = CollectorTypeKeyCounter
|
||||
} else {
|
||||
collectorType = 0
|
||||
collectorType = CollectorTypeCounter
|
||||
}
|
||||
}
|
||||
|
||||
aggregateCtx := ctx.CollectAggregator()
|
||||
|
||||
// If we use aggregators, we need to collect group items by key
|
||||
if aggregateCtx != nil && collectorType != CollectorTypeKeyGroup {
|
||||
// We need to patch the loop result to be a collector
|
||||
collectorType = CollectorTypeKeyGroup
|
||||
}
|
||||
|
||||
// We replace DataSet initialization with Collector initialization
|
||||
v.Emitter.PatchSwapAx(loop.ResultPos, vm.OpDataSetCollector, loop.Result, collectorType)
|
||||
v.Emitter.PatchSwapAx(loop.ResultPos, vm.OpDataSetCollector, loop.Result, int(collectorType))
|
||||
v.Emitter.EmitABC(vm.OpPushKV, loop.Result, kvKeyReg, kvValReg)
|
||||
v.emitIterJumpOrClose(loop)
|
||||
|
||||
@@ -459,12 +466,13 @@ func (v *Visitor) VisitCollectClause(ctx *fql.CollectClauseContext) interface{}
|
||||
v.emitIterKey(loop, kvValReg)
|
||||
v.emitIterValue(loop, v.Symbols.DefineVariable(projectionVariableName))
|
||||
} else {
|
||||
v.emitIterKey(loop, kvKeyReg)
|
||||
v.emitIterValue(loop, kvValReg)
|
||||
}
|
||||
|
||||
// Aggregation loop
|
||||
if c := ctx.CollectAggregator(); c != nil {
|
||||
v.emitCollectAggregator(c, loop)
|
||||
if aggregateCtx != nil {
|
||||
v.emitCollectAggregator(aggregateCtx, loop)
|
||||
}
|
||||
|
||||
// TODO: Reuse the Registers
|
||||
@@ -474,13 +482,14 @@ func (v *Visitor) VisitCollectClause(ctx *fql.CollectClauseContext) interface{}
|
||||
loop.Key = vm.NoopOperand
|
||||
|
||||
if isGrouping {
|
||||
v.emitCollectGroupKeySelectorVariables(groupSelectors, kvValReg)
|
||||
// Now we are defining new variables for the group selectors
|
||||
v.emitCollectGroupKeySelectorVariables(groupSelectors, kvKeyReg, kvValReg, aggregateCtx != nil)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (v *Visitor) emitCollectAggregator(c fql.ICollectAggregatorContext, loop *Loop) {
|
||||
func (v *Visitor) emitCollectAggregator(c fql.ICollectAggregatorContext, parentLoop *Loop) {
|
||||
// First of all, we allocate registers for accumulators
|
||||
selectors := c.AllCollectAggregateSelector()
|
||||
accums := make([]vm.Operand, len(selectors))
|
||||
@@ -493,21 +502,22 @@ func (v *Visitor) emitCollectAggregator(c fql.ICollectAggregatorContext, loop *L
|
||||
v.Emitter.EmitA(vm.OpList, reg)
|
||||
}
|
||||
|
||||
// Now we iterate over the grouped items
|
||||
aggrIter := v.Registers.Allocate(Temp)
|
||||
v.emitIterValue(loop, aggrIter)
|
||||
|
||||
// We just re-use the same register
|
||||
v.Emitter.EmitAB(vm.OpIter, aggrIter, aggrIter)
|
||||
// jumpPlaceholder is a placeholder for the exit aggrIterJump position
|
||||
aggrIterJump := v.Emitter.EmitJumpc(vm.OpIterNext, jumpPlaceholder, loop.Iterator)
|
||||
|
||||
// Store upper scope for aggregators
|
||||
mainScope := v.Symbols.Scope()
|
||||
// Nested scope for aggregators
|
||||
v.Symbols.EnterScope()
|
||||
aggrIterVal := v.Symbols.DefineVariable(loop.ValueName)
|
||||
v.Emitter.EmitAB(vm.OpIterValue, aggrIterVal, aggrIter)
|
||||
loop := v.Loops.EnterLoop(TemporalLoop, ForLoop, false)
|
||||
|
||||
// Now we iterate over the grouped items
|
||||
v.emitIterValue(parentLoop, loop.Iterator)
|
||||
|
||||
// We just re-use the same register
|
||||
v.Emitter.EmitAB(vm.OpIter, loop.Iterator, loop.Iterator)
|
||||
// jumpPlaceholder is a placeholder for the exit aggrIterJump position
|
||||
loop.Jump = v.Emitter.EmitJumpc(vm.OpIterNext, jumpPlaceholder, loop.Iterator)
|
||||
|
||||
aggrIterVal := v.Symbols.DefineVariable(parentLoop.ValueName)
|
||||
v.Emitter.EmitAB(vm.OpIterValue, aggrIterVal, loop.Iterator)
|
||||
|
||||
// Now we add value selectors to the accumulators
|
||||
for i := 0; i < len(selectors); i++ {
|
||||
@@ -531,8 +541,7 @@ func (v *Visitor) emitCollectAggregator(c fql.ICollectAggregatorContext, loop *L
|
||||
}
|
||||
|
||||
// Now we can iterate over the grouped items
|
||||
v.Emitter.EmitJump(vm.OpJump, aggrIterJump)
|
||||
v.Emitter.EmitA(vm.OpClose, aggrIter)
|
||||
v.emitIterJumpOrClose(loop)
|
||||
|
||||
// Now we can iterate over the selectors and execute the aggregation functions by passing the accumulators
|
||||
// And define variables for each accumulator result
|
||||
@@ -552,8 +561,10 @@ func (v *Visitor) emitCollectAggregator(c fql.ICollectAggregatorContext, loop *L
|
||||
v.Registers.Free(result)
|
||||
}
|
||||
|
||||
v.Loops.ExitLoop()
|
||||
// Now close the aggregators scope
|
||||
v.Symbols.ExitScope()
|
||||
|
||||
// Free the registers for accumulators
|
||||
for _, reg := range accums {
|
||||
v.Registers.Free(reg)
|
||||
@@ -588,7 +599,7 @@ func (v *Visitor) emitCollectGroupKeySelectors(selectors []fql.ICollectSelectorC
|
||||
return kvKeyReg
|
||||
}
|
||||
|
||||
func (v *Visitor) emitCollectGroupKeySelectorVariables(selectors []fql.ICollectSelectorContext, kvValReg vm.Operand) {
|
||||
func (v *Visitor) emitCollectGroupKeySelectorVariables(selectors []fql.ICollectSelectorContext, kvKeyReg, kvValReg vm.Operand, isAggregation bool) {
|
||||
if len(selectors) > 1 {
|
||||
variables := make([]vm.Operand, len(selectors))
|
||||
|
||||
@@ -599,7 +610,13 @@ func (v *Visitor) emitCollectGroupKeySelectorVariables(selectors []fql.ICollectS
|
||||
variables[i] = v.Symbols.DefineVariable(name)
|
||||
}
|
||||
|
||||
v.Emitter.EmitABC(vm.OpLoadIndex, variables[i], kvValReg, v.loadConstant(runtime.Int(i)))
|
||||
reg := kvValReg
|
||||
|
||||
if isAggregation {
|
||||
reg = kvKeyReg
|
||||
}
|
||||
|
||||
v.Emitter.EmitABC(vm.OpLoadIndex, variables[i], reg, v.loadConstant(runtime.Int(i)))
|
||||
}
|
||||
|
||||
// Free the register after moving its value to the variable
|
||||
@@ -611,8 +628,15 @@ func (v *Visitor) emitCollectGroupKeySelectorVariables(selectors []fql.ICollectS
|
||||
name := selectors[0].Identifier().GetText()
|
||||
// Define a variable for each selector
|
||||
varReg := v.Symbols.DefineVariable(name)
|
||||
|
||||
reg := kvValReg
|
||||
|
||||
if isAggregation {
|
||||
reg = kvKeyReg
|
||||
}
|
||||
|
||||
// If we have a single selector, we can just move the value
|
||||
v.Emitter.EmitAB(vm.OpMove, varReg, kvValReg)
|
||||
v.Emitter.EmitAB(vm.OpMove, varReg, reg)
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -371,7 +371,7 @@ loop:
|
||||
case OpDataSetCollector:
|
||||
reg[dst] = internal.NewCollector(internal.CollectorType(src1))
|
||||
case OpPush:
|
||||
ds := reg[dst].(*internal.DataSet)
|
||||
ds := reg[dst].(runtime.List)
|
||||
|
||||
if err := ds.Add(ctx, reg[src1]); err != nil {
|
||||
if _, catch := tryCatch(vm.pc); catch {
|
||||
|
Reference in New Issue
Block a user