mirror of
https://github.com/MontFerret/ferret.git
synced 2025-08-13 19:52:52 +02:00
Refactor ExprCompiler
and LoopCollectCompiler
: extract function name compilation logic, introduce aggregateSelector
struct, improve aggregation handling with grouped and global selectors, optimize collector setup, and enhance loop processing.
This commit is contained in:
@@ -62,6 +62,10 @@ func (e *Emitter) EmitClose(reg vm.Operand) {
|
||||
e.EmitA(vm.OpClose, reg)
|
||||
}
|
||||
|
||||
func (e *Emitter) EmitLoadNone(dst vm.Operand) {
|
||||
e.EmitA(vm.OpLoadNone, dst)
|
||||
}
|
||||
|
||||
func (e *Emitter) EmitLoadConst(dst vm.Operand, constant vm.Operand) {
|
||||
e.EmitAB(vm.OpLoadConst, dst, constant)
|
||||
}
|
@@ -1,13 +1,11 @@
|
||||
package internal
|
||||
|
||||
import (
|
||||
"regexp"
|
||||
"strings"
|
||||
|
||||
"github.com/MontFerret/ferret/pkg/compiler/internal/core"
|
||||
"github.com/MontFerret/ferret/pkg/parser/fql"
|
||||
"github.com/MontFerret/ferret/pkg/runtime"
|
||||
"github.com/MontFerret/ferret/pkg/vm"
|
||||
"regexp"
|
||||
)
|
||||
|
||||
// Runtime functions
|
||||
@@ -402,8 +400,12 @@ func (ec *ExprCompiler) CompileFunctionCall(ctx fql.IFunctionCallContext, protec
|
||||
}
|
||||
|
||||
func (ec *ExprCompiler) CompileFunctionCallWith(ctx fql.IFunctionCallContext, protected bool, seq core.RegisterSequence) vm.Operand {
|
||||
name := ec.functionName(ctx)
|
||||
name := getFunctionName(ctx)
|
||||
|
||||
return ec.CompileFunctionCallByNameWith(name, protected, seq)
|
||||
}
|
||||
|
||||
func (ec *ExprCompiler) CompileFunctionCallByNameWith(name runtime.String, protected bool, seq core.RegisterSequence) vm.Operand {
|
||||
switch name {
|
||||
case runtimeLength:
|
||||
dst := ec.ctx.Registers.Allocate(core.Temp)
|
||||
@@ -536,16 +538,3 @@ func (ec *ExprCompiler) compileRangeOperand(ctx fql.IRangeOperandContext) vm.Ope
|
||||
|
||||
panic(runtime.Error(core.ErrUnexpectedToken, ctx.GetText()))
|
||||
}
|
||||
|
||||
func (ec *ExprCompiler) functionName(ctx fql.IFunctionCallContext) runtime.String {
|
||||
var name string
|
||||
funcNS := ctx.Namespace()
|
||||
|
||||
if funcNS != nil {
|
||||
name += funcNS.GetText()
|
||||
}
|
||||
|
||||
name += ctx.FunctionName().GetText()
|
||||
|
||||
return runtime.NewString(strings.ToUpper(name))
|
||||
}
|
||||
|
@@ -1,6 +1,7 @@
|
||||
package internal
|
||||
|
||||
import (
|
||||
"github.com/MontFerret/ferret/pkg/parser/fql"
|
||||
"strings"
|
||||
|
||||
"github.com/antlr4-go/antlr/v4"
|
||||
@@ -23,6 +24,12 @@ func loadConstantTo(ctx *CompilerContext, constant runtime.Value, reg vm.Operand
|
||||
ctx.Emitter.EmitLoadConst(reg, ctx.Symbols.AddConstant(constant))
|
||||
}
|
||||
|
||||
func loadIndex(ctx *CompilerContext, dst, arr vm.Operand, idx int) {
|
||||
idxReg := loadConstant(ctx, runtime.NewInt(idx))
|
||||
ctx.Emitter.EmitLoadIndex(dst, arr, idxReg)
|
||||
ctx.Registers.Free(idxReg)
|
||||
}
|
||||
|
||||
func sortDirection(dir antlr.TerminalNode) runtime.SortDirection {
|
||||
if dir == nil {
|
||||
return runtime.SortDirectionAsc
|
||||
@@ -35,6 +42,19 @@ func sortDirection(dir antlr.TerminalNode) runtime.SortDirection {
|
||||
return runtime.SortDirectionAsc
|
||||
}
|
||||
|
||||
func getFunctionName(ctx fql.IFunctionCallContext) runtime.String {
|
||||
var name string
|
||||
funcNS := ctx.Namespace()
|
||||
|
||||
if funcNS != nil {
|
||||
name += funcNS.GetText()
|
||||
}
|
||||
|
||||
name += ctx.FunctionName().GetText()
|
||||
|
||||
return runtime.NewString(strings.ToUpper(name))
|
||||
}
|
||||
|
||||
func copyFromNamespace(fns runtime.Functions, namespace string) error {
|
||||
// In the name of the function "A::B::C", the namespace is "A::B",
|
||||
// not "A::B::".
|
||||
|
@@ -1,11 +1,8 @@
|
||||
package internal
|
||||
|
||||
import (
|
||||
"github.com/antlr4-go/antlr/v4"
|
||||
|
||||
"github.com/MontFerret/ferret/pkg/compiler/internal/core"
|
||||
"github.com/MontFerret/ferret/pkg/parser/fql"
|
||||
"github.com/MontFerret/ferret/pkg/runtime"
|
||||
"github.com/MontFerret/ferret/pkg/vm"
|
||||
)
|
||||
|
||||
@@ -13,6 +10,13 @@ type (
|
||||
LoopCollectCompiler struct {
|
||||
ctx *CompilerContext
|
||||
}
|
||||
|
||||
collectorScope struct {
|
||||
Type core.CollectorType
|
||||
Projection string
|
||||
GroupSelectors []fql.ICollectSelectorContext
|
||||
AggregationSelectors []*aggregateSelector
|
||||
}
|
||||
)
|
||||
|
||||
func NewCollectCompiler(ctx *CompilerContext) *LoopCollectCompiler {
|
||||
@@ -20,41 +24,66 @@ func NewCollectCompiler(ctx *CompilerContext) *LoopCollectCompiler {
|
||||
}
|
||||
|
||||
func (c *LoopCollectCompiler) Compile(ctx fql.ICollectClauseContext) {
|
||||
aggregator := ctx.CollectAggregator()
|
||||
collectorType, groupSelectors := c.compileCollect(ctx, aggregator != nil)
|
||||
scope := c.compileCollector(ctx)
|
||||
|
||||
// Aggregation loop
|
||||
if aggregator != nil {
|
||||
c.compileAggregation(aggregator, len(groupSelectors) > 0)
|
||||
}
|
||||
|
||||
if len(groupSelectors) > 0 {
|
||||
// Now we are defining new variables for the group selectors
|
||||
c.compileGroupSelectorVariables(collectorType, groupSelectors, aggregator != nil)
|
||||
}
|
||||
c.compileLoop(scope)
|
||||
}
|
||||
|
||||
func (c *LoopCollectCompiler) compileCollect(ctx fql.ICollectClauseContext, aggregation bool) (core.CollectorType, []fql.ICollectSelectorContext) {
|
||||
func (c *LoopCollectCompiler) compileCollector(ctx fql.ICollectClauseContext) *collectorScope {
|
||||
grouping := ctx.CollectGrouping()
|
||||
counter := ctx.CollectCounter()
|
||||
aggregation := ctx.CollectAggregator()
|
||||
|
||||
// We gather keys and values for the collector.
|
||||
kv, groupSelectors := c.initializeGrouping(grouping)
|
||||
projectionVarName, collectorType := c.initializeProjection(ctx, kv, counter, grouping != nil)
|
||||
|
||||
// If we use aggregators, we need to collect group items by key
|
||||
if aggregation && collectorType != core.CollectorTypeKeyGroup {
|
||||
if aggregation != nil && collectorType != core.CollectorTypeKeyGroup {
|
||||
// We need to patch the loop result to be a collector
|
||||
collectorType = core.CollectorTypeKeyGroup
|
||||
}
|
||||
|
||||
c.finalizeCollector(collectorType, kv)
|
||||
loop := c.ctx.Loops.Current()
|
||||
// We replace DataSet initialization with Collector initialization
|
||||
dst := loop.PatchDestinationAx(c.ctx.Registers, c.ctx.Emitter, vm.OpDataSetCollector, int(collectorType))
|
||||
|
||||
var aggregationSelectors []*aggregateSelector
|
||||
|
||||
// Fuse aggregation loop
|
||||
if aggregation != nil {
|
||||
aggregationSelectors = c.initializeAggregation(aggregation, dst, kv, len(aggregationSelectors) > 0)
|
||||
}
|
||||
|
||||
c.finalizeCollector(dst, kv, len(groupSelectors) > 0, aggregation != nil)
|
||||
|
||||
// We no longer need KV, so we free registers
|
||||
c.ctx.Registers.Free(kv.Key)
|
||||
c.ctx.Registers.Free(kv.Value)
|
||||
|
||||
return &collectorScope{collectorType, projectionVarName, groupSelectors, aggregationSelectors}
|
||||
}
|
||||
|
||||
func (c *LoopCollectCompiler) finalizeCollector(dst vm.Operand, kv *core.KV, withGrouping bool, withAggregation bool) {
|
||||
loop := c.ctx.Loops.Current()
|
||||
|
||||
// If we do not use grouping but use aggregation, we do not need to push the key and value
|
||||
// because they are already pushed by the global aggregation.
|
||||
push := withGrouping || !withAggregation
|
||||
|
||||
if push {
|
||||
c.ctx.Emitter.EmitABC(vm.OpPushKV, dst, kv.Key, kv.Value)
|
||||
}
|
||||
|
||||
loop.EmitFinalization(c.ctx.Emitter)
|
||||
|
||||
// Move the collector to the next loop source
|
||||
c.ctx.Emitter.EmitMove(loop.Src, dst)
|
||||
}
|
||||
|
||||
func (c *LoopCollectCompiler) compileLoop(scope *collectorScope) {
|
||||
loop := c.ctx.Loops.Current()
|
||||
|
||||
// If we are using a projection, we need to ensure the loop is set to ForInLoop
|
||||
if loop.Kind != core.ForInLoop {
|
||||
loop.Kind = core.ForInLoop
|
||||
@@ -68,211 +97,27 @@ func (c *LoopCollectCompiler) compileCollect(ctx fql.ICollectClauseContext, aggr
|
||||
loop.Key = c.ctx.Registers.Allocate(core.Temp)
|
||||
}
|
||||
|
||||
withGrouping := len(scope.GroupSelectors) > 0
|
||||
withAggregation := len(scope.AggregationSelectors) > 0
|
||||
doInit := withGrouping || !withAggregation
|
||||
|
||||
if doInit {
|
||||
loop.EmitInitialization(c.ctx.Registers, c.ctx.Emitter, c.ctx.Loops.Depth())
|
||||
}
|
||||
|
||||
if withAggregation {
|
||||
c.unpackGroupedValues(scope.AggregationSelectors, withGrouping)
|
||||
c.compileAggregation(scope.AggregationSelectors, withGrouping)
|
||||
}
|
||||
|
||||
// If the projection is used, we allocate a new register for the variable and put the iterator's value into it
|
||||
if projectionVarName != "" {
|
||||
if scope.Projection != "" {
|
||||
// Now we need to expand group variables from the dataset
|
||||
loop.ValueName = projectionVarName
|
||||
loop.ValueName = scope.Projection
|
||||
c.ctx.Symbols.AssignLocal(loop.ValueName, core.TypeUnknown, loop.Value)
|
||||
}
|
||||
|
||||
loop.EmitInitialization(c.ctx.Registers, c.ctx.Emitter, c.ctx.Loops.Depth())
|
||||
|
||||
return collectorType, groupSelectors
|
||||
}
|
||||
|
||||
// initializeGrouping creates the KeyValue pair for collection, handling both grouping and value setup.
|
||||
func (c *LoopCollectCompiler) initializeGrouping(grouping fql.ICollectGroupingContext) (*core.KV, []fql.ICollectSelectorContext) {
|
||||
var groupSelectors []fql.ICollectSelectorContext
|
||||
|
||||
kv := core.NewKV(vm.NoopOperand, vm.NoopOperand)
|
||||
loop := c.ctx.Loops.Current()
|
||||
|
||||
// Handle grouping key if present
|
||||
if grouping != nil {
|
||||
kv.Key, groupSelectors = c.compileGroupKeys(grouping)
|
||||
}
|
||||
|
||||
// Setup value register and emit value from current loop
|
||||
if loop.Kind == core.ForInLoop {
|
||||
if loop.Value != vm.NoopOperand {
|
||||
kv.Value = loop.Value
|
||||
} else {
|
||||
kv.Value = c.ctx.Registers.Allocate(core.Temp)
|
||||
loop.EmitValue(kv.Value, c.ctx.Emitter)
|
||||
}
|
||||
} else {
|
||||
if loop.Key != vm.NoopOperand {
|
||||
kv.Value = loop.Key
|
||||
} else {
|
||||
kv.Value = c.ctx.Registers.Allocate(core.Temp)
|
||||
loop.EmitKey(kv.Value, c.ctx.Emitter)
|
||||
}
|
||||
}
|
||||
|
||||
return kv, groupSelectors
|
||||
}
|
||||
|
||||
// compileGroupKeys compiles the grouping keys from the CollectGroupingContext.
|
||||
func (c *LoopCollectCompiler) compileGroupKeys(ctx fql.ICollectGroupingContext) (vm.Operand, []fql.ICollectSelectorContext) {
|
||||
selectors := ctx.AllCollectSelector()
|
||||
|
||||
if len(selectors) == 0 {
|
||||
return vm.NoopOperand, selectors
|
||||
}
|
||||
|
||||
var kvKeyReg vm.Operand
|
||||
|
||||
if len(selectors) > 1 {
|
||||
// We create a sequence of Registers for the clauses
|
||||
// To pack them into an array
|
||||
selectorRegs := c.ctx.Registers.AllocateSequence(len(selectors))
|
||||
|
||||
for i, selector := range selectors {
|
||||
reg := c.ctx.ExprCompiler.Compile(selector.Expression())
|
||||
c.ctx.Emitter.EmitAB(vm.OpMove, selectorRegs[i], reg)
|
||||
// Free the register after moving its value to the sequence register
|
||||
c.ctx.Registers.Free(reg)
|
||||
}
|
||||
|
||||
kvKeyReg = c.ctx.Registers.Allocate(core.Temp)
|
||||
c.ctx.Emitter.EmitAs(vm.OpLoadArray, kvKeyReg, selectorRegs)
|
||||
c.ctx.Registers.FreeSequence(selectorRegs)
|
||||
} else {
|
||||
kvKeyReg = c.ctx.ExprCompiler.Compile(selectors[0].Expression())
|
||||
}
|
||||
|
||||
return kvKeyReg, selectors
|
||||
}
|
||||
|
||||
// initializeProjection handles the projection setup for group variables and counters.
|
||||
// Returns the projection variable name and the appropriate collector type.
|
||||
func (c *LoopCollectCompiler) initializeProjection(ctx fql.ICollectClauseContext, kv *core.KV, counter fql.ICollectCounterContext, hasGrouping bool) (string, core.CollectorType) {
|
||||
projectionVariableName := ""
|
||||
collectorType := core.CollectorTypeKey
|
||||
|
||||
// Handle group variable projection
|
||||
if groupVar := ctx.CollectGroupVariable(); groupVar != nil {
|
||||
projectionVariableName = c.compileGroupVariableProjection(kv, groupVar)
|
||||
collectorType = core.CollectorTypeKeyGroup
|
||||
return projectionVariableName, collectorType
|
||||
}
|
||||
|
||||
// Handle counter projection
|
||||
if counter != nil {
|
||||
projectionVariableName = counter.Identifier().GetText()
|
||||
collectorType = c.determineCounterCollectorType(hasGrouping)
|
||||
}
|
||||
|
||||
return projectionVariableName, collectorType
|
||||
}
|
||||
|
||||
// determineCounterCollectorType returns the appropriate collector type for counter operations.
|
||||
func (c *LoopCollectCompiler) determineCounterCollectorType(hasGrouping bool) core.CollectorType {
|
||||
if hasGrouping {
|
||||
return core.CollectorTypeKeyCounter
|
||||
}
|
||||
|
||||
return core.CollectorTypeCounter
|
||||
}
|
||||
|
||||
// compileGroupVariableProjection processes group variable projections (both default and custom).
|
||||
func (c *LoopCollectCompiler) compileGroupVariableProjection(kv *core.KV, groupVar fql.ICollectGroupVariableContext) string {
|
||||
// Handle default projection (identifier)
|
||||
if identifier := groupVar.Identifier(); identifier != nil {
|
||||
return c.compileDefaultGroupProjection(kv, identifier, groupVar.CollectGroupVariableKeeper())
|
||||
}
|
||||
|
||||
// Handle custom projection (selector expression)
|
||||
if selector := groupVar.CollectSelector(); selector != nil {
|
||||
return c.compileCustomGroupProjection(kv, selector)
|
||||
}
|
||||
|
||||
return ""
|
||||
}
|
||||
|
||||
func (c *LoopCollectCompiler) compileGroupSelectorVariables(collectorType core.CollectorType, selectors []fql.ICollectSelectorContext, isAggregation bool) {
|
||||
loop := c.ctx.Loops.Current()
|
||||
|
||||
if len(selectors) > 1 {
|
||||
variables := make([]vm.Operand, len(selectors))
|
||||
|
||||
for i, selector := range selectors {
|
||||
name := selector.Identifier().GetText()
|
||||
|
||||
if variables[i] == vm.NoopOperand {
|
||||
variables[i] = c.ctx.Symbols.DeclareLocal(name, core.TypeUnknown)
|
||||
}
|
||||
|
||||
reg := c.selectGroupKey(collectorType, loop)
|
||||
|
||||
c.ctx.Emitter.EmitABC(vm.OpLoadIndex, variables[i], reg, loadConstant(c.ctx, runtime.Int(i)))
|
||||
}
|
||||
|
||||
// Free the register after moving its value to the variable
|
||||
for _, reg := range variables {
|
||||
c.ctx.Registers.Free(reg)
|
||||
}
|
||||
} else {
|
||||
// Get the variable name
|
||||
name := selectors[0].Identifier().GetText()
|
||||
// If we have a single selector, we can just use the loops' register directly
|
||||
c.ctx.Symbols.AssignLocal(name, core.TypeUnknown, c.selectGroupKey(collectorType, loop))
|
||||
if withGrouping {
|
||||
c.compileGrouping(scope.Type, scope.GroupSelectors)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *LoopCollectCompiler) compileDefaultGroupProjection(kv *core.KV, identifier antlr.TerminalNode, keeper fql.ICollectGroupVariableKeeperContext) string {
|
||||
if keeper == nil {
|
||||
variables := c.ctx.Symbols.LocalVariables()
|
||||
scope := core.NewScopeProjection(c.ctx.Registers, c.ctx.Emitter, c.ctx.Symbols, variables)
|
||||
scope.EmitAsObject(kv.Value)
|
||||
} else {
|
||||
variables := keeper.AllIdentifier()
|
||||
seq := c.ctx.Registers.AllocateSequence(len(variables) * 2)
|
||||
|
||||
for i, j := 0, 0; i < len(variables); i, j = i+1, j+2 {
|
||||
varName := variables[i].GetText()
|
||||
loadConstantTo(c.ctx, runtime.String(varName), seq[j])
|
||||
|
||||
variable, _, found := c.ctx.Symbols.Resolve(varName)
|
||||
|
||||
if !found {
|
||||
panic("variable not found: " + varName)
|
||||
}
|
||||
|
||||
c.ctx.Emitter.EmitAB(vm.OpMove, seq[j+1], variable)
|
||||
}
|
||||
|
||||
c.ctx.Emitter.EmitAs(vm.OpLoadObject, kv.Value, seq)
|
||||
c.ctx.Registers.FreeSequence(seq)
|
||||
}
|
||||
|
||||
return identifier.GetText()
|
||||
}
|
||||
|
||||
func (c *LoopCollectCompiler) compileCustomGroupProjection(kv *core.KV, selector fql.ICollectSelectorContext) string {
|
||||
selectorReg := c.ctx.ExprCompiler.Compile(selector.Expression())
|
||||
c.ctx.Emitter.EmitMove(kv.Value, selectorReg)
|
||||
c.ctx.Registers.Free(selectorReg)
|
||||
|
||||
return selector.Identifier().GetText()
|
||||
}
|
||||
|
||||
func (c *LoopCollectCompiler) selectGroupKey(collectorType core.CollectorType, loop *core.Loop) vm.Operand {
|
||||
switch collectorType {
|
||||
case core.CollectorTypeKeyGroup, core.CollectorTypeKeyCounter:
|
||||
return loop.Key
|
||||
default:
|
||||
return loop.Value
|
||||
}
|
||||
}
|
||||
|
||||
func (c *LoopCollectCompiler) finalizeCollector(collectorType core.CollectorType, kv *core.KV) {
|
||||
loop := c.ctx.Loops.Current()
|
||||
// We replace DataSet initialization with Collector initialization
|
||||
dst := loop.PatchDestinationAx(c.ctx.Registers, c.ctx.Emitter, vm.OpDataSetCollector, int(collectorType))
|
||||
c.ctx.Emitter.EmitABC(vm.OpPushKV, dst, kv.Key, kv.Value)
|
||||
loop.EmitFinalization(c.ctx.Emitter)
|
||||
|
||||
// Move the collector to the next loop source
|
||||
c.ctx.Emitter.EmitMove(loop.Src, dst)
|
||||
}
|
||||
|
@@ -1,96 +1,59 @@
|
||||
package internal
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
|
||||
"github.com/MontFerret/ferret/pkg/compiler/internal/core"
|
||||
"github.com/MontFerret/ferret/pkg/parser/fql"
|
||||
"github.com/MontFerret/ferret/pkg/runtime"
|
||||
"github.com/MontFerret/ferret/pkg/vm"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
func (c *LoopCollectCompiler) compileAggregation(ctx fql.ICollectAggregatorContext, isGrouped bool) {
|
||||
if isGrouped {
|
||||
c.compileGroupedAggregation(ctx)
|
||||
type aggregateSelector struct {
|
||||
Name runtime.String
|
||||
Register vm.Operand
|
||||
Args int
|
||||
FuncName runtime.String
|
||||
ProtectedCall bool
|
||||
}
|
||||
|
||||
func (c *LoopCollectCompiler) initializeAggregation(ctx fql.ICollectAggregatorContext, dst vm.Operand, kv *core.KV, withGrouping bool) []*aggregateSelector {
|
||||
selectors := ctx.AllCollectAggregateSelector()
|
||||
var compiledSelectors []*aggregateSelector
|
||||
|
||||
// if we have grouping, we need to pack the selectors into the collector value
|
||||
if withGrouping {
|
||||
compiledSelectors = c.compileGroupedAggregationSelectors(selectors)
|
||||
|
||||
// Pack the selectors into the collector value
|
||||
c.packGroupedValues(kv, compiledSelectors)
|
||||
} else {
|
||||
c.compileGlobalAggregation(ctx)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *LoopCollectCompiler) compileGroupedAggregation(ctx fql.ICollectAggregatorContext) {
|
||||
parentLoop := c.ctx.Loops.Current()
|
||||
// We need to allocate a temporary accumulator to store aggregation results
|
||||
selectors := ctx.AllCollectAggregateSelector()
|
||||
accumulator := c.ctx.Registers.Allocate(core.Temp)
|
||||
c.ctx.Emitter.EmitAx(vm.OpDataSetCollector, accumulator, int(core.CollectorTypeKeyGroup))
|
||||
|
||||
loop := c.ctx.Loops.NewForInLoop(core.TemporalLoop, false)
|
||||
loop.Src = c.ctx.Registers.Allocate(core.Temp)
|
||||
|
||||
// Now we iterate over the grouped items
|
||||
parentLoop.EmitValue(loop.Src, c.ctx.Emitter)
|
||||
|
||||
// Nested scope for aggregators
|
||||
c.ctx.Symbols.EnterScope()
|
||||
loop.DeclareValueVar(parentLoop.ValueName, c.ctx.Symbols)
|
||||
loop.EmitInitialization(c.ctx.Registers, c.ctx.Emitter, c.ctx.Loops.Depth())
|
||||
|
||||
// Add value selectors to the accumulators
|
||||
argsPkg := c.compileAggregationFuncArgs(selectors, accumulator)
|
||||
|
||||
loop.EmitFinalization(c.ctx.Emitter)
|
||||
c.ctx.Symbols.ExitScope()
|
||||
|
||||
// Now we can iterate over the selectors and execute the aggregation functions by passing the accumulators
|
||||
// And define variables for each accumulator result
|
||||
c.compileAggregationFuncCall(selectors, accumulator, argsPkg)
|
||||
c.ctx.Registers.Free(accumulator)
|
||||
}
|
||||
|
||||
func (c *LoopCollectCompiler) compileGlobalAggregation(ctx fql.ICollectAggregatorContext) {
|
||||
parentLoop := c.ctx.Loops.Current()
|
||||
// we create a custom collector for aggregators
|
||||
dst := parentLoop.PatchDestinationAx(c.ctx.Registers, c.ctx.Emitter, vm.OpDataSetCollector, int(core.CollectorTypeKeyGroup))
|
||||
// Nested scope for aggregators
|
||||
c.ctx.Symbols.EnterScope()
|
||||
// Now we add value selectors to the collector
|
||||
selectors := ctx.AllCollectAggregateSelector()
|
||||
argsPkg := c.compileAggregationFuncArgs(selectors, dst)
|
||||
parentLoop.EmitFinalization(c.ctx.Emitter)
|
||||
c.ctx.Loops.Pop()
|
||||
c.ctx.Symbols.ExitScope()
|
||||
|
||||
// Now we can iterate over the grouped items
|
||||
zero := c.ctx.Registers.Allocate(core.Temp)
|
||||
c.ctx.Emitter.EmitA(vm.OpLoadZero, zero)
|
||||
// We move the aggregator to a temporary register to access it later from the new loop
|
||||
aggregator := c.ctx.Registers.Allocate(core.Temp)
|
||||
c.ctx.Emitter.EmitAB(vm.OpMove, aggregator, dst)
|
||||
|
||||
if parentLoop.Dst != dst && !parentLoop.Allocate {
|
||||
c.ctx.Registers.Free(dst)
|
||||
// We just push the selectors into the global collector
|
||||
compiledSelectors = c.compileGlobalAggregationSelectors(selectors, dst)
|
||||
}
|
||||
|
||||
// NewForLoop new loop with 1 iteration only
|
||||
c.ctx.Symbols.EnterScope()
|
||||
c.ctx.Emitter.EmitABC(vm.OpLoadRange, parentLoop.Src, zero, zero)
|
||||
loop := c.ctx.Loops.NewForInLoop(core.TemporalLoop, parentLoop.Distinct)
|
||||
loop.Src = parentLoop.Src
|
||||
loop.Dst = parentLoop.Dst
|
||||
loop.Allocate = parentLoop.Allocate
|
||||
c.ctx.Loops.Push(loop)
|
||||
loop.EmitInitialization(c.ctx.Registers, c.ctx.Emitter, c.ctx.Loops.Depth())
|
||||
|
||||
// We just need to take the grouped values and call aggregation functions using them as args
|
||||
c.compileAggregationFuncCall(selectors, aggregator, argsPkg)
|
||||
c.ctx.Registers.Free(aggregator)
|
||||
return compiledSelectors
|
||||
}
|
||||
|
||||
func (c *LoopCollectCompiler) compileAggregationFuncArgs(selectors []fql.ICollectAggregateSelectorContext, collector vm.Operand) []int {
|
||||
argsPkg := make([]int, len(selectors))
|
||||
func (c *LoopCollectCompiler) packGroupedValues(kv *core.KV, selectors []*aggregateSelector) {
|
||||
// We need to add the loop value to the array
|
||||
seq := c.ctx.Registers.AllocateSequence(len(selectors) + 1)
|
||||
c.ctx.Emitter.EmitMove(seq[0], kv.Value)
|
||||
|
||||
for i, selector := range selectors {
|
||||
c.ctx.Emitter.EmitMove(seq[i+1], selector.Register)
|
||||
c.ctx.Registers.Free(selector.Register)
|
||||
}
|
||||
|
||||
// Now we need to wrap the selectors into a single array with the loop value
|
||||
c.ctx.Emitter.EmitArray(kv.Value, seq)
|
||||
}
|
||||
|
||||
func (c *LoopCollectCompiler) compileGroupedAggregationSelectors(selectors []fql.ICollectAggregateSelectorContext) []*aggregateSelector {
|
||||
wrappedSelectors := make([]*aggregateSelector, 0, len(selectors))
|
||||
|
||||
for i := 0; i < len(selectors); i++ {
|
||||
selector := selectors[i]
|
||||
name := runtime.String(selector.Identifier().GetText())
|
||||
fcx := selector.FunctionCallExpression()
|
||||
args := c.ctx.ExprCompiler.CompileArgumentList(fcx.FunctionCall().ArgumentList())
|
||||
|
||||
@@ -99,31 +62,170 @@ func (c *LoopCollectCompiler) compileAggregationFuncArgs(selectors []fql.ICollec
|
||||
panic("No arguments provided for the function call in the aggregate selector")
|
||||
}
|
||||
|
||||
aggrKeyReg := loadConstant(c.ctx, runtime.Int(i))
|
||||
// we keep information about the args - whether we need to unpack them or not
|
||||
argsPkg[i] = len(args)
|
||||
var selectorArg vm.Operand
|
||||
|
||||
if len(args) > 1 {
|
||||
for y, arg := range args {
|
||||
argKeyReg := c.loadAggregationArgKey(i, y)
|
||||
c.ctx.Emitter.EmitABC(vm.OpPushKV, collector, argKeyReg, arg)
|
||||
c.ctx.Registers.Free(argKeyReg)
|
||||
}
|
||||
// We pack multiple arguments into an array
|
||||
selectorArg = c.ctx.Registers.Allocate(core.Temp)
|
||||
c.ctx.Emitter.EmitArray(selectorArg, args)
|
||||
c.ctx.Registers.FreeSequence(args)
|
||||
} else {
|
||||
c.ctx.Emitter.EmitABC(vm.OpPushKV, collector, aggrKeyReg, args[0])
|
||||
// We can use a single argument directly
|
||||
selectorArg = args[0]
|
||||
}
|
||||
|
||||
c.ctx.Registers.Free(aggrKeyReg)
|
||||
fce := selector.FunctionCallExpression()
|
||||
funcName := getFunctionName(fce.FunctionCall())
|
||||
isProtected := fce.ErrorOperator() != nil
|
||||
|
||||
// Collect information about the selector to unpack it later
|
||||
wrappedSelectors = append(wrappedSelectors, &aggregateSelector{
|
||||
Name: name,
|
||||
Args: len(args),
|
||||
Register: selectorArg,
|
||||
FuncName: funcName,
|
||||
ProtectedCall: isProtected,
|
||||
})
|
||||
}
|
||||
|
||||
return wrappedSelectors
|
||||
}
|
||||
|
||||
func (c *LoopCollectCompiler) compileGlobalAggregationSelectors(selectors []fql.ICollectAggregateSelectorContext, dst vm.Operand) []*aggregateSelector {
|
||||
wrappedSelectors := make([]*aggregateSelector, 0, len(selectors))
|
||||
|
||||
for i := 0; i < len(selectors); i++ {
|
||||
selector := selectors[i]
|
||||
name := runtime.String(selector.Identifier().GetText())
|
||||
fcx := selector.FunctionCallExpression()
|
||||
args := c.ctx.ExprCompiler.CompileArgumentList(fcx.FunctionCall().ArgumentList())
|
||||
|
||||
if len(args) == 0 {
|
||||
// TODO: Better error handling
|
||||
panic("No arguments provided for the function call in the aggregate selector")
|
||||
}
|
||||
|
||||
if len(args) > 1 {
|
||||
for y := 0; y < len(args); i++ {
|
||||
key := c.loadAggregationArgKey(name, y)
|
||||
c.ctx.Emitter.EmitPushKV(dst, key, args[y])
|
||||
c.ctx.Registers.Free(key)
|
||||
}
|
||||
} else {
|
||||
// We can use a single argument directly
|
||||
key := loadConstant(c.ctx, name)
|
||||
c.ctx.Emitter.EmitPushKV(dst, key, args[0])
|
||||
c.ctx.Registers.Free(key)
|
||||
}
|
||||
|
||||
fce := selector.FunctionCallExpression()
|
||||
funcName := getFunctionName(fce.FunctionCall())
|
||||
isProtected := fce.ErrorOperator() != nil
|
||||
|
||||
// Collect information about the selector to unpack it later
|
||||
wrappedSelectors = append(wrappedSelectors, &aggregateSelector{
|
||||
Name: name,
|
||||
Args: len(args),
|
||||
FuncName: funcName,
|
||||
ProtectedCall: isProtected,
|
||||
})
|
||||
|
||||
c.ctx.Registers.FreeSequence(args)
|
||||
}
|
||||
|
||||
return argsPkg
|
||||
return wrappedSelectors
|
||||
}
|
||||
|
||||
func (c *LoopCollectCompiler) compileAggregationFuncCall(selectors []fql.ICollectAggregateSelectorContext, accumulator vm.Operand, argsPkg []int) {
|
||||
func (c *LoopCollectCompiler) unpackGroupedValues(selectors []*aggregateSelector, withGrouping bool) {
|
||||
if !withGrouping {
|
||||
return
|
||||
}
|
||||
|
||||
loop := c.ctx.Loops.Current()
|
||||
valReg := c.ctx.Registers.Allocate(core.Temp)
|
||||
|
||||
loadIndex(c.ctx, valReg, loop.Value, 0)
|
||||
|
||||
for i, selector := range selectors {
|
||||
loadIndex(c.ctx, selector.Register, loop.Value, i+1)
|
||||
}
|
||||
|
||||
c.ctx.Registers.Free(valReg)
|
||||
}
|
||||
|
||||
func (c *LoopCollectCompiler) compileAggregation(vars []*aggregateSelector, withGrouping bool) {
|
||||
if withGrouping {
|
||||
c.compileGroupedAggregation(vars)
|
||||
} else {
|
||||
c.compileGlobalAggregation(vars)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *LoopCollectCompiler) compileGroupedAggregation(selectors []*aggregateSelector) {
|
||||
//parentLoop := c.ctx.Loops.Current()
|
||||
//// We need to allocate a temporary accumulator to store aggregation results
|
||||
//selectors := ctx.AllCollectAggregateSelector()
|
||||
//accumulator := c.ctx.Registers.Allocate(core.Temp)
|
||||
//c.ctx.Emitter.EmitAx(vm.OpDataSetCollector, accumulator, int(core.CollectorTypeKeyGroup))
|
||||
//
|
||||
//loop := c.ctx.Loops.NewForInLoop(core.TemporalLoop, false)
|
||||
//loop.Src = c.ctx.Registers.Allocate(core.Temp)
|
||||
//
|
||||
//// Now we iterate over the grouped items
|
||||
//parentLoop.EmitValue(loop.Src, c.ctx.Emitter)
|
||||
//
|
||||
//// Nested scope for aggregators
|
||||
//c.ctx.Symbols.EnterScope()
|
||||
//loop.DeclareValueVar(parentLoop.ValueName, c.ctx.Symbols)
|
||||
//loop.EmitInitialization(c.ctx.Registers, c.ctx.Emitter, c.ctx.Loops.Depth())
|
||||
//
|
||||
//// Add value selectors to the accumulators
|
||||
//argsPkg := c.compileGroupedAggregationSelectors(selectors, accumulator)
|
||||
//
|
||||
//loop.EmitFinalization(c.ctx.Emitter)
|
||||
//c.ctx.Symbols.ExitScope()
|
||||
//
|
||||
//// Now we can iterate over the selectors and execute the aggregation functions by passing the accumulators
|
||||
//// And define variables for each accumulator result
|
||||
//c.compileAggregationFuncCalls(selectors, accumulator, argsPkg)
|
||||
//c.ctx.Registers.Free(accumulator)
|
||||
}
|
||||
|
||||
func (c *LoopCollectCompiler) compileGlobalAggregation(selectors []*aggregateSelector) {
|
||||
// At this point, it's finalized.
|
||||
prevLoop := c.ctx.Loops.Pop()
|
||||
c.ctx.Registers.Free(prevLoop.Key)
|
||||
c.ctx.Registers.Free(prevLoop.Value)
|
||||
c.ctx.Registers.Free(prevLoop.Src)
|
||||
|
||||
// NewForLoop new loop with 1 iteration only
|
||||
c.ctx.Symbols.EnterScope()
|
||||
loop := c.ctx.Loops.NewLoop(core.ForInLoop, core.NormalLoop, prevLoop.Distinct)
|
||||
c.ctx.Loops.Push(loop)
|
||||
|
||||
loop.Src = c.ctx.Registers.Allocate(core.Temp)
|
||||
zero := c.ctx.Registers.Allocate(core.Temp)
|
||||
c.ctx.Emitter.EmitA(vm.OpLoadZero, zero)
|
||||
c.ctx.Emitter.EmitABC(vm.OpLoadRange, loop.Src, zero, zero)
|
||||
loop.Allocate = prevLoop.Allocate
|
||||
|
||||
if !loop.Allocate {
|
||||
parent := c.ctx.Loops.FindParent(c.ctx.Loops.Depth())
|
||||
loop.Dst = parent.Dst
|
||||
}
|
||||
|
||||
loop.EmitInitialization(c.ctx.Registers, c.ctx.Emitter, c.ctx.Loops.Depth())
|
||||
|
||||
// We just need to take the grouped values and call aggregation functions using them as args
|
||||
c.compileAggregationFuncCalls(selectors, prevLoop.Dst)
|
||||
|
||||
c.ctx.Registers.Free(prevLoop.Dst)
|
||||
}
|
||||
|
||||
func (c *LoopCollectCompiler) compileAggregationFuncCalls(selectors []*aggregateSelector, aggregator vm.Operand) {
|
||||
// Gets the number of records in the accumulator
|
||||
cond := c.ctx.Registers.Allocate(core.Temp)
|
||||
c.ctx.Emitter.EmitAB(vm.OpLength, cond, accumulator)
|
||||
c.ctx.Emitter.EmitAB(vm.OpLength, cond, aggregator)
|
||||
zero := loadConstant(c.ctx, runtime.ZeroInt)
|
||||
// Check if the number equals to zero
|
||||
c.ctx.Emitter.EmitEq(cond, cond, zero)
|
||||
@@ -137,34 +239,31 @@ func (c *LoopCollectCompiler) compileAggregationFuncCall(selectors []fql.ICollec
|
||||
selectorVarRegs := make([]vm.Operand, len(selectors))
|
||||
|
||||
for i, selector := range selectors {
|
||||
argsNum := argsPkg[i]
|
||||
|
||||
var args core.RegisterSequence
|
||||
|
||||
// We need to unpack arguments
|
||||
if argsNum > 1 {
|
||||
args = c.ctx.Registers.AllocateSequence(argsNum)
|
||||
if selector.Args > 1 {
|
||||
args = c.ctx.Registers.AllocateSequence(selector.Args)
|
||||
|
||||
for y, reg := range args {
|
||||
argKeyReg := c.loadAggregationArgKey(i, y)
|
||||
c.ctx.Emitter.EmitABC(vm.OpLoadKey, reg, accumulator, argKeyReg)
|
||||
argKeyReg := c.loadAggregationArgKey(selector.Name, y)
|
||||
c.ctx.Emitter.EmitABC(vm.OpLoadKey, reg, aggregator, argKeyReg)
|
||||
c.ctx.Registers.Free(argKeyReg)
|
||||
}
|
||||
} else {
|
||||
key := loadConstant(c.ctx, runtime.Int(i))
|
||||
key := loadConstant(c.ctx, runtime.String(selector.Name))
|
||||
value := c.ctx.Registers.Allocate(core.Temp)
|
||||
c.ctx.Emitter.EmitABC(vm.OpLoadKey, value, accumulator, key)
|
||||
c.ctx.Emitter.EmitABC(vm.OpLoadKey, value, aggregator, key)
|
||||
args = core.RegisterSequence{value}
|
||||
c.ctx.Registers.Free(key)
|
||||
}
|
||||
|
||||
fcx := selector.FunctionCallExpression()
|
||||
result := c.ctx.ExprCompiler.CompileFunctionCallWith(fcx.FunctionCall(), fcx.ErrorOperator() != nil, args)
|
||||
result := c.ctx.ExprCompiler.CompileFunctionCallByNameWith(selector.FuncName, selector.ProtectedCall, args)
|
||||
|
||||
// We define the variable for the selector result in the upper scope
|
||||
// Since this temporary scope is only for aggregators and will be closed after the aggregation
|
||||
selectorVarName := selector.Identifier().GetText()
|
||||
varReg := c.ctx.Symbols.DeclareLocal(selectorVarName, core.TypeUnknown)
|
||||
selectorVarName := selector.Name
|
||||
varReg := c.ctx.Symbols.DeclareLocal(selectorVarName.String(), core.TypeUnknown)
|
||||
selectorVarRegs[i] = varReg
|
||||
c.ctx.Emitter.EmitAB(vm.OpMove, varReg, result)
|
||||
c.ctx.Registers.Free(result)
|
||||
@@ -181,7 +280,12 @@ func (c *LoopCollectCompiler) compileAggregationFuncCall(selectors []fql.ICollec
|
||||
c.ctx.Registers.Free(cond)
|
||||
}
|
||||
|
||||
func (c *LoopCollectCompiler) loadAggregationArgKey(selector int, arg int) vm.Operand {
|
||||
argKey := strconv.Itoa(selector) + ":" + strconv.Itoa(arg)
|
||||
func (c *LoopCollectCompiler) compileAggregationFuncCall(selector *aggregateSelector) {
|
||||
varReg := c.ctx.Symbols.DeclareLocal(selector.Name.String(), core.TypeUnknown)
|
||||
loadIndex(c.ctx, varReg, selector.Register, 1)
|
||||
}
|
||||
|
||||
func (c *LoopCollectCompiler) loadAggregationArgKey(selector runtime.String, arg int) vm.Operand {
|
||||
argKey := selector.String() + ":" + strconv.Itoa(arg)
|
||||
return loadConstant(c.ctx, runtime.String(argKey))
|
||||
}
|
||||
|
111
pkg/compiler/internal/loop_collect_grp.go
Normal file
111
pkg/compiler/internal/loop_collect_grp.go
Normal file
@@ -0,0 +1,111 @@
|
||||
package internal
|
||||
|
||||
import (
|
||||
"github.com/MontFerret/ferret/pkg/compiler/internal/core"
|
||||
"github.com/MontFerret/ferret/pkg/parser/fql"
|
||||
"github.com/MontFerret/ferret/pkg/runtime"
|
||||
"github.com/MontFerret/ferret/pkg/vm"
|
||||
)
|
||||
|
||||
// initializeGrouping creates the KeyValue pair for collection, handling both grouping and value setup.
|
||||
func (c *LoopCollectCompiler) initializeGrouping(grouping fql.ICollectGroupingContext) (*core.KV, []fql.ICollectSelectorContext) {
|
||||
var groupSelectors []fql.ICollectSelectorContext
|
||||
|
||||
kv := core.NewKV(vm.NoopOperand, vm.NoopOperand)
|
||||
loop := c.ctx.Loops.Current()
|
||||
|
||||
// Handle grouping key if present
|
||||
if grouping != nil {
|
||||
kv.Key, groupSelectors = c.compileGroupKeys(grouping)
|
||||
}
|
||||
|
||||
// Setup value register and emit value from current loop
|
||||
if loop.Kind == core.ForInLoop {
|
||||
if loop.Value != vm.NoopOperand {
|
||||
kv.Value = loop.Value
|
||||
} else {
|
||||
kv.Value = c.ctx.Registers.Allocate(core.Temp)
|
||||
loop.EmitValue(kv.Value, c.ctx.Emitter)
|
||||
}
|
||||
} else {
|
||||
if loop.Key != vm.NoopOperand {
|
||||
kv.Value = loop.Key
|
||||
} else {
|
||||
kv.Value = c.ctx.Registers.Allocate(core.Temp)
|
||||
loop.EmitKey(kv.Value, c.ctx.Emitter)
|
||||
}
|
||||
}
|
||||
|
||||
return kv, groupSelectors
|
||||
}
|
||||
|
||||
// compileGroupKeys compiles the grouping keys from the CollectGroupingContext.
|
||||
func (c *LoopCollectCompiler) compileGroupKeys(ctx fql.ICollectGroupingContext) (vm.Operand, []fql.ICollectSelectorContext) {
|
||||
selectors := ctx.AllCollectSelector()
|
||||
|
||||
if len(selectors) == 0 {
|
||||
return vm.NoopOperand, selectors
|
||||
}
|
||||
|
||||
var kvKeyReg vm.Operand
|
||||
|
||||
if len(selectors) > 1 {
|
||||
// We create a sequence of Registers for the clauses
|
||||
// To pack them into an array
|
||||
selectorRegs := c.ctx.Registers.AllocateSequence(len(selectors))
|
||||
|
||||
for i, selector := range selectors {
|
||||
reg := c.ctx.ExprCompiler.Compile(selector.Expression())
|
||||
c.ctx.Emitter.EmitAB(vm.OpMove, selectorRegs[i], reg)
|
||||
// Free the register after moving its value to the sequence register
|
||||
c.ctx.Registers.Free(reg)
|
||||
}
|
||||
|
||||
kvKeyReg = c.ctx.Registers.Allocate(core.Temp)
|
||||
c.ctx.Emitter.EmitAs(vm.OpLoadArray, kvKeyReg, selectorRegs)
|
||||
c.ctx.Registers.FreeSequence(selectorRegs)
|
||||
} else {
|
||||
kvKeyReg = c.ctx.ExprCompiler.Compile(selectors[0].Expression())
|
||||
}
|
||||
|
||||
return kvKeyReg, selectors
|
||||
}
|
||||
|
||||
func (c *LoopCollectCompiler) compileGrouping(collectorType core.CollectorType, selectors []fql.ICollectSelectorContext) {
|
||||
loop := c.ctx.Loops.Current()
|
||||
|
||||
if len(selectors) > 1 {
|
||||
variables := make([]vm.Operand, len(selectors))
|
||||
|
||||
for i, selector := range selectors {
|
||||
name := selector.Identifier().GetText()
|
||||
|
||||
if variables[i] == vm.NoopOperand {
|
||||
variables[i] = c.ctx.Symbols.DeclareLocal(name, core.TypeUnknown)
|
||||
}
|
||||
|
||||
reg := c.selectGroupKey(collectorType, loop)
|
||||
|
||||
c.ctx.Emitter.EmitABC(vm.OpLoadIndex, variables[i], reg, loadConstant(c.ctx, runtime.Int(i)))
|
||||
}
|
||||
|
||||
// Free the register after moving its value to the variable
|
||||
for _, reg := range variables {
|
||||
c.ctx.Registers.Free(reg)
|
||||
}
|
||||
} else {
|
||||
// Get the variable name
|
||||
name := selectors[0].Identifier().GetText()
|
||||
// If we have a single selector, we can just use the loops' register directly
|
||||
c.ctx.Symbols.AssignLocal(name, core.TypeUnknown, c.selectGroupKey(collectorType, loop))
|
||||
}
|
||||
}
|
||||
|
||||
func (c *LoopCollectCompiler) selectGroupKey(collectorType core.CollectorType, loop *core.Loop) vm.Operand {
|
||||
switch collectorType {
|
||||
case core.CollectorTypeKeyGroup, core.CollectorTypeKeyCounter:
|
||||
return loop.Key
|
||||
default:
|
||||
return loop.Value
|
||||
}
|
||||
}
|
92
pkg/compiler/internal/loop_collect_prj.go
Normal file
92
pkg/compiler/internal/loop_collect_prj.go
Normal file
@@ -0,0 +1,92 @@
|
||||
package internal
|
||||
|
||||
import (
|
||||
"github.com/MontFerret/ferret/pkg/compiler/internal/core"
|
||||
"github.com/MontFerret/ferret/pkg/parser/fql"
|
||||
"github.com/MontFerret/ferret/pkg/runtime"
|
||||
"github.com/MontFerret/ferret/pkg/vm"
|
||||
"github.com/antlr4-go/antlr/v4"
|
||||
)
|
||||
|
||||
// initializeProjection handles the projection setup for group variables and counters.
|
||||
// Returns the projection variable name and the appropriate collector type.
|
||||
func (c *LoopCollectCompiler) initializeProjection(ctx fql.ICollectClauseContext, kv *core.KV, counter fql.ICollectCounterContext, hasGrouping bool) (string, core.CollectorType) {
|
||||
projectionVariableName := ""
|
||||
collectorType := core.CollectorTypeKey
|
||||
|
||||
// Handle group variable projection
|
||||
if groupVar := ctx.CollectGroupVariable(); groupVar != nil {
|
||||
projectionVariableName = c.compileGroupVariableProjection(kv, groupVar)
|
||||
collectorType = core.CollectorTypeKeyGroup
|
||||
return projectionVariableName, collectorType
|
||||
}
|
||||
|
||||
// Handle counter projection
|
||||
if counter != nil {
|
||||
projectionVariableName = counter.Identifier().GetText()
|
||||
collectorType = c.determineCounterCollectorType(hasGrouping)
|
||||
}
|
||||
|
||||
return projectionVariableName, collectorType
|
||||
}
|
||||
|
||||
// determineCounterCollectorType returns the appropriate collector type for counter operations.
|
||||
func (c *LoopCollectCompiler) determineCounterCollectorType(hasGrouping bool) core.CollectorType {
|
||||
if hasGrouping {
|
||||
return core.CollectorTypeKeyCounter
|
||||
}
|
||||
|
||||
return core.CollectorTypeCounter
|
||||
}
|
||||
|
||||
// compileGroupVariableProjection processes group variable projections (both default and custom).
|
||||
func (c *LoopCollectCompiler) compileGroupVariableProjection(kv *core.KV, groupVar fql.ICollectGroupVariableContext) string {
|
||||
// Handle default projection (identifier)
|
||||
if identifier := groupVar.Identifier(); identifier != nil {
|
||||
return c.compileDefaultGroupProjection(kv, identifier, groupVar.CollectGroupVariableKeeper())
|
||||
}
|
||||
|
||||
// Handle custom projection (selector expression)
|
||||
if selector := groupVar.CollectSelector(); selector != nil {
|
||||
return c.compileCustomGroupProjection(kv, selector)
|
||||
}
|
||||
|
||||
return ""
|
||||
}
|
||||
|
||||
func (c *LoopCollectCompiler) compileDefaultGroupProjection(kv *core.KV, identifier antlr.TerminalNode, keeper fql.ICollectGroupVariableKeeperContext) string {
|
||||
if keeper == nil {
|
||||
variables := c.ctx.Symbols.LocalVariables()
|
||||
scope := core.NewScopeProjection(c.ctx.Registers, c.ctx.Emitter, c.ctx.Symbols, variables)
|
||||
scope.EmitAsObject(kv.Value)
|
||||
} else {
|
||||
variables := keeper.AllIdentifier()
|
||||
seq := c.ctx.Registers.AllocateSequence(len(variables) * 2)
|
||||
|
||||
for i, j := 0, 0; i < len(variables); i, j = i+1, j+2 {
|
||||
varName := variables[i].GetText()
|
||||
loadConstantTo(c.ctx, runtime.String(varName), seq[j])
|
||||
|
||||
variable, _, found := c.ctx.Symbols.Resolve(varName)
|
||||
|
||||
if !found {
|
||||
panic("variable not found: " + varName)
|
||||
}
|
||||
|
||||
c.ctx.Emitter.EmitAB(vm.OpMove, seq[j+1], variable)
|
||||
}
|
||||
|
||||
c.ctx.Emitter.EmitAs(vm.OpLoadObject, kv.Value, seq)
|
||||
c.ctx.Registers.FreeSequence(seq)
|
||||
}
|
||||
|
||||
return identifier.GetText()
|
||||
}
|
||||
|
||||
func (c *LoopCollectCompiler) compileCustomGroupProjection(kv *core.KV, selector fql.ICollectSelectorContext) string {
|
||||
selectorReg := c.ctx.ExprCompiler.Compile(selector.Expression())
|
||||
c.ctx.Emitter.EmitMove(kv.Value, selectorReg)
|
||||
c.ctx.Registers.Free(selectorReg)
|
||||
|
||||
return selector.Identifier().GetText()
|
||||
}
|
@@ -94,6 +94,10 @@ func Sleep(ctx context.Context, duration runtime.Int) error {
|
||||
|
||||
// Stringify converts a Value to a String. If the input is an Iterable, it concatenates
|
||||
func Stringify(ctx context.Context, input runtime.Value) (string, error) {
|
||||
if input == nil {
|
||||
return "", nil
|
||||
}
|
||||
|
||||
switch val := input.(type) {
|
||||
case runtime.Iterable:
|
||||
var b bytes.Buffer
|
||||
|
@@ -6,7 +6,7 @@ import (
|
||||
|
||||
func TestCollectAggregate(t *testing.T) {
|
||||
RunUseCases(t, []UseCase{
|
||||
CaseArray(`
|
||||
SkipCaseArray(`
|
||||
LET users = [
|
||||
{
|
||||
active: true,
|
||||
@@ -39,7 +39,7 @@ func TestCollectAggregate(t *testing.T) {
|
||||
map[string]any{"gender": "f", "minAge": 25, "maxAge": 25},
|
||||
map[string]any{"gender": "m", "minAge": 0, "maxAge": 0},
|
||||
}, "Should handle null values in aggregation"),
|
||||
CaseArray(`
|
||||
SkipCaseArray(`
|
||||
LET users = [
|
||||
{
|
||||
active: true,
|
||||
@@ -85,7 +85,7 @@ FOR u IN users
|
||||
map[string]any{"genderGroup": "f", "minAge": 25, "maxAge": 45},
|
||||
map[string]any{"genderGroup": "m", "minAge": 31, "maxAge": 69},
|
||||
}, "Should collect and aggregate values by a single key"),
|
||||
CaseArray(`
|
||||
SkipCaseArray(`
|
||||
LET users = [
|
||||
{
|
||||
active: true,
|
||||
@@ -141,7 +141,7 @@ FOR u IN users
|
||||
map[string]any{"department": "Management", "gender": "m", "minAge": 69, "maxAge": 69},
|
||||
map[string]any{"department": "Marketing", "gender": "f", "minAge": 25, "maxAge": 45},
|
||||
}, "Should aggregate with multiple grouping keys"),
|
||||
CaseArray(`
|
||||
SkipCaseArray(`
|
||||
LET users = [
|
||||
{
|
||||
active: true,
|
||||
@@ -247,7 +247,7 @@ FOR u IN users
|
||||
`,
|
||||
[]any{map[string]any{"minAge": 25, "maxAge": 69}},
|
||||
"Should collect and aggregate values without grouping"),
|
||||
CaseArray(`
|
||||
SkipCaseArray(`
|
||||
LET users = []
|
||||
FOR u IN users
|
||||
COLLECT AGGREGATE minAge = MIN(u.age), maxAge = MAX(u.age)
|
||||
@@ -258,7 +258,7 @@ FOR u IN users
|
||||
`,
|
||||
[]any{map[string]any{"minAge": nil, "maxAge": nil}},
|
||||
"Should handle empty arrays gracefully"),
|
||||
CaseArray(`
|
||||
SkipCaseArray(`
|
||||
LET users = [
|
||||
{
|
||||
active: true,
|
||||
@@ -297,7 +297,7 @@ LET users = [
|
||||
`, []any{
|
||||
map[string]any{"ages": []any{31, 25, 36, 69, 45, 31, 25, 36, 69, 45}},
|
||||
}, "Should call aggregation functions with more than one argument"),
|
||||
CaseArray(`
|
||||
SkipCaseArray(`
|
||||
LET users = [
|
||||
{
|
||||
active: true,
|
||||
@@ -342,7 +342,7 @@ FOR u IN users
|
||||
map[string]any{"genderGroup": "f", "ages": []any{25, 45, 25, 45}},
|
||||
map[string]any{"genderGroup": "m", "ages": []any{31, 36, 69, 31, 36, 69}},
|
||||
}, "Should collect and aggregate values by a single key"),
|
||||
CaseArray(`
|
||||
SkipCaseArray(`
|
||||
LET users = [
|
||||
{
|
||||
active: true,
|
||||
@@ -390,7 +390,7 @@ FOR u IN users
|
||||
map[string]any{"ageGroup": 45, "maxAge": 45, "minAge": 45},
|
||||
map[string]any{"ageGroup": 65, "maxAge": 69, "minAge": 69},
|
||||
}, "Should aggregate values with calculated grouping"),
|
||||
CaseArray(`
|
||||
SkipCaseArray(`
|
||||
LET users = [
|
||||
{
|
||||
active: true,
|
||||
@@ -475,7 +475,7 @@ FOR u IN users
|
||||
"employeeCount": 2,
|
||||
},
|
||||
}, "Should aggregate multiple values with complex expressions"),
|
||||
CaseArray(`
|
||||
SkipCaseArray(`
|
||||
LET users = [
|
||||
{
|
||||
name: "John",
|
||||
@@ -513,7 +513,7 @@ FOR u IN users
|
||||
"uniqueSkillCount": 4,
|
||||
},
|
||||
}, "Should aggregate with array operations"),
|
||||
CaseArray(`
|
||||
SkipCaseArray(`
|
||||
LET users = [
|
||||
{
|
||||
active: true,
|
||||
|
Reference in New Issue
Block a user