From 77c86a78423bda3346bc92b2414fa3aa71fb5ba3 Mon Sep 17 00:00:00 2001 From: Rowan Seymour Date: Fri, 17 May 2024 14:54:46 -0500 Subject: [PATCH 1/4] Rewrite contactql query conversion to ES to use simple maps --- contactql/es/query.go | 230 +++++++++++++++---------------------- contactql/es/query_test.go | 5 +- contactql/es/utils.go | 82 +++++++++++++ 3 files changed, 177 insertions(+), 140 deletions(-) create mode 100644 contactql/es/utils.go diff --git a/contactql/es/query.go b/contactql/es/query.go index bc0097e69..c1e0ded14 100644 --- a/contactql/es/query.go +++ b/contactql/es/query.go @@ -8,8 +8,6 @@ import ( "github.com/nyaruka/goflow/assets" "github.com/nyaruka/goflow/contactql" "github.com/nyaruka/goflow/envs" - - "github.com/olivere/elastic/v7" ) // AssetMapper is used to map engine assets to however ES identifies them @@ -27,7 +25,7 @@ var contactStatusCodes = map[string]string{ } // ToElasticQuery converts a contactql query to an Elastic query -func ToElasticQuery(env envs.Environment, mapper AssetMapper, query *contactql.ContactQuery) elastic.Query { +func ToElasticQuery(env envs.Environment, mapper AssetMapper, query *contactql.ContactQuery) map[string]any { if query.Resolver() == nil { panic("can only convert queries parsed with a resolver") } @@ -35,60 +33,55 @@ func ToElasticQuery(env envs.Environment, mapper AssetMapper, query *contactql.C return nodeToElastic(env, query.Resolver(), mapper, query.Root()) } -func nodeToElastic(env envs.Environment, resolver contactql.Resolver, mapper AssetMapper, node contactql.QueryNode) elastic.Query { +func nodeToElastic(env envs.Environment, resolver contactql.Resolver, mapper AssetMapper, node contactql.QueryNode) map[string]any { switch n := node.(type) { case *contactql.BoolCombination: - return boolCombinationToElastic(env, resolver, mapper, n) + return boolCombination(env, resolver, mapper, n) case *contactql.Condition: - return conditionToElastic(env, resolver, mapper, n) + return condition(env, resolver, mapper, n) default: panic(fmt.Sprintf("unsupported node type: %T", n)) } } -func boolCombinationToElastic(env envs.Environment, resolver contactql.Resolver, mapper AssetMapper, combination *contactql.BoolCombination) elastic.Query { - queries := make([]elastic.Query, len(combination.Children())) +func boolCombination(env envs.Environment, resolver contactql.Resolver, mapper AssetMapper, combination *contactql.BoolCombination) map[string]any { + queries := make([]map[string]any, len(combination.Children())) for i, child := range combination.Children() { queries[i] = nodeToElastic(env, resolver, mapper, child) } if combination.Operator() == contactql.BoolOperatorAnd { - return elastic.NewBoolQuery().Must(queries...) + return All(queries...) } - return elastic.NewBoolQuery().Should(queries...) + return Any(queries...) } -func conditionToElastic(env envs.Environment, resolver contactql.Resolver, mapper AssetMapper, c *contactql.Condition) elastic.Query { +func condition(env envs.Environment, resolver contactql.Resolver, mapper AssetMapper, c *contactql.Condition) map[string]any { switch c.PropertyType() { case contactql.PropertyTypeField: - return fieldConditionToElastic(env, resolver, c) + return fieldCondition(env, resolver, c) case contactql.PropertyTypeAttribute: - return attributeConditionToElastic(env, resolver, mapper, c) + return attributeCondition(env, resolver, mapper, c) case contactql.PropertyTypeURN: - return schemeConditionToElastic(env, c) + return schemeCondition(c) default: panic(fmt.Sprintf("unsupported property type: %s", c.PropertyType())) } } -func fieldConditionToElastic(env envs.Environment, resolver contactql.Resolver, c *contactql.Condition) elastic.Query { - var query elastic.Query - +func fieldCondition(env envs.Environment, resolver contactql.Resolver, c *contactql.Condition) map[string]any { field := resolver.ResolveField(c.PropertyKey()) fieldType := field.Type() - fieldQuery := elastic.NewTermQuery("fields.field", field.UUID()) + fieldQuery := Term("fields.field", field.UUID()) // special cases for set/unset if (c.Operator() == contactql.OpEqual || c.Operator() == contactql.OpNotEqual) && c.Value() == "" { - query = elastic.NewNestedQuery("fields", elastic.NewBoolQuery().Must( - fieldQuery, - elastic.NewExistsQuery("fields."+string(fieldType)), - )) + query := Nested("fields", All(fieldQuery, Exists("fields."+string(fieldType)))) // if we are looking for unset, inverse our query if c.Operator() == contactql.OpEqual { - query = not(query) + query = Not(query) } return query } @@ -98,77 +91,68 @@ func fieldConditionToElastic(env envs.Environment, resolver contactql.Resolver, switch c.Operator() { case contactql.OpEqual: - query = elastic.NewTermQuery("fields.text", value) - return elastic.NewNestedQuery("fields", elastic.NewBoolQuery().Must(fieldQuery, query)) + return Nested("fields", All(fieldQuery, Term("fields.text", value))) case contactql.OpNotEqual: - query = elastic.NewBoolQuery().Must( - fieldQuery, - elastic.NewTermQuery("fields.text", value), - elastic.NewExistsQuery("fields.text"), - ) - return not(elastic.NewNestedQuery("fields", query)) + query := All(fieldQuery, Term("fields.text", value), Exists("fields.text")) + return Not(Nested("fields", query)) default: panic(fmt.Sprintf("unsupported text field operator: %s", c.Operator())) } } else if fieldType == assets.FieldTypeNumber { value, _ := c.ValueAsNumber() + var query map[string]any switch c.Operator() { case contactql.OpEqual: - query = elastic.NewMatchQuery("fields.number", value) + query = Match("fields.number", value) case contactql.OpNotEqual: - return not( - elastic.NewNestedQuery("fields", - elastic.NewBoolQuery().Must( - fieldQuery, - elastic.NewMatchQuery("fields.number", value), - ), + return Not( + Nested("fields", + All(fieldQuery, Match("fields.number", value)), ), ) case contactql.OpGreaterThan: - query = elastic.NewRangeQuery("fields.number").Gt(value) + query = GreaterThan("fields.number", value) case contactql.OpGreaterThanOrEqual: - query = elastic.NewRangeQuery("fields.number").Gte(value) + query = GreaterThanOrEqual("fields.number", value) case contactql.OpLessThan: - query = elastic.NewRangeQuery("fields.number").Lt(value) + query = LessThan("fields.number", value) case contactql.OpLessThanOrEqual: - query = elastic.NewRangeQuery("fields.number").Lte(value) + query = LessThanOrEqual("fields.number", value) default: panic(fmt.Sprintf("unsupported number field operator: %s", c.Operator())) } - return elastic.NewNestedQuery("fields", elastic.NewBoolQuery().Must(fieldQuery, query)) + return Nested("fields", All(fieldQuery, query)) } else if fieldType == assets.FieldTypeDatetime { value, _ := c.ValueAsDate(env) start, end := dates.DayToUTCRange(value, value.Location()) + var query map[string]any switch c.Operator() { case contactql.OpEqual: - query = elastic.NewRangeQuery("fields.datetime").Gte(start).Lt(end) + query = Between("fields.datetime", start, end) case contactql.OpNotEqual: - return not( - elastic.NewNestedQuery("fields", - elastic.NewBoolQuery().Must( - fieldQuery, - elastic.NewRangeQuery("fields.datetime").Gte(start).Lt(end), - ), + return Not( + Nested("fields", + All(fieldQuery, Between("fields.datetime", start, end)), ), ) case contactql.OpGreaterThan: - query = elastic.NewRangeQuery("fields.datetime").Gte(end) + query = GreaterThanOrEqual("fields.datetime", end) case contactql.OpGreaterThanOrEqual: - query = elastic.NewRangeQuery("fields.datetime").Gte(start) + query = GreaterThanOrEqual("fields.datetime", start) case contactql.OpLessThan: - query = elastic.NewRangeQuery("fields.datetime").Lt(start) + query = LessThan("fields.datetime", start) case contactql.OpLessThanOrEqual: - query = elastic.NewRangeQuery("fields.datetime").Lt(end) + query = LessThan("fields.datetime", end) default: panic(fmt.Sprintf("unsupported datetime field operator: %s", c.Operator())) } - return elastic.NewNestedQuery("fields", elastic.NewBoolQuery().Must(fieldQuery, query)) + return Nested("fields", All(fieldQuery, query)) } else if fieldType == assets.FieldTypeState || fieldType == assets.FieldTypeDistrict || fieldType == assets.FieldTypeWard { value := strings.ToLower(c.Value()) @@ -176,15 +160,11 @@ func fieldConditionToElastic(env envs.Environment, resolver contactql.Resolver, switch c.Operator() { case contactql.OpEqual: - query = elastic.NewTermQuery(name, value) - return elastic.NewNestedQuery("fields", elastic.NewBoolQuery().Must(fieldQuery, query)) + return Nested("fields", All(fieldQuery, Term(name, value))) case contactql.OpNotEqual: - return not( - elastic.NewNestedQuery("fields", - elastic.NewBoolQuery().Must( - elastic.NewTermQuery(name, value), - elastic.NewExistsQuery(name), - ), + return Not( + Nested("fields", + All(Term(name, value), Exists(name)), ), ) default: @@ -195,22 +175,18 @@ func fieldConditionToElastic(env envs.Environment, resolver contactql.Resolver, panic(fmt.Sprintf("unsupported field type: %s", fieldType)) } -func attributeConditionToElastic(env envs.Environment, resolver contactql.Resolver, mapper AssetMapper, c *contactql.Condition) elastic.Query { +func attributeCondition(env envs.Environment, resolver contactql.Resolver, mapper AssetMapper, c *contactql.Condition) map[string]any { key := c.PropertyKey() value := strings.ToLower(c.Value()) - var query elastic.Query // special case for set/unset for name and language if (c.Operator() == contactql.OpEqual || c.Operator() == contactql.OpNotEqual) && value == "" && (key == contactql.AttributeName || key == contactql.AttributeLanguage) { - query = elastic.NewBoolQuery().Must( - elastic.NewExistsQuery(key), - not(elastic.NewTermQuery(fmt.Sprintf("%s.keyword", key), "")), - ) + query := All(Exists(key), Not(Term(fmt.Sprintf("%s.keyword", key), ""))) if c.Operator() == contactql.OpEqual { - query = not(query) + query = Not(query) } return query @@ -222,20 +198,20 @@ func attributeConditionToElastic(env envs.Environment, resolver contactql.Resolv case contactql.AttributeID: switch c.Operator() { case contactql.OpEqual: - return elastic.NewIdsQuery().Ids(value) + return Ids(value) case contactql.OpNotEqual: - return not(elastic.NewIdsQuery().Ids(value)) + return Not(Ids(value)) default: panic(fmt.Sprintf("unsupported ID attribute operator: %s", c.Operator())) } case contactql.AttributeName: switch c.Operator() { case contactql.OpEqual: - return elastic.NewTermQuery("name.keyword", c.Value()) + return Term("name.keyword", c.Value()) case contactql.OpNotEqual: - return not(elastic.NewTermQuery("name.keyword", c.Value())) + return Not(Term("name.keyword", c.Value())) case contactql.OpContains: - return elastic.NewMatchQuery("name", value) + return Match("name", value) default: panic(fmt.Sprintf("unsupported name attribute operator: %s", c.Operator())) } @@ -251,26 +227,26 @@ func attributeConditionToElastic(env envs.Environment, resolver contactql.Resolv switch c.Operator() { case contactql.OpEqual: - return elastic.NewRangeQuery("created_on").Gte(start).Lt(end) + return Between("created_on", start, end) case contactql.OpNotEqual: - return not(elastic.NewRangeQuery("created_on").Gte(start).Lt(end)) + return Not(Between("created_on", start, end)) case contactql.OpGreaterThan: - return elastic.NewRangeQuery("created_on").Gte(end) + return GreaterThanOrEqual("created_on", end) case contactql.OpGreaterThanOrEqual: - return elastic.NewRangeQuery("created_on").Gte(start) + return GreaterThanOrEqual("created_on", start) case contactql.OpLessThan: - return elastic.NewRangeQuery("created_on").Lt(start) + return LessThan("created_on", start) case contactql.OpLessThanOrEqual: - return elastic.NewRangeQuery("created_on").Lt(end) + return LessThan("created_on", end) default: panic(fmt.Sprintf("unsupported created_on attribute operator: %s", c.Operator())) } case contactql.AttributeLastSeenOn: // special case for set/unset if (c.Operator() == contactql.OpEqual || c.Operator() == contactql.OpNotEqual) && value == "" { - query = elastic.NewExistsQuery("last_seen_on") + query := Exists("last_seen_on") if c.Operator() == contactql.OpEqual { - query = not(query) + query = Not(query) } return query } @@ -280,17 +256,17 @@ func attributeConditionToElastic(env envs.Environment, resolver contactql.Resolv switch c.Operator() { case contactql.OpEqual: - return elastic.NewRangeQuery("last_seen_on").Gte(start).Lt(end) + return Between("last_seen_on", start, end) case contactql.OpNotEqual: - return not(elastic.NewRangeQuery("last_seen_on").Gte(start).Lt(end)) + return Not(Between("last_seen_on", start, end)) case contactql.OpGreaterThan: - return elastic.NewRangeQuery("last_seen_on").Gte(end) + return GreaterThanOrEqual("last_seen_on", end) case contactql.OpGreaterThanOrEqual: - return elastic.NewRangeQuery("last_seen_on").Gte(start) + return GreaterThanOrEqual("last_seen_on", start) case contactql.OpLessThan: - return elastic.NewRangeQuery("last_seen_on").Lt(start) + return LessThan("last_seen_on", start) case contactql.OpLessThanOrEqual: - return elastic.NewRangeQuery("last_seen_on").Lt(end) + return LessThan("last_seen_on", end) default: panic(fmt.Sprintf("unsupported last_seen_on attribute operator: %s", c.Operator())) } @@ -299,29 +275,29 @@ func attributeConditionToElastic(env envs.Environment, resolver contactql.Resolv // special case for set/unset if (c.Operator() == contactql.OpEqual || c.Operator() == contactql.OpNotEqual) && value == "" { - query = elastic.NewNestedQuery("urns", elastic.NewExistsQuery("urns.path")) + query := Nested("urns", Exists("urns.path")) if c.Operator() == contactql.OpEqual { - query = not(query) + query = Not(query) } return query } switch c.Operator() { case contactql.OpEqual: - return elastic.NewNestedQuery("urns", elastic.NewTermQuery("urns.path.keyword", value)) + return Nested("urns", Term("urns.path.keyword", value)) case contactql.OpNotEqual: - return not(elastic.NewNestedQuery("urns", elastic.NewTermQuery("urns.path.keyword", value))) + return Not(Nested("urns", Term("urns.path.keyword", value))) case contactql.OpContains: - return elastic.NewNestedQuery("urns", elastic.NewMatchPhraseQuery("urns.path", value)) + return Nested("urns", MatchPhrase("urns.path", value)) default: panic(fmt.Sprintf("unsupported URN attribute operator: %s", c.Operator())) } case contactql.AttributeGroup: // special case for set/unset if (c.Operator() == contactql.OpEqual || c.Operator() == contactql.OpNotEqual) && value == "" { - query = elastic.NewExistsQuery("group_ids") + query := Exists("group_ids") if c.Operator() == contactql.OpEqual { - query = not(query) + query = Not(query) } return query } @@ -330,9 +306,9 @@ func attributeConditionToElastic(env envs.Environment, resolver contactql.Resolv switch c.Operator() { case contactql.OpEqual: - return elastic.NewTermQuery("group_ids", mapper.Group(group)) + return Term("group_ids", mapper.Group(group)) case contactql.OpNotEqual: - return not(elastic.NewTermQuery("group_ids", mapper.Group(group))) + return Not(Term("group_ids", mapper.Group(group))) default: panic(fmt.Sprintf("unsupported group attribute operator: %s", c.Operator())) } @@ -344,9 +320,9 @@ func attributeConditionToElastic(env envs.Environment, resolver contactql.Resolv // special case for set/unset if (c.Operator() == contactql.OpEqual || c.Operator() == contactql.OpNotEqual) && value == "" { - query = elastic.NewExistsQuery(fieldName) + query := Exists(fieldName) if c.Operator() == contactql.OpEqual { - query = not(query) + query = Not(query) } return query } @@ -355,9 +331,9 @@ func attributeConditionToElastic(env envs.Environment, resolver contactql.Resolv switch c.Operator() { case contactql.OpEqual: - return elastic.NewTermQuery(fieldName, mapper.Flow(flow)) + return Term(fieldName, mapper.Flow(flow)) case contactql.OpNotEqual: - return not(elastic.NewTermQuery(fieldName, mapper.Flow(flow))) + return Not(Term(fieldName, mapper.Flow(flow))) default: panic(fmt.Sprintf("unsupported flow attribute operator: %s", c.Operator())) } @@ -368,79 +344,61 @@ func attributeConditionToElastic(env envs.Environment, resolver contactql.Resolv } } -func schemeConditionToElastic(env envs.Environment, c *contactql.Condition) elastic.Query { +func schemeCondition(c *contactql.Condition) map[string]any { key := c.PropertyKey() value := strings.ToLower(c.Value()) // special case for set/unset if (c.Operator() == contactql.OpEqual || c.Operator() == contactql.OpNotEqual) && value == "" { - var query elastic.Query - query = elastic.NewNestedQuery("urns", elastic.NewBoolQuery().Must( - elastic.NewTermQuery("urns.scheme", key), - elastic.NewExistsQuery("urns.path"), - )) + query := Nested("urns", All(Term("urns.scheme", key), Exists("urns.path"))) if c.Operator() == contactql.OpEqual { - query = not(query) + query = Not(query) } return query } switch c.Operator() { case contactql.OpEqual: - return elastic.NewNestedQuery("urns", elastic.NewBoolQuery().Must( - elastic.NewTermQuery("urns.path.keyword", value), - elastic.NewTermQuery("urns.scheme", key)), - ) + return Nested("urns", All(Term("urns.path.keyword", value), Term("urns.scheme", key))) case contactql.OpNotEqual: - return not(elastic.NewNestedQuery("urns", elastic.NewBoolQuery().Must( - elastic.NewTermQuery("urns.path.keyword", value), - elastic.NewTermQuery("urns.scheme", key)), - )) + return Not(Nested("urns", All(Term("urns.path.keyword", value), Term("urns.scheme", key)))) case contactql.OpContains: - return elastic.NewNestedQuery("urns", elastic.NewBoolQuery().Must( - elastic.NewMatchPhraseQuery("urns.path", value), - elastic.NewTermQuery("urns.scheme", key)), - ) + return Nested("urns", All(MatchPhrase("urns.path", value), Term("urns.scheme", key))) default: panic(fmt.Sprintf("unsupported scheme operator: %s", c.Operator())) } } -func textAttributeQuery(c *contactql.Condition, name string, tx func(string) string) elastic.Query { +func textAttributeQuery(c *contactql.Condition, name string, tx func(string) string) map[string]any { value := tx(c.Value()) switch c.Operator() { case contactql.OpEqual: - return elastic.NewTermQuery(name, value) + return Term(name, value) case contactql.OpNotEqual: - return not(elastic.NewTermQuery(name, value)) + return Not(Term(name, value)) default: panic(fmt.Sprintf("unsupported %s attribute operator: %s", name, c.Operator())) } } -func numericalAttributeQuery(c *contactql.Condition, name string) elastic.Query { +func numericalAttributeQuery(c *contactql.Condition, name string) map[string]any { value, _ := c.ValueAsNumber() switch c.Operator() { case contactql.OpEqual: - return elastic.NewMatchQuery(name, value) + return Match(name, value) case contactql.OpNotEqual: - return not(elastic.NewMatchQuery(name, value)) + return Not(Match(name, value)) case contactql.OpGreaterThan: - return elastic.NewRangeQuery(name).Gt(value) + return GreaterThan(name, value) case contactql.OpGreaterThanOrEqual: - return elastic.NewRangeQuery(name).Gte(value) + return GreaterThanOrEqual(name, value) case contactql.OpLessThan: - return elastic.NewRangeQuery(name).Lt(value) + return LessThan(name, value) case contactql.OpLessThanOrEqual: - return elastic.NewRangeQuery(name).Lte(value) + return LessThanOrEqual(name, value) default: panic(fmt.Sprintf("unsupported %s attribute operator: %s", name, c.Operator())) } } - -// convenience utility to create a not boolean query -func not(queries ...elastic.Query) *elastic.BoolQuery { - return elastic.NewBoolQuery().MustNot(queries...) -} diff --git a/contactql/es/query_test.go b/contactql/es/query_test.go index 3e8bbc408..eec2b9d88 100644 --- a/contactql/es/query_test.go +++ b/contactql/es/query_test.go @@ -96,10 +96,7 @@ func TestElasticQuery(t *testing.T) { query := es.ToElasticQuery(env, mapper, parsed) assert.NotNil(t, query, tc.Description) - source, err := query.Source() - require.NoError(t, err, "error requesting source for elastic query in ", testName) - - asJSON, err := jsonx.Marshal(source) + asJSON, err := jsonx.Marshal(query) require.NoError(t, err) test.AssertEqualJSON(t, tc.Elastic, asJSON, "elastic mismatch in %s", testName) diff --git a/contactql/es/utils.go b/contactql/es/utils.go new file mode 100644 index 000000000..6ffcce120 --- /dev/null +++ b/contactql/es/utils.go @@ -0,0 +1,82 @@ +package es + +func Any(queries ...map[string]any) map[string]any { + return map[string]any{"bool": map[string]any{"should": queries}} +} + +func All(queries ...map[string]any) map[string]any { + return map[string]any{"bool": map[string]any{"must": queries}} +} + +func Not(query map[string]any) map[string]any { + return map[string]any{"bool": map[string]any{"must_not": query}} +} + +func Ids(values ...string) map[string]any { + return map[string]any{"ids": map[string]any{"values": values}} +} + +func Term(field string, value any) map[string]any { + return map[string]any{"term": map[string]any{field: value}} +} + +func Exists(field string) map[string]any { + return map[string]any{"exists": map[string]any{"field": field}} +} + +func Nested(path string, query map[string]any) map[string]any { + return map[string]any{"nested": map[string]any{"path": path, "query": query}} +} + +func Match(field string, value any) map[string]any { + return map[string]any{"match": map[string]any{field: map[string]any{"query": value}}} +} + +func MatchPhrase(field, value string) map[string]any { + return map[string]any{"match_phrase": map[string]any{field: map[string]any{"query": value}}} +} + +func GreaterThan(field string, value any) map[string]any { + return map[string]any{"range": map[string]any{field: map[string]any{ + "from": value, + "include_lower": false, + "include_upper": true, + "to": nil, + }}} +} + +func GreaterThanOrEqual(field string, value any) map[string]any { + return map[string]any{"range": map[string]any{field: map[string]any{ + "from": value, + "include_lower": true, + "include_upper": true, + "to": nil, + }}} +} + +func LessThan(field string, value any) map[string]any { + return map[string]any{"range": map[string]any{field: map[string]any{ + "from": nil, + "include_lower": true, + "include_upper": false, + "to": value, + }}} +} + +func LessThanOrEqual(field string, value any) map[string]any { + return map[string]any{"range": map[string]any{field: map[string]any{ + "from": nil, + "include_lower": true, + "include_upper": true, + "to": value, + }}} +} + +func Between(field string, from, to any) map[string]any { + return map[string]any{"range": map[string]any{field: map[string]any{ + "from": from, + "include_lower": true, + "include_upper": false, + "to": to, + }}} +} From e53fd146f1bef205e21bab8095212270dd7858b2 Mon Sep 17 00:00:00 2001 From: Rowan Seymour Date: Fri, 17 May 2024 15:31:26 -0500 Subject: [PATCH 2/4] Rewrite ES sorting to not use deprecated library either --- contactql/es/query.go | 155 +++++++++--------- contactql/es/sort.go | 16 +- contactql/es/sort_test.go | 3 +- go.mod | 3 - go.sum | 8 - .../es/utils.go => utils/elastic/query.go | 5 +- utils/elastic/sort.go | 19 +++ 7 files changed, 108 insertions(+), 101 deletions(-) rename contactql/es/utils.go => utils/elastic/query.go (92%) create mode 100644 utils/elastic/sort.go diff --git a/contactql/es/query.go b/contactql/es/query.go index c1e0ded14..dcc6c280d 100644 --- a/contactql/es/query.go +++ b/contactql/es/query.go @@ -8,6 +8,7 @@ import ( "github.com/nyaruka/goflow/assets" "github.com/nyaruka/goflow/contactql" "github.com/nyaruka/goflow/envs" + esq "github.com/nyaruka/goflow/utils/elastic" ) // AssetMapper is used to map engine assets to however ES identifies them @@ -51,10 +52,10 @@ func boolCombination(env envs.Environment, resolver contactql.Resolver, mapper A } if combination.Operator() == contactql.BoolOperatorAnd { - return All(queries...) + return esq.All(queries...) } - return Any(queries...) + return esq.Any(queries...) } func condition(env envs.Environment, resolver contactql.Resolver, mapper AssetMapper, c *contactql.Condition) map[string]any { @@ -73,15 +74,15 @@ func condition(env envs.Environment, resolver contactql.Resolver, mapper AssetMa func fieldCondition(env envs.Environment, resolver contactql.Resolver, c *contactql.Condition) map[string]any { field := resolver.ResolveField(c.PropertyKey()) fieldType := field.Type() - fieldQuery := Term("fields.field", field.UUID()) + fieldQuery := esq.Term("fields.field", field.UUID()) // special cases for set/unset if (c.Operator() == contactql.OpEqual || c.Operator() == contactql.OpNotEqual) && c.Value() == "" { - query := Nested("fields", All(fieldQuery, Exists("fields."+string(fieldType)))) + query := esq.Nested("fields", esq.All(fieldQuery, esq.Exists("fields."+string(fieldType)))) // if we are looking for unset, inverse our query if c.Operator() == contactql.OpEqual { - query = Not(query) + query = esq.Not(query) } return query } @@ -91,10 +92,10 @@ func fieldCondition(env envs.Environment, resolver contactql.Resolver, c *contac switch c.Operator() { case contactql.OpEqual: - return Nested("fields", All(fieldQuery, Term("fields.text", value))) + return esq.Nested("fields", esq.All(fieldQuery, esq.Term("fields.text", value))) case contactql.OpNotEqual: - query := All(fieldQuery, Term("fields.text", value), Exists("fields.text")) - return Not(Nested("fields", query)) + query := esq.All(fieldQuery, esq.Term("fields.text", value), esq.Exists("fields.text")) + return esq.Not(esq.Nested("fields", query)) default: panic(fmt.Sprintf("unsupported text field operator: %s", c.Operator())) } @@ -105,26 +106,26 @@ func fieldCondition(env envs.Environment, resolver contactql.Resolver, c *contac switch c.Operator() { case contactql.OpEqual: - query = Match("fields.number", value) + query = esq.Match("fields.number", value) case contactql.OpNotEqual: - return Not( - Nested("fields", - All(fieldQuery, Match("fields.number", value)), + return esq.Not( + esq.Nested("fields", + esq.All(fieldQuery, esq.Match("fields.number", value)), ), ) case contactql.OpGreaterThan: - query = GreaterThan("fields.number", value) + query = esq.GreaterThan("fields.number", value) case contactql.OpGreaterThanOrEqual: - query = GreaterThanOrEqual("fields.number", value) + query = esq.GreaterThanOrEqual("fields.number", value) case contactql.OpLessThan: - query = LessThan("fields.number", value) + query = esq.LessThan("fields.number", value) case contactql.OpLessThanOrEqual: - query = LessThanOrEqual("fields.number", value) + query = esq.LessThanOrEqual("fields.number", value) default: panic(fmt.Sprintf("unsupported number field operator: %s", c.Operator())) } - return Nested("fields", All(fieldQuery, query)) + return esq.Nested("fields", esq.All(fieldQuery, query)) } else if fieldType == assets.FieldTypeDatetime { value, _ := c.ValueAsDate(env) @@ -133,26 +134,26 @@ func fieldCondition(env envs.Environment, resolver contactql.Resolver, c *contac switch c.Operator() { case contactql.OpEqual: - query = Between("fields.datetime", start, end) + query = esq.Between("fields.datetime", start, end) case contactql.OpNotEqual: - return Not( - Nested("fields", - All(fieldQuery, Between("fields.datetime", start, end)), + return esq.Not( + esq.Nested("fields", + esq.All(fieldQuery, esq.Between("fields.datetime", start, end)), ), ) case contactql.OpGreaterThan: - query = GreaterThanOrEqual("fields.datetime", end) + query = esq.GreaterThanOrEqual("fields.datetime", end) case contactql.OpGreaterThanOrEqual: - query = GreaterThanOrEqual("fields.datetime", start) + query = esq.GreaterThanOrEqual("fields.datetime", start) case contactql.OpLessThan: - query = LessThan("fields.datetime", start) + query = esq.LessThan("fields.datetime", start) case contactql.OpLessThanOrEqual: - query = LessThan("fields.datetime", end) + query = esq.LessThan("fields.datetime", end) default: panic(fmt.Sprintf("unsupported datetime field operator: %s", c.Operator())) } - return Nested("fields", All(fieldQuery, query)) + return esq.Nested("fields", esq.All(fieldQuery, query)) } else if fieldType == assets.FieldTypeState || fieldType == assets.FieldTypeDistrict || fieldType == assets.FieldTypeWard { value := strings.ToLower(c.Value()) @@ -160,11 +161,11 @@ func fieldCondition(env envs.Environment, resolver contactql.Resolver, c *contac switch c.Operator() { case contactql.OpEqual: - return Nested("fields", All(fieldQuery, Term(name, value))) + return esq.Nested("fields", esq.All(fieldQuery, esq.Term(name, value))) case contactql.OpNotEqual: - return Not( - Nested("fields", - All(Term(name, value), Exists(name)), + return esq.Not( + esq.Nested("fields", + esq.All(esq.Term(name, value), esq.Exists(name)), ), ) default: @@ -183,10 +184,10 @@ func attributeCondition(env envs.Environment, resolver contactql.Resolver, mappe if (c.Operator() == contactql.OpEqual || c.Operator() == contactql.OpNotEqual) && value == "" && (key == contactql.AttributeName || key == contactql.AttributeLanguage) { - query := All(Exists(key), Not(Term(fmt.Sprintf("%s.keyword", key), ""))) + query := esq.All(esq.Exists(key), esq.Not(esq.Term(fmt.Sprintf("%s.keyword", key), ""))) if c.Operator() == contactql.OpEqual { - query = Not(query) + query = esq.Not(query) } return query @@ -198,20 +199,20 @@ func attributeCondition(env envs.Environment, resolver contactql.Resolver, mappe case contactql.AttributeID: switch c.Operator() { case contactql.OpEqual: - return Ids(value) + return esq.Ids(value) case contactql.OpNotEqual: - return Not(Ids(value)) + return esq.Not(esq.Ids(value)) default: panic(fmt.Sprintf("unsupported ID attribute operator: %s", c.Operator())) } case contactql.AttributeName: switch c.Operator() { case contactql.OpEqual: - return Term("name.keyword", c.Value()) + return esq.Term("name.keyword", c.Value()) case contactql.OpNotEqual: - return Not(Term("name.keyword", c.Value())) + return esq.Not(esq.Term("name.keyword", c.Value())) case contactql.OpContains: - return Match("name", value) + return esq.Match("name", value) default: panic(fmt.Sprintf("unsupported name attribute operator: %s", c.Operator())) } @@ -227,26 +228,26 @@ func attributeCondition(env envs.Environment, resolver contactql.Resolver, mappe switch c.Operator() { case contactql.OpEqual: - return Between("created_on", start, end) + return esq.Between("created_on", start, end) case contactql.OpNotEqual: - return Not(Between("created_on", start, end)) + return esq.Not(esq.Between("created_on", start, end)) case contactql.OpGreaterThan: - return GreaterThanOrEqual("created_on", end) + return esq.GreaterThanOrEqual("created_on", end) case contactql.OpGreaterThanOrEqual: - return GreaterThanOrEqual("created_on", start) + return esq.GreaterThanOrEqual("created_on", start) case contactql.OpLessThan: - return LessThan("created_on", start) + return esq.LessThan("created_on", start) case contactql.OpLessThanOrEqual: - return LessThan("created_on", end) + return esq.LessThan("created_on", end) default: panic(fmt.Sprintf("unsupported created_on attribute operator: %s", c.Operator())) } case contactql.AttributeLastSeenOn: // special case for set/unset if (c.Operator() == contactql.OpEqual || c.Operator() == contactql.OpNotEqual) && value == "" { - query := Exists("last_seen_on") + query := esq.Exists("last_seen_on") if c.Operator() == contactql.OpEqual { - query = Not(query) + query = esq.Not(query) } return query } @@ -256,17 +257,17 @@ func attributeCondition(env envs.Environment, resolver contactql.Resolver, mappe switch c.Operator() { case contactql.OpEqual: - return Between("last_seen_on", start, end) + return esq.Between("last_seen_on", start, end) case contactql.OpNotEqual: - return Not(Between("last_seen_on", start, end)) + return esq.Not(esq.Between("last_seen_on", start, end)) case contactql.OpGreaterThan: - return GreaterThanOrEqual("last_seen_on", end) + return esq.GreaterThanOrEqual("last_seen_on", end) case contactql.OpGreaterThanOrEqual: - return GreaterThanOrEqual("last_seen_on", start) + return esq.GreaterThanOrEqual("last_seen_on", start) case contactql.OpLessThan: - return LessThan("last_seen_on", start) + return esq.LessThan("last_seen_on", start) case contactql.OpLessThanOrEqual: - return LessThan("last_seen_on", end) + return esq.LessThan("last_seen_on", end) default: panic(fmt.Sprintf("unsupported last_seen_on attribute operator: %s", c.Operator())) } @@ -275,29 +276,29 @@ func attributeCondition(env envs.Environment, resolver contactql.Resolver, mappe // special case for set/unset if (c.Operator() == contactql.OpEqual || c.Operator() == contactql.OpNotEqual) && value == "" { - query := Nested("urns", Exists("urns.path")) + query := esq.Nested("urns", esq.Exists("urns.path")) if c.Operator() == contactql.OpEqual { - query = Not(query) + query = esq.Not(query) } return query } switch c.Operator() { case contactql.OpEqual: - return Nested("urns", Term("urns.path.keyword", value)) + return esq.Nested("urns", esq.Term("urns.path.keyword", value)) case contactql.OpNotEqual: - return Not(Nested("urns", Term("urns.path.keyword", value))) + return esq.Not(esq.Nested("urns", esq.Term("urns.path.keyword", value))) case contactql.OpContains: - return Nested("urns", MatchPhrase("urns.path", value)) + return esq.Nested("urns", esq.MatchPhrase("urns.path", value)) default: panic(fmt.Sprintf("unsupported URN attribute operator: %s", c.Operator())) } case contactql.AttributeGroup: // special case for set/unset if (c.Operator() == contactql.OpEqual || c.Operator() == contactql.OpNotEqual) && value == "" { - query := Exists("group_ids") + query := esq.Exists("group_ids") if c.Operator() == contactql.OpEqual { - query = Not(query) + query = esq.Not(query) } return query } @@ -306,9 +307,9 @@ func attributeCondition(env envs.Environment, resolver contactql.Resolver, mappe switch c.Operator() { case contactql.OpEqual: - return Term("group_ids", mapper.Group(group)) + return esq.Term("group_ids", mapper.Group(group)) case contactql.OpNotEqual: - return Not(Term("group_ids", mapper.Group(group))) + return esq.Not(esq.Term("group_ids", mapper.Group(group))) default: panic(fmt.Sprintf("unsupported group attribute operator: %s", c.Operator())) } @@ -320,9 +321,9 @@ func attributeCondition(env envs.Environment, resolver contactql.Resolver, mappe // special case for set/unset if (c.Operator() == contactql.OpEqual || c.Operator() == contactql.OpNotEqual) && value == "" { - query := Exists(fieldName) + query := esq.Exists(fieldName) if c.Operator() == contactql.OpEqual { - query = Not(query) + query = esq.Not(query) } return query } @@ -331,9 +332,9 @@ func attributeCondition(env envs.Environment, resolver contactql.Resolver, mappe switch c.Operator() { case contactql.OpEqual: - return Term(fieldName, mapper.Flow(flow)) + return esq.Term(fieldName, mapper.Flow(flow)) case contactql.OpNotEqual: - return Not(Term(fieldName, mapper.Flow(flow))) + return esq.Not(esq.Term(fieldName, mapper.Flow(flow))) default: panic(fmt.Sprintf("unsupported flow attribute operator: %s", c.Operator())) } @@ -350,20 +351,20 @@ func schemeCondition(c *contactql.Condition) map[string]any { // special case for set/unset if (c.Operator() == contactql.OpEqual || c.Operator() == contactql.OpNotEqual) && value == "" { - query := Nested("urns", All(Term("urns.scheme", key), Exists("urns.path"))) + query := esq.Nested("urns", esq.All(esq.Term("urns.scheme", key), esq.Exists("urns.path"))) if c.Operator() == contactql.OpEqual { - query = Not(query) + query = esq.Not(query) } return query } switch c.Operator() { case contactql.OpEqual: - return Nested("urns", All(Term("urns.path.keyword", value), Term("urns.scheme", key))) + return esq.Nested("urns", esq.All(esq.Term("urns.path.keyword", value), esq.Term("urns.scheme", key))) case contactql.OpNotEqual: - return Not(Nested("urns", All(Term("urns.path.keyword", value), Term("urns.scheme", key)))) + return esq.Not(esq.Nested("urns", esq.All(esq.Term("urns.path.keyword", value), esq.Term("urns.scheme", key)))) case contactql.OpContains: - return Nested("urns", All(MatchPhrase("urns.path", value), Term("urns.scheme", key))) + return esq.Nested("urns", esq.All(esq.MatchPhrase("urns.path", value), esq.Term("urns.scheme", key))) default: panic(fmt.Sprintf("unsupported scheme operator: %s", c.Operator())) } @@ -374,9 +375,9 @@ func textAttributeQuery(c *contactql.Condition, name string, tx func(string) str switch c.Operator() { case contactql.OpEqual: - return Term(name, value) + return esq.Term(name, value) case contactql.OpNotEqual: - return Not(Term(name, value)) + return esq.Not(esq.Term(name, value)) default: panic(fmt.Sprintf("unsupported %s attribute operator: %s", name, c.Operator())) } @@ -387,17 +388,17 @@ func numericalAttributeQuery(c *contactql.Condition, name string) map[string]any switch c.Operator() { case contactql.OpEqual: - return Match(name, value) + return esq.Match(name, value) case contactql.OpNotEqual: - return Not(Match(name, value)) + return esq.Not(esq.Match(name, value)) case contactql.OpGreaterThan: - return GreaterThan(name, value) + return esq.GreaterThan(name, value) case contactql.OpGreaterThanOrEqual: - return GreaterThanOrEqual(name, value) + return esq.GreaterThanOrEqual(name, value) case contactql.OpLessThan: - return LessThan(name, value) + return esq.LessThan(name, value) case contactql.OpLessThanOrEqual: - return LessThanOrEqual(name, value) + return esq.LessThanOrEqual(name, value) default: panic(fmt.Sprintf("unsupported %s attribute operator: %s", name, c.Operator())) } diff --git a/contactql/es/sort.go b/contactql/es/sort.go index 33e24c581..31b42b6fa 100644 --- a/contactql/es/sort.go +++ b/contactql/es/sort.go @@ -6,16 +6,15 @@ import ( "github.com/nyaruka/goflow/assets" "github.com/nyaruka/goflow/contactql" - - "github.com/olivere/elastic/v7" + esq "github.com/nyaruka/goflow/utils/elastic" "github.com/pkg/errors" ) // ToElasticFieldSort returns the elastic FieldSort for the passed in sort by string -func ToElasticFieldSort(sortBy string, resolver contactql.Resolver) (*elastic.FieldSort, error) { +func ToElasticFieldSort(sortBy string, resolver contactql.Resolver) (map[string]any, error) { // default to most recent first by id if sortBy == "" { - return elastic.NewFieldSort("id").Desc(), nil + return esq.Sort("id", false), nil } // figure out if we are ascending or descending (default is ascending, can be changed with leading -) @@ -30,12 +29,12 @@ func ToElasticFieldSort(sortBy string, resolver contactql.Resolver) (*elastic.Fi // name needs to be sorted by keyword field if property == contactql.AttributeName { - return elastic.NewFieldSort("name.keyword").Order(ascending), nil + return esq.Sort("name.keyword", ascending), nil } // other attributes are straight sorts if property == contactql.AttributeID || property == contactql.AttributeCreatedOn || property == contactql.AttributeLastSeenOn || property == contactql.AttributeLanguage { - return elastic.NewFieldSort(property).Order(ascending), nil + return esq.Sort(property, ascending), nil } // we are sorting by a custom field @@ -52,8 +51,5 @@ func ToElasticFieldSort(sortBy string, resolver contactql.Resolver) (*elastic.Fi key = fmt.Sprintf("fields.%s", field.Type()) } - sort := elastic.NewFieldSort(key) - sort = sort.Nested(elastic.NewNestedSort("fields").Filter(elastic.NewTermQuery("fields.field", field.UUID()))) - sort = sort.Order(ascending) - return sort, nil + return esq.SortNested(key, esq.Term("fields.field", field.UUID()), "fields", ascending), nil } diff --git a/contactql/es/sort_test.go b/contactql/es/sort_test.go index 98e0fc152..58de6b1da 100644 --- a/contactql/es/sort_test.go +++ b/contactql/es/sort_test.go @@ -35,8 +35,7 @@ func TestElasticSort(t *testing.T) { if tc.Error != "" { assert.EqualError(t, err, tc.Error) } else { - src, _ := sort.Source() - encoded := jsonx.MustMarshal(src) + encoded := jsonx.MustMarshal(sort) test.AssertEqualJSON(t, []byte(tc.Elastic), encoded, "field sort mismatch for %s", tc.Description) } } diff --git a/go.mod b/go.mod index 6ca43b76f..58d4ca26b 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,6 @@ require ( github.com/buger/jsonparser v1.1.1 github.com/go-playground/validator/v10 v10.20.0 github.com/nyaruka/gocommon v1.54.9 - github.com/olivere/elastic/v7 v7.0.32 github.com/pkg/errors v0.9.1 github.com/sergi/go-diff v1.3.1 github.com/shopspring/decimal v1.4.0 @@ -28,9 +27,7 @@ require ( github.com/go-playground/universal-translator v0.18.1 // indirect github.com/gofrs/uuid v4.4.0+incompatible // indirect github.com/gorilla/websocket v1.5.1 // indirect - github.com/josharian/intern v1.0.0 // indirect github.com/leodido/go-urn v1.4.0 // indirect - github.com/mailru/easyjson v0.7.7 // indirect github.com/nyaruka/null/v2 v2.0.3 // indirect github.com/nyaruka/phonenumbers v1.3.5 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect diff --git a/go.sum b/go.sum index 879a554d6..714627e5d 100644 --- a/go.sum +++ b/go.sum @@ -11,8 +11,6 @@ github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx2 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= -github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= github.com/gabriel-vasile/mimetype v1.4.3 h1:in2uUcidCuFcDKtdcBxlR0rJ1+fsokWf+uqxgUFjbI0= github.com/gabriel-vasile/mimetype v1.4.3/go.mod h1:d8uq/6HKRL6CGdk+aubisF/M5GcPfT7nKyLpA0lbSSk= github.com/go-chi/chi/v5 v5.0.12 h1:9euLV5sTrTNTRUU9POmDUvfxyj6LAABLUcEWO+JJb4s= @@ -33,8 +31,6 @@ github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/ github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY= github.com/jmoiron/sqlx v1.4.0 h1:1PLqN7S1UYp5t4SrVVnt4nUVNemrDAtxlulVe+Qgm3o= github.com/jmoiron/sqlx v1.4.0/go.mod h1:ZrZ7UsYB/weZdl2Bxg6jCRO9c3YHl8r3ahlKmRT4JLY= -github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= -github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= @@ -44,16 +40,12 @@ github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ= github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI= github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= -github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= -github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= github.com/nyaruka/gocommon v1.54.9 h1:BvWgXQc7h9Qlhj347N56wPZiik64SGmv+qSLmar4pVo= github.com/nyaruka/gocommon v1.54.9/go.mod h1:rWkEIpYIK98zL9Qm6PeMXJ+84WcWlArf01RfuWWCYvQ= github.com/nyaruka/null/v2 v2.0.3 h1:rdmMRQyVzrOF3Jff/gpU/7BDR9mQX0lcLl4yImsA3kw= github.com/nyaruka/null/v2 v2.0.3/go.mod h1:OCVeCkCXwrg5/qE6RU0c1oUVZBy+ZDrT+xYg1XSaIWA= github.com/nyaruka/phonenumbers v1.3.5 h1:WZLbQn61j2E1OFnvpUTYbK/6hViUgl6tppJ55/E2iQM= github.com/nyaruka/phonenumbers v1.3.5/go.mod h1:Ut+eFwikULbmCenH6InMKL9csUNLyxHuBLyfkpum11s= -github.com/olivere/elastic/v7 v7.0.32 h1:R7CXvbu8Eq+WlsLgxmKVKPox0oOwAE/2T9Si5BnvK6E= -github.com/olivere/elastic/v7 v7.0.32/go.mod h1:c7PVmLe3Fxq77PIfY/bZmxY/TAamBhCzZ8xDOE09a9k= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= diff --git a/contactql/es/utils.go b/utils/elastic/query.go similarity index 92% rename from contactql/es/utils.go rename to utils/elastic/query.go index 6ffcce120..638c5702a 100644 --- a/contactql/es/utils.go +++ b/utils/elastic/query.go @@ -1,13 +1,16 @@ -package es +package esq +// Any is a shortcut for a bool query with a should clause func Any(queries ...map[string]any) map[string]any { return map[string]any{"bool": map[string]any{"should": queries}} } +// All is a shortcut for a bool query with a must clause func All(queries ...map[string]any) map[string]any { return map[string]any{"bool": map[string]any{"must": queries}} } +// Not is a shortcut for a bool query with a must_not clause func Not(query map[string]any) map[string]any { return map[string]any{"bool": map[string]any{"must_not": query}} } diff --git a/utils/elastic/sort.go b/utils/elastic/sort.go new file mode 100644 index 000000000..7bf4f8ff9 --- /dev/null +++ b/utils/elastic/sort.go @@ -0,0 +1,19 @@ +package esq + +func Sort(field string, ascending bool) map[string]any { + return map[string]any{field: map[string]any{"order": order(ascending)}} +} + +func SortNested(field string, filter map[string]any, path string, ascending bool) map[string]any { + return map[string]any{field: map[string]any{ + "nested": map[string]any{"filter": filter, "path": path}, + "order": order(ascending), + }} +} + +func order(asc bool) string { + if asc { + return "asc" + } + return "desc" +} From 7114ff358dc3126b565d04285bf1f67e8f4a06b5 Mon Sep 17 00:00:00 2001 From: Rowan Seymour Date: Fri, 17 May 2024 15:43:35 -0500 Subject: [PATCH 3/4] Add docstrings and import as elastic --- contactql/es/query.go | 156 ++++++++++++++++++++--------------------- contactql/es/sort.go | 10 +-- utils/elastic/query.go | 13 +++- utils/elastic/sort.go | 4 +- 4 files changed, 98 insertions(+), 85 deletions(-) diff --git a/contactql/es/query.go b/contactql/es/query.go index dcc6c280d..a5d4d1456 100644 --- a/contactql/es/query.go +++ b/contactql/es/query.go @@ -8,7 +8,7 @@ import ( "github.com/nyaruka/goflow/assets" "github.com/nyaruka/goflow/contactql" "github.com/nyaruka/goflow/envs" - esq "github.com/nyaruka/goflow/utils/elastic" + "github.com/nyaruka/goflow/utils/elastic" ) // AssetMapper is used to map engine assets to however ES identifies them @@ -52,10 +52,10 @@ func boolCombination(env envs.Environment, resolver contactql.Resolver, mapper A } if combination.Operator() == contactql.BoolOperatorAnd { - return esq.All(queries...) + return elastic.All(queries...) } - return esq.Any(queries...) + return elastic.Any(queries...) } func condition(env envs.Environment, resolver contactql.Resolver, mapper AssetMapper, c *contactql.Condition) map[string]any { @@ -74,15 +74,15 @@ func condition(env envs.Environment, resolver contactql.Resolver, mapper AssetMa func fieldCondition(env envs.Environment, resolver contactql.Resolver, c *contactql.Condition) map[string]any { field := resolver.ResolveField(c.PropertyKey()) fieldType := field.Type() - fieldQuery := esq.Term("fields.field", field.UUID()) + fieldQuery := elastic.Term("fields.field", field.UUID()) // special cases for set/unset if (c.Operator() == contactql.OpEqual || c.Operator() == contactql.OpNotEqual) && c.Value() == "" { - query := esq.Nested("fields", esq.All(fieldQuery, esq.Exists("fields."+string(fieldType)))) + query := elastic.Nested("fields", elastic.All(fieldQuery, elastic.Exists("fields."+string(fieldType)))) // if we are looking for unset, inverse our query if c.Operator() == contactql.OpEqual { - query = esq.Not(query) + query = elastic.Not(query) } return query } @@ -92,10 +92,10 @@ func fieldCondition(env envs.Environment, resolver contactql.Resolver, c *contac switch c.Operator() { case contactql.OpEqual: - return esq.Nested("fields", esq.All(fieldQuery, esq.Term("fields.text", value))) + return elastic.Nested("fields", elastic.All(fieldQuery, elastic.Term("fields.text", value))) case contactql.OpNotEqual: - query := esq.All(fieldQuery, esq.Term("fields.text", value), esq.Exists("fields.text")) - return esq.Not(esq.Nested("fields", query)) + query := elastic.All(fieldQuery, elastic.Term("fields.text", value), elastic.Exists("fields.text")) + return elastic.Not(elastic.Nested("fields", query)) default: panic(fmt.Sprintf("unsupported text field operator: %s", c.Operator())) } @@ -106,26 +106,26 @@ func fieldCondition(env envs.Environment, resolver contactql.Resolver, c *contac switch c.Operator() { case contactql.OpEqual: - query = esq.Match("fields.number", value) + query = elastic.Match("fields.number", value) case contactql.OpNotEqual: - return esq.Not( - esq.Nested("fields", - esq.All(fieldQuery, esq.Match("fields.number", value)), + return elastic.Not( + elastic.Nested("fields", + elastic.All(fieldQuery, elastic.Match("fields.number", value)), ), ) case contactql.OpGreaterThan: - query = esq.GreaterThan("fields.number", value) + query = elastic.GreaterThan("fields.number", value) case contactql.OpGreaterThanOrEqual: - query = esq.GreaterThanOrEqual("fields.number", value) + query = elastic.GreaterThanOrEqual("fields.number", value) case contactql.OpLessThan: - query = esq.LessThan("fields.number", value) + query = elastic.LessThan("fields.number", value) case contactql.OpLessThanOrEqual: - query = esq.LessThanOrEqual("fields.number", value) + query = elastic.LessThanOrEqual("fields.number", value) default: panic(fmt.Sprintf("unsupported number field operator: %s", c.Operator())) } - return esq.Nested("fields", esq.All(fieldQuery, query)) + return elastic.Nested("fields", elastic.All(fieldQuery, query)) } else if fieldType == assets.FieldTypeDatetime { value, _ := c.ValueAsDate(env) @@ -134,26 +134,26 @@ func fieldCondition(env envs.Environment, resolver contactql.Resolver, c *contac switch c.Operator() { case contactql.OpEqual: - query = esq.Between("fields.datetime", start, end) + query = elastic.Between("fields.datetime", start, end) case contactql.OpNotEqual: - return esq.Not( - esq.Nested("fields", - esq.All(fieldQuery, esq.Between("fields.datetime", start, end)), + return elastic.Not( + elastic.Nested("fields", + elastic.All(fieldQuery, elastic.Between("fields.datetime", start, end)), ), ) case contactql.OpGreaterThan: - query = esq.GreaterThanOrEqual("fields.datetime", end) + query = elastic.GreaterThanOrEqual("fields.datetime", end) case contactql.OpGreaterThanOrEqual: - query = esq.GreaterThanOrEqual("fields.datetime", start) + query = elastic.GreaterThanOrEqual("fields.datetime", start) case contactql.OpLessThan: - query = esq.LessThan("fields.datetime", start) + query = elastic.LessThan("fields.datetime", start) case contactql.OpLessThanOrEqual: - query = esq.LessThan("fields.datetime", end) + query = elastic.LessThan("fields.datetime", end) default: panic(fmt.Sprintf("unsupported datetime field operator: %s", c.Operator())) } - return esq.Nested("fields", esq.All(fieldQuery, query)) + return elastic.Nested("fields", elastic.All(fieldQuery, query)) } else if fieldType == assets.FieldTypeState || fieldType == assets.FieldTypeDistrict || fieldType == assets.FieldTypeWard { value := strings.ToLower(c.Value()) @@ -161,11 +161,11 @@ func fieldCondition(env envs.Environment, resolver contactql.Resolver, c *contac switch c.Operator() { case contactql.OpEqual: - return esq.Nested("fields", esq.All(fieldQuery, esq.Term(name, value))) + return elastic.Nested("fields", elastic.All(fieldQuery, elastic.Term(name, value))) case contactql.OpNotEqual: - return esq.Not( - esq.Nested("fields", - esq.All(esq.Term(name, value), esq.Exists(name)), + return elastic.Not( + elastic.Nested("fields", + elastic.All(elastic.Term(name, value), elastic.Exists(name)), ), ) default: @@ -184,10 +184,10 @@ func attributeCondition(env envs.Environment, resolver contactql.Resolver, mappe if (c.Operator() == contactql.OpEqual || c.Operator() == contactql.OpNotEqual) && value == "" && (key == contactql.AttributeName || key == contactql.AttributeLanguage) { - query := esq.All(esq.Exists(key), esq.Not(esq.Term(fmt.Sprintf("%s.keyword", key), ""))) + query := elastic.All(elastic.Exists(key), elastic.Not(elastic.Term(fmt.Sprintf("%s.keyword", key), ""))) if c.Operator() == contactql.OpEqual { - query = esq.Not(query) + query = elastic.Not(query) } return query @@ -199,20 +199,20 @@ func attributeCondition(env envs.Environment, resolver contactql.Resolver, mappe case contactql.AttributeID: switch c.Operator() { case contactql.OpEqual: - return esq.Ids(value) + return elastic.Ids(value) case contactql.OpNotEqual: - return esq.Not(esq.Ids(value)) + return elastic.Not(elastic.Ids(value)) default: panic(fmt.Sprintf("unsupported ID attribute operator: %s", c.Operator())) } case contactql.AttributeName: switch c.Operator() { case contactql.OpEqual: - return esq.Term("name.keyword", c.Value()) + return elastic.Term("name.keyword", c.Value()) case contactql.OpNotEqual: - return esq.Not(esq.Term("name.keyword", c.Value())) + return elastic.Not(elastic.Term("name.keyword", c.Value())) case contactql.OpContains: - return esq.Match("name", value) + return elastic.Match("name", value) default: panic(fmt.Sprintf("unsupported name attribute operator: %s", c.Operator())) } @@ -228,26 +228,26 @@ func attributeCondition(env envs.Environment, resolver contactql.Resolver, mappe switch c.Operator() { case contactql.OpEqual: - return esq.Between("created_on", start, end) + return elastic.Between("created_on", start, end) case contactql.OpNotEqual: - return esq.Not(esq.Between("created_on", start, end)) + return elastic.Not(elastic.Between("created_on", start, end)) case contactql.OpGreaterThan: - return esq.GreaterThanOrEqual("created_on", end) + return elastic.GreaterThanOrEqual("created_on", end) case contactql.OpGreaterThanOrEqual: - return esq.GreaterThanOrEqual("created_on", start) + return elastic.GreaterThanOrEqual("created_on", start) case contactql.OpLessThan: - return esq.LessThan("created_on", start) + return elastic.LessThan("created_on", start) case contactql.OpLessThanOrEqual: - return esq.LessThan("created_on", end) + return elastic.LessThan("created_on", end) default: panic(fmt.Sprintf("unsupported created_on attribute operator: %s", c.Operator())) } case contactql.AttributeLastSeenOn: // special case for set/unset if (c.Operator() == contactql.OpEqual || c.Operator() == contactql.OpNotEqual) && value == "" { - query := esq.Exists("last_seen_on") + query := elastic.Exists("last_seen_on") if c.Operator() == contactql.OpEqual { - query = esq.Not(query) + query = elastic.Not(query) } return query } @@ -257,17 +257,17 @@ func attributeCondition(env envs.Environment, resolver contactql.Resolver, mappe switch c.Operator() { case contactql.OpEqual: - return esq.Between("last_seen_on", start, end) + return elastic.Between("last_seen_on", start, end) case contactql.OpNotEqual: - return esq.Not(esq.Between("last_seen_on", start, end)) + return elastic.Not(elastic.Between("last_seen_on", start, end)) case contactql.OpGreaterThan: - return esq.GreaterThanOrEqual("last_seen_on", end) + return elastic.GreaterThanOrEqual("last_seen_on", end) case contactql.OpGreaterThanOrEqual: - return esq.GreaterThanOrEqual("last_seen_on", start) + return elastic.GreaterThanOrEqual("last_seen_on", start) case contactql.OpLessThan: - return esq.LessThan("last_seen_on", start) + return elastic.LessThan("last_seen_on", start) case contactql.OpLessThanOrEqual: - return esq.LessThan("last_seen_on", end) + return elastic.LessThan("last_seen_on", end) default: panic(fmt.Sprintf("unsupported last_seen_on attribute operator: %s", c.Operator())) } @@ -276,29 +276,29 @@ func attributeCondition(env envs.Environment, resolver contactql.Resolver, mappe // special case for set/unset if (c.Operator() == contactql.OpEqual || c.Operator() == contactql.OpNotEqual) && value == "" { - query := esq.Nested("urns", esq.Exists("urns.path")) + query := elastic.Nested("urns", elastic.Exists("urns.path")) if c.Operator() == contactql.OpEqual { - query = esq.Not(query) + query = elastic.Not(query) } return query } switch c.Operator() { case contactql.OpEqual: - return esq.Nested("urns", esq.Term("urns.path.keyword", value)) + return elastic.Nested("urns", elastic.Term("urns.path.keyword", value)) case contactql.OpNotEqual: - return esq.Not(esq.Nested("urns", esq.Term("urns.path.keyword", value))) + return elastic.Not(elastic.Nested("urns", elastic.Term("urns.path.keyword", value))) case contactql.OpContains: - return esq.Nested("urns", esq.MatchPhrase("urns.path", value)) + return elastic.Nested("urns", elastic.MatchPhrase("urns.path", value)) default: panic(fmt.Sprintf("unsupported URN attribute operator: %s", c.Operator())) } case contactql.AttributeGroup: // special case for set/unset if (c.Operator() == contactql.OpEqual || c.Operator() == contactql.OpNotEqual) && value == "" { - query := esq.Exists("group_ids") + query := elastic.Exists("group_ids") if c.Operator() == contactql.OpEqual { - query = esq.Not(query) + query = elastic.Not(query) } return query } @@ -307,9 +307,9 @@ func attributeCondition(env envs.Environment, resolver contactql.Resolver, mappe switch c.Operator() { case contactql.OpEqual: - return esq.Term("group_ids", mapper.Group(group)) + return elastic.Term("group_ids", mapper.Group(group)) case contactql.OpNotEqual: - return esq.Not(esq.Term("group_ids", mapper.Group(group))) + return elastic.Not(elastic.Term("group_ids", mapper.Group(group))) default: panic(fmt.Sprintf("unsupported group attribute operator: %s", c.Operator())) } @@ -321,9 +321,9 @@ func attributeCondition(env envs.Environment, resolver contactql.Resolver, mappe // special case for set/unset if (c.Operator() == contactql.OpEqual || c.Operator() == contactql.OpNotEqual) && value == "" { - query := esq.Exists(fieldName) + query := elastic.Exists(fieldName) if c.Operator() == contactql.OpEqual { - query = esq.Not(query) + query = elastic.Not(query) } return query } @@ -332,9 +332,9 @@ func attributeCondition(env envs.Environment, resolver contactql.Resolver, mappe switch c.Operator() { case contactql.OpEqual: - return esq.Term(fieldName, mapper.Flow(flow)) + return elastic.Term(fieldName, mapper.Flow(flow)) case contactql.OpNotEqual: - return esq.Not(esq.Term(fieldName, mapper.Flow(flow))) + return elastic.Not(elastic.Term(fieldName, mapper.Flow(flow))) default: panic(fmt.Sprintf("unsupported flow attribute operator: %s", c.Operator())) } @@ -351,20 +351,20 @@ func schemeCondition(c *contactql.Condition) map[string]any { // special case for set/unset if (c.Operator() == contactql.OpEqual || c.Operator() == contactql.OpNotEqual) && value == "" { - query := esq.Nested("urns", esq.All(esq.Term("urns.scheme", key), esq.Exists("urns.path"))) + query := elastic.Nested("urns", elastic.All(elastic.Term("urns.scheme", key), elastic.Exists("urns.path"))) if c.Operator() == contactql.OpEqual { - query = esq.Not(query) + query = elastic.Not(query) } return query } switch c.Operator() { case contactql.OpEqual: - return esq.Nested("urns", esq.All(esq.Term("urns.path.keyword", value), esq.Term("urns.scheme", key))) + return elastic.Nested("urns", elastic.All(elastic.Term("urns.path.keyword", value), elastic.Term("urns.scheme", key))) case contactql.OpNotEqual: - return esq.Not(esq.Nested("urns", esq.All(esq.Term("urns.path.keyword", value), esq.Term("urns.scheme", key)))) + return elastic.Not(elastic.Nested("urns", elastic.All(elastic.Term("urns.path.keyword", value), elastic.Term("urns.scheme", key)))) case contactql.OpContains: - return esq.Nested("urns", esq.All(esq.MatchPhrase("urns.path", value), esq.Term("urns.scheme", key))) + return elastic.Nested("urns", elastic.All(elastic.MatchPhrase("urns.path", value), elastic.Term("urns.scheme", key))) default: panic(fmt.Sprintf("unsupported scheme operator: %s", c.Operator())) } @@ -375,9 +375,9 @@ func textAttributeQuery(c *contactql.Condition, name string, tx func(string) str switch c.Operator() { case contactql.OpEqual: - return esq.Term(name, value) + return elastic.Term(name, value) case contactql.OpNotEqual: - return esq.Not(esq.Term(name, value)) + return elastic.Not(elastic.Term(name, value)) default: panic(fmt.Sprintf("unsupported %s attribute operator: %s", name, c.Operator())) } @@ -388,17 +388,17 @@ func numericalAttributeQuery(c *contactql.Condition, name string) map[string]any switch c.Operator() { case contactql.OpEqual: - return esq.Match(name, value) + return elastic.Match(name, value) case contactql.OpNotEqual: - return esq.Not(esq.Match(name, value)) + return elastic.Not(elastic.Match(name, value)) case contactql.OpGreaterThan: - return esq.GreaterThan(name, value) + return elastic.GreaterThan(name, value) case contactql.OpGreaterThanOrEqual: - return esq.GreaterThanOrEqual(name, value) + return elastic.GreaterThanOrEqual(name, value) case contactql.OpLessThan: - return esq.LessThan(name, value) + return elastic.LessThan(name, value) case contactql.OpLessThanOrEqual: - return esq.LessThanOrEqual(name, value) + return elastic.LessThanOrEqual(name, value) default: panic(fmt.Sprintf("unsupported %s attribute operator: %s", name, c.Operator())) } diff --git a/contactql/es/sort.go b/contactql/es/sort.go index 31b42b6fa..dbd881a52 100644 --- a/contactql/es/sort.go +++ b/contactql/es/sort.go @@ -6,7 +6,7 @@ import ( "github.com/nyaruka/goflow/assets" "github.com/nyaruka/goflow/contactql" - esq "github.com/nyaruka/goflow/utils/elastic" + "github.com/nyaruka/goflow/utils/elastic" "github.com/pkg/errors" ) @@ -14,7 +14,7 @@ import ( func ToElasticFieldSort(sortBy string, resolver contactql.Resolver) (map[string]any, error) { // default to most recent first by id if sortBy == "" { - return esq.Sort("id", false), nil + return elastic.Sort("id", false), nil } // figure out if we are ascending or descending (default is ascending, can be changed with leading -) @@ -29,12 +29,12 @@ func ToElasticFieldSort(sortBy string, resolver contactql.Resolver) (map[string] // name needs to be sorted by keyword field if property == contactql.AttributeName { - return esq.Sort("name.keyword", ascending), nil + return elastic.Sort("name.keyword", ascending), nil } // other attributes are straight sorts if property == contactql.AttributeID || property == contactql.AttributeCreatedOn || property == contactql.AttributeLastSeenOn || property == contactql.AttributeLanguage { - return esq.Sort(property, ascending), nil + return elastic.Sort(property, ascending), nil } // we are sorting by a custom field @@ -51,5 +51,5 @@ func ToElasticFieldSort(sortBy string, resolver contactql.Resolver) (map[string] key = fmt.Sprintf("fields.%s", field.Type()) } - return esq.SortNested(key, esq.Term("fields.field", field.UUID()), "fields", ascending), nil + return elastic.SortNested(key, elastic.Term("fields.field", field.UUID()), "fields", ascending), nil } diff --git a/utils/elastic/query.go b/utils/elastic/query.go index 638c5702a..f72d94d2d 100644 --- a/utils/elastic/query.go +++ b/utils/elastic/query.go @@ -1,4 +1,4 @@ -package esq +package elastic // Any is a shortcut for a bool query with a should clause func Any(queries ...map[string]any) map[string]any { @@ -15,30 +15,37 @@ func Not(query map[string]any) map[string]any { return map[string]any{"bool": map[string]any{"must_not": query}} } +// Not is a shortcut for an ids query func Ids(values ...string) map[string]any { return map[string]any{"ids": map[string]any{"values": values}} } +// Term is a shortcut for a term query func Term(field string, value any) map[string]any { return map[string]any{"term": map[string]any{field: value}} } +// Exists is a shortcut for an exists query func Exists(field string) map[string]any { return map[string]any{"exists": map[string]any{"field": field}} } +// Nested is a shortcut for a nested query func Nested(path string, query map[string]any) map[string]any { return map[string]any{"nested": map[string]any{"path": path, "query": query}} } +// Match is a shortcut for a match query func Match(field string, value any) map[string]any { return map[string]any{"match": map[string]any{field: map[string]any{"query": value}}} } +// MatchPhrase is a shortcut for a match_phrase query func MatchPhrase(field, value string) map[string]any { return map[string]any{"match_phrase": map[string]any{field: map[string]any{"query": value}}} } +// GreaterThan is a shortcut for a range query where x > value func GreaterThan(field string, value any) map[string]any { return map[string]any{"range": map[string]any{field: map[string]any{ "from": value, @@ -48,6 +55,7 @@ func GreaterThan(field string, value any) map[string]any { }}} } +// GreaterThanOrEqual is a shortcut for a range query where x >= value func GreaterThanOrEqual(field string, value any) map[string]any { return map[string]any{"range": map[string]any{field: map[string]any{ "from": value, @@ -57,6 +65,7 @@ func GreaterThanOrEqual(field string, value any) map[string]any { }}} } +// LessThan is a shortcut for a range query where x < value func LessThan(field string, value any) map[string]any { return map[string]any{"range": map[string]any{field: map[string]any{ "from": nil, @@ -66,6 +75,7 @@ func LessThan(field string, value any) map[string]any { }}} } +// LessThanOrEqual is a shortcut for a range query where x <= value func LessThanOrEqual(field string, value any) map[string]any { return map[string]any{"range": map[string]any{field: map[string]any{ "from": nil, @@ -75,6 +85,7 @@ func LessThanOrEqual(field string, value any) map[string]any { }}} } +// Between is a shortcut for a range query where from <= x < to func Between(field string, from, to any) map[string]any { return map[string]any{"range": map[string]any{field: map[string]any{ "from": from, diff --git a/utils/elastic/sort.go b/utils/elastic/sort.go index 7bf4f8ff9..8d40581eb 100644 --- a/utils/elastic/sort.go +++ b/utils/elastic/sort.go @@ -1,9 +1,11 @@ -package esq +package elastic +// Sort is a shortcut for a simple field sort func Sort(field string, ascending bool) map[string]any { return map[string]any{field: map[string]any{"order": order(ascending)}} } +// SortNested is a shortcut for a nested field sort func SortNested(field string, filter map[string]any, path string, ascending bool) map[string]any { return map[string]any{field: map[string]any{ "nested": map[string]any{"filter": filter, "path": path}, From 0c8234fc3f248c80027906a3c5639ac13f5c8622 Mon Sep 17 00:00:00 2001 From: Rowan Seymour Date: Fri, 17 May 2024 15:56:30 -0500 Subject: [PATCH 4/4] Add Query and Sort map types --- contactql/es/query.go | 24 ++++++++--------- contactql/es/sort.go | 8 +++--- utils/elastic/query.go | 58 ++++++++++++++++++++++-------------------- utils/elastic/sort.go | 12 +++++---- 4 files changed, 53 insertions(+), 49 deletions(-) diff --git a/contactql/es/query.go b/contactql/es/query.go index a5d4d1456..d8ad36beb 100644 --- a/contactql/es/query.go +++ b/contactql/es/query.go @@ -26,7 +26,7 @@ var contactStatusCodes = map[string]string{ } // ToElasticQuery converts a contactql query to an Elastic query -func ToElasticQuery(env envs.Environment, mapper AssetMapper, query *contactql.ContactQuery) map[string]any { +func ToElasticQuery(env envs.Environment, mapper AssetMapper, query *contactql.ContactQuery) elastic.Query { if query.Resolver() == nil { panic("can only convert queries parsed with a resolver") } @@ -34,7 +34,7 @@ func ToElasticQuery(env envs.Environment, mapper AssetMapper, query *contactql.C return nodeToElastic(env, query.Resolver(), mapper, query.Root()) } -func nodeToElastic(env envs.Environment, resolver contactql.Resolver, mapper AssetMapper, node contactql.QueryNode) map[string]any { +func nodeToElastic(env envs.Environment, resolver contactql.Resolver, mapper AssetMapper, node contactql.QueryNode) elastic.Query { switch n := node.(type) { case *contactql.BoolCombination: return boolCombination(env, resolver, mapper, n) @@ -45,8 +45,8 @@ func nodeToElastic(env envs.Environment, resolver contactql.Resolver, mapper Ass } } -func boolCombination(env envs.Environment, resolver contactql.Resolver, mapper AssetMapper, combination *contactql.BoolCombination) map[string]any { - queries := make([]map[string]any, len(combination.Children())) +func boolCombination(env envs.Environment, resolver contactql.Resolver, mapper AssetMapper, combination *contactql.BoolCombination) elastic.Query { + queries := make([]elastic.Query, len(combination.Children())) for i, child := range combination.Children() { queries[i] = nodeToElastic(env, resolver, mapper, child) } @@ -58,7 +58,7 @@ func boolCombination(env envs.Environment, resolver contactql.Resolver, mapper A return elastic.Any(queries...) } -func condition(env envs.Environment, resolver contactql.Resolver, mapper AssetMapper, c *contactql.Condition) map[string]any { +func condition(env envs.Environment, resolver contactql.Resolver, mapper AssetMapper, c *contactql.Condition) elastic.Query { switch c.PropertyType() { case contactql.PropertyTypeField: return fieldCondition(env, resolver, c) @@ -71,7 +71,7 @@ func condition(env envs.Environment, resolver contactql.Resolver, mapper AssetMa } } -func fieldCondition(env envs.Environment, resolver contactql.Resolver, c *contactql.Condition) map[string]any { +func fieldCondition(env envs.Environment, resolver contactql.Resolver, c *contactql.Condition) elastic.Query { field := resolver.ResolveField(c.PropertyKey()) fieldType := field.Type() fieldQuery := elastic.Term("fields.field", field.UUID()) @@ -102,7 +102,7 @@ func fieldCondition(env envs.Environment, resolver contactql.Resolver, c *contac } else if fieldType == assets.FieldTypeNumber { value, _ := c.ValueAsNumber() - var query map[string]any + var query elastic.Query switch c.Operator() { case contactql.OpEqual: @@ -130,7 +130,7 @@ func fieldCondition(env envs.Environment, resolver contactql.Resolver, c *contac } else if fieldType == assets.FieldTypeDatetime { value, _ := c.ValueAsDate(env) start, end := dates.DayToUTCRange(value, value.Location()) - var query map[string]any + var query elastic.Query switch c.Operator() { case contactql.OpEqual: @@ -176,7 +176,7 @@ func fieldCondition(env envs.Environment, resolver contactql.Resolver, c *contac panic(fmt.Sprintf("unsupported field type: %s", fieldType)) } -func attributeCondition(env envs.Environment, resolver contactql.Resolver, mapper AssetMapper, c *contactql.Condition) map[string]any { +func attributeCondition(env envs.Environment, resolver contactql.Resolver, mapper AssetMapper, c *contactql.Condition) elastic.Query { key := c.PropertyKey() value := strings.ToLower(c.Value()) @@ -345,7 +345,7 @@ func attributeCondition(env envs.Environment, resolver contactql.Resolver, mappe } } -func schemeCondition(c *contactql.Condition) map[string]any { +func schemeCondition(c *contactql.Condition) elastic.Query { key := c.PropertyKey() value := strings.ToLower(c.Value()) @@ -370,7 +370,7 @@ func schemeCondition(c *contactql.Condition) map[string]any { } } -func textAttributeQuery(c *contactql.Condition, name string, tx func(string) string) map[string]any { +func textAttributeQuery(c *contactql.Condition, name string, tx func(string) string) elastic.Query { value := tx(c.Value()) switch c.Operator() { @@ -383,7 +383,7 @@ func textAttributeQuery(c *contactql.Condition, name string, tx func(string) str } } -func numericalAttributeQuery(c *contactql.Condition, name string) map[string]any { +func numericalAttributeQuery(c *contactql.Condition, name string) elastic.Query { value, _ := c.ValueAsNumber() switch c.Operator() { diff --git a/contactql/es/sort.go b/contactql/es/sort.go index dbd881a52..02ecaa544 100644 --- a/contactql/es/sort.go +++ b/contactql/es/sort.go @@ -11,10 +11,10 @@ import ( ) // ToElasticFieldSort returns the elastic FieldSort for the passed in sort by string -func ToElasticFieldSort(sortBy string, resolver contactql.Resolver) (map[string]any, error) { +func ToElasticFieldSort(sortBy string, resolver contactql.Resolver) (elastic.Sort, error) { // default to most recent first by id if sortBy == "" { - return elastic.Sort("id", false), nil + return elastic.SortBy("id", false), nil } // figure out if we are ascending or descending (default is ascending, can be changed with leading -) @@ -29,12 +29,12 @@ func ToElasticFieldSort(sortBy string, resolver contactql.Resolver) (map[string] // name needs to be sorted by keyword field if property == contactql.AttributeName { - return elastic.Sort("name.keyword", ascending), nil + return elastic.SortBy("name.keyword", ascending), nil } // other attributes are straight sorts if property == contactql.AttributeID || property == contactql.AttributeCreatedOn || property == contactql.AttributeLastSeenOn || property == contactql.AttributeLanguage { - return elastic.Sort(property, ascending), nil + return elastic.SortBy(property, ascending), nil } // we are sorting by a custom field diff --git a/utils/elastic/query.go b/utils/elastic/query.go index f72d94d2d..dca9048c3 100644 --- a/utils/elastic/query.go +++ b/utils/elastic/query.go @@ -1,53 +1,55 @@ package elastic +type Query map[string]any + // Any is a shortcut for a bool query with a should clause -func Any(queries ...map[string]any) map[string]any { - return map[string]any{"bool": map[string]any{"should": queries}} +func Any(queries ...Query) Query { + return Query{"bool": Query{"should": queries}} } // All is a shortcut for a bool query with a must clause -func All(queries ...map[string]any) map[string]any { - return map[string]any{"bool": map[string]any{"must": queries}} +func All(queries ...Query) Query { + return Query{"bool": Query{"must": queries}} } // Not is a shortcut for a bool query with a must_not clause -func Not(query map[string]any) map[string]any { - return map[string]any{"bool": map[string]any{"must_not": query}} +func Not(query Query) Query { + return Query{"bool": Query{"must_not": query}} } // Not is a shortcut for an ids query -func Ids(values ...string) map[string]any { - return map[string]any{"ids": map[string]any{"values": values}} +func Ids(values ...string) Query { + return Query{"ids": Query{"values": values}} } // Term is a shortcut for a term query -func Term(field string, value any) map[string]any { - return map[string]any{"term": map[string]any{field: value}} +func Term(field string, value any) Query { + return Query{"term": Query{field: value}} } // Exists is a shortcut for an exists query -func Exists(field string) map[string]any { - return map[string]any{"exists": map[string]any{"field": field}} +func Exists(field string) Query { + return Query{"exists": Query{"field": field}} } // Nested is a shortcut for a nested query -func Nested(path string, query map[string]any) map[string]any { - return map[string]any{"nested": map[string]any{"path": path, "query": query}} +func Nested(path string, query Query) Query { + return Query{"nested": Query{"path": path, "query": query}} } // Match is a shortcut for a match query -func Match(field string, value any) map[string]any { - return map[string]any{"match": map[string]any{field: map[string]any{"query": value}}} +func Match(field string, value any) Query { + return Query{"match": Query{field: Query{"query": value}}} } // MatchPhrase is a shortcut for a match_phrase query -func MatchPhrase(field, value string) map[string]any { - return map[string]any{"match_phrase": map[string]any{field: map[string]any{"query": value}}} +func MatchPhrase(field, value string) Query { + return Query{"match_phrase": Query{field: Query{"query": value}}} } // GreaterThan is a shortcut for a range query where x > value -func GreaterThan(field string, value any) map[string]any { - return map[string]any{"range": map[string]any{field: map[string]any{ +func GreaterThan(field string, value any) Query { + return Query{"range": Query{field: Query{ "from": value, "include_lower": false, "include_upper": true, @@ -56,8 +58,8 @@ func GreaterThan(field string, value any) map[string]any { } // GreaterThanOrEqual is a shortcut for a range query where x >= value -func GreaterThanOrEqual(field string, value any) map[string]any { - return map[string]any{"range": map[string]any{field: map[string]any{ +func GreaterThanOrEqual(field string, value any) Query { + return Query{"range": Query{field: Query{ "from": value, "include_lower": true, "include_upper": true, @@ -66,8 +68,8 @@ func GreaterThanOrEqual(field string, value any) map[string]any { } // LessThan is a shortcut for a range query where x < value -func LessThan(field string, value any) map[string]any { - return map[string]any{"range": map[string]any{field: map[string]any{ +func LessThan(field string, value any) Query { + return Query{"range": Query{field: Query{ "from": nil, "include_lower": true, "include_upper": false, @@ -76,8 +78,8 @@ func LessThan(field string, value any) map[string]any { } // LessThanOrEqual is a shortcut for a range query where x <= value -func LessThanOrEqual(field string, value any) map[string]any { - return map[string]any{"range": map[string]any{field: map[string]any{ +func LessThanOrEqual(field string, value any) Query { + return Query{"range": Query{field: Query{ "from": nil, "include_lower": true, "include_upper": true, @@ -86,8 +88,8 @@ func LessThanOrEqual(field string, value any) map[string]any { } // Between is a shortcut for a range query where from <= x < to -func Between(field string, from, to any) map[string]any { - return map[string]any{"range": map[string]any{field: map[string]any{ +func Between(field string, from, to any) Query { + return Query{"range": Query{field: Query{ "from": from, "include_lower": true, "include_upper": false, diff --git a/utils/elastic/sort.go b/utils/elastic/sort.go index 8d40581eb..c4c04f225 100644 --- a/utils/elastic/sort.go +++ b/utils/elastic/sort.go @@ -1,13 +1,15 @@ package elastic -// Sort is a shortcut for a simple field sort -func Sort(field string, ascending bool) map[string]any { - return map[string]any{field: map[string]any{"order": order(ascending)}} +type Sort map[string]any + +// SortBy is a shortcut for a simple field sort +func SortBy(field string, ascending bool) Sort { + return Sort{field: map[string]any{"order": order(ascending)}} } // SortNested is a shortcut for a nested field sort -func SortNested(field string, filter map[string]any, path string, ascending bool) map[string]any { - return map[string]any{field: map[string]any{ +func SortNested(field string, filter Query, path string, ascending bool) Sort { + return Sort{field: map[string]any{ "nested": map[string]any{"filter": filter, "path": path}, "order": order(ascending), }}