diff --git a/server/compose/service/record.go b/server/compose/service/record.go index eff5684c14..76cb69a52e 100644 --- a/server/compose/service/record.go +++ b/server/compose/service/record.go @@ -1,35 +1,35 @@ package service import ( - "context" - "encoding/json" - "fmt" - "regexp" - "sort" - "strconv" - "strings" - "time" - - "github.com/cortezaproject/corteza/server/pkg/envoyx" - "github.com/cortezaproject/corteza/server/pkg/filter" - "github.com/cortezaproject/corteza/server/pkg/revisions" - "github.com/spf13/cast" - - "github.com/cortezaproject/corteza/server/pkg/dal" - "github.com/cortezaproject/corteza/server/pkg/locale" - - "github.com/cortezaproject/corteza/server/compose/dalutils" - "github.com/cortezaproject/corteza/server/compose/service/event" - "github.com/cortezaproject/corteza/server/compose/service/values" - "github.com/cortezaproject/corteza/server/compose/types" - "github.com/cortezaproject/corteza/server/pkg/actionlog" - "github.com/cortezaproject/corteza/server/pkg/auth" - "github.com/cortezaproject/corteza/server/pkg/corredor" - "github.com/cortezaproject/corteza/server/pkg/envoy/resource" - "github.com/cortezaproject/corteza/server/pkg/errors" - "github.com/cortezaproject/corteza/server/pkg/eventbus" - "github.com/cortezaproject/corteza/server/store" - systemTypes "github.com/cortezaproject/corteza/server/system/types" + "context" + "encoding/json" + "fmt" + "regexp" + "sort" + "strconv" + "strings" + "time" + + "github.com/cortezaproject/corteza/server/pkg/envoyx" + "github.com/cortezaproject/corteza/server/pkg/filter" + "github.com/cortezaproject/corteza/server/pkg/revisions" + "github.com/spf13/cast" + + "github.com/cortezaproject/corteza/server/pkg/dal" + "github.com/cortezaproject/corteza/server/pkg/locale" + + "github.com/cortezaproject/corteza/server/compose/dalutils" + "github.com/cortezaproject/corteza/server/compose/service/event" + "github.com/cortezaproject/corteza/server/compose/service/values" + "github.com/cortezaproject/corteza/server/compose/types" + "github.com/cortezaproject/corteza/server/pkg/actionlog" + "github.com/cortezaproject/corteza/server/pkg/auth" + "github.com/cortezaproject/corteza/server/pkg/corredor" + "github.com/cortezaproject/corteza/server/pkg/envoy/resource" + "github.com/cortezaproject/corteza/server/pkg/errors" + "github.com/cortezaproject/corteza/server/pkg/eventbus" + "github.com/cortezaproject/corteza/server/store" + systemTypes "github.com/cortezaproject/corteza/server/system/types" ) const ( @@ -648,17 +648,17 @@ func (svc record) Bulk(ctx context.Context, skipFailed bool, oo ...*types.Record rr[i].DuplicationError = dupErrors if rve := types.IsRecordValueErrorSet(err); rve != nil { - if valueErrors == nil { + if valueErrors == nil { valueErrors = &types.RecordValueErrorSet{} } // Attach additional meta to each value error for FE identification for _, re := range rve.Set { - if re.Meta == nil { - continue - } + if re.Meta == nil { + continue + } - if p.ID != "" { + if p.ID != "" { re.Meta["id"] = p.ID } @@ -2360,13 +2360,19 @@ func loadRecord(ctx context.Context, s store.Storer, namespaceID, moduleID, reco func recordReportToDalPipeline(m *types.Module, metrics, dimensions, f string) (pp dal.Pipeline, _ *dal.Aggregate, err error) { // Map dimension to the aggregate group // @note we only ever used a single dimension so this is ok - dim := []dal.AggregateAttr{ - { - Identifier: "dimension_0", - RawExpr: dimensions, - Key: true, - }, + auxDim := dal.AggregateAttr{ + Identifier: "dimension_0", + RawExpr: dimensions, + Key: true, } + + ff := m.Fields.FindByName(dimensions) + if ff != nil { + auxDim.MultiValue = ff.Multi + auxDim.Label = ff.Label + } + + dim := []dal.AggregateAttr{auxDim} oo := filter.SortExprSet{{Column: dim[0].Identifier}} // Map metrics to the aggregate attrs diff --git a/server/pkg/dal/def_aggregate.go b/server/pkg/dal/def_aggregate.go index 70d75c69e9..4a36776b92 100644 --- a/server/pkg/dal/def_aggregate.go +++ b/server/pkg/dal/def_aggregate.go @@ -44,7 +44,8 @@ type ( // AggregateAttr is a simple wrapper to outline aggregated attribute definitions AggregateAttr struct { - Key bool + MultiValue bool + Key bool // @todo change; temporary for compose service RawExpr string diff --git a/server/store/adapters/rdbms/dal/model.go b/server/store/adapters/rdbms/dal/model.go index 303ac1fec2..7eb46c5992 100644 --- a/server/store/adapters/rdbms/dal/model.go +++ b/server/store/adapters/rdbms/dal/model.go @@ -260,7 +260,7 @@ func (d *model) Search(f filter.Filter) (i *iterator, err error) { limit: f.Limit(), } - i.query = d.searchSql(f) + i.query = d.applyFiltersToQuery(d.selectSql(), f) if err = i.query.Error(); err != nil { return } @@ -360,16 +360,14 @@ func (d *model) Lookup(ctx context.Context, pkv dal.ValueGetter, r dal.ValueSett return rows.Close() } -// constructs SQL for selecting records from a table, -// converting parts of record filter into conditions +// converting parts of record filter into conditions for selecting records from a table // // Does not add any limits, sorting or any cursor conditions! -func (d *model) searchSql(f filter.Filter) *goqu.SelectDataset { +func (d *model) applyFiltersToQuery(base *goqu.SelectDataset, f filter.Filter) *goqu.SelectDataset { var ( - err error - base = d.selectSql() - tmp exp.Expression - cnd []exp.Expression + err error + tmp exp.Expression + cnd []exp.Expression ) { @@ -515,10 +513,10 @@ func (d *model) searchSql(f filter.Filter) *goqu.SelectDataset { return base.Where(cnd...) } -func (d *model) aggregateSql(f filter.Filter, groupBy []dal.AggregateAttr, out []dal.AggregateAttr, having *ql.ASTNode) (q *goqu.SelectDataset) { - // get SELECT query based on - // the given filter - q = d.searchSql(f) +func (m *model) aggregateSql(f filter.Filter, groupBy []dal.AggregateAttr, out []dal.AggregateAttr, having *ql.ASTNode) (q *goqu.SelectDataset) { + // Get the base query and apply some filters + q = m.dialect.AggregateBase(m.table, groupBy, out) + q = m.applyFiltersToQuery(q, f) var ( err error @@ -537,18 +535,27 @@ func (d *model) aggregateSql(f filter.Filter, groupBy []dal.AggregateAttr, out [ switch { case len(c.RawExpr) > 0: // @todo could probably be removed since RawExpr is only a temporary solution? - return d.parseQuery(c.RawExpr) + return m.parseQuery(c.RawExpr) case c.Expression != nil: - return d.convertQuery(c.Expression) + return m.convertQuery(c.Expression) } - return d.table.AttributeExpression(c.Identifier) + return m.table.AttributeExpression(c.Identifier) } ) + nc := m.dialect.Nuances() for i, c := range groupBy { - if expr, err = field(c); err != nil { - return q.SetError(err) + if c.MultiValue { + if nc.ExpandedJsonColumnSelector != nil { + expr = nc.ExpandedJsonColumnSelector(c.RawExpr) + } else { + expr = goqu.C(c.RawExpr) + } + } else { + if expr, err = field(c); err != nil { + return q.SetError(err) + } } alias = c.Identifier @@ -587,7 +594,7 @@ func (d *model) aggregateSql(f filter.Filter, groupBy []dal.AggregateAttr, out [ if having != nil { var ( - symHandler = d.qlConverterGenericSymHandler() + symHandler = m.qlConverterGenericSymHandler() converter = ql.Converter( // we need a more specialized symbol handler for the HAVING clause @@ -596,7 +603,7 @@ func (d *model) aggregateSql(f filter.Filter, groupBy []dal.AggregateAttr, out [ sym := dal.NormalizeAttrNames(node.Symbol) if a2expr[sym] != nil { - if d.dialect.Nuances().HavingClauseMustUseAlias { + if m.dialect.Nuances().HavingClauseMustUseAlias { // is aliased expression? return a2expr[sym], nil } else { @@ -607,7 +614,7 @@ func (d *model) aggregateSql(f filter.Filter, groupBy []dal.AggregateAttr, out [ // if not, use the default handler return symHandler(node) }), - ql.RefHandler(d.qlConverterGenericRefHandler()), + ql.RefHandler(m.qlConverterGenericRefHandler()), ) ) diff --git a/server/store/adapters/rdbms/drivers/dialect.go b/server/store/adapters/rdbms/drivers/dialect.go index 0436f4f720..c53f2e732a 100644 --- a/server/store/adapters/rdbms/drivers/dialect.go +++ b/server/store/adapters/rdbms/drivers/dialect.go @@ -26,6 +26,9 @@ type ( // TwoStepUpsert uses the context from the update statement to figure out // if it needs to do an insert. TwoStepUpsert bool + + // @todo change this around; temporary fix as not sure how I'd rewrite + ExpandedJsonColumnSelector func(ident string) exp.Expression } Dialect interface { @@ -63,9 +66,9 @@ type ( // comparison or soring expression AttributeCast(*dal.Attribute, exp.Expression) (exp.Expression, error) - AttributeExpression(attr *dal.Attribute, modelIdent string, ident string) (expr exp.Expression, err error) + AttributeExpression(attr *dal.Attribute, modelIdent string, ident string) (expr exp.Expression, err error) - // TableCodec returns table codec (encodes & decodes data to/from db table) + // TableCodec returns table codec (encodes & decodes data to/from db table) TableCodec(*dal.Model) TableCodec // TypeWrap returns driver's type implementation for a particular attribute type @@ -73,6 +76,8 @@ type ( QuoteIdent(string) string + AggregateBase(t TableCodec, gb []dal.AggregateAttr, out []dal.AggregateAttr) *goqu.SelectDataset + // AttributeToColumn converts attribute to column defunition AttributeToColumn(*dal.Attribute) (*ddl.Column, error) ColumnFits(base, assert *ddl.Column) bool diff --git a/server/store/adapters/rdbms/drivers/mssql/dialect.go b/server/store/adapters/rdbms/drivers/mssql/dialect.go index 2d24db34a0..529b20f359 100644 --- a/server/store/adapters/rdbms/drivers/mssql/dialect.go +++ b/server/store/adapters/rdbms/drivers/mssql/dialect.go @@ -63,6 +63,12 @@ func (mssqlDialect) Nuances() drivers.Nuances { return nuances } +func (d mssqlDialect) AggregateBase(t drivers.TableCodec, groupBy []dal.AggregateAttr, out []dal.AggregateAttr) (slct *goqu.SelectDataset) { + // @todo as is, aggregation is not offloaded to mssql + panic("not implemented") + return nil +} + func (mssqlDialect) GOQU() goqu.DialectWrapper { return goquDialectWrapper } func (mssqlDialect) DialectOptions() *sqlgen.SQLDialectOptions { return goquDialectOptions } func (mssqlDialect) QuoteIdent(i string) string { return quoteIdent + i + quoteIdent } @@ -163,7 +169,7 @@ func (mssqlDialect) AttributeCast(attr *dal.Attribute, val exp.Expression) (expr } func (mssqlDialect) AttributeExpression(attr *dal.Attribute, modelIdent string, ident string) (expr exp.Expression, err error) { - return exp.NewLiteralExpression("?", exp.NewIdentifierExpression("", modelIdent, ident)), nil + return exp.NewLiteralExpression("?", exp.NewIdentifierExpression("", modelIdent, ident)), nil } func (mssqlDialect) AttributeToColumn(attr *dal.Attribute) (col *ddl.Column, err error) { diff --git a/server/store/adapters/rdbms/drivers/mysql/dialect.go b/server/store/adapters/rdbms/drivers/mysql/dialect.go index 28b995b3e9..f36ac02bf5 100644 --- a/server/store/adapters/rdbms/drivers/mysql/dialect.go +++ b/server/store/adapters/rdbms/drivers/mysql/dialect.go @@ -42,6 +42,50 @@ func (mysqlDialect) Nuances() drivers.Nuances { return nuances } +func (d mysqlDialect) AggregateBase(t drivers.TableCodec, groupBy []dal.AggregateAttr, out []dal.AggregateAttr) (slct *goqu.SelectDataset) { + var ( + cols = t.Columns() + + // working around a bug inside goqu lib that adds + // * to the list of columns to be selected + // even if we clear the columns first + q = d.GOQU(). + From(t.Ident()) + ) + + for _, g := range groupBy { + // Special handling for multi value fields + if g.MultiValue { + // Only straight up columns can be multi value so we can freely use RawExpr + colName := g.RawExpr + ax, err := t.AttributeExpressionQuoted(colName) + if err != nil { + q = q.SetError(err) + return q + } + + q = q.From( + t.Ident(), + goqu.Func("JSON_TABLE", + ax, + exp.NewLiteralExpression(fmt.Sprintf(`'$[*]' COLUMNS (%s TEXT PATH '$')`, colName)), + ).As(colName), + ) + } + } + + if len(cols) == 0 { + return q.SetError(fmt.Errorf("can not create SELECT without columns")) + } + + q = q.Select(t.Ident().Col(cols[0].Name())) + for _, col := range cols[1:] { + q = q.SelectAppend(t.Ident().Col(col.Name())) + } + + return q +} + func (mysqlDialect) GOQU() goqu.DialectWrapper { return goquDialectWrapper } func (mysqlDialect) DialectOptions() *sqlgen.SQLDialectOptions { return goquDialectOptions } func (mysqlDialect) QuoteIdent(i string) string { return quoteIdent + i + quoteIdent } @@ -62,7 +106,6 @@ func (d mysqlDialect) JsonExtract(jsonDoc exp.Expression, pp ...any) (path exp.E if path, err = jsonPathExpr(pp...); err != nil { return } else { - path = exp.NewCastExpression(path, "CHAR") return exp.NewSQLFunctionExpression("JSON_EXTRACT", jsonDoc, path), nil } } @@ -163,7 +206,7 @@ func (mysqlDialect) AttributeCast(attr *dal.Attribute, val exp.Expression) (exp. } func (mysqlDialect) AttributeExpression(attr *dal.Attribute, modelIdent string, ident string) (expr exp.Expression, err error) { - return exp.NewLiteralExpression("?", exp.NewIdentifierExpression("", modelIdent, ident)), nil + return exp.NewLiteralExpression("?", exp.NewIdentifierExpression("", modelIdent, ident)), nil } func (mysqlDialect) AttributeToColumn(attr *dal.Attribute) (col *ddl.Column, err error) { diff --git a/server/store/adapters/rdbms/drivers/postgres/dialect.go b/server/store/adapters/rdbms/drivers/postgres/dialect.go index 1ff760dbea..176e40e81b 100644 --- a/server/store/adapters/rdbms/drivers/postgres/dialect.go +++ b/server/store/adapters/rdbms/drivers/postgres/dialect.go @@ -1,19 +1,19 @@ package postgres import ( - "fmt" - "strings" - - "github.com/cortezaproject/corteza/server/pkg/dal" - "github.com/cortezaproject/corteza/server/pkg/expr" - "github.com/cortezaproject/corteza/server/store/adapters/rdbms/ddl" - "github.com/cortezaproject/corteza/server/store/adapters/rdbms/drivers" - "github.com/cortezaproject/corteza/server/store/adapters/rdbms/ql" - "github.com/doug-martin/goqu/v9" - "github.com/doug-martin/goqu/v9/dialect/postgres" - "github.com/doug-martin/goqu/v9/exp" - "github.com/doug-martin/goqu/v9/sqlgen" - "github.com/spf13/cast" + "fmt" + "strings" + + "github.com/cortezaproject/corteza/server/pkg/dal" + "github.com/cortezaproject/corteza/server/pkg/expr" + "github.com/cortezaproject/corteza/server/store/adapters/rdbms/ddl" + "github.com/cortezaproject/corteza/server/store/adapters/rdbms/drivers" + "github.com/cortezaproject/corteza/server/store/adapters/rdbms/ql" + "github.com/doug-martin/goqu/v9" + "github.com/doug-martin/goqu/v9/dialect/postgres" + "github.com/doug-martin/goqu/v9/exp" + "github.com/doug-martin/goqu/v9/sqlgen" + "github.com/spf13/cast" ) type ( @@ -61,6 +61,49 @@ func (d postgresDialect) JsonExtractUnquote(ident exp.Expression, pp ...any) (ex return DeepIdentJSON(false, ident, pp...), nil } +func (d postgresDialect) AggregateBase(t drivers.TableCodec, groupBy []dal.AggregateAttr, out []dal.AggregateAttr) (slct *goqu.SelectDataset) { + var ( + cols = t.Columns() + + // working around a bug inside goqu lib that adds + // * to the list of columns to be selected + // even if we clear the columns first + q = d.GOQU(). + From(t.Ident()) + ) + + // When dealing with multi values, we need to make a cross join with every element in the array + // to achieve the same functionality as we had before + for _, g := range groupBy { + // Special handling for multi value fields + if g.MultiValue { + // Only straight up columns can be multi value so we can freely use RawExpr + colName := g.RawExpr + xpr, err := t.AttributeExpressionQuoted(colName) + if err != nil { + q = q.SetError(err) + return q + } + + q = q.From( + t.Ident(), + goqu.Func("JSONB_ARRAY_ELEMENTS_TEXT", xpr).As(colName), + ) + } + } + + if len(cols) == 0 { + return q.SetError(fmt.Errorf("can not create SELECT without columns")) + } + + q = q.Select(t.Ident().Col(cols[0].Name())) + for _, col := range cols[1:] { + q = q.SelectAppend(t.Ident().Col(col.Name())) + } + + return q +} + // JsonArrayContains prepares postgresql compatible comparison of value and JSON array // // literal value = multi-value field / plain @@ -108,15 +151,15 @@ func (postgresDialect) AttributeCast(attr *dal.Attribute, val exp.Expression) (e } func (postgresDialect) AttributeExpression(attr *dal.Attribute, modelIdent string, ident string) (expr exp.Expression, err error) { - identExpr := exp.NewIdentifierExpression("", modelIdent, ident) + identExpr := exp.NewIdentifierExpression("", modelIdent, ident) - // truncate timestamp data type to second mark precision - if attr.Type.Type() == dal.AttributeTypeTimestamp { - return exp.NewLiteralExpression("date_trunc(?, ?)", "second", identExpr), nil - } + // truncate timestamp data type to second mark precision + if attr.Type.Type() == dal.AttributeTypeTimestamp { + return exp.NewLiteralExpression("date_trunc(?, ?)", "second", identExpr), nil + } - // using column directly - return exp.NewLiteralExpression("?", identExpr), nil + // using column directly + return exp.NewLiteralExpression("?", identExpr), nil } func (postgresDialect) AttributeToColumn(attr *dal.Attribute) (col *ddl.Column, err error) { diff --git a/server/store/adapters/rdbms/drivers/sqlite/dialect.go b/server/store/adapters/rdbms/drivers/sqlite/dialect.go index 3732c7fbb1..23b8d3a62a 100644 --- a/server/store/adapters/rdbms/drivers/sqlite/dialect.go +++ b/server/store/adapters/rdbms/drivers/sqlite/dialect.go @@ -29,6 +29,10 @@ var ( nuances = drivers.Nuances{ HavingClauseMustUseAlias: true, + + ExpandedJsonColumnSelector: func(ident string) exp.Expression { + return exp.NewLiteralExpression(fmt.Sprintf(`%s.value`, ident)) + }, } ) @@ -50,6 +54,44 @@ func (sqliteDialect) Nuances() drivers.Nuances { return nuances } +func (d sqliteDialect) AggregateBase(t drivers.TableCodec, groupBy []dal.AggregateAttr, out []dal.AggregateAttr) (slct *goqu.SelectDataset) { + var ( + cols = t.Columns() + + // working around a bug inside goqu lib that adds + // * to the list of columns to be selected + // even if we clear the columns first + q = d.GOQU(). + From(t.Ident()) + ) + + for _, g := range groupBy { + // Special handling for multi value fields + if g.MultiValue { + // Only straight up columns can be multi value so we can freely use RawExpr + colName := g.RawExpr + q = q.From( + t.Ident(), + goqu.Func("json_each", + goqu.C("values").Table("compose_record"), + exp.NewLiteralExpression(fmt.Sprintf(`'$.%s'`, colName)), + ).As(colName), + ) + } + } + + if len(cols) == 0 { + return q.SetError(fmt.Errorf("can not create SELECT without columns")) + } + + q = q.Select(t.Ident().Col(cols[0].Name())) + for _, col := range cols[1:] { + q = q.SelectAppend(t.Ident().Col(col.Name())) + } + + return q +} + func (sqliteDialect) GOQU() goqu.DialectWrapper { return goquDialectWrapper } func (sqliteDialect) DialectOptions() *sqlgen.SQLDialectOptions { return goquDialectOptions } func (sqliteDialect) QuoteIdent(i string) string { return quoteIdent + i + quoteIdent } @@ -148,7 +190,7 @@ func (sqliteDialect) AttributeCast(attr *dal.Attribute, val exp.Expression) (exp } func (sqliteDialect) AttributeExpression(attr *dal.Attribute, modelIdent string, ident string) (expr exp.Expression, err error) { - return exp.NewLiteralExpression("?", exp.NewIdentifierExpression("", modelIdent, ident)), nil + return exp.NewLiteralExpression("?", exp.NewIdentifierExpression("", modelIdent, ident)), nil } func (sqliteDialect) AttributeToColumn(attr *dal.Attribute) (col *ddl.Column, err error) { diff --git a/server/store/adapters/rdbms/drivers/table.go b/server/store/adapters/rdbms/drivers/table.go index 529837b8b5..5404dab8d1 100644 --- a/server/store/adapters/rdbms/drivers/table.go +++ b/server/store/adapters/rdbms/drivers/table.go @@ -1,9 +1,10 @@ package drivers import ( - "fmt" - "github.com/cortezaproject/corteza/server/pkg/dal" - "github.com/doug-martin/goqu/v9/exp" + "fmt" + + "github.com/cortezaproject/corteza/server/pkg/dal" + "github.com/doug-martin/goqu/v9/exp" ) type ( @@ -15,6 +16,7 @@ type ( Encode(r dal.ValueGetter) (_ []any, err error) Decode(buf []any, r dal.ValueSetter) (err error) AttributeExpression(string) (exp.Expression, error) + AttributeExpressionQuoted(string) (exp.Expression, error) } // GenericTableCodec is a generic implementation of TableCodec @@ -122,7 +124,15 @@ func (t *GenericTableCodec) Decode(buf []any, r dal.ValueSetter) (err error) { return } +func (t *GenericTableCodec) AttributeExpressionQuoted(ident string) (exp.Expression, error) { + return t.attributeExpression(true, ident) +} + func (t *GenericTableCodec) AttributeExpression(ident string) (exp.Expression, error) { + return t.attributeExpression(false, ident) +} + +func (t *GenericTableCodec) attributeExpression(quoted bool, ident string) (exp.Expression, error) { attr := t.model.Attributes.FindByIdent(ident) if attr == nil { @@ -131,16 +141,28 @@ func (t *GenericTableCodec) AttributeExpression(ident string) (exp.Expression, e switch s := attr.Store.(type) { case *dal.CodecAlias: - return t.dialect.AttributeExpression(attr, t.model.Ident, s.Ident) + return t.dialect.AttributeExpression(attr, t.model.Ident, s.Ident) case *dal.CodecRecordValueSetJSON: - // using JSON to handle embedded values - lit, err := t.dialect.JsonExtractUnquote(exp.NewIdentifierExpression("", t.model.Ident, s.Ident), attr.Ident, 0) - if err != nil { - return nil, err - } + idfExpr := exp.NewIdentifierExpression("", t.model.Ident, s.Ident) + + var lit exp.Expression + var err error + if quoted { + lit, err = t.dialect.JsonExtract(idfExpr, attr.Ident) + if err != nil { + return nil, err + } - return t.dialect.AttributeCast(attr, lit) + return lit, nil + } else { + // using JSON to handle embedded values + lit, err = t.dialect.JsonExtractUnquote(idfExpr, attr.Ident, 0) + if err != nil { + return nil, err + } + return t.dialect.AttributeCast(attr, lit) + } } return exp.NewLiteralExpression("?", exp.NewIdentifierExpression("", t.model.Ident, ident)), nil