1
0
mirror of https://github.com/MontFerret/ferret.git synced 2025-08-15 20:02:56 +02:00

Refactor collect handling; restructure group and aggregation logic, unify loop operations, and improve method clarity

This commit is contained in:
Tim Voronov
2025-06-18 17:22:39 -04:00
parent 16137db876
commit 2e78bc4c64

View File

@@ -18,110 +18,110 @@ func NewCollectCompiler(ctx *CompilerContext) *CollectCompiler {
}
func (cc *CollectCompiler) Compile(ctx fql.ICollectClauseContext) {
// TODO: Undefine original loop variables
loop := cc.ctx.Loops.Current()
// We collect the aggregation keys
// And wrap each loop element by a KeyValuePair
// Where a key is either a single value or a list of values
// These KeyValuePairs are then added to the dataset
var kvKeyReg, kvValReg vm.Operand
var groupSelectors []fql.ICollectSelectorContext
var isGrouping bool
grouping := ctx.CollectGrouping()
counter := ctx.CollectCounter()
aggregator := ctx.CollectAggregator()
isCollecting := grouping != nil || counter != nil
if isCollecting {
if grouping != nil {
isGrouping = true
groupSelectors = grouping.AllCollectSelector()
kvKeyReg = cc.compileCollectGroupKeySelectors(groupSelectors)
}
kvValReg = cc.ctx.Registers.Allocate(core.Temp)
loop.EmitValue(kvValReg, cc.ctx.Emitter)
var projectionVariableName string
collectorType := core.CollectorTypeKey
// If we have a collect group variable, we need to project it
if groupVar := ctx.CollectGroupVariable(); groupVar != nil {
// Projection can be either a default projection (identifier) or a custom projection (selector expression)
if identifier := groupVar.Identifier(); identifier != nil {
projectionVariableName = cc.compileDefaultGroupProjection(loop, kvValReg, identifier, groupVar.CollectGroupVariableKeeper())
} else if selector := groupVar.CollectSelector(); selector != nil {
projectionVariableName = cc.compileCustomGroupProjection(loop, kvValReg, selector)
}
collectorType = core.CollectorTypeKeyGroup
} else if counter != nil {
projectionVariableName = counter.Identifier().GetText()
if isGrouping {
collectorType = core.CollectorTypeKeyCounter
} else {
collectorType = core.CollectorTypeCounter
}
}
// If we use aggregators, we need to collect group items by key
if aggregator != nil && collectorType != core.CollectorTypeKeyGroup {
// We need to patch the loop result to be a collector
collectorType = core.CollectorTypeKeyGroup
}
// We replace DataSet initialization with Collector initialization
cc.ctx.Emitter.PatchSwapAx(loop.ResultPos, vm.OpDataSetCollector, loop.Result, int(collectorType))
cc.ctx.Emitter.EmitABC(vm.OpPushKV, loop.Result, kvKeyReg, kvValReg)
loop.EmitFinalization(cc.ctx.Emitter)
cc.ctx.Emitter.EmitMove(loop.Src, loop.Result)
cc.ctx.Registers.Free(loop.Value)
cc.ctx.Registers.Free(loop.Key)
loop.Value = kvValReg
loop.Key = vm.NoopOperand
// If the projection is used, we allocate a new register for the variable and put the iterator's value into it
if projectionVariableName != "" {
// Now we need to expand group variables from the dataset
loop.DeclareValueVar(projectionVariableName, cc.ctx.Symbols)
cc.ctx.LoopCompiler.EmitLoopBegin(loop)
loop.EmitKey(kvValReg, cc.ctx.Emitter)
loop.BindValueVar(cc.ctx.Emitter)
} else {
cc.ctx.LoopCompiler.EmitLoopBegin(loop)
loop.EmitKey(kvKeyReg, cc.ctx.Emitter)
//loop.EmitValue(kvValReg, cc.ctx.Emitter)
}
}
kvKeyReg, kvValReg, groupSelectors := cc.compileGrouping(ctx, aggregator != nil)
// Aggregation loop
if aggregator != nil {
cc.compileAggregation(aggregator, loop, isCollecting)
cc.compileAggregation(aggregator, len(groupSelectors) > 0)
}
if isCollecting && isGrouping {
if len(groupSelectors) > 0 {
// Now we are defining new variables for the group selectors
cc.compileCollectGroupKeySelectorVariables(groupSelectors, kvKeyReg, kvValReg, aggregator != nil)
cc.compileGroupSelectorVariables(groupSelectors, kvKeyReg, kvValReg, aggregator != nil)
}
}
func (cc *CollectCompiler) compileAggregation(c fql.ICollectAggregatorContext, parentLoop *core.Loop, isCollected bool) {
if isCollected {
cc.compileGroupedAggregation(c, parentLoop)
func (cc *CollectCompiler) compileGrouping(ctx fql.ICollectClauseContext, aggregation bool) (vm.Operand, vm.Operand, []fql.ICollectSelectorContext) {
var kvKeyReg, kvValReg vm.Operand
var groupSelectors []fql.ICollectSelectorContext
grouping := ctx.CollectGrouping()
counter := ctx.CollectCounter()
if grouping == nil && counter == nil {
return kvKeyReg, kvValReg, groupSelectors
}
loop := cc.ctx.Loops.Current()
if grouping != nil {
groupSelectors = grouping.AllCollectSelector()
kvKeyReg = cc.compileGroupSelectors(groupSelectors)
}
kvValReg = cc.ctx.Registers.Allocate(core.Temp)
loop.EmitValue(kvValReg, cc.ctx.Emitter)
var projectionVariableName string
collectorType := core.CollectorTypeKey
// If we have a collect group variable, we need to project it
if groupVar := ctx.CollectGroupVariable(); groupVar != nil {
// Projection can be either a default projection (identifier) or a custom projection (selector expression)
if identifier := groupVar.Identifier(); identifier != nil {
projectionVariableName = cc.compileDefaultGroupProjection(loop, kvValReg, identifier, groupVar.CollectGroupVariableKeeper())
} else if selector := groupVar.CollectSelector(); selector != nil {
projectionVariableName = cc.compileCustomGroupProjection(loop, kvValReg, selector)
}
collectorType = core.CollectorTypeKeyGroup
} else if counter != nil {
projectionVariableName = counter.Identifier().GetText()
if grouping != nil {
collectorType = core.CollectorTypeKeyCounter
} else {
collectorType = core.CollectorTypeCounter
}
}
// If we use aggregators, we need to collect group items by key
if aggregation && collectorType != core.CollectorTypeKeyGroup {
// We need to patch the loop result to be a collector
collectorType = core.CollectorTypeKeyGroup
}
// We replace DataSet initialization with Collector initialization
cc.ctx.Emitter.PatchSwapAx(loop.ResultPos, vm.OpDataSetCollector, loop.Result, int(collectorType))
cc.ctx.Emitter.EmitABC(vm.OpPushKV, loop.Result, kvKeyReg, kvValReg)
loop.EmitFinalization(cc.ctx.Emitter)
cc.ctx.Emitter.EmitMove(loop.Src, loop.Result)
cc.ctx.Registers.Free(loop.Value)
cc.ctx.Registers.Free(loop.Key)
loop.Value = kvValReg
loop.Key = vm.NoopOperand
// If the projection is used, we allocate a new register for the variable and put the iterator's value into it
if projectionVariableName != "" {
// Now we need to expand group variables from the dataset
loop.DeclareValueVar(projectionVariableName, cc.ctx.Symbols)
cc.ctx.LoopCompiler.EmitLoopBegin(loop)
loop.EmitKey(kvValReg, cc.ctx.Emitter)
loop.BindValueVar(cc.ctx.Emitter)
} else {
cc.compileGlobalAggregation(c, parentLoop)
cc.ctx.LoopCompiler.EmitLoopBegin(loop)
loop.EmitKey(kvKeyReg, cc.ctx.Emitter)
//loop.EmitValue(kvValReg, cc.ctx.Emitter)
}
return kvKeyReg, kvValReg, groupSelectors
}
func (cc *CollectCompiler) compileAggregation(c fql.ICollectAggregatorContext, isGrouped bool) {
if isGrouped {
cc.compileGroupedAggregation(c)
} else {
cc.compileGlobalAggregation(c)
}
}
func (cc *CollectCompiler) compileGroupedAggregation(c fql.ICollectAggregatorContext, parentLoop *core.Loop) {
func (cc *CollectCompiler) compileGroupedAggregation(c fql.ICollectAggregatorContext) {
parentLoop := cc.ctx.Loops.Current()
// We need to allocate a temporary accumulators to store aggregation results
selectors := c.AllCollectAggregateSelector()
accums := cc.initAggrAccumulators(selectors)
@@ -164,7 +164,8 @@ func (cc *CollectCompiler) compileGroupedAggregation(c fql.ICollectAggregatorCon
// cc.ctx.Registers.Free(aggrIterVal)
}
func (cc *CollectCompiler) compileGlobalAggregation(c fql.ICollectAggregatorContext, parentLoop *core.Loop) {
func (cc *CollectCompiler) compileGlobalAggregation(c fql.ICollectAggregatorContext) {
parentLoop := cc.ctx.Loops.Current()
loop := parentLoop
// we create a custom collector for aggregators
cc.ctx.Emitter.PatchSwapAx(loop.ResultPos, vm.OpDataSetCollector, loop.Result, int(core.CollectorTypeKeyGroup))
@@ -269,7 +270,7 @@ func (cc *CollectCompiler) compileAggregationFuncCall(selectors []fql.ICollectAg
}
}
func (cc *CollectCompiler) compileCollectGroupKeySelectors(selectors []fql.ICollectSelectorContext) vm.Operand {
func (cc *CollectCompiler) compileGroupSelectors(selectors []fql.ICollectSelectorContext) vm.Operand {
if len(selectors) == 0 {
return vm.NoopOperand
}
@@ -298,7 +299,7 @@ func (cc *CollectCompiler) compileCollectGroupKeySelectors(selectors []fql.IColl
return kvKeyReg
}
func (cc *CollectCompiler) compileCollectGroupKeySelectorVariables(selectors []fql.ICollectSelectorContext, kvKeyReg, kvValReg vm.Operand, isAggregation bool) {
func (cc *CollectCompiler) compileGroupSelectorVariables(selectors []fql.ICollectSelectorContext, kvKeyReg, kvValReg vm.Operand, isAggregation bool) {
if len(selectors) > 1 {
variables := make([]vm.Operand, len(selectors))