mirror of
https://github.com/MontFerret/ferret.git
synced 2025-08-15 20:02:56 +02:00
Refactor loop collection: enhance CollectorSpec
with destination handling, streamline projection logic, improve aggregation handling, optimize register allocation, and update COLLECT
integration tests.
This commit is contained in:
@@ -1,10 +1,13 @@
|
|||||||
package core
|
package core
|
||||||
|
|
||||||
|
import "github.com/MontFerret/ferret/pkg/vm"
|
||||||
|
|
||||||
type (
|
type (
|
||||||
CollectorType int
|
CollectorType int
|
||||||
|
|
||||||
CollectorSpec struct {
|
CollectorSpec struct {
|
||||||
typ CollectorType
|
typ CollectorType
|
||||||
|
dst vm.Operand
|
||||||
projection *CollectorProjection
|
projection *CollectorProjection
|
||||||
groupSelectors []*CollectSelector
|
groupSelectors []*CollectSelector
|
||||||
aggregationSelectors []*AggregateSelector
|
aggregationSelectors []*AggregateSelector
|
||||||
@@ -18,20 +21,19 @@ const (
|
|||||||
CollectorTypeKeyGroup
|
CollectorTypeKeyGroup
|
||||||
)
|
)
|
||||||
|
|
||||||
func NewCollectorSpec(type_ CollectorType, projection *CollectorProjection, groupSelectors []*CollectSelector, aggregationSelectors []*AggregateSelector) *CollectorSpec {
|
func NewCollectorSpec(type_ CollectorType, dst vm.Operand, projection *CollectorProjection, groupSelectors []*CollectSelector, aggregationSelectors []*AggregateSelector) *CollectorSpec {
|
||||||
return &CollectorSpec{
|
return &CollectorSpec{
|
||||||
typ: type_,
|
typ: type_,
|
||||||
|
dst: dst,
|
||||||
projection: projection,
|
projection: projection,
|
||||||
groupSelectors: groupSelectors,
|
groupSelectors: groupSelectors,
|
||||||
aggregationSelectors: aggregationSelectors,
|
aggregationSelectors: aggregationSelectors,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func DetermineCollectorType(withGrouping, withAggregation bool, projection *CollectorProjection) CollectorType {
|
func DetermineCollectorType(withGrouping, withAggregation, withProjection, withCounter bool) CollectorType {
|
||||||
withProjection := projection != nil
|
|
||||||
|
|
||||||
if withGrouping {
|
if withGrouping {
|
||||||
if withProjection && projection.IsCounted() {
|
if withCounter {
|
||||||
return CollectorTypeKeyCounter
|
return CollectorTypeKeyCounter
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -49,6 +51,10 @@ func (c *CollectorSpec) Type() CollectorType {
|
|||||||
return c.typ
|
return c.typ
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *CollectorSpec) Destination() vm.Operand {
|
||||||
|
return c.dst
|
||||||
|
}
|
||||||
|
|
||||||
func (c *CollectorSpec) Projection() *CollectorProjection {
|
func (c *CollectorSpec) Projection() *CollectorProjection {
|
||||||
return c.projection
|
return c.projection
|
||||||
}
|
}
|
||||||
|
@@ -23,16 +23,16 @@ func (c *LoopCollectCompiler) Compile(ctx fql.ICollectClauseContext) {
|
|||||||
|
|
||||||
func (c *LoopCollectCompiler) compileCollector(ctx fql.ICollectClauseContext) *core.CollectorSpec {
|
func (c *LoopCollectCompiler) compileCollector(ctx fql.ICollectClauseContext) *core.CollectorSpec {
|
||||||
grouping := ctx.CollectGrouping()
|
grouping := ctx.CollectGrouping()
|
||||||
|
projection := ctx.CollectGroupProjection()
|
||||||
counter := ctx.CollectCounter()
|
counter := ctx.CollectCounter()
|
||||||
aggregation := ctx.CollectAggregator()
|
aggregation := ctx.CollectAggregator()
|
||||||
|
|
||||||
// We gather keys and values for the collector.
|
// We gather keys and values for the collector.
|
||||||
kv, groupSelectors := c.initializeGrouping(grouping)
|
kv, groupSelectors := c.initializeGrouping(grouping)
|
||||||
projection := c.initializeProjection(ctx, kv, counter)
|
|
||||||
|
|
||||||
loop := c.ctx.Loops.Current()
|
collectorType := core.DetermineCollectorType(len(groupSelectors) > 0, aggregation != nil, projection != nil, counter != nil)
|
||||||
collectorType := core.DetermineCollectorType(len(groupSelectors) > 0, aggregation != nil, projection)
|
|
||||||
// We replace DataSet initialization with Collector initialization
|
// We replace DataSet initialization with Collector initialization
|
||||||
|
loop := c.ctx.Loops.Current()
|
||||||
dst := loop.PatchDestinationAx(c.ctx.Registers, c.ctx.Emitter, vm.OpDataSetCollector, int(collectorType))
|
dst := loop.PatchDestinationAx(c.ctx.Registers, c.ctx.Emitter, vm.OpDataSetCollector, int(collectorType))
|
||||||
|
|
||||||
var aggregationSelectors []*core.AggregateSelector
|
var aggregationSelectors []*core.AggregateSelector
|
||||||
@@ -42,15 +42,17 @@ func (c *LoopCollectCompiler) compileCollector(ctx fql.ICollectClauseContext) *c
|
|||||||
aggregationSelectors = c.initializeAggregation(aggregation, dst, kv, len(groupSelectors) > 0)
|
aggregationSelectors = c.initializeAggregation(aggregation, dst, kv, len(groupSelectors) > 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
scope := core.NewCollectorSpec(collectorType, projection, groupSelectors, aggregationSelectors)
|
groupProjection := c.initializeProjection(kv, projection, counter)
|
||||||
|
|
||||||
c.finalizeCollector(dst, kv, scope)
|
spec := core.NewCollectorSpec(collectorType, dst, groupProjection, groupSelectors, aggregationSelectors)
|
||||||
|
|
||||||
|
c.finalizeCollector(dst, kv, spec)
|
||||||
|
|
||||||
// We no longer need KV, so we free registers
|
// We no longer need KV, so we free registers
|
||||||
c.ctx.Registers.Free(kv.Key)
|
c.ctx.Registers.Free(kv.Key)
|
||||||
c.ctx.Registers.Free(kv.Value)
|
c.ctx.Registers.Free(kv.Value)
|
||||||
|
|
||||||
return scope
|
return spec
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *LoopCollectCompiler) finalizeCollector(dst vm.Operand, kv *core.KV, spec *core.CollectorSpec) {
|
func (c *LoopCollectCompiler) finalizeCollector(dst vm.Operand, kv *core.KV, spec *core.CollectorSpec) {
|
||||||
@@ -67,9 +69,6 @@ func (c *LoopCollectCompiler) finalizeCollector(dst vm.Operand, kv *core.KV, spe
|
|||||||
}
|
}
|
||||||
|
|
||||||
loop.EmitFinalization(c.ctx.Emitter)
|
loop.EmitFinalization(c.ctx.Emitter)
|
||||||
|
|
||||||
// Move the collector to the next loop source
|
|
||||||
c.ctx.Emitter.EmitMove(loop.Src, dst)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *LoopCollectCompiler) compileLoop(spec *core.CollectorSpec) {
|
func (c *LoopCollectCompiler) compileLoop(spec *core.CollectorSpec) {
|
||||||
@@ -91,6 +90,8 @@ func (c *LoopCollectCompiler) compileLoop(spec *core.CollectorSpec) {
|
|||||||
doInit := spec.HasGrouping() || !spec.HasAggregation()
|
doInit := spec.HasGrouping() || !spec.HasAggregation()
|
||||||
|
|
||||||
if doInit {
|
if doInit {
|
||||||
|
// Move the collector to the next loop source
|
||||||
|
c.ctx.Emitter.EmitMove(loop.Src, spec.Destination())
|
||||||
loop.EmitInitialization(c.ctx.Registers, c.ctx.Emitter, c.ctx.Loops.Depth())
|
loop.EmitInitialization(c.ctx.Registers, c.ctx.Emitter, c.ctx.Loops.Depth())
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -99,12 +100,11 @@ func (c *LoopCollectCompiler) compileLoop(spec *core.CollectorSpec) {
|
|||||||
c.compileAggregation(spec)
|
c.compileAggregation(spec)
|
||||||
}
|
}
|
||||||
|
|
||||||
// If the projection is used, we allocate a new register for the variable and put the iterator's value into it
|
|
||||||
if spec.HasProjection() {
|
|
||||||
c.finalizeProjection(spec)
|
|
||||||
}
|
|
||||||
|
|
||||||
if spec.HasGrouping() {
|
if spec.HasGrouping() {
|
||||||
c.compileGrouping(spec)
|
c.compileGrouping(spec)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if spec.HasProjection() && !spec.HasAggregation() {
|
||||||
|
c.finalizeProjection(spec, loop.Value)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@@ -199,12 +199,12 @@ func (c *LoopCollectCompiler) compileGlobalAggregation(spec *core.CollectorSpec)
|
|||||||
loop.EmitInitialization(c.ctx.Registers, c.ctx.Emitter, c.ctx.Loops.Depth())
|
loop.EmitInitialization(c.ctx.Registers, c.ctx.Emitter, c.ctx.Loops.Depth())
|
||||||
|
|
||||||
// We just need to take the grouped values and call aggregation functions using them as args
|
// We just need to take the grouped values and call aggregation functions using them as args
|
||||||
c.compileAggregationFuncCalls(spec.AggregationSelectors(), prevLoop.Dst)
|
c.compileAggregationFuncCalls(spec, prevLoop.Dst)
|
||||||
|
|
||||||
c.ctx.Registers.Free(prevLoop.Dst)
|
c.ctx.Registers.Free(prevLoop.Dst)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *LoopCollectCompiler) compileAggregationFuncCalls(selectors []*core.AggregateSelector, aggregator vm.Operand) {
|
func (c *LoopCollectCompiler) compileAggregationFuncCalls(spec *core.CollectorSpec, aggregator vm.Operand) {
|
||||||
// Gets the number of records in the accumulator
|
// Gets the number of records in the accumulator
|
||||||
cond := c.ctx.Registers.Allocate(core.Temp)
|
cond := c.ctx.Registers.Allocate(core.Temp)
|
||||||
c.ctx.Emitter.EmitAB(vm.OpLength, cond, aggregator)
|
c.ctx.Emitter.EmitAB(vm.OpLength, cond, aggregator)
|
||||||
@@ -218,6 +218,7 @@ func (c *LoopCollectCompiler) compileAggregationFuncCalls(selectors []*core.Aggr
|
|||||||
// We skip the key retrieval and function call of there are no records in the accumulator
|
// We skip the key retrieval and function call of there are no records in the accumulator
|
||||||
c.ctx.Emitter.EmitJumpIfTrue(cond, elseLabel)
|
c.ctx.Emitter.EmitJumpIfTrue(cond, elseLabel)
|
||||||
|
|
||||||
|
selectors := spec.AggregationSelectors()
|
||||||
selectorVarRegs := make([]vm.Operand, len(selectors))
|
selectorVarRegs := make([]vm.Operand, len(selectors))
|
||||||
|
|
||||||
for i, selector := range selectors {
|
for i, selector := range selectors {
|
||||||
@@ -251,6 +252,13 @@ func (c *LoopCollectCompiler) compileAggregationFuncCalls(selectors []*core.Aggr
|
|||||||
c.ctx.Registers.Free(result)
|
c.ctx.Registers.Free(result)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var projVar vm.Operand
|
||||||
|
|
||||||
|
// If the projection is used, we allocate a new register for the variable and put the iterator's value into it
|
||||||
|
if spec.HasProjection() {
|
||||||
|
projVar = c.finalizeProjection(spec, aggregator)
|
||||||
|
}
|
||||||
|
|
||||||
c.ctx.Emitter.EmitJump(endLabel)
|
c.ctx.Emitter.EmitJump(endLabel)
|
||||||
c.ctx.Emitter.MarkLabel(elseLabel)
|
c.ctx.Emitter.MarkLabel(elseLabel)
|
||||||
|
|
||||||
@@ -258,6 +266,10 @@ func (c *LoopCollectCompiler) compileAggregationFuncCalls(selectors []*core.Aggr
|
|||||||
c.ctx.Emitter.EmitA(vm.OpLoadNone, varReg)
|
c.ctx.Emitter.EmitA(vm.OpLoadNone, varReg)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if projVar != vm.NoopOperand {
|
||||||
|
c.ctx.Emitter.EmitA(vm.OpLoadNone, projVar)
|
||||||
|
}
|
||||||
|
|
||||||
c.ctx.Emitter.MarkLabel(endLabel)
|
c.ctx.Emitter.MarkLabel(endLabel)
|
||||||
c.ctx.Registers.Free(cond)
|
c.ctx.Registers.Free(cond)
|
||||||
}
|
}
|
||||||
|
@@ -11,10 +11,11 @@ import (
|
|||||||
|
|
||||||
// initializeProjection handles the projection setup for group variables and counters.
|
// initializeProjection handles the projection setup for group variables and counters.
|
||||||
// Returns the projection variable name and the appropriate collector type.
|
// Returns the projection variable name and the appropriate collector type.
|
||||||
func (c *LoopCollectCompiler) initializeProjection(ctx fql.ICollectClauseContext, kv *core.KV, counter fql.ICollectCounterContext) *core.CollectorProjection {
|
func (c *LoopCollectCompiler) initializeProjection(kv *core.KV, projection fql.ICollectGroupProjectionContext, counter fql.ICollectCounterContext) *core.CollectorProjection {
|
||||||
// Handle group variable projection
|
// Handle group variable projection
|
||||||
if groupVar := ctx.CollectGroupProjection(); groupVar != nil {
|
if projection != nil {
|
||||||
varName := c.compileGroupVariableProjection(kv, groupVar)
|
varName := c.compileGroupVariableProjection(kv, projection)
|
||||||
|
|
||||||
return core.NewCollectorGroupProjection(varName)
|
return core.NewCollectorGroupProjection(varName)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -28,20 +29,24 @@ func (c *LoopCollectCompiler) initializeProjection(ctx fql.ICollectClauseContext
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *LoopCollectCompiler) finalizeProjection(spec *core.CollectorSpec) {
|
func (c *LoopCollectCompiler) finalizeProjection(spec *core.CollectorSpec, aggregator vm.Operand) vm.Operand {
|
||||||
loop := c.ctx.Loops.Current()
|
loop := c.ctx.Loops.Current()
|
||||||
varName := spec.Projection().VariableName()
|
varName := spec.Projection().VariableName()
|
||||||
|
|
||||||
if spec.HasGrouping() || !spec.HasAggregation() {
|
if spec.HasGrouping() || !spec.HasAggregation() {
|
||||||
// Now we need to expand group variables from the dataset
|
// Now we need to expand group variables from the dataset
|
||||||
loop.ValueName = varName
|
loop.ValueName = varName
|
||||||
c.ctx.Symbols.AssignLocal(loop.ValueName, core.TypeUnknown, loop.Value)
|
c.ctx.Symbols.AssignLocal(loop.ValueName, core.TypeUnknown, aggregator)
|
||||||
} else {
|
|
||||||
|
return loop.Value
|
||||||
|
}
|
||||||
|
|
||||||
key := loadConstant(c.ctx, runtime.String(varName))
|
key := loadConstant(c.ctx, runtime.String(varName))
|
||||||
val := c.ctx.Symbols.DeclareLocal(varName, core.TypeUnknown)
|
val := c.ctx.Symbols.DeclareLocal(varName, core.TypeUnknown)
|
||||||
c.ctx.Emitter.EmitABC(vm.OpLoadKey, val, loop.Dst, key)
|
c.ctx.Emitter.EmitABC(vm.OpLoadKey, val, aggregator, key)
|
||||||
c.ctx.Registers.Free(key)
|
c.ctx.Registers.Free(key)
|
||||||
}
|
|
||||||
|
return val
|
||||||
}
|
}
|
||||||
|
|
||||||
// compileGroupVariableProjection processes group variable projections (both default and custom).
|
// compileGroupVariableProjection processes group variable projections (both default and custom).
|
||||||
|
Reference in New Issue
Block a user