diff --git a/contactql/es/query.go b/contactql/es/query.go index dcc6c280d..a5d4d1456 100644 --- a/contactql/es/query.go +++ b/contactql/es/query.go @@ -8,7 +8,7 @@ import ( "github.com/nyaruka/goflow/assets" "github.com/nyaruka/goflow/contactql" "github.com/nyaruka/goflow/envs" - esq "github.com/nyaruka/goflow/utils/elastic" + "github.com/nyaruka/goflow/utils/elastic" ) // AssetMapper is used to map engine assets to however ES identifies them @@ -52,10 +52,10 @@ func boolCombination(env envs.Environment, resolver contactql.Resolver, mapper A } if combination.Operator() == contactql.BoolOperatorAnd { - return esq.All(queries...) + return elastic.All(queries...) } - return esq.Any(queries...) + return elastic.Any(queries...) } func condition(env envs.Environment, resolver contactql.Resolver, mapper AssetMapper, c *contactql.Condition) map[string]any { @@ -74,15 +74,15 @@ func condition(env envs.Environment, resolver contactql.Resolver, mapper AssetMa func fieldCondition(env envs.Environment, resolver contactql.Resolver, c *contactql.Condition) map[string]any { field := resolver.ResolveField(c.PropertyKey()) fieldType := field.Type() - fieldQuery := esq.Term("fields.field", field.UUID()) + fieldQuery := elastic.Term("fields.field", field.UUID()) // special cases for set/unset if (c.Operator() == contactql.OpEqual || c.Operator() == contactql.OpNotEqual) && c.Value() == "" { - query := esq.Nested("fields", esq.All(fieldQuery, esq.Exists("fields."+string(fieldType)))) + query := elastic.Nested("fields", elastic.All(fieldQuery, elastic.Exists("fields."+string(fieldType)))) // if we are looking for unset, inverse our query if c.Operator() == contactql.OpEqual { - query = esq.Not(query) + query = elastic.Not(query) } return query } @@ -92,10 +92,10 @@ func fieldCondition(env envs.Environment, resolver contactql.Resolver, c *contac switch c.Operator() { case contactql.OpEqual: - return esq.Nested("fields", esq.All(fieldQuery, esq.Term("fields.text", value))) + return elastic.Nested("fields", elastic.All(fieldQuery, elastic.Term("fields.text", value))) case contactql.OpNotEqual: - query := esq.All(fieldQuery, esq.Term("fields.text", value), esq.Exists("fields.text")) - return esq.Not(esq.Nested("fields", query)) + query := elastic.All(fieldQuery, elastic.Term("fields.text", value), elastic.Exists("fields.text")) + return elastic.Not(elastic.Nested("fields", query)) default: panic(fmt.Sprintf("unsupported text field operator: %s", c.Operator())) } @@ -106,26 +106,26 @@ func fieldCondition(env envs.Environment, resolver contactql.Resolver, c *contac switch c.Operator() { case contactql.OpEqual: - query = esq.Match("fields.number", value) + query = elastic.Match("fields.number", value) case contactql.OpNotEqual: - return esq.Not( - esq.Nested("fields", - esq.All(fieldQuery, esq.Match("fields.number", value)), + return elastic.Not( + elastic.Nested("fields", + elastic.All(fieldQuery, elastic.Match("fields.number", value)), ), ) case contactql.OpGreaterThan: - query = esq.GreaterThan("fields.number", value) + query = elastic.GreaterThan("fields.number", value) case contactql.OpGreaterThanOrEqual: - query = esq.GreaterThanOrEqual("fields.number", value) + query = elastic.GreaterThanOrEqual("fields.number", value) case contactql.OpLessThan: - query = esq.LessThan("fields.number", value) + query = elastic.LessThan("fields.number", value) case contactql.OpLessThanOrEqual: - query = esq.LessThanOrEqual("fields.number", value) + query = elastic.LessThanOrEqual("fields.number", value) default: panic(fmt.Sprintf("unsupported number field operator: %s", c.Operator())) } - return esq.Nested("fields", esq.All(fieldQuery, query)) + return elastic.Nested("fields", elastic.All(fieldQuery, query)) } else if fieldType == assets.FieldTypeDatetime { value, _ := c.ValueAsDate(env) @@ -134,26 +134,26 @@ func fieldCondition(env envs.Environment, resolver contactql.Resolver, c *contac switch c.Operator() { case contactql.OpEqual: - query = esq.Between("fields.datetime", start, end) + query = elastic.Between("fields.datetime", start, end) case contactql.OpNotEqual: - return esq.Not( - esq.Nested("fields", - esq.All(fieldQuery, esq.Between("fields.datetime", start, end)), + return elastic.Not( + elastic.Nested("fields", + elastic.All(fieldQuery, elastic.Between("fields.datetime", start, end)), ), ) case contactql.OpGreaterThan: - query = esq.GreaterThanOrEqual("fields.datetime", end) + query = elastic.GreaterThanOrEqual("fields.datetime", end) case contactql.OpGreaterThanOrEqual: - query = esq.GreaterThanOrEqual("fields.datetime", start) + query = elastic.GreaterThanOrEqual("fields.datetime", start) case contactql.OpLessThan: - query = esq.LessThan("fields.datetime", start) + query = elastic.LessThan("fields.datetime", start) case contactql.OpLessThanOrEqual: - query = esq.LessThan("fields.datetime", end) + query = elastic.LessThan("fields.datetime", end) default: panic(fmt.Sprintf("unsupported datetime field operator: %s", c.Operator())) } - return esq.Nested("fields", esq.All(fieldQuery, query)) + return elastic.Nested("fields", elastic.All(fieldQuery, query)) } else if fieldType == assets.FieldTypeState || fieldType == assets.FieldTypeDistrict || fieldType == assets.FieldTypeWard { value := strings.ToLower(c.Value()) @@ -161,11 +161,11 @@ func fieldCondition(env envs.Environment, resolver contactql.Resolver, c *contac switch c.Operator() { case contactql.OpEqual: - return esq.Nested("fields", esq.All(fieldQuery, esq.Term(name, value))) + return elastic.Nested("fields", elastic.All(fieldQuery, elastic.Term(name, value))) case contactql.OpNotEqual: - return esq.Not( - esq.Nested("fields", - esq.All(esq.Term(name, value), esq.Exists(name)), + return elastic.Not( + elastic.Nested("fields", + elastic.All(elastic.Term(name, value), elastic.Exists(name)), ), ) default: @@ -184,10 +184,10 @@ func attributeCondition(env envs.Environment, resolver contactql.Resolver, mappe if (c.Operator() == contactql.OpEqual || c.Operator() == contactql.OpNotEqual) && value == "" && (key == contactql.AttributeName || key == contactql.AttributeLanguage) { - query := esq.All(esq.Exists(key), esq.Not(esq.Term(fmt.Sprintf("%s.keyword", key), ""))) + query := elastic.All(elastic.Exists(key), elastic.Not(elastic.Term(fmt.Sprintf("%s.keyword", key), ""))) if c.Operator() == contactql.OpEqual { - query = esq.Not(query) + query = elastic.Not(query) } return query @@ -199,20 +199,20 @@ func attributeCondition(env envs.Environment, resolver contactql.Resolver, mappe case contactql.AttributeID: switch c.Operator() { case contactql.OpEqual: - return esq.Ids(value) + return elastic.Ids(value) case contactql.OpNotEqual: - return esq.Not(esq.Ids(value)) + return elastic.Not(elastic.Ids(value)) default: panic(fmt.Sprintf("unsupported ID attribute operator: %s", c.Operator())) } case contactql.AttributeName: switch c.Operator() { case contactql.OpEqual: - return esq.Term("name.keyword", c.Value()) + return elastic.Term("name.keyword", c.Value()) case contactql.OpNotEqual: - return esq.Not(esq.Term("name.keyword", c.Value())) + return elastic.Not(elastic.Term("name.keyword", c.Value())) case contactql.OpContains: - return esq.Match("name", value) + return elastic.Match("name", value) default: panic(fmt.Sprintf("unsupported name attribute operator: %s", c.Operator())) } @@ -228,26 +228,26 @@ func attributeCondition(env envs.Environment, resolver contactql.Resolver, mappe switch c.Operator() { case contactql.OpEqual: - return esq.Between("created_on", start, end) + return elastic.Between("created_on", start, end) case contactql.OpNotEqual: - return esq.Not(esq.Between("created_on", start, end)) + return elastic.Not(elastic.Between("created_on", start, end)) case contactql.OpGreaterThan: - return esq.GreaterThanOrEqual("created_on", end) + return elastic.GreaterThanOrEqual("created_on", end) case contactql.OpGreaterThanOrEqual: - return esq.GreaterThanOrEqual("created_on", start) + return elastic.GreaterThanOrEqual("created_on", start) case contactql.OpLessThan: - return esq.LessThan("created_on", start) + return elastic.LessThan("created_on", start) case contactql.OpLessThanOrEqual: - return esq.LessThan("created_on", end) + return elastic.LessThan("created_on", end) default: panic(fmt.Sprintf("unsupported created_on attribute operator: %s", c.Operator())) } case contactql.AttributeLastSeenOn: // special case for set/unset if (c.Operator() == contactql.OpEqual || c.Operator() == contactql.OpNotEqual) && value == "" { - query := esq.Exists("last_seen_on") + query := elastic.Exists("last_seen_on") if c.Operator() == contactql.OpEqual { - query = esq.Not(query) + query = elastic.Not(query) } return query } @@ -257,17 +257,17 @@ func attributeCondition(env envs.Environment, resolver contactql.Resolver, mappe switch c.Operator() { case contactql.OpEqual: - return esq.Between("last_seen_on", start, end) + return elastic.Between("last_seen_on", start, end) case contactql.OpNotEqual: - return esq.Not(esq.Between("last_seen_on", start, end)) + return elastic.Not(elastic.Between("last_seen_on", start, end)) case contactql.OpGreaterThan: - return esq.GreaterThanOrEqual("last_seen_on", end) + return elastic.GreaterThanOrEqual("last_seen_on", end) case contactql.OpGreaterThanOrEqual: - return esq.GreaterThanOrEqual("last_seen_on", start) + return elastic.GreaterThanOrEqual("last_seen_on", start) case contactql.OpLessThan: - return esq.LessThan("last_seen_on", start) + return elastic.LessThan("last_seen_on", start) case contactql.OpLessThanOrEqual: - return esq.LessThan("last_seen_on", end) + return elastic.LessThan("last_seen_on", end) default: panic(fmt.Sprintf("unsupported last_seen_on attribute operator: %s", c.Operator())) } @@ -276,29 +276,29 @@ func attributeCondition(env envs.Environment, resolver contactql.Resolver, mappe // special case for set/unset if (c.Operator() == contactql.OpEqual || c.Operator() == contactql.OpNotEqual) && value == "" { - query := esq.Nested("urns", esq.Exists("urns.path")) + query := elastic.Nested("urns", elastic.Exists("urns.path")) if c.Operator() == contactql.OpEqual { - query = esq.Not(query) + query = elastic.Not(query) } return query } switch c.Operator() { case contactql.OpEqual: - return esq.Nested("urns", esq.Term("urns.path.keyword", value)) + return elastic.Nested("urns", elastic.Term("urns.path.keyword", value)) case contactql.OpNotEqual: - return esq.Not(esq.Nested("urns", esq.Term("urns.path.keyword", value))) + return elastic.Not(elastic.Nested("urns", elastic.Term("urns.path.keyword", value))) case contactql.OpContains: - return esq.Nested("urns", esq.MatchPhrase("urns.path", value)) + return elastic.Nested("urns", elastic.MatchPhrase("urns.path", value)) default: panic(fmt.Sprintf("unsupported URN attribute operator: %s", c.Operator())) } case contactql.AttributeGroup: // special case for set/unset if (c.Operator() == contactql.OpEqual || c.Operator() == contactql.OpNotEqual) && value == "" { - query := esq.Exists("group_ids") + query := elastic.Exists("group_ids") if c.Operator() == contactql.OpEqual { - query = esq.Not(query) + query = elastic.Not(query) } return query } @@ -307,9 +307,9 @@ func attributeCondition(env envs.Environment, resolver contactql.Resolver, mappe switch c.Operator() { case contactql.OpEqual: - return esq.Term("group_ids", mapper.Group(group)) + return elastic.Term("group_ids", mapper.Group(group)) case contactql.OpNotEqual: - return esq.Not(esq.Term("group_ids", mapper.Group(group))) + return elastic.Not(elastic.Term("group_ids", mapper.Group(group))) default: panic(fmt.Sprintf("unsupported group attribute operator: %s", c.Operator())) } @@ -321,9 +321,9 @@ func attributeCondition(env envs.Environment, resolver contactql.Resolver, mappe // special case for set/unset if (c.Operator() == contactql.OpEqual || c.Operator() == contactql.OpNotEqual) && value == "" { - query := esq.Exists(fieldName) + query := elastic.Exists(fieldName) if c.Operator() == contactql.OpEqual { - query = esq.Not(query) + query = elastic.Not(query) } return query } @@ -332,9 +332,9 @@ func attributeCondition(env envs.Environment, resolver contactql.Resolver, mappe switch c.Operator() { case contactql.OpEqual: - return esq.Term(fieldName, mapper.Flow(flow)) + return elastic.Term(fieldName, mapper.Flow(flow)) case contactql.OpNotEqual: - return esq.Not(esq.Term(fieldName, mapper.Flow(flow))) + return elastic.Not(elastic.Term(fieldName, mapper.Flow(flow))) default: panic(fmt.Sprintf("unsupported flow attribute operator: %s", c.Operator())) } @@ -351,20 +351,20 @@ func schemeCondition(c *contactql.Condition) map[string]any { // special case for set/unset if (c.Operator() == contactql.OpEqual || c.Operator() == contactql.OpNotEqual) && value == "" { - query := esq.Nested("urns", esq.All(esq.Term("urns.scheme", key), esq.Exists("urns.path"))) + query := elastic.Nested("urns", elastic.All(elastic.Term("urns.scheme", key), elastic.Exists("urns.path"))) if c.Operator() == contactql.OpEqual { - query = esq.Not(query) + query = elastic.Not(query) } return query } switch c.Operator() { case contactql.OpEqual: - return esq.Nested("urns", esq.All(esq.Term("urns.path.keyword", value), esq.Term("urns.scheme", key))) + return elastic.Nested("urns", elastic.All(elastic.Term("urns.path.keyword", value), elastic.Term("urns.scheme", key))) case contactql.OpNotEqual: - return esq.Not(esq.Nested("urns", esq.All(esq.Term("urns.path.keyword", value), esq.Term("urns.scheme", key)))) + return elastic.Not(elastic.Nested("urns", elastic.All(elastic.Term("urns.path.keyword", value), elastic.Term("urns.scheme", key)))) case contactql.OpContains: - return esq.Nested("urns", esq.All(esq.MatchPhrase("urns.path", value), esq.Term("urns.scheme", key))) + return elastic.Nested("urns", elastic.All(elastic.MatchPhrase("urns.path", value), elastic.Term("urns.scheme", key))) default: panic(fmt.Sprintf("unsupported scheme operator: %s", c.Operator())) } @@ -375,9 +375,9 @@ func textAttributeQuery(c *contactql.Condition, name string, tx func(string) str switch c.Operator() { case contactql.OpEqual: - return esq.Term(name, value) + return elastic.Term(name, value) case contactql.OpNotEqual: - return esq.Not(esq.Term(name, value)) + return elastic.Not(elastic.Term(name, value)) default: panic(fmt.Sprintf("unsupported %s attribute operator: %s", name, c.Operator())) } @@ -388,17 +388,17 @@ func numericalAttributeQuery(c *contactql.Condition, name string) map[string]any switch c.Operator() { case contactql.OpEqual: - return esq.Match(name, value) + return elastic.Match(name, value) case contactql.OpNotEqual: - return esq.Not(esq.Match(name, value)) + return elastic.Not(elastic.Match(name, value)) case contactql.OpGreaterThan: - return esq.GreaterThan(name, value) + return elastic.GreaterThan(name, value) case contactql.OpGreaterThanOrEqual: - return esq.GreaterThanOrEqual(name, value) + return elastic.GreaterThanOrEqual(name, value) case contactql.OpLessThan: - return esq.LessThan(name, value) + return elastic.LessThan(name, value) case contactql.OpLessThanOrEqual: - return esq.LessThanOrEqual(name, value) + return elastic.LessThanOrEqual(name, value) default: panic(fmt.Sprintf("unsupported %s attribute operator: %s", name, c.Operator())) } diff --git a/contactql/es/sort.go b/contactql/es/sort.go index 31b42b6fa..dbd881a52 100644 --- a/contactql/es/sort.go +++ b/contactql/es/sort.go @@ -6,7 +6,7 @@ import ( "github.com/nyaruka/goflow/assets" "github.com/nyaruka/goflow/contactql" - esq "github.com/nyaruka/goflow/utils/elastic" + "github.com/nyaruka/goflow/utils/elastic" "github.com/pkg/errors" ) @@ -14,7 +14,7 @@ import ( func ToElasticFieldSort(sortBy string, resolver contactql.Resolver) (map[string]any, error) { // default to most recent first by id if sortBy == "" { - return esq.Sort("id", false), nil + return elastic.Sort("id", false), nil } // figure out if we are ascending or descending (default is ascending, can be changed with leading -) @@ -29,12 +29,12 @@ func ToElasticFieldSort(sortBy string, resolver contactql.Resolver) (map[string] // name needs to be sorted by keyword field if property == contactql.AttributeName { - return esq.Sort("name.keyword", ascending), nil + return elastic.Sort("name.keyword", ascending), nil } // other attributes are straight sorts if property == contactql.AttributeID || property == contactql.AttributeCreatedOn || property == contactql.AttributeLastSeenOn || property == contactql.AttributeLanguage { - return esq.Sort(property, ascending), nil + return elastic.Sort(property, ascending), nil } // we are sorting by a custom field @@ -51,5 +51,5 @@ func ToElasticFieldSort(sortBy string, resolver contactql.Resolver) (map[string] key = fmt.Sprintf("fields.%s", field.Type()) } - return esq.SortNested(key, esq.Term("fields.field", field.UUID()), "fields", ascending), nil + return elastic.SortNested(key, elastic.Term("fields.field", field.UUID()), "fields", ascending), nil } diff --git a/utils/elastic/query.go b/utils/elastic/query.go index 638c5702a..f72d94d2d 100644 --- a/utils/elastic/query.go +++ b/utils/elastic/query.go @@ -1,4 +1,4 @@ -package esq +package elastic // Any is a shortcut for a bool query with a should clause func Any(queries ...map[string]any) map[string]any { @@ -15,30 +15,37 @@ func Not(query map[string]any) map[string]any { return map[string]any{"bool": map[string]any{"must_not": query}} } +// Not is a shortcut for an ids query func Ids(values ...string) map[string]any { return map[string]any{"ids": map[string]any{"values": values}} } +// Term is a shortcut for a term query func Term(field string, value any) map[string]any { return map[string]any{"term": map[string]any{field: value}} } +// Exists is a shortcut for an exists query func Exists(field string) map[string]any { return map[string]any{"exists": map[string]any{"field": field}} } +// Nested is a shortcut for a nested query func Nested(path string, query map[string]any) map[string]any { return map[string]any{"nested": map[string]any{"path": path, "query": query}} } +// Match is a shortcut for a match query func Match(field string, value any) map[string]any { return map[string]any{"match": map[string]any{field: map[string]any{"query": value}}} } +// MatchPhrase is a shortcut for a match_phrase query func MatchPhrase(field, value string) map[string]any { return map[string]any{"match_phrase": map[string]any{field: map[string]any{"query": value}}} } +// GreaterThan is a shortcut for a range query where x > value func GreaterThan(field string, value any) map[string]any { return map[string]any{"range": map[string]any{field: map[string]any{ "from": value, @@ -48,6 +55,7 @@ func GreaterThan(field string, value any) map[string]any { }}} } +// GreaterThanOrEqual is a shortcut for a range query where x >= value func GreaterThanOrEqual(field string, value any) map[string]any { return map[string]any{"range": map[string]any{field: map[string]any{ "from": value, @@ -57,6 +65,7 @@ func GreaterThanOrEqual(field string, value any) map[string]any { }}} } +// LessThan is a shortcut for a range query where x < value func LessThan(field string, value any) map[string]any { return map[string]any{"range": map[string]any{field: map[string]any{ "from": nil, @@ -66,6 +75,7 @@ func LessThan(field string, value any) map[string]any { }}} } +// LessThanOrEqual is a shortcut for a range query where x <= value func LessThanOrEqual(field string, value any) map[string]any { return map[string]any{"range": map[string]any{field: map[string]any{ "from": nil, @@ -75,6 +85,7 @@ func LessThanOrEqual(field string, value any) map[string]any { }}} } +// Between is a shortcut for a range query where from <= x < to func Between(field string, from, to any) map[string]any { return map[string]any{"range": map[string]any{field: map[string]any{ "from": from, diff --git a/utils/elastic/sort.go b/utils/elastic/sort.go index 7bf4f8ff9..8d40581eb 100644 --- a/utils/elastic/sort.go +++ b/utils/elastic/sort.go @@ -1,9 +1,11 @@ -package esq +package elastic +// Sort is a shortcut for a simple field sort func Sort(field string, ascending bool) map[string]any { return map[string]any{field: map[string]any{"order": order(ascending)}} } +// SortNested is a shortcut for a nested field sort func SortNested(field string, filter map[string]any, path string, ascending bool) map[string]any { return map[string]any{field: map[string]any{ "nested": map[string]any{"filter": filter, "path": path},