Skip to content

Commit

Permalink
Address multi value aggregation keys for reporting
Browse files Browse the repository at this point in the history
  • Loading branch information
tjerman authored and Fajfa committed Aug 29, 2024
1 parent d157579 commit b43da9f
Show file tree
Hide file tree
Showing 9 changed files with 272 additions and 97 deletions.
86 changes: 46 additions & 40 deletions server/compose/service/record.go
Original file line number Diff line number Diff line change
@@ -1,35 +1,35 @@
package service

import (
"context"
"encoding/json"
"fmt"
"regexp"
"sort"
"strconv"
"strings"
"time"

"github.com/cortezaproject/corteza/server/pkg/envoyx"
"github.com/cortezaproject/corteza/server/pkg/filter"
"github.com/cortezaproject/corteza/server/pkg/revisions"
"github.com/spf13/cast"

"github.com/cortezaproject/corteza/server/pkg/dal"
"github.com/cortezaproject/corteza/server/pkg/locale"

"github.com/cortezaproject/corteza/server/compose/dalutils"
"github.com/cortezaproject/corteza/server/compose/service/event"
"github.com/cortezaproject/corteza/server/compose/service/values"
"github.com/cortezaproject/corteza/server/compose/types"
"github.com/cortezaproject/corteza/server/pkg/actionlog"
"github.com/cortezaproject/corteza/server/pkg/auth"
"github.com/cortezaproject/corteza/server/pkg/corredor"
"github.com/cortezaproject/corteza/server/pkg/envoy/resource"
"github.com/cortezaproject/corteza/server/pkg/errors"
"github.com/cortezaproject/corteza/server/pkg/eventbus"
"github.com/cortezaproject/corteza/server/store"
systemTypes "github.com/cortezaproject/corteza/server/system/types"
"context"
"encoding/json"
"fmt"
"regexp"
"sort"
"strconv"
"strings"
"time"

"github.com/cortezaproject/corteza/server/pkg/envoyx"
"github.com/cortezaproject/corteza/server/pkg/filter"
"github.com/cortezaproject/corteza/server/pkg/revisions"
"github.com/spf13/cast"

"github.com/cortezaproject/corteza/server/pkg/dal"
"github.com/cortezaproject/corteza/server/pkg/locale"

"github.com/cortezaproject/corteza/server/compose/dalutils"
"github.com/cortezaproject/corteza/server/compose/service/event"
"github.com/cortezaproject/corteza/server/compose/service/values"
"github.com/cortezaproject/corteza/server/compose/types"
"github.com/cortezaproject/corteza/server/pkg/actionlog"
"github.com/cortezaproject/corteza/server/pkg/auth"
"github.com/cortezaproject/corteza/server/pkg/corredor"
"github.com/cortezaproject/corteza/server/pkg/envoy/resource"
"github.com/cortezaproject/corteza/server/pkg/errors"
"github.com/cortezaproject/corteza/server/pkg/eventbus"
"github.com/cortezaproject/corteza/server/store"
systemTypes "github.com/cortezaproject/corteza/server/system/types"
)

const (
Expand Down Expand Up @@ -648,17 +648,17 @@ func (svc record) Bulk(ctx context.Context, skipFailed bool, oo ...*types.Record
rr[i].DuplicationError = dupErrors

if rve := types.IsRecordValueErrorSet(err); rve != nil {
if valueErrors == nil {
if valueErrors == nil {
valueErrors = &types.RecordValueErrorSet{}
}

// Attach additional meta to each value error for FE identification
for _, re := range rve.Set {
if re.Meta == nil {
continue
}
if re.Meta == nil {
continue
}

if p.ID != "" {
if p.ID != "" {
re.Meta["id"] = p.ID
}

Expand Down Expand Up @@ -2360,13 +2360,19 @@ func loadRecord(ctx context.Context, s store.Storer, namespaceID, moduleID, reco
func recordReportToDalPipeline(m *types.Module, metrics, dimensions, f string) (pp dal.Pipeline, _ *dal.Aggregate, err error) {
// Map dimension to the aggregate group
// @note we only ever used a single dimension so this is ok
dim := []dal.AggregateAttr{
{
Identifier: "dimension_0",
RawExpr: dimensions,
Key: true,
},
auxDim := dal.AggregateAttr{
Identifier: "dimension_0",
RawExpr: dimensions,
Key: true,
}

ff := m.Fields.FindByName(dimensions)
if ff != nil {
auxDim.MultiValue = ff.Multi
auxDim.Label = ff.Label
}

dim := []dal.AggregateAttr{auxDim}
oo := filter.SortExprSet{{Column: dim[0].Identifier}}

// Map metrics to the aggregate attrs
Expand Down
3 changes: 2 additions & 1 deletion server/pkg/dal/def_aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ type (

// AggregateAttr is a simple wrapper to outline aggregated attribute definitions
AggregateAttr struct {
Key bool
MultiValue bool
Key bool

// @todo change; temporary for compose service
RawExpr string
Expand Down
47 changes: 27 additions & 20 deletions server/store/adapters/rdbms/dal/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ func (d *model) Search(f filter.Filter) (i *iterator, err error) {
limit: f.Limit(),
}

i.query = d.searchSql(f)
i.query = d.applyFiltersToQuery(d.selectSql(), f)
if err = i.query.Error(); err != nil {
return
}
Expand Down Expand Up @@ -360,16 +360,14 @@ func (d *model) Lookup(ctx context.Context, pkv dal.ValueGetter, r dal.ValueSett
return rows.Close()
}

// constructs SQL for selecting records from a table,
// converting parts of record filter into conditions
// converting parts of record filter into conditions for selecting records from a table
//
// Does not add any limits, sorting or any cursor conditions!
func (d *model) searchSql(f filter.Filter) *goqu.SelectDataset {
func (d *model) applyFiltersToQuery(base *goqu.SelectDataset, f filter.Filter) *goqu.SelectDataset {
var (
err error
base = d.selectSql()
tmp exp.Expression
cnd []exp.Expression
err error
tmp exp.Expression
cnd []exp.Expression
)

{
Expand Down Expand Up @@ -515,10 +513,10 @@ func (d *model) searchSql(f filter.Filter) *goqu.SelectDataset {
return base.Where(cnd...)
}

func (d *model) aggregateSql(f filter.Filter, groupBy []dal.AggregateAttr, out []dal.AggregateAttr, having *ql.ASTNode) (q *goqu.SelectDataset) {
// get SELECT query based on
// the given filter
q = d.searchSql(f)
func (m *model) aggregateSql(f filter.Filter, groupBy []dal.AggregateAttr, out []dal.AggregateAttr, having *ql.ASTNode) (q *goqu.SelectDataset) {
// Get the base query and apply some filters
q = m.dialect.AggregateBase(m.table, groupBy, out)
q = m.applyFiltersToQuery(q, f)

var (
err error
Expand All @@ -537,18 +535,27 @@ func (d *model) aggregateSql(f filter.Filter, groupBy []dal.AggregateAttr, out [
switch {
case len(c.RawExpr) > 0:
// @todo could probably be removed since RawExpr is only a temporary solution?
return d.parseQuery(c.RawExpr)
return m.parseQuery(c.RawExpr)
case c.Expression != nil:
return d.convertQuery(c.Expression)
return m.convertQuery(c.Expression)
}

return d.table.AttributeExpression(c.Identifier)
return m.table.AttributeExpression(c.Identifier)
}
)

nc := m.dialect.Nuances()
for i, c := range groupBy {
if expr, err = field(c); err != nil {
return q.SetError(err)
if c.MultiValue {
if nc.ExpandedJsonColumnSelector != nil {
expr = nc.ExpandedJsonColumnSelector(c.RawExpr)
} else {
expr = goqu.C(c.RawExpr)
}
} else {
if expr, err = field(c); err != nil {
return q.SetError(err)
}
}

alias = c.Identifier
Expand Down Expand Up @@ -587,7 +594,7 @@ func (d *model) aggregateSql(f filter.Filter, groupBy []dal.AggregateAttr, out [

if having != nil {
var (
symHandler = d.qlConverterGenericSymHandler()
symHandler = m.qlConverterGenericSymHandler()

converter = ql.Converter(
// we need a more specialized symbol handler for the HAVING clause
Expand All @@ -596,7 +603,7 @@ func (d *model) aggregateSql(f filter.Filter, groupBy []dal.AggregateAttr, out [
sym := dal.NormalizeAttrNames(node.Symbol)

if a2expr[sym] != nil {
if d.dialect.Nuances().HavingClauseMustUseAlias {
if m.dialect.Nuances().HavingClauseMustUseAlias {
// is aliased expression?
return a2expr[sym], nil
} else {
Expand All @@ -607,7 +614,7 @@ func (d *model) aggregateSql(f filter.Filter, groupBy []dal.AggregateAttr, out [
// if not, use the default handler
return symHandler(node)
}),
ql.RefHandler(d.qlConverterGenericRefHandler()),
ql.RefHandler(m.qlConverterGenericRefHandler()),
)
)

Expand Down
9 changes: 7 additions & 2 deletions server/store/adapters/rdbms/drivers/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ type (
// TwoStepUpsert uses the context from the update statement to figure out
// if it needs to do an insert.
TwoStepUpsert bool

// @todo change this around; temporary fix as not sure how I'd rewrite
ExpandedJsonColumnSelector func(ident string) exp.Expression
}

Dialect interface {
Expand Down Expand Up @@ -63,16 +66,18 @@ type (
// comparison or soring expression
AttributeCast(*dal.Attribute, exp.Expression) (exp.Expression, error)

AttributeExpression(attr *dal.Attribute, modelIdent string, ident string) (expr exp.Expression, err error)
AttributeExpression(attr *dal.Attribute, modelIdent string, ident string) (expr exp.Expression, err error)

// TableCodec returns table codec (encodes & decodes data to/from db table)
// TableCodec returns table codec (encodes & decodes data to/from db table)
TableCodec(*dal.Model) TableCodec

// TypeWrap returns driver's type implementation for a particular attribute type
TypeWrap(dal.Type) Type

QuoteIdent(string) string

AggregateBase(t TableCodec, gb []dal.AggregateAttr, out []dal.AggregateAttr) *goqu.SelectDataset

// AttributeToColumn converts attribute to column defunition
AttributeToColumn(*dal.Attribute) (*ddl.Column, error)
ColumnFits(base, assert *ddl.Column) bool
Expand Down
8 changes: 7 additions & 1 deletion server/store/adapters/rdbms/drivers/mssql/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ func (mssqlDialect) Nuances() drivers.Nuances {
return nuances
}

func (d mssqlDialect) AggregateBase(t drivers.TableCodec, groupBy []dal.AggregateAttr, out []dal.AggregateAttr) (slct *goqu.SelectDataset) {
// @todo as is, aggregation is not offloaded to mssql
panic("not implemented")
return nil
}

func (mssqlDialect) GOQU() goqu.DialectWrapper { return goquDialectWrapper }
func (mssqlDialect) DialectOptions() *sqlgen.SQLDialectOptions { return goquDialectOptions }
func (mssqlDialect) QuoteIdent(i string) string { return quoteIdent + i + quoteIdent }
Expand Down Expand Up @@ -163,7 +169,7 @@ func (mssqlDialect) AttributeCast(attr *dal.Attribute, val exp.Expression) (expr
}

func (mssqlDialect) AttributeExpression(attr *dal.Attribute, modelIdent string, ident string) (expr exp.Expression, err error) {
return exp.NewLiteralExpression("?", exp.NewIdentifierExpression("", modelIdent, ident)), nil
return exp.NewLiteralExpression("?", exp.NewIdentifierExpression("", modelIdent, ident)), nil
}

func (mssqlDialect) AttributeToColumn(attr *dal.Attribute) (col *ddl.Column, err error) {
Expand Down
47 changes: 45 additions & 2 deletions server/store/adapters/rdbms/drivers/mysql/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,50 @@ func (mysqlDialect) Nuances() drivers.Nuances {
return nuances
}

func (d mysqlDialect) AggregateBase(t drivers.TableCodec, groupBy []dal.AggregateAttr, out []dal.AggregateAttr) (slct *goqu.SelectDataset) {
var (
cols = t.Columns()

// working around a bug inside goqu lib that adds
// * to the list of columns to be selected
// even if we clear the columns first
q = d.GOQU().
From(t.Ident())
)

for _, g := range groupBy {
// Special handling for multi value fields
if g.MultiValue {
// Only straight up columns can be multi value so we can freely use RawExpr
colName := g.RawExpr
ax, err := t.AttributeExpressionQuoted(colName)
if err != nil {
q = q.SetError(err)
return q
}

q = q.From(
t.Ident(),
goqu.Func("JSON_TABLE",
ax,
exp.NewLiteralExpression(fmt.Sprintf(`'$[*]' COLUMNS (%s TEXT PATH '$')`, colName)),
).As(colName),
)
}
}

if len(cols) == 0 {
return q.SetError(fmt.Errorf("can not create SELECT without columns"))
}

q = q.Select(t.Ident().Col(cols[0].Name()))
for _, col := range cols[1:] {
q = q.SelectAppend(t.Ident().Col(col.Name()))
}

return q
}

func (mysqlDialect) GOQU() goqu.DialectWrapper { return goquDialectWrapper }
func (mysqlDialect) DialectOptions() *sqlgen.SQLDialectOptions { return goquDialectOptions }
func (mysqlDialect) QuoteIdent(i string) string { return quoteIdent + i + quoteIdent }
Expand All @@ -62,7 +106,6 @@ func (d mysqlDialect) JsonExtract(jsonDoc exp.Expression, pp ...any) (path exp.E
if path, err = jsonPathExpr(pp...); err != nil {
return
} else {
path = exp.NewCastExpression(path, "CHAR")
return exp.NewSQLFunctionExpression("JSON_EXTRACT", jsonDoc, path), nil
}
}
Expand Down Expand Up @@ -163,7 +206,7 @@ func (mysqlDialect) AttributeCast(attr *dal.Attribute, val exp.Expression) (exp.
}

func (mysqlDialect) AttributeExpression(attr *dal.Attribute, modelIdent string, ident string) (expr exp.Expression, err error) {
return exp.NewLiteralExpression("?", exp.NewIdentifierExpression("", modelIdent, ident)), nil
return exp.NewLiteralExpression("?", exp.NewIdentifierExpression("", modelIdent, ident)), nil
}

func (mysqlDialect) AttributeToColumn(attr *dal.Attribute) (col *ddl.Column, err error) {
Expand Down
Loading

0 comments on commit b43da9f

Please sign in to comment.