1
0
mirror of https://github.com/MontFerret/ferret.git synced 2025-08-15 20:02:56 +02:00

Implement collectors for grouping and counting in collect clauses

This commit is contained in:
Tim Voronov
2025-06-06 11:09:45 -04:00
parent 31cf71d8a8
commit dc5ce9a5cc
12 changed files with 694 additions and 165 deletions

View File

@@ -1568,11 +1568,52 @@ LET users = [
map[string]any{"active": true, "age": 31, "gender": "m"},
map[string]any{"active": true, "age": 36, "gender": "m"},
}, "Should compile query with SORT and FILTER statements."),
CaseObject(`
LET users = [
{
active: true,
age: 31,
gender: "m"
},
{
active: true,
age: 29,
gender: "f"
},
{
active: true,
age: 36,
gender: "m"
}
]
LET sorted = (FOR u IN users
SORT u.age
FILTER u.gender == "m"
RETURN u)
RETURN sorted[0]
`, map[string]any{"active": true, "age": 31, "gender": "m"}, "Should return correct value from a sorted DataSet."),
}, vm.WithFunction("TEST", func(ctx context.Context, args ...runtime.Value) (runtime.Value, error) {
return runtime.None, nil
}))
}
// COLLECT vs. RETURN DISTINCT
//
// In order to make a result set unique, one can either use COLLECT or RETURN DISTINCT.
//
// FOR u IN users
// RETURN DISTINCT u.age
// FOR u IN users
// COLLECT age = u.age
// RETURN age
// Behind the scenes, both variants create a CollectNode. However, they use different implementations of COLLECT that have different properties:
//
// RETURN DISTINCT maintains the order of results, but it is limited to a single value.
//
// COLLECT changes the order of results (sorted or undefined), but it supports multiple values and is more flexible than RETURN DISTINCT.
//
// Aside from COLLECTs sophisticated grouping and aggregation capabilities, it allows you to place a LIMIT operation before RETURN to potentially stop the COLLECT operation early.
func TestCollect(t *testing.T) {
RunUseCases(t, []UseCase{
SkipCaseCompilationError(`
@@ -1690,6 +1731,44 @@ LET users = [
COLLECT gender = i.gender
RETURN gender
`, []any{"f", "m"}, "Should group result by a single key"),
Case(`
LET users = [
{
active: true,
married: true,
age: 31,
gender: "m"
},
{
active: true,
married: false,
age: 25,
gender: "f"
},
{
active: true,
married: false,
age: 36,
gender: "m"
},
{
active: false,
married: true,
age: 69,
gender: "m"
},
{
active: true,
married: true,
age: 45,
gender: "f"
}
]
LET grouped = (FOR i IN users
COLLECT gender = i.gender
RETURN gender)
RETURN grouped[0]
`, "f", "Should return correct group key by an index"),
CaseArray(
`LET users = [
{
@@ -1824,6 +1903,15 @@ LET users = [
},
},
}, "Should create default projection"),
CaseArray(`
LET users = []
FOR i IN users
COLLECT gender = i.gender INTO genders
RETURN {
gender,
values: genders
}
`, []any{}, "COLLECT gender = i.gender INTO genders: should return an empty array when source is empty"),
CaseArray(
`LET users = [
{
@@ -2015,6 +2103,16 @@ LET users = [
},
}, "Should create default projection with default KEEP"),
CaseArray(`
LET users = []
FOR i IN users
LET married = i.married
COLLECT gender = i.gender INTO genders KEEP married
RETURN {
gender,
values: genders
}
`, []any{}, "COLLECT gender = i.gender INTO genders KEEP married: Should return an empty array when source is empty"),
CaseArray(`
LET users = [
{
active: true,
@@ -2381,6 +2479,64 @@ LET users = [
"values": 3,
},
}, "Should group and count result by a single key"),
CaseArray(
`
LET users = []
FOR i IN users
COLLECT gender = i.gender WITH COUNT INTO numberOfUsers
RETURN {
gender,
values: numberOfUsers
}
`, []any{}, "COLLECT gender = i.gender WITH COUNT INTO numberOfUsers: Should return empty array when source is empty"),
CaseArray(
`LET users = [
{
active: true,
age: 31,
gender: "m",
married: true
},
{
active: true,
age: 25,
gender: "f",
married: false
},
{
active: true,
age: 36,
gender: "m",
married: false
},
{
active: false,
age: 69,
gender: "m",
married: true
},
{
active: true,
age: 45,
gender: "f",
married: true
}
]
FOR i IN users
COLLECT WITH COUNT INTO numberOfUsers
RETURN numberOfUsers
`, []any{
5,
}, "Should just count the number of items in the source"),
CaseArray(
`LET users = []
FOR i IN users
COLLECT WITH COUNT INTO numberOfUsers
RETURN numberOfUsers
`, []any{
0,
}, "Should return 0 when there are no items in the source"),
})
}

View File

@@ -39,6 +39,13 @@ func (e *Emitter) EmitJumpc(op vm.Opcode, pos int, reg vm.Operand) int {
return len(e.instructions) - 1
}
func (e *Emitter) PatchSwapAx(pos int, op vm.Opcode, dst vm.Operand, arg int) {
e.instructions[pos] = vm.Instruction{
Opcode: op,
Operands: [3]vm.Operand{dst, vm.Operand(arg), vm.NoopOperand},
}
}
// PatchJump patches a jump opcode.
func (e *Emitter) PatchJump(instr int) {
e.instructions[instr].Operands[0] = vm.Operand(len(e.instructions) - 1)

View File

@@ -23,6 +23,7 @@ type (
KeyName string
Key vm.Operand
Result vm.Operand
ResultPos int
}
LoopTable struct {

View File

@@ -400,7 +400,6 @@ func (v *visitor) VisitLimitClause(ctx *fql.LimitClauseContext) interface{} {
}
func (v *visitor) VisitCollectClause(ctx *fql.CollectClauseContext) interface{} {
if c := ctx.CollectGrouping(); c != nil {
// TODO: Undefine original loop variables
loop := v.loops.Loop()
@@ -409,10 +408,85 @@ func (v *visitor) VisitCollectClause(ctx *fql.CollectClauseContext) interface{}
// Where a key is either a single value or a list of values
// These KeyValuePairs are then added to the dataset
var kvKeyReg vm.Operand
selectors := c.AllCollectSelector()
isMultiSelector := len(selectors) > 1
kvValReg := v.registers.Allocate(Temp)
var groupSelectors []fql.ICollectSelectorContext
var isGrouping bool
var isCounting bool
grouping := ctx.CollectGrouping()
if isMultiSelector {
if grouping != nil {
isGrouping = true
groupSelectors = grouping.AllCollectSelector()
kvKeyReg = v.emitGroupingKeySelectors(groupSelectors)
}
v.emitIterValue(loop, kvValReg)
var projectionVariableName string
// TODO: Create enum for better readability
collectorType := 1
// If we have a collect group variable, we need to project it
if groupVar := ctx.CollectGroupVariable(); groupVar != nil {
// Projection can be either a default projection (identifier) or a custom projection (selector expression)
if identifier := groupVar.Identifier(); identifier != nil {
projectionVariableName = v.emitDefaultCollectGroupProjection(loop, kvValReg, identifier, groupVar.CollectGroupVariableKeeper())
} else if selector := groupVar.CollectSelector(); selector != nil {
projectionVariableName = v.emitCustomCollectGroupProjection(loop, kvValReg, selector)
}
collectorType = 3
} else if countVar := ctx.CollectCounter(); countVar != nil {
projectionVariableName = v.emitCollectCountProjection(loop, kvValReg, countVar)
isCounting = true
if isGrouping {
collectorType = 2
} else {
collectorType = 0
}
}
// We replace DataSet initialization with Collector initialization
v.emitter.PatchSwapAx(loop.ResultPos, vm.OpCollector, loop.Result, collectorType)
v.emitter.EmitABC(vm.OpPushKV, loop.Result, kvKeyReg, kvValReg)
v.emitIterJumpOrClose(loop)
// Replace source with sorted array
v.patchLoop(loop)
// If the projection is used, we allocate a new register for the variable and put the iterator's value into it
if projectionVariableName != "" {
// Now we need to expand group variables from the dataset
v.emitIterKey(loop, kvValReg)
v.emitIterValue(loop, v.symbols.DefineVariable(projectionVariableName))
} else {
v.emitIterValue(loop, kvValReg)
}
if isCounting {
}
//loop.ValueName = ""
//loop.KeyName = ""
// TODO: Reuse the registers
v.registers.Free(loop.Value)
v.registers.Free(loop.Key)
loop.Value = vm.NoopOperand
loop.Key = vm.NoopOperand
if isGrouping {
v.emitGroupingKeySelectorVariables(groupSelectors, kvValReg)
}
return nil
}
func (v *visitor) emitGroupingKeySelectors(selectors []fql.ICollectSelectorContext) vm.Operand {
var kvKeyReg vm.Operand
if len(selectors) > 1 {
// We create a sequence of registers for the clauses
// To pack them into an array
selectorRegs := v.registers.AllocateSequence(len(selectors))
@@ -425,74 +499,17 @@ func (v *visitor) VisitCollectClause(ctx *fql.CollectClauseContext) interface{}
}
kvKeyReg = v.registers.Allocate(Temp)
v.emitter.EmitAs(vm.OpLoadList, kvKeyReg, selectorRegs)
v.emitter.EmitAs(vm.OpList, kvKeyReg, selectorRegs)
v.registers.FreeSequence(selectorRegs)
} else {
kvKeyReg = selectors[0].Accept(v).(vm.Operand)
}
kvValReg := v.registers.Allocate(Temp)
if loop.Kind == ForLoop {
v.emitter.EmitAB(vm.OpIterValue, kvValReg, loop.Iterator)
} else {
v.emitter.EmitAB(vm.OpWhileLoopValue, kvValReg, loop.Iterator)
return kvKeyReg
}
var projectionVariableName string
if groupVar := ctx.CollectGroupVariable(); groupVar != nil {
if identifier := groupVar.Identifier(); identifier != nil {
projectionVariableName = v.emitDefaultCollectGroupProjection(loop, kvValReg, identifier, groupVar.CollectGroupVariableKeeper())
} else if selector := groupVar.CollectSelector(); selector != nil {
projectionVariableName = v.emitCustomCollectGroupProjection(loop, kvValReg, selector)
}
v.emitter.EmitABC(vm.OpCollectKV, loop.Result, kvKeyReg, kvValReg)
} else if countVar := ctx.CollectCounter(); countVar != nil {
projectionVariableName = v.emitCollectCountProjection(loop, kvValReg, countVar)
v.emitter.EmitABC(vm.OpCollectKc, loop.Result, kvKeyReg, kvValReg)
} else {
v.emitter.EmitABC(vm.OpCollectK, loop.Result, kvKeyReg, kvValReg)
}
v.emitter.EmitJump(vm.OpJump, loop.Jump-loop.JumpOffset)
v.emitter.EmitA(vm.OpClose, loop.Iterator)
if loop.Kind == ForLoop {
v.emitter.PatchJump(loop.Jump)
} else {
v.emitter.PatchJumpAB(loop.Jump)
}
v.emitter.EmitAB(vm.OpSort, loop.Result, v.loadConstant(runtime.ZeroInt))
// Replace source with sorted array
v.emitter.EmitAB(vm.OpMove, loop.Src, loop.Result)
v.symbols.ExitScope()
v.symbols.EnterScope()
// Create new for loop
v.emitLoopBegin(loop)
// Now we need to expand group variables from the dataset
v.emitter.EmitAB(vm.OpIterKey, kvValReg, loop.Iterator)
// If the projection is used, we allocate a new register for the variable and put the iterator's value into it
if projectionVariableName != "" {
v.emitter.EmitAB(vm.OpIterValue, v.symbols.DefineVariable(projectionVariableName), loop.Iterator)
}
//loop.ValueName = ""
//loop.KeyName = ""
// TODO: Reuse the registers
v.registers.Free(loop.Value)
v.registers.Free(loop.Key)
loop.Value = vm.NoopOperand
loop.Key = vm.NoopOperand
if isMultiSelector {
func (v *visitor) emitGroupingKeySelectorVariables(selectors []fql.ICollectSelectorContext, kvValReg vm.Operand) {
if len(selectors) > 1 {
variables := make([]vm.Operand, len(selectors))
for i, selector := range selectors {
@@ -517,11 +534,6 @@ func (v *visitor) VisitCollectClause(ctx *fql.CollectClauseContext) interface{}
// If we have a single selector, we can just move the value
v.emitter.EmitAB(vm.OpMove, varReg, kvValReg)
}
return nil
}
return nil
}
func (v *visitor) emitDefaultCollectGroupProjection(loop *Loop, kvValReg vm.Operand, identifier antlr.TerminalNode, keeper fql.ICollectGroupVariableKeeperContext) string {
@@ -532,7 +544,7 @@ func (v *visitor) emitDefaultCollectGroupProjection(loop *Loop, kvValReg vm.Oper
// We will keep it for now for backward compatibility.
v.loadConstantTo(runtime.String(loop.ValueName), seq.Registers[0]) // Map key
v.emitter.EmitAB(vm.OpMove, seq.Registers[1], kvValReg) // Map value
v.emitter.EmitAs(vm.OpLoadMap, kvValReg, seq)
v.emitter.EmitAs(vm.OpMap, kvValReg, seq)
v.registers.FreeSequence(seq)
} else {
@@ -545,7 +557,7 @@ func (v *visitor) emitDefaultCollectGroupProjection(loop *Loop, kvValReg vm.Oper
v.emitter.EmitAB(vm.OpMove, seq.Registers[j+1], v.symbols.Variable(varName))
}
v.emitter.EmitAs(vm.OpLoadMap, kvValReg, seq)
v.emitter.EmitAs(vm.OpMap, kvValReg, seq)
v.registers.FreeSequence(seq)
}
@@ -560,11 +572,7 @@ func (v *visitor) emitCustomCollectGroupProjection(_ *Loop, kvValReg vm.Operand,
return selector.Identifier().GetText()
}
func (v *visitor) emitCollectCountProjection(_ *Loop, kvValReg vm.Operand, selector fql.ICollectCounterContext) string {
//selectorReg := selector.Expression().Accept(v).(vm.Operand)
//v.emitter.EmitAB(vm.OpMove, kvValReg, selectorReg)
//v.registers.Free(selectorReg)
func (v *visitor) emitCollectCountProjection(_ *Loop, _ vm.Operand, selector fql.ICollectCounterContext) string {
return selector.Identifier().GetText()
}
@@ -609,7 +617,7 @@ func (v *visitor) VisitSortClause(ctx *fql.SortClauseContext) interface{} {
}
arrReg := v.registers.Allocate(Temp)
v.emitter.EmitAs(vm.OpLoadList, arrReg, keyRegs)
v.emitter.EmitAs(vm.OpList, arrReg, keyRegs)
v.emitter.EmitAB(vm.OpMove, kvKeyReg, arrReg) // TODO: Free registers
} else {
clausesReg := clauses[0].Accept(v).(vm.Operand)
@@ -624,23 +632,11 @@ func (v *visitor) VisitSortClause(ctx *fql.SortClauseContext) interface{} {
} else {
// If so, we need to load it from the iterator
kvValReg = v.registers.Allocate(Temp)
if loop.Kind == ForLoop {
v.emitter.EmitAB(vm.OpIterValue, kvValReg, loop.Iterator)
} else {
v.emitter.EmitAB(vm.OpWhileLoopValue, kvValReg, loop.Iterator)
}
v.emitIterValue(loop, kvValReg)
}
v.emitter.EmitABC(vm.OpPushKV, loop.Result, kvKeyReg, kvValReg)
v.emitter.EmitJump(vm.OpJump, loop.Jump-loop.JumpOffset)
v.emitter.EmitA(vm.OpClose, loop.Iterator)
if loop.Kind == ForLoop {
v.emitter.PatchJump(loop.Jump)
} else {
v.emitter.PatchJumpAB(loop.Jump)
}
v.emitIterJumpOrClose(loop)
if isSortMany {
v.emitter.EmitAs(vm.OpSortMany, loop.Result, directionRegs)
@@ -788,7 +784,7 @@ func (v *visitor) VisitRangeOperator(ctx *fql.RangeOperatorContext) interface{}
start := ctx.GetLeft().Accept(v).(vm.Operand)
end := ctx.GetRight().Accept(v).(vm.Operand)
v.emitter.EmitABC(vm.OpLoadRange, dst, start, end)
v.emitter.EmitABC(vm.OpRange, dst, start, end)
return dst
}
@@ -889,7 +885,7 @@ func (v *visitor) VisitArrayLiteral(ctx *fql.ArrayLiteralContext) interface{} {
}
// Initialize an array
v.emitter.EmitAs(vm.OpLoadList, destReg, seq)
v.emitter.EmitAs(vm.OpList, destReg, seq)
// Free seq registers
//v.registers.FreeSequence(seq)
@@ -899,7 +895,7 @@ func (v *visitor) VisitArrayLiteral(ctx *fql.ArrayLiteralContext) interface{} {
}
// Empty array
v.emitter.EmitA(vm.OpLoadList, destReg)
v.emitter.EmitA(vm.OpList, destReg)
return destReg
}
@@ -910,7 +906,7 @@ func (v *visitor) VisitObjectLiteral(ctx *fql.ObjectLiteralContext) interface{}
size := len(assignments)
if size == 0 {
v.emitter.EmitA(vm.OpLoadMap, dst)
v.emitter.EmitA(vm.OpMap, dst)
return dst
}
@@ -944,7 +940,7 @@ func (v *visitor) VisitObjectLiteral(ctx *fql.ObjectLiteralContext) interface{}
}
}
v.emitter.EmitAs(vm.OpLoadMap, dst, seq)
v.emitter.EmitAs(vm.OpMap, dst, seq)
return dst
}
@@ -1448,9 +1444,11 @@ func (v *visitor) functionName(ctx *fql.FunctionCallContext) runtime.String {
return runtime.NewString(strings.ToUpper(name))
}
// emitIterValue emits an instruction to get the value from the iterator
func (v *visitor) emitLoopBegin(loop *Loop) {
if loop.Allocate {
v.emitter.EmitAb(vm.OpLoadDataSet, loop.Result, loop.Distinct)
v.emitter.EmitAb(vm.OpDataSet, loop.Result, loop.Distinct)
loop.ResultPos = v.emitter.Size() - 1
}
loop.Iterator = v.registers.Allocate(State)
@@ -1473,6 +1471,40 @@ func (v *visitor) emitLoopBegin(loop *Loop) {
}
}
// emitIterValue emits an instruction to get the value from the iterator
func (v *visitor) emitIterValue(loop *Loop, reg vm.Operand) {
v.emitter.EmitAB(vm.OpIterValue, reg, loop.Iterator)
}
// emitIterKey emits an instruction to get the key from the iterator
func (v *visitor) emitIterKey(loop *Loop, reg vm.Operand) {
v.emitter.EmitAB(vm.OpIterKey, reg, loop.Iterator)
}
// emitIterJumpOrClose emits an instruction to jump to the end of the loop or close the iterator
func (v *visitor) emitIterJumpOrClose(loop *Loop) {
v.emitter.EmitJump(vm.OpJump, loop.Jump-loop.JumpOffset)
v.emitter.EmitA(vm.OpClose, loop.Iterator)
if loop.Kind == ForLoop {
v.emitter.PatchJump(loop.Jump)
} else {
v.emitter.PatchJumpAB(loop.Jump)
}
}
// patchLoop replaces the source of the loop with a modified dataset
func (v *visitor) patchLoop(loop *Loop) {
// Replace source with sorted array
v.emitter.EmitAB(vm.OpMove, loop.Src, loop.Result)
v.symbols.ExitScope()
v.symbols.EnterScope()
// Create new for loop
v.emitLoopBegin(loop)
}
func (v *visitor) emitLoopEnd(loop *Loop) vm.Operand {
v.emitter.EmitJump(vm.OpJump, loop.Jump-loop.JumpOffset)

View File

@@ -4,9 +4,10 @@ package fql
import (
"fmt"
"github.com/antlr4-go/antlr/v4"
"sync"
"unicode"
"github.com/antlr4-go/antlr/v4"
)
// Suppress unused import error

View File

@@ -1,3 +1,62 @@
package internal
type Collector interface{}
import (
"context"
"github.com/MontFerret/ferret/pkg/runtime"
)
type (
CollectorType int
Collector interface {
runtime.Value
runtime.Iterable
Collect(ctx context.Context, key, value runtime.Value) error
}
BaseCollector struct{}
)
const (
CollectorTypeCounter CollectorType = iota
CollectorTypeKey
CollectorTypeKeyCounter
CollectorTypeKeyGroup
)
func NewCollector(typ CollectorType) Collector {
switch typ {
case CollectorTypeCounter:
return NewCounterCollector()
case CollectorTypeKey:
return NewKeyCollector()
case CollectorTypeKeyCounter:
return NewKeyCounterCollector()
case CollectorTypeKeyGroup:
return NewKeyGroupCollector()
default:
panic("unknown collector type")
}
}
func (*BaseCollector) MarshalJSON() ([]byte, error) {
panic("not supported")
}
func (*BaseCollector) String() string {
return "[Collector]"
}
func (*BaseCollector) Unwrap() interface{} {
panic("not supported")
}
func (*BaseCollector) Hash() uint64 {
panic("not supported")
}
func (*BaseCollector) Copy() runtime.Value {
panic("not supported")
}

View File

@@ -0,0 +1,30 @@
package internal
import (
"context"
"github.com/MontFerret/ferret/pkg/runtime"
)
type CounterCollector struct {
*BaseCollector
counter runtime.Int
}
func NewCounterCollector() Collector {
return &CounterCollector{
BaseCollector: &BaseCollector{},
counter: 0,
}
}
func (c *CounterCollector) Iterate(ctx context.Context) (runtime.Iterator, error) {
return runtime.NewArrayWith(c.counter).Iterate(ctx)
}
func (c *CounterCollector) Collect(ctx context.Context, key, value runtime.Value) error {
c.counter++
return nil
}

View File

@@ -0,0 +1,52 @@
package internal
import (
"context"
"github.com/MontFerret/ferret/pkg/runtime"
)
type KeyCollector struct {
*BaseCollector
values runtime.List
grouping map[string]runtime.Value
sorted bool
}
func NewKeyCollector() Collector {
return &KeyCollector{
BaseCollector: &BaseCollector{},
values: runtime.NewArray(16),
grouping: make(map[string]runtime.Value),
}
}
func (c *KeyCollector) Iterate(ctx context.Context) (runtime.Iterator, error) {
if !c.sorted {
if err := runtime.SortAsc(ctx, c.values); err != nil {
return nil, err
}
c.sorted = true
}
return c.values.Iterate(ctx)
}
func (c *KeyCollector) Collect(ctx context.Context, key, _ runtime.Value) error {
k, err := Stringify(ctx, key)
if err != nil {
return err
}
_, exists := c.grouping[k]
if !exists {
c.grouping[k] = runtime.None
return c.values.Add(ctx, key)
}
return nil
}

View File

@@ -0,0 +1,103 @@
package internal
import (
"context"
"github.com/MontFerret/ferret/pkg/runtime"
)
type KeyCounterCollector struct {
*BaseCollector
values runtime.List
grouping map[string]runtime.Int
sorted bool
}
func NewKeyCounterCollector() Collector {
return &KeyCounterCollector{
BaseCollector: &BaseCollector{},
values: runtime.NewArray(8),
grouping: make(map[string]runtime.Int),
}
}
func (c *KeyCounterCollector) Iterate(ctx context.Context) (runtime.Iterator, error) {
if !c.sorted {
if err := c.sort(ctx); err != nil {
return nil, err
}
c.sorted = true
}
iter, err := c.values.Iterate(ctx)
if err != nil {
return nil, err
}
return NewKVIterator(iter), nil
}
func (c *KeyCounterCollector) sort(ctx context.Context) error {
return runtime.SortListWith(ctx, c.values, func(first, second runtime.Value) int64 {
firstKV, firstOk := first.(*KV)
secondKV, secondOk := second.(*KV)
var comp int64
if firstOk && secondOk {
comp = runtime.CompareValues(firstKV.Key, secondKV.Key)
} else {
comp = runtime.CompareValues(first, second)
}
return comp
})
}
func (c *KeyCounterCollector) Collect(ctx context.Context, key, _ runtime.Value) error {
k, err := Stringify(ctx, key)
if err != nil {
return err
}
idx, exists := c.grouping[k]
var kv *KV
if !exists {
size, err := c.values.Length(ctx)
if err != nil {
return err
}
idx = size
kv = NewKV(key, runtime.ZeroInt)
if err := c.values.Add(ctx, kv); err != nil {
return err
}
c.grouping[k] = idx
} else {
value, err := c.values.Get(ctx, idx)
if err != nil {
return err
}
kv = value.(*KV)
}
if count, ok := kv.Value.(runtime.Int); ok {
sum := count + 1
kv.Value = sum
} else {
kv.Value = runtime.NewInt(1)
}
return nil
}

View File

@@ -0,0 +1,81 @@
package internal
import (
"context"
"github.com/MontFerret/ferret/pkg/runtime"
)
type KeyGroupCollector struct {
*BaseCollector
values runtime.List
grouping map[string]runtime.List
sorted bool
}
func NewKeyGroupCollector() Collector {
return &KeyGroupCollector{
BaseCollector: &BaseCollector{},
values: runtime.NewArray(8),
grouping: make(map[string]runtime.List),
}
}
func (c *KeyGroupCollector) Iterate(ctx context.Context) (runtime.Iterator, error) {
if !c.sorted {
if err := c.sort(ctx); err != nil {
return nil, err
}
c.sorted = true
}
iter, err := c.values.Iterate(ctx)
if err != nil {
return nil, err
}
return NewKVIterator(iter), nil
}
func (c *KeyGroupCollector) sort(ctx context.Context) error {
return runtime.SortListWith(ctx, c.values, func(first, second runtime.Value) int64 {
firstKV, firstOk := first.(*KV)
secondKV, secondOk := second.(*KV)
var comp int64
if firstOk && secondOk {
comp = runtime.CompareValues(firstKV.Key, secondKV.Key)
} else {
comp = runtime.CompareValues(first, second)
}
return comp
})
}
func (c *KeyGroupCollector) Collect(ctx context.Context, key, value runtime.Value) error {
k, err := Stringify(ctx, key)
if err != nil {
return err
}
group, exists := c.grouping[k]
if !exists {
group = runtime.NewArray(4)
c.grouping[k] = group
err = c.values.Add(ctx, NewKV(key, group))
if err != nil {
return err
}
}
return group.Add(ctx, value)
}

View File

@@ -12,16 +12,12 @@ const (
OpLoadGlobal // Load a global variable to a register A
OpStoreGlobal // Store a value from register A to a global variable
OpLoadParam // Load a parameter to a register A
OpLoadList // Load an array from a list of registers (ARR R2, R3 R5 - creates an array in R2 with elements from R3 to R5)
OpLoadMap // Load an object from a list of registers (OBJ R2, R3 R5 - creates an object in R2 with elements from R3 to R5)
OpLoadRange // Load a range from a list of registers (RNG R2, R3, R4 - creates a range in R2 with start from R3 and end at R4)
OpLoadIndex // Load a value from a list to a register (INDEX R1, R2, R3 - loads a value from a list in R2 to R1)
OpLoadIndexOptional // Load a value from a list to a register, if it exists
OpLoadKey // Load a value from a map to a register (KEY R1, R2, R3 - loads a value from a map in R2 to R1)
OpLoadKeyOptional // Load a value from a map to a register, if it exists
OpLoadProperty // Load a property (key or index) from an object (map or list) to a register
OpLoadPropertyOptional // Load a property (key or index) from an object (map or list) to a register, if it exists
OpLoadDataSet // Load a dataset to a register A
OpJump
OpJumpIfFalse
@@ -56,6 +52,11 @@ const (
OpRegexpPositive
OpRegexpNegative
OpList // Load an array from a list of registers (ARR R2, R3 R5 - creates an array in R2 with elements from R3 to R5)
OpMap // Load an object from a list of registers (OBJ R2, R3 R5 - creates an object in R2 with elements from R3 to R5)
OpRange // Load a range from a list of registers (RNG R2, R3, R4 - creates a range in R2 with start from R3 and end at R4)
OpDataSet // Load a dataset to a register A
OpLength
OpType
OpClose
@@ -72,15 +73,14 @@ const (
OpIterValue // Returns the current value from the iterator (ITER R2, R3 - returns the current value from the iterator in R2 with a collection from R3)
OpIterKey // Returns the current key from the iterator (ITER R2, R3 - returns the current key from the iterator in R2 with a collection from R3)
OpWhileLoopPrep
OpWhileLoopNext
OpWhileLoopValue
OpPush // Adds a value to a dataset
OpPushKV // Adds a key-value pair to a dataset
OpCollector
OpCollectK // Adds a key to a group
OpCollectKc // Adds a key to a group and counts it
OpCollectKV // Adds a value to a group using key
OpLimit
OpSkip

View File

@@ -181,7 +181,7 @@ loop:
} else {
return nil, err
}
case OpLoadList:
case OpList:
var size int
if src1 > 0 {
@@ -198,7 +198,7 @@ loop:
}
reg[dst] = arr
case OpLoadMap:
case OpMap:
obj := runtime.NewObject()
var args int
@@ -351,7 +351,7 @@ loop:
}
}
}
case OpLoadRange:
case OpRange:
res, err := internal.ToRange(ctx, reg[src1], reg[src2])
if err == nil {
@@ -359,8 +359,10 @@ loop:
} else {
return nil, err
}
case OpLoadDataSet:
case OpDataSet:
reg[dst] = internal.NewDataSet(src1 == 1)
case OpCollector:
reg[dst] = internal.NewCollector(internal.CollectorType(src1))
case OpPush:
ds := reg[dst].(*internal.DataSet)
@@ -372,11 +374,18 @@ loop:
}
}
case OpPushKV:
ds := reg[dst].(*internal.DataSet)
key := reg[src1]
value := reg[src2]
var err error
if err := ds.AddKV(ctx, key, value); err != nil {
switch target := reg[dst].(type) {
case *internal.DataSet:
err = target.AddKV(ctx, reg[src1], reg[src2])
case internal.Collector:
err = target.Collect(ctx, reg[src1], reg[src2])
default:
return nil, runtime.TypeError(target, "vm.Collector")
}
if err != nil {
if _, catch := tryCatch(vm.pc); catch {
continue
}
@@ -458,18 +467,6 @@ loop:
case OpIterKey:
iterator := reg[src1].(*internal.Iterator)
reg[dst] = iterator.Key()
case OpWhileLoopPrep:
reg[dst] = runtime.Int(-1)
case OpWhileLoopNext:
cond := runtime.ToBoolean(reg[src1])
if cond {
reg[dst] = internal.Increment(ctx, reg[dst])
} else {
vm.pc = int(src2)
}
case OpWhileLoopValue:
reg[dst] = reg[src1]
case OpSkip:
state := runtime.ToIntSafe(ctx, reg[dst])
threshold := runtime.ToIntSafe(ctx, reg[src1])
@@ -492,11 +489,21 @@ loop:
vm.pc = jump
}
case OpSort:
// TODO: Handle more than just DataSet
ds := reg[dst].(*internal.DataSet)
var err error
dir := runtime.ToIntSafe(ctx, reg[src1])
if err := ds.Sort(ctx, dir); err != nil {
switch target := reg[dst].(type) {
case *internal.DataSet:
err = target.Sort(ctx, dir)
case runtime.Sortable:
if dir == internal.SortAsc {
err = target.SortAsc(ctx)
} else {
err = target.SortDesc(ctx)
}
}
if err != nil {
if _, catch := tryCatch(vm.pc); catch {
continue
} else {