mirror of
https://github.com/MontFerret/ferret.git
synced 2025-08-13 19:52:52 +02:00
Add support for grouping with counting in collect clauses
Introduced new opcode `OpCollectKc` for grouping with counting and updated the VM, compiler, and dataset logic to support this functionality. Enhanced the `CollectClause` to handle `WITH COUNT INTO` syntax, added `emitCollectCountProjection`, and implemented relevant test cases.
This commit is contained in:
@@ -2332,6 +2332,55 @@ LET users = [
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
}, "Should create default projection with custom KEEP with multiple custom names"),
|
}, "Should create default projection with custom KEEP with multiple custom names"),
|
||||||
|
CaseArray(
|
||||||
|
`LET users = [
|
||||||
|
{
|
||||||
|
active: true,
|
||||||
|
age: 31,
|
||||||
|
gender: "m",
|
||||||
|
married: true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
active: true,
|
||||||
|
age: 25,
|
||||||
|
gender: "f",
|
||||||
|
married: false
|
||||||
|
},
|
||||||
|
{
|
||||||
|
active: true,
|
||||||
|
age: 36,
|
||||||
|
gender: "m",
|
||||||
|
married: false
|
||||||
|
},
|
||||||
|
{
|
||||||
|
active: false,
|
||||||
|
age: 69,
|
||||||
|
gender: "m",
|
||||||
|
married: true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
active: true,
|
||||||
|
age: 45,
|
||||||
|
gender: "f",
|
||||||
|
married: true
|
||||||
|
}
|
||||||
|
]
|
||||||
|
FOR i IN users
|
||||||
|
COLLECT gender = i.gender WITH COUNT INTO numberOfUsers
|
||||||
|
RETURN {
|
||||||
|
gender,
|
||||||
|
values: numberOfUsers
|
||||||
|
}
|
||||||
|
`, []any{
|
||||||
|
map[string]any{
|
||||||
|
"gender": "f",
|
||||||
|
"values": 2,
|
||||||
|
},
|
||||||
|
map[string]any{
|
||||||
|
"gender": "m",
|
||||||
|
"values": 3,
|
||||||
|
},
|
||||||
|
}, "Should group and count result by a single key"),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -401,136 +401,130 @@ func (v *visitor) VisitLimitClause(ctx *fql.LimitClauseContext) interface{} {
|
|||||||
|
|
||||||
func (v *visitor) VisitCollectClause(ctx *fql.CollectClauseContext) interface{} {
|
func (v *visitor) VisitCollectClause(ctx *fql.CollectClauseContext) interface{} {
|
||||||
if c := ctx.CollectGrouping(); c != nil {
|
if c := ctx.CollectGrouping(); c != nil {
|
||||||
|
// TODO: Undefine original loop variables
|
||||||
|
loop := v.loops.Loop()
|
||||||
|
|
||||||
if cvar := ctx.CollectGroupVariable(); cvar != nil {
|
// We collect the aggregation keys
|
||||||
return v.visitCollectGrouping(c.(*fql.CollectGroupingContext), cvar.(*fql.CollectGroupVariableContext))
|
// And wrap each loop element by a KeyValuePair
|
||||||
}
|
// Where a key is either a single value or a list of values
|
||||||
// Collect by grouping
|
// These KeyValuePairs are then added to the dataset
|
||||||
return v.visitCollectGrouping(c.(*fql.CollectGroupingContext), nil)
|
var kvKeyReg vm.Operand
|
||||||
}
|
selectors := c.AllCollectSelector()
|
||||||
|
isMultiSelector := len(selectors) > 1
|
||||||
|
|
||||||
return nil
|
if isMultiSelector {
|
||||||
}
|
// We create a sequence of registers for the clauses
|
||||||
|
// To pack them into an array
|
||||||
|
selectorRegs := v.registers.AllocateSequence(len(selectors))
|
||||||
|
|
||||||
func (v *visitor) visitCollectGrouping(ctx *fql.CollectGroupingContext, cvar *fql.CollectGroupVariableContext) interface{} {
|
for i, selector := range selectors {
|
||||||
// TODO: Undefine original loop variables
|
reg := selector.Accept(v).(vm.Operand)
|
||||||
loop := v.loops.Loop()
|
v.emitter.EmitAB(vm.OpMove, selectorRegs.Registers[i], reg)
|
||||||
|
// Free the register after moving its value to the sequence register
|
||||||
// We collect the aggregation keys
|
v.registers.Free(reg)
|
||||||
// And wrap each loop element by a KeyValuePair
|
|
||||||
// Where a key is either a single value or a list of values
|
|
||||||
// These KeyValuePairs are then added to the dataset
|
|
||||||
var kvKeyReg vm.Operand
|
|
||||||
selectors := ctx.AllCollectSelector()
|
|
||||||
isMultiSelector := len(selectors) > 1
|
|
||||||
|
|
||||||
if isMultiSelector {
|
|
||||||
// We create a sequence of registers for the clauses
|
|
||||||
// To pack them into an array
|
|
||||||
selectorRegs := v.registers.AllocateSequence(len(selectors))
|
|
||||||
|
|
||||||
for i, selector := range selectors {
|
|
||||||
reg := selector.Accept(v).(vm.Operand)
|
|
||||||
v.emitter.EmitAB(vm.OpMove, selectorRegs.Registers[i], reg)
|
|
||||||
// Free the register after moving its value to the sequence register
|
|
||||||
v.registers.Free(reg)
|
|
||||||
}
|
|
||||||
|
|
||||||
kvKeyReg = v.registers.Allocate(Temp)
|
|
||||||
v.emitter.EmitAs(vm.OpLoadList, kvKeyReg, selectorRegs)
|
|
||||||
v.registers.FreeSequence(selectorRegs)
|
|
||||||
} else {
|
|
||||||
kvKeyReg = selectors[0].Accept(v).(vm.Operand)
|
|
||||||
}
|
|
||||||
|
|
||||||
kvValReg := v.registers.Allocate(Temp)
|
|
||||||
|
|
||||||
if loop.Kind == ForLoop {
|
|
||||||
v.emitter.EmitAB(vm.OpIterValue, kvValReg, loop.Iterator)
|
|
||||||
} else {
|
|
||||||
v.emitter.EmitAB(vm.OpWhileLoopValue, kvValReg, loop.Iterator)
|
|
||||||
}
|
|
||||||
|
|
||||||
var projectionVariableName string
|
|
||||||
|
|
||||||
if cvar != nil {
|
|
||||||
if identifier := cvar.Identifier(); identifier != nil {
|
|
||||||
projectionVariableName = v.emitDefaultCollectProjection(loop, kvValReg, identifier, cvar.CollectGroupVariableKeeper())
|
|
||||||
} else if selector := cvar.CollectSelector(); selector != nil {
|
|
||||||
projectionVariableName = v.emitCustomCollectProjection(loop, kvValReg, selector)
|
|
||||||
}
|
|
||||||
|
|
||||||
v.emitter.EmitABC(vm.OpCollectKV, loop.Result, kvKeyReg, kvValReg)
|
|
||||||
} else {
|
|
||||||
v.emitter.EmitABC(vm.OpCollectK, loop.Result, kvKeyReg, kvValReg)
|
|
||||||
}
|
|
||||||
|
|
||||||
v.emitter.EmitJump(vm.OpJump, loop.Jump-loop.JumpOffset)
|
|
||||||
v.emitter.EmitA(vm.OpClose, loop.Iterator)
|
|
||||||
|
|
||||||
if loop.Kind == ForLoop {
|
|
||||||
v.emitter.PatchJump(loop.Jump)
|
|
||||||
} else {
|
|
||||||
v.emitter.PatchJumpAB(loop.Jump)
|
|
||||||
}
|
|
||||||
|
|
||||||
v.emitter.EmitAB(vm.OpSort, loop.Result, v.loadConstant(runtime.ZeroInt))
|
|
||||||
|
|
||||||
// Replace source with sorted array
|
|
||||||
v.emitter.EmitAB(vm.OpMove, loop.Src, loop.Result)
|
|
||||||
|
|
||||||
v.symbols.ExitScope()
|
|
||||||
v.symbols.EnterScope()
|
|
||||||
|
|
||||||
// Create new for loop
|
|
||||||
v.emitLoopBegin(loop)
|
|
||||||
|
|
||||||
// Now we need to expand group variables from the dataset
|
|
||||||
v.emitter.EmitAB(vm.OpIterKey, kvValReg, loop.Iterator)
|
|
||||||
|
|
||||||
// If the projection is used, we allocate a new register for the variable and put the iterator's value into it
|
|
||||||
if projectionVariableName != "" {
|
|
||||||
v.emitter.EmitAB(vm.OpIterValue, v.symbols.DefineVariable(projectionVariableName), loop.Iterator)
|
|
||||||
}
|
|
||||||
|
|
||||||
//loop.ValueName = ""
|
|
||||||
//loop.KeyName = ""
|
|
||||||
// TODO: Reuse the registers
|
|
||||||
v.registers.Free(loop.Value)
|
|
||||||
v.registers.Free(loop.Key)
|
|
||||||
loop.Value = vm.NoopOperand
|
|
||||||
loop.Key = vm.NoopOperand
|
|
||||||
|
|
||||||
if isMultiSelector {
|
|
||||||
variables := make([]vm.Operand, len(selectors))
|
|
||||||
|
|
||||||
for i, selector := range selectors {
|
|
||||||
name := selector.Identifier().GetText()
|
|
||||||
|
|
||||||
if variables[i] == vm.NoopOperand {
|
|
||||||
variables[i] = v.symbols.DefineVariable(name)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
v.emitter.EmitABC(vm.OpLoadIndex, variables[i], kvValReg, v.loadConstant(runtime.Int(i)))
|
kvKeyReg = v.registers.Allocate(Temp)
|
||||||
|
v.emitter.EmitAs(vm.OpLoadList, kvKeyReg, selectorRegs)
|
||||||
|
v.registers.FreeSequence(selectorRegs)
|
||||||
|
} else {
|
||||||
|
kvKeyReg = selectors[0].Accept(v).(vm.Operand)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Free the register after moving its value to the variable
|
kvValReg := v.registers.Allocate(Temp)
|
||||||
for _, reg := range variables {
|
|
||||||
v.registers.Free(reg)
|
if loop.Kind == ForLoop {
|
||||||
|
v.emitter.EmitAB(vm.OpIterValue, kvValReg, loop.Iterator)
|
||||||
|
} else {
|
||||||
|
v.emitter.EmitAB(vm.OpWhileLoopValue, kvValReg, loop.Iterator)
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
// Get the variable name
|
var projectionVariableName string
|
||||||
name := selectors[0].Identifier().GetText()
|
|
||||||
// Define a variable for each selector
|
if groupVar := ctx.CollectGroupVariable(); groupVar != nil {
|
||||||
varReg := v.symbols.DefineVariable(name)
|
if identifier := groupVar.Identifier(); identifier != nil {
|
||||||
// If we have a single selector, we can just move the value
|
projectionVariableName = v.emitDefaultCollectGroupProjection(loop, kvValReg, identifier, groupVar.CollectGroupVariableKeeper())
|
||||||
v.emitter.EmitAB(vm.OpMove, varReg, kvValReg)
|
} else if selector := groupVar.CollectSelector(); selector != nil {
|
||||||
|
projectionVariableName = v.emitCustomCollectGroupProjection(loop, kvValReg, selector)
|
||||||
|
}
|
||||||
|
|
||||||
|
v.emitter.EmitABC(vm.OpCollectKV, loop.Result, kvKeyReg, kvValReg)
|
||||||
|
} else if countVar := ctx.CollectCounter(); countVar != nil {
|
||||||
|
projectionVariableName = v.emitCollectCountProjection(loop, kvValReg, countVar)
|
||||||
|
v.emitter.EmitABC(vm.OpCollectKc, loop.Result, kvKeyReg, kvValReg)
|
||||||
|
} else {
|
||||||
|
v.emitter.EmitABC(vm.OpCollectK, loop.Result, kvKeyReg, kvValReg)
|
||||||
|
}
|
||||||
|
|
||||||
|
v.emitter.EmitJump(vm.OpJump, loop.Jump-loop.JumpOffset)
|
||||||
|
v.emitter.EmitA(vm.OpClose, loop.Iterator)
|
||||||
|
|
||||||
|
if loop.Kind == ForLoop {
|
||||||
|
v.emitter.PatchJump(loop.Jump)
|
||||||
|
} else {
|
||||||
|
v.emitter.PatchJumpAB(loop.Jump)
|
||||||
|
}
|
||||||
|
|
||||||
|
v.emitter.EmitAB(vm.OpSort, loop.Result, v.loadConstant(runtime.ZeroInt))
|
||||||
|
|
||||||
|
// Replace source with sorted array
|
||||||
|
v.emitter.EmitAB(vm.OpMove, loop.Src, loop.Result)
|
||||||
|
|
||||||
|
v.symbols.ExitScope()
|
||||||
|
v.symbols.EnterScope()
|
||||||
|
|
||||||
|
// Create new for loop
|
||||||
|
v.emitLoopBegin(loop)
|
||||||
|
|
||||||
|
// Now we need to expand group variables from the dataset
|
||||||
|
v.emitter.EmitAB(vm.OpIterKey, kvValReg, loop.Iterator)
|
||||||
|
|
||||||
|
// If the projection is used, we allocate a new register for the variable and put the iterator's value into it
|
||||||
|
if projectionVariableName != "" {
|
||||||
|
v.emitter.EmitAB(vm.OpIterValue, v.symbols.DefineVariable(projectionVariableName), loop.Iterator)
|
||||||
|
}
|
||||||
|
|
||||||
|
//loop.ValueName = ""
|
||||||
|
//loop.KeyName = ""
|
||||||
|
// TODO: Reuse the registers
|
||||||
|
v.registers.Free(loop.Value)
|
||||||
|
v.registers.Free(loop.Key)
|
||||||
|
loop.Value = vm.NoopOperand
|
||||||
|
loop.Key = vm.NoopOperand
|
||||||
|
|
||||||
|
if isMultiSelector {
|
||||||
|
variables := make([]vm.Operand, len(selectors))
|
||||||
|
|
||||||
|
for i, selector := range selectors {
|
||||||
|
name := selector.Identifier().GetText()
|
||||||
|
|
||||||
|
if variables[i] == vm.NoopOperand {
|
||||||
|
variables[i] = v.symbols.DefineVariable(name)
|
||||||
|
}
|
||||||
|
|
||||||
|
v.emitter.EmitABC(vm.OpLoadIndex, variables[i], kvValReg, v.loadConstant(runtime.Int(i)))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Free the register after moving its value to the variable
|
||||||
|
for _, reg := range variables {
|
||||||
|
v.registers.Free(reg)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Get the variable name
|
||||||
|
name := selectors[0].Identifier().GetText()
|
||||||
|
// Define a variable for each selector
|
||||||
|
varReg := v.symbols.DefineVariable(name)
|
||||||
|
// If we have a single selector, we can just move the value
|
||||||
|
v.emitter.EmitAB(vm.OpMove, varReg, kvValReg)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v *visitor) emitDefaultCollectProjection(loop *Loop, kvValReg vm.Operand, identifier antlr.TerminalNode, keeper fql.ICollectGroupVariableKeeperContext) string {
|
func (v *visitor) emitDefaultCollectGroupProjection(loop *Loop, kvValReg vm.Operand, identifier antlr.TerminalNode, keeper fql.ICollectGroupVariableKeeperContext) string {
|
||||||
if keeper == nil {
|
if keeper == nil {
|
||||||
seq := v.registers.AllocateSequence(2) // Key and Value for Map
|
seq := v.registers.AllocateSequence(2) // Key and Value for Map
|
||||||
|
|
||||||
@@ -558,8 +552,7 @@ func (v *visitor) emitDefaultCollectProjection(loop *Loop, kvValReg vm.Operand,
|
|||||||
return identifier.GetText()
|
return identifier.GetText()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v *visitor) emitCustomCollectProjection(_ *Loop, kvValReg vm.Operand, selector fql.ICollectSelectorContext) string {
|
func (v *visitor) emitCustomCollectGroupProjection(_ *Loop, kvValReg vm.Operand, selector fql.ICollectSelectorContext) string {
|
||||||
selector.Identifier().GetText()
|
|
||||||
selectorReg := selector.Expression().Accept(v).(vm.Operand)
|
selectorReg := selector.Expression().Accept(v).(vm.Operand)
|
||||||
v.emitter.EmitAB(vm.OpMove, kvValReg, selectorReg)
|
v.emitter.EmitAB(vm.OpMove, kvValReg, selectorReg)
|
||||||
v.registers.Free(selectorReg)
|
v.registers.Free(selectorReg)
|
||||||
@@ -567,6 +560,14 @@ func (v *visitor) emitCustomCollectProjection(_ *Loop, kvValReg vm.Operand, sele
|
|||||||
return selector.Identifier().GetText()
|
return selector.Identifier().GetText()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (v *visitor) emitCollectCountProjection(_ *Loop, kvValReg vm.Operand, selector fql.ICollectCounterContext) string {
|
||||||
|
//selectorReg := selector.Expression().Accept(v).(vm.Operand)
|
||||||
|
//v.emitter.EmitAB(vm.OpMove, kvValReg, selectorReg)
|
||||||
|
//v.registers.Free(selectorReg)
|
||||||
|
|
||||||
|
return selector.Identifier().GetText()
|
||||||
|
}
|
||||||
|
|
||||||
func (v *visitor) VisitCollectSelector(ctx *fql.CollectSelectorContext) interface{} {
|
func (v *visitor) VisitCollectSelector(ctx *fql.CollectSelectorContext) interface{} {
|
||||||
if c := ctx.Expression(); c != nil {
|
if c := ctx.Expression(); c != nil {
|
||||||
return c.Accept(v)
|
return c.Accept(v)
|
||||||
|
@@ -103,7 +103,7 @@ func (ds *DataSet) AddKV(ctx context.Context, key, value runtime.Value) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ds *DataSet) CollectKey(ctx context.Context, key runtime.Value) error {
|
func (ds *DataSet) CollectK(ctx context.Context, key runtime.Value) error {
|
||||||
k, err := Stringify(ctx, key)
|
k, err := Stringify(ctx, key)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -126,6 +126,38 @@ func (ds *DataSet) CollectKey(ctx context.Context, key runtime.Value) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (ds *DataSet) CollectKc(ctx context.Context, key runtime.Value) error {
|
||||||
|
k, err := Stringify(ctx, key)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if ds.grouping == nil {
|
||||||
|
ds.grouping = make(map[string]runtime.Value)
|
||||||
|
}
|
||||||
|
|
||||||
|
group, exists := ds.grouping[k]
|
||||||
|
|
||||||
|
if !exists {
|
||||||
|
group = NewKV(key, runtime.ZeroInt)
|
||||||
|
ds.grouping[k] = group
|
||||||
|
_ = ds.values.Add(ctx, group)
|
||||||
|
}
|
||||||
|
|
||||||
|
kv := group.(*KV)
|
||||||
|
if count, ok := kv.Value.(runtime.Int); ok {
|
||||||
|
sum := count + 1
|
||||||
|
kv.Value = sum
|
||||||
|
} else {
|
||||||
|
kv.Value = runtime.NewInt(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
ds.keyed = true
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (ds *DataSet) CollectKV(ctx context.Context, key, value runtime.Value) error {
|
func (ds *DataSet) CollectKV(ctx context.Context, key, value runtime.Value) error {
|
||||||
k, err := Stringify(ctx, key)
|
k, err := Stringify(ctx, key)
|
||||||
|
|
||||||
|
@@ -79,6 +79,7 @@ const (
|
|||||||
OpPush // Adds a value to a dataset
|
OpPush // Adds a value to a dataset
|
||||||
OpPushKV // Adds a key-value pair to a dataset
|
OpPushKV // Adds a key-value pair to a dataset
|
||||||
OpCollectK // Adds a key to a group
|
OpCollectK // Adds a key to a group
|
||||||
|
OpCollectKc // Adds a key to a group and counts it
|
||||||
OpCollectKV // Adds a value to a group using key
|
OpCollectKV // Adds a value to a group using key
|
||||||
OpLimit
|
OpLimit
|
||||||
OpSkip
|
OpSkip
|
||||||
|
13
pkg/vm/vm.go
13
pkg/vm/vm.go
@@ -387,7 +387,18 @@ loop:
|
|||||||
ds := reg[dst].(*internal.DataSet)
|
ds := reg[dst].(*internal.DataSet)
|
||||||
key := reg[src1]
|
key := reg[src1]
|
||||||
|
|
||||||
if err := ds.CollectKey(ctx, key); err != nil {
|
if err := ds.CollectK(ctx, key); err != nil {
|
||||||
|
if _, catch := tryCatch(vm.pc); catch {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
case OpCollectKc:
|
||||||
|
ds := reg[dst].(*internal.DataSet)
|
||||||
|
key := reg[src1]
|
||||||
|
|
||||||
|
if err := ds.CollectKc(ctx, key); err != nil {
|
||||||
if _, catch := tryCatch(vm.pc); catch {
|
if _, catch := tryCatch(vm.pc); catch {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user