From 9ac223fd978da4b9abab4cef4eee0999158af2dd Mon Sep 17 00:00:00 2001 From: Daniel Thorn Date: Mon, 30 Sep 2024 10:18:09 -0700 Subject: [PATCH] support elasticsearch 8 --- bin/test.sh | 12 +- docker-compose.yml | 4 +- requirements.in | 6 +- requirements.txt | 18 + socorro/external/es/connection_context.py | 33 +- socorro/external/es/crashstorage.py | 143 +++++--- socorro/external/es/query.py | 19 +- socorro/external/es/super_search_fields.py | 333 +++++++----------- socorro/external/es/supersearch.py | 117 ++++-- socorro/external/legacy_es/crashstorage.py | 98 ++++-- socorro/tests/conftest.py | 143 +++++++- .../tests/external/es/test_crashstorage.py | 54 ++- socorro/tests/external/es/test_query.py | 8 +- .../external/es/test_super_search_fields.py | 82 +---- socorro/tests/external/es/test_supersearch.py | 241 +++++++------ .../external/legacy_es/test_analyzers.py | 8 +- .../legacy_es/test_connection_context.py | 20 +- .../external/legacy_es/test_crashstorage.py | 93 +++-- .../tests/external/legacy_es/test_query.py | 12 +- .../legacy_es/test_super_search_fields.py | 4 +- .../external/legacy_es/test_supersearch.py | 210 +++++------ .../management/commands/updatesignatures.py | 6 +- .../crashstats/crashstats/tests/conftest.py | 6 +- .../crashstats/tests/test_updatemissing.py | 8 +- .../crashstats/tests/test_verifyprocessed.py | 18 +- .../crashstats/signature/tests/test_views.py | 56 +-- .../crashstats/supersearch/libsupersearch.py | 46 +-- webapp/crashstats/supersearch/models.py | 25 +- .../supersearch/tests/test_esclean.py | 19 +- .../supersearch/tests/test_forms.py | 6 +- .../supersearch/tests/test_libsupersearch.py | 37 -- .../supersearch/tests/test_views.py | 58 +-- .../topcrashers/tests/test_views.py | 48 +-- 33 files changed, 1097 insertions(+), 894 deletions(-) delete mode 100644 webapp/crashstats/supersearch/tests/test_libsupersearch.py diff --git a/bin/test.sh b/bin/test.sh index 5b900634f7..42e5255353 100755 --- a/bin/test.sh +++ b/bin/test.sh @@ -51,16 +51,14 @@ popd echo ">>> run tests" # Run socorro tests -ELASTICSEARCH_MODE=LEGACY_ONLY "${PYTEST}" -# override ELASTICSEARCH_URL to use legacy elasticsearch so we can test PREFER_NEW without -# implementing es8 support. Override will be removed when socorrro/external/es expects es8 -ELASTICSEARCH_MODE=PREFER_NEW ELASTICSEARCH_URL="${LEGACY_ELASTICSEARCH_URL}" "${PYTEST}" +# ignore tests because elasticsearch 8 tests cannot be run with LEGACY_ONLY +ELASTICSEARCH_MODE=LEGACY_ONLY "${PYTEST}" --ignore=socorro/tests/external/es/ +# ignore tests because elasticsearch 8 supersearch is not implemented yet +ELASTICSEARCH_MODE=PREFER_NEW "${PYTEST}" # Collect static and then run pytest in the webapp pushd webapp ${PYTHON} manage.py collectstatic --noinput ELASTICSEARCH_MODE=LEGACY_ONLY "${PYTEST}" -# override ELASTICSEARCH_URL to use legacy elasticsearch so we can test PREFER_NEW without -# implementing es8 support. Override will be removed when socorrro/external/es expects es8 -ELASTICSEARCH_MODE=PREFER_NEW ELASTICSEARCH_URL="${LEGACY_ELASTICSEARCH_URL}" "${PYTEST}" +ELASTICSEARCH_MODE=PREFER_NEW "${PYTEST}" popd diff --git a/docker-compose.yml b/docker-compose.yml index 95d68eb3d3..12b7498797 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -225,9 +225,9 @@ services: ports: - "9200:9200" - # https://www.elastic.co/guide/en/elasticsearch/reference/8.14/docker.html + # https://www.elastic.co/guide/en/elasticsearch/reference/8.15/docker.html elasticsearch: - image: docker.elastic.co/elasticsearch/elasticsearch:8.14.3 + image: docker.elastic.co/elasticsearch/elasticsearch:8.15.2 mem_limit: 1g command: - bin/elasticsearch diff --git a/requirements.in b/requirements.in index b6fb758a7e..eec1acbb5f 100644 --- a/requirements.in +++ b/requirements.in @@ -64,8 +64,10 @@ django-waffle==2.3.0 # NOTE(willkg): We stick with LTS releases and the next one is 5.2 (2025). django==4.2.16 -# NOTE(willkg): Need to keep elasticsearch and elasticsearch-dsl at these versions -# because they work with the cluster we're using +# NOTE(relud): Need to keep elasticsearch and elasticsearch-dsl at versions compatible with the +# cluster we're using +elasticsearch==8.15.1 +elasticsearch-dsl==8.15.3 # via docker/set_up_legacy_es_requirements.sh and legacy-es-requirements.txt # NOTE(relud): these dependencies are installed separately via legacy-es-requirements.txt and # docker/set_up_legacy_es_requirements.sh, relocated, and patched to reference the new location (aka diff --git a/requirements.txt b/requirements.txt index db55cd5158..6907cb3426 100644 --- a/requirements.txt +++ b/requirements.txt @@ -46,6 +46,7 @@ certifi==2024.7.4 \ --hash=sha256:5a1e7645bc0ec61a09e26c36f6106dd4cf40c6db3a1fb6352b0244e7fb057c7b \ --hash=sha256:c198e21b1289c2ab85ee4e67bb4b4ef3ead0892059901a8d5b622f24a1101e90 # via + # elastic-transport # requests # sentry-sdk cffi==1.16.0 \ @@ -304,6 +305,20 @@ docutils==0.20.1 \ # via # sphinx # sphinx-rtd-theme +elastic-transport==8.15.0 \ + --hash=sha256:85d62558f9baafb0868c801233a59b235e61d7b4804c28c2fadaa866b6766233 \ + --hash=sha256:d7080d1dada2b4eee69e7574f9c17a76b42f2895eff428e562f94b0360e158c0 + # via elasticsearch +elasticsearch==8.15.1 \ + --hash=sha256:02a0476e98768a30d7926335fc0d305c04fdb928eea1354c6e6040d8c2814569 \ + --hash=sha256:40c0d312f8adf8bdc81795bc16a0b546ddf544cb1f90e829a244e4780c4dbfd8 + # via + # -r requirements.in + # elasticsearch-dsl +elasticsearch-dsl==8.15.3 \ + --hash=sha256:4005fa1abef5c0adc5266896c3177d0e98ff5ac49603c36dda9058d616f79257 \ + --hash=sha256:9541a669e5ea996a447289ba3aefb3540dd9983ca0670c0f90ad4a711753422d + # via -r requirements.in enforce-typing==1.0.0.post1 \ --hash=sha256:90347a61d08e7f7578d9714b4f0fd8abd9b6bc48c8ac8d46d7f290d413afabb7 \ --hash=sha256:d3184dfdbfd7f9520c884986561751a6106c57cdd65d730470645d2d40c47e18 @@ -1018,6 +1033,7 @@ python-dateutil==2.9.0.post0 \ --hash=sha256:a8b2bc7bffae282281c8140a97d3aa9c14da0b136dfe83f850eea9a5f7470427 # via # -r requirements.in + # elasticsearch-dsl # freezegun python-decouple==3.8 \ --hash=sha256:ba6e2657d4f376ecc46f77a3a615e058d93ba5e465c01bbe57289bfb7cce680f \ @@ -1307,12 +1323,14 @@ typing-extensions==4.9.0 \ --hash=sha256:af72aea155e91adfc61c3ae9e0e342dbc0cba726d6cba4b6c72c1f34e47291cd # via # dj-database-url + # elasticsearch-dsl # opentelemetry-sdk urllib3==1.26.20 \ --hash=sha256:0ed14ccfbf1c30a9072c7ca157e4319b70d65f623e91e7b32fadb2853431016e \ --hash=sha256:40c2dc0c681e47eb8f90e7e27bf6ff7df2e677421fd46756da1161c39ca70d32 # via # -r requirements.in + # elastic-transport # requests # sentry-sdk urlwait==1.0 \ diff --git a/socorro/external/es/connection_context.py b/socorro/external/es/connection_context.py index aed29789dd..962fef8848 100644 --- a/socorro/external/es/connection_context.py +++ b/socorro/external/es/connection_context.py @@ -2,9 +2,9 @@ # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at https://mozilla.org/MPL/2.0/. -import contextlib +from contextlib import contextmanager -import elasticsearch_1_9_0 as elasticsearch +from elasticsearch import Elasticsearch, RequestError class ConnectionContext: @@ -37,10 +37,9 @@ def connection(self, name=None, timeout=None): if timeout is None: timeout = self.timeout - return elasticsearch.Elasticsearch( + return Elasticsearch( hosts=self.url, - timeout=timeout, - connection_class=elasticsearch.connection.RequestsHttpConnection, + request_timeout=timeout, verify_certs=True, ) @@ -51,9 +50,9 @@ def indices_client(self, name=None): http://elasticsearch-py.readthedocs.org/en/latest/api.html#indices """ - return elasticsearch.client.IndicesClient(self.connection()) + return self.connection().indices - @contextlib.contextmanager + @contextmanager def __call__(self, name=None, timeout=None): conn = self.connection(name, timeout) yield conn @@ -72,10 +71,9 @@ def create_index(self, index_name, index_settings): client.create(index=index_name, body=index_settings) return True - except elasticsearch.exceptions.RequestError as exc: + except RequestError as exc: # If this index already exists, swallow the error. - # NOTE! This is NOT how the error looks like in ES 2.x - if "IndexAlreadyExistsException" not in str(exc): + if "resource_already_exists_exception" not in str(exc): raise return False @@ -85,23 +83,20 @@ def get_indices(self): :returns: list of str """ - indices_client = self.indices_client() - status = indices_client.status() - indices = status["indices"].keys() - return indices + return self.indices_client().get_alias().keys() def delete_index(self, index_name): """Delete an index.""" - self.indices_client().delete(index_name) + self.indices_client().delete(index=index_name) - def get_mapping(self, index_name, doc_type): - """Return the mapping for the specified index and doc_type.""" + def get_mapping(self, index_name): + """Return the mapping for the specified index.""" resp = self.indices_client().get_mapping(index=index_name) - return resp[index_name]["mappings"][doc_type]["properties"] + return resp[index_name]["mappings"]["properties"] def refresh(self, index_name=None): self.indices_client().refresh(index=index_name or "_all") def health_check(self): with self() as conn: - conn.cluster.health(wait_for_status="yellow", request_timeout=5) + conn.options(request_timeout=5).cluster.health(wait_for_status="yellow") diff --git a/socorro/external/es/crashstorage.py b/socorro/external/es/crashstorage.py index 66695e8d03..eadebc786c 100644 --- a/socorro/external/es/crashstorage.py +++ b/socorro/external/es/crashstorage.py @@ -8,14 +8,16 @@ import re import time -import elasticsearch_1_9_0 as elasticsearch -from elasticsearch_1_9_0.exceptions import NotFoundError -from elasticsearch_dsl_0_0_11 import Search +import elasticsearch +from elasticsearch.exceptions import NotFoundError +from elasticsearch_dsl import Search import glom import markus from socorro.external.crashstorage_base import CrashStorageBase from socorro.external.es.connection_context import ConnectionContext +from socorro.external.es.query import Query +from socorro.external.es.supersearch import SuperSearch from socorro.external.es.super_search_fields import ( build_mapping, FIELDS, @@ -28,6 +30,12 @@ from socorro.lib.libdatetime import JsonDTEncoder, string_to_datetime, utc_now +# Map of processed crash schema permissions to webapp permissions +PROCESSED_CRASH_TO_WEBAPP_PERMISSIONS = { + "public": "", + "protected": "crashstats.view_pii", +} + # Additional custom analyzers for crash report data ES_CUSTOM_ANALYZERS = { "analyzer": {"semicolon_keywords": {"type": "pattern", "pattern": ";"}} @@ -236,12 +244,11 @@ def build_document(src, crash_document, fields, all_keys): # Fix values so they index correctly storage_type = field.get("type", field["storage_mapping"].get("type")) - if storage_type == "string": - analyzer = field.get("analyzer", field["storage_mapping"].get("analyzer")) - if analyzer == "keyword": - value = fix_keyword(value, max_size=MAX_KEYWORD_FIELD_VALUE_SIZE) - else: - value = fix_string(value, max_size=MAX_STRING_FIELD_VALUE_SIZE) + if storage_type == "keyword": + value = fix_keyword(value, max_size=MAX_KEYWORD_FIELD_VALUE_SIZE) + + elif storage_type == "text": + value = fix_string(value, max_size=MAX_STRING_FIELD_VALUE_SIZE) elif storage_type == "integer": value = fix_integer(value) @@ -258,14 +265,46 @@ def build_document(src, crash_document, fields, all_keys): if value is None: continue + elif storage_type == "boolean": + value = fix_boolean(value) + if value is None: + continue + for dest_key in get_destination_keys(field): if dest_key in all_keys: glom.assign(crash_document, dest_key, value, missing=dict) +def convert_permissions(fields): + """Converts processed crash schema / super search permissions to webapp permissions + + :arg fields: super search fields structure to convert permissions of + + :returns: fields with permissions converted + + """ + + def _convert(permissions): + if not permissions: + return permissions + + new_permissions = [ + PROCESSED_CRASH_TO_WEBAPP_PERMISSIONS[perm] for perm in permissions + ] + return [perm for perm in new_permissions if perm] + + for val in fields.values(): + val["permissions_needed"] = _convert(val["permissions_needed"]) + + return fields + + class ESCrashStorage(CrashStorageBase): """Indexes documents based on the processed crash to Elasticsearch.""" + # For use in webapp + SUPERSEARCH_FIELDS = convert_permissions(FIELDS) + # These regex will catch field names from Elasticsearch exceptions. They # have been tested with Elasticsearch 1.4. field_name_string_error_re = re.compile(r"field=\"([\w\-.]+)\"") @@ -278,7 +317,6 @@ def __init__( index="socorro%Y%W", index_regex=r"^socorro[0-9]{6}$", retention_policy=26, - doctype="crash_reports", metrics_prefix="processor.es", timeout=30, shards_per_index=10, @@ -297,7 +335,6 @@ def __init__( self.index = index self.index_regex = index_regex self.retention_policy = retention_policy - self.doctype = doctype self.shards_per_index = shards_per_index # Cached answers for things that don't change @@ -309,6 +346,24 @@ def __init__( def build_client(cls, url, timeout): return ConnectionContext(url=url, timeout=timeout) + def build_query(self): + """Return new instance of Query.""" + return Query(crashstorage=self) + + def build_supersearch(self): + """Return new instance of SuperSearch.""" + return SuperSearch(crashstorage=self) + + def build_search(self, **kwargs): + """Return new instance of elasticsearch_dsl's Search.""" + with self.client() as conn: + return Search(using=conn, doc_type=self.get_doctype(), **kwargs) + + @staticmethod + def get_source_key(field): + """Return source key for the field.""" + return get_source_key(field) + def get_index_template(self): """Return template for index names.""" return self.index @@ -327,10 +382,6 @@ def get_index_for_date(self, date): template = self.get_index_template() return date.strftime(template) - def get_doctype(self): - """Return doctype.""" - return self.doctype - def get_retention_policy(self): """Return retention policy in weeks.""" return self.retention_policy @@ -348,28 +399,24 @@ def get_socorro_index_settings(self, mappings): "mappings": mappings, } - def get_mapping(self, index_name, es_doctype, reraise=False): - """Retrieves the mapping for a given index and doctype + def get_mapping(self, index_name, reraise=False): + """Retrieves the mapping for a given index NOTE(willkg): Mappings are cached on the ESCrashStorage instance. If you change the indices (like in tests), you should get a new ESCrashStorage instance. :arg str index_name: the index to retrieve the mapping for - :arg str es_doctype: the doctype to retrieve the mapping for :arg bool reraise: True if you want this to reraise a NotFoundError; False otherwise :returns: mapping as a dict or None """ - cache_key = f"{index_name}::{es_doctype}" - mapping = self._mapping_cache.get(cache_key) + mapping = self._mapping_cache.get(index_name) if mapping is None: try: - mapping = self.client.get_mapping( - index_name=index_name, doc_type=es_doctype - ) - self._mapping_cache[cache_key] = mapping + mapping = self.client.get_mapping(index_name=index_name) + self._mapping_cache[index_name] = mapping except elasticsearch.exceptions.NotFoundError: if reraise: raise @@ -379,13 +426,13 @@ def create_index(self, index_name, mappings=None): """Create an index that will receive crash reports. :arg index_name: the name of the index to create - :arg mappings: dict of doctype->ES mapping + :arg mappings: ES mapping :returns: True if the index was created, False if it already existed """ if mappings is None: - mappings = build_mapping(doctype=self.get_doctype()) + mappings = build_mapping() index_settings = self.get_socorro_index_settings(mappings) @@ -451,31 +498,29 @@ def get_keys_for_indexable_fields(self): return keys - def get_keys_for_mapping(self, index_name, es_doctype): + def get_keys_for_mapping(self, index_name): """Get the keys in "namespace.key" format for a given mapping NOTE(willkg): Results are cached on this ESCrashStorage instance. :arg str index_name: the name of the index - :arg str es_doctype: the doctype for the index :returns: set of "namespace.key" fields :raise elasticsearch.exceptions.NotFoundError: if the index doesn't exist """ - cache_key = f"{index_name}::{es_doctype}" - keys = self._keys_for_mapping_cache.get(cache_key) + keys = self._keys_for_mapping_cache.get(index_name) if keys is None: - mapping = self.get_mapping(index_name, es_doctype, reraise=True) + mapping = self.get_mapping(index_name, reraise=True) keys = parse_mapping(mapping, None) - self._keys_for_mapping_cache[cache_key] = keys + self._keys_for_mapping_cache[index_name] = keys return keys - def get_keys(self, index_name, es_doctype): + def get_keys(self, index_name): supersearch_fields_keys = self.get_keys_for_indexable_fields() try: - mapping_keys = self.get_keys_for_mapping(index_name, es_doctype) + mapping_keys = self.get_keys_for_mapping(index_name) except NotFoundError: mapping_keys = None all_valid_keys = supersearch_fields_keys @@ -494,8 +539,7 @@ def save_processed_crash(self, raw_crash, processed_crash): index_name = self.get_index_for_date( string_to_datetime(processed_crash["date_processed"]) ) - es_doctype = self.get_doctype() - all_valid_keys = self.get_keys(index_name, es_doctype) + all_valid_keys = self.get_keys(index_name) src = {"processed_crash": copy.deepcopy(processed_crash)} @@ -510,7 +554,6 @@ def save_processed_crash(self, raw_crash, processed_crash): self._submit_crash_to_elasticsearch( crash_id=crash_id, - es_doctype=es_doctype, index_name=index_name, crash_document=crash_document, ) @@ -530,12 +573,10 @@ def _capture(key, data): _capture("crash_document_size", crash_document) - def _index_crash(self, connection, es_index, es_doctype, crash_document, crash_id): + def _index_crash(self, connection, es_index, crash_document, crash_id): try: start_time = time.time() - connection.index( - index=es_index, doc_type=es_doctype, body=crash_document, id=crash_id - ) + connection.index(index=es_index, body=crash_document, id=crash_id) index_outcome = "successful" except Exception: index_outcome = "failed" @@ -546,9 +587,7 @@ def _index_crash(self, connection, es_index, es_doctype, crash_document, crash_i "index", value=elapsed_time * 1000.0, tags=["outcome:" + index_outcome] ) - def _submit_crash_to_elasticsearch( - self, crash_id, es_doctype, index_name, crash_document - ): + def _submit_crash_to_elasticsearch(self, crash_id, index_name, crash_document): """Submit a crash report to elasticsearch""" # Attempt to create the index; it's OK if it already exists. @@ -562,9 +601,7 @@ def _submit_crash_to_elasticsearch( for _ in range(5): try: with self.client() as conn: - return self._index_crash( - conn, index_name, es_doctype, crash_document, crash_id - ) + return self._index_crash(conn, index_name, crash_document, crash_id) except elasticsearch.exceptions.ConnectionError: # If this is a connection error, sleep a second and then try again @@ -575,6 +612,8 @@ def _submit_crash_to_elasticsearch( # is and fix the document and try again field_name = None + # TODO(relud) clean this up + if "MaxBytesLengthExceededException" in e.error: # This is caused by a string that is way too long for # Elasticsearch. @@ -641,7 +680,7 @@ def _submit_crash_to_elasticsearch( else: crash_document["removed_fields"] = field_name - except elasticsearch.exceptions.ElasticsearchException as exc: + except elasticsearch.ApiError as exc: self.logger.critical( "Submission to Elasticsearch failed for %s (%s)", crash_id, @@ -655,7 +694,7 @@ def catalog_crash(self, crash_id): contents = [] with self.client() as conn: try: - search = Search(using=conn, doc_type=self.get_doctype()) + search = Search(using=conn) search = search.filter("term", **{"processed_crash.uuid": crash_id}) results = search.execute().to_dict() hits = results["hits"]["hits"] @@ -668,15 +707,13 @@ def catalog_crash(self, crash_id): def delete_crash(self, crash_id): with self.client() as conn: try: - search = Search(using=conn, doc_type=self.get_doctype()) + search = Search(using=conn) search = search.filter("term", **{"processed_crash.uuid": crash_id}) results = search.execute().to_dict() hits = results["hits"]["hits"] if hits: hit = hits[0] - conn.delete( - index=hit["_index"], doc_type=hit["_type"], id=hit["_id"] - ) + conn.delete(index=hit["_index"], id=hit["_id"]) self.client.refresh() except Exception: self.logger.exception(f"ERROR: es: when deleting {crash_id}") diff --git a/socorro/external/es/query.py b/socorro/external/es/query.py index c01eeb0cb8..7acbaab8e5 100644 --- a/socorro/external/es/query.py +++ b/socorro/external/es/query.py @@ -3,13 +3,12 @@ # file, You can obtain one at https://mozilla.org/MPL/2.0/. import datetime -import elasticsearch_1_9_0 as elasticsearch import json -import re + +from elasticsearch import NotFoundError, TransportError, BadRequestError from socorro.lib import DatabaseError, MissingArgumentError, ResourceNotFound from socorro.external.es.base import generate_list_of_indexes -from socorro.external.es.supersearch import BAD_INDEX_REGEX from socorro.lib import libdatetime, external_common @@ -54,18 +53,22 @@ def get(self, **kwargs): search_args = {} if indices: search_args["index"] = indices - search_args["doc_type"] = self.crashstorage.get_doctype() connection = self.get_connection() try: results = connection.search(body=json.dumps(params["query"]), **search_args) - except elasticsearch.exceptions.NotFoundError as exc: - missing_index = re.findall(BAD_INDEX_REGEX, exc.error)[0] + results_body = results.body + except NotFoundError as exc: + missing_index = exc.body["error"]["resource.id"] raise ResourceNotFound( f"elasticsearch index {missing_index!r} does not exist" ) from exc - except elasticsearch.exceptions.TransportError as exc: + except (TransportError, BadRequestError) as exc: raise DatabaseError(exc) from exc - return results + # unnest results.hits.total.value to results.hits.total to match legacy behavior + if "hits" in results_body: + results_body["hits"]["total"] = results_body["hits"]["total"]["value"] + + return results_body diff --git a/socorro/external/es/super_search_fields.py b/socorro/external/es/super_search_fields.py index df44a25767..6690a99cc7 100644 --- a/socorro/external/es/super_search_fields.py +++ b/socorro/external/es/super_search_fields.py @@ -153,13 +153,12 @@ def get_destination_keys(field): return None -def build_mapping(doctype, fields=None): +def build_mapping(fields=None): """Generates Elasticsearch mapping from the super search fields schema - :arg str doctype: the doctype to use :arg any fields: map of field name -> field value; defaults to FIELDS - :returns: dict of doctype -> Elasticsearch mapping + :returns: Elasticsearch mapping """ fields = fields or FIELDS @@ -169,81 +168,17 @@ def build_mapping(doctype, fields=None): if not field.get("storage_mapping"): continue - add_doc_values(field["storage_mapping"]) - destination_keys = get_destination_keys(field) for destination_key in destination_keys: key_parts = destination_key.split(".") add_field_to_properties(properties, key_parts, field) mapping = { - doctype: { - "_all": {"enabled": False}, - "_source": {"compress": True}, - "properties": properties, - } + "properties": properties, } return mapping -def is_doc_values_friendly(storage_value): - """Predicate denoting whether this storage should have doc_values added - - ``doc_values=True`` is a thing we can add to certain storages to reduce the - memory they use in Elasticsearch. - - This predicate determines whether we should add it or not for a given - storage. - - :arg storage_value: a storage value from super search storages - - :returns: True if ``doc_values=True` should be added; False otherwise - - """ - storage_type = storage_value.get("type") - - # No clue what type this is--probably false - if not storage_type: - return False - - # object storages don't work with doc_values=True - if storage_type == "object": - return False - - # analyzed string storages don't work with doc_values=True - if storage_type == "string" and storage_value.get("index") != "not_analyzed": - return False - - # Everything is fine! Yay! - return True - - -def add_doc_values(value): - """Add "doc_values": True to storage mapping of field value - - NOTE(willkg): Elasticsearch 2.0+ does this automatically, so we - can nix this when we upgrade. - - Note: This makes changes in-place and recurses on the structure - of value. - - :arg value: the storage mapping of a field value - - """ - if is_doc_values_friendly(value): - value["doc_values"] = True - - # Handle subfields - if value.get("fields"): - for field in value.get("fields", {}).values(): - add_doc_values(field) - - # Handle objects with nested properties - if value.get("properties"): - for field in value["properties"].values(): - add_doc_values(field) - - # Cache of hashed_args -> list of fields values _FIELDS_CACHE = {} @@ -354,7 +289,7 @@ def keyword_field( "is_exposed": True, "is_returned": True, "query_type": "string", - "storage_mapping": {"analyzer": "keyword", "type": "string"}, + "storage_mapping": {"type": "keyword", "ignore_above": 10_000}, } @@ -490,7 +425,7 @@ def apply_schema_properties(fields, schema): "name": "phc_kind", "namespace": "processed_crash", "query_type": "string", - "storage_mapping": {"analyzer": "keyword", "type": "string"}, + "storage_mapping": {"type": "keyword"}, }, "phc_base_address": { "data_validation_type": "str", @@ -502,7 +437,7 @@ def apply_schema_properties(fields, schema): "name": "phc_base_address", "namespace": "processed_crash", "query_type": "string", - "storage_mapping": {"type": "string"}, + "storage_mapping": {"type": "text"}, }, "phc_usable_size": { "data_validation_type": "int", @@ -526,7 +461,7 @@ def apply_schema_properties(fields, schema): "name": "phc_alloc_stack", "namespace": "processed_crash", "query_type": "string", - "storage_mapping": {"type": "string"}, + "storage_mapping": {"type": "text"}, }, "phc_free_stack": { "data_validation_type": "str", @@ -538,7 +473,7 @@ def apply_schema_properties(fields, schema): "name": "phc_free_stack", "namespace": "processed_crash", "query_type": "string", - "storage_mapping": {"type": "string"}, + "storage_mapping": {"type": "text"}, }, "abort_message": { "data_validation_type": "str", @@ -551,9 +486,8 @@ def apply_schema_properties(fields, schema): "namespace": "processed_crash", "query_type": "string", "storage_mapping": { - "fields": {"full": {"index": "not_analyzed", "type": "string"}}, - "index": "analyzed", - "type": "string", + "fields": {"full": {"type": "keyword"}}, + "type": "text", }, }, "accessibility": { @@ -578,7 +512,7 @@ def apply_schema_properties(fields, schema): "name": "accessibility_client", "namespace": "processed_crash", "query_type": "string", - "storage_mapping": {"analyzer": "keyword", "type": "string"}, + "storage_mapping": {"type": "keyword"}, }, "accessibility_in_proc_client": { "data_validation_type": "str", @@ -590,7 +524,7 @@ def apply_schema_properties(fields, schema): "name": "accessibility_in_proc_client", "namespace": "processed_crash", "query_type": "string", - "storage_mapping": {"analyzer": "keyword", "type": "string"}, + "storage_mapping": {"type": "keyword"}, }, "adapter_device_id": { "data_validation_type": "str", @@ -602,7 +536,7 @@ def apply_schema_properties(fields, schema): "name": "adapter_device_id", "namespace": "processed_crash", "query_type": "string", - "storage_mapping": {"analyzer": "keyword", "type": "string"}, + "storage_mapping": {"type": "keyword"}, }, "adapter_driver_version": { "data_validation_type": "str", @@ -614,7 +548,7 @@ def apply_schema_properties(fields, schema): "name": "adapter_driver_version", "namespace": "processed_crash", "query_type": "string", - "storage_mapping": {"type": "string"}, + "storage_mapping": {"type": "text"}, }, "adapter_subsys_id": { "data_validation_type": "str", @@ -626,7 +560,7 @@ def apply_schema_properties(fields, schema): "name": "adapter_subsys_id", "namespace": "processed_crash", "query_type": "string", - "storage_mapping": {"type": "string"}, + "storage_mapping": {"type": "text"}, }, "adapter_vendor_id": { "data_validation_type": "str", @@ -638,7 +572,7 @@ def apply_schema_properties(fields, schema): "name": "adapter_vendor_id", "namespace": "processed_crash", "query_type": "string", - "storage_mapping": {"analyzer": "keyword", "type": "string"}, + "storage_mapping": {"type": "keyword"}, }, "addons": { "data_validation_type": "str", @@ -650,7 +584,7 @@ def apply_schema_properties(fields, schema): "name": "addons", "namespace": "processed_crash", "query_type": "string", - "storage_mapping": {"analyzer": "keyword", "type": "string"}, + "storage_mapping": {"type": "keyword"}, }, "addons_checked": { "data_validation_type": "bool", @@ -674,7 +608,7 @@ def apply_schema_properties(fields, schema): "name": "address", "namespace": "processed_crash", "query_type": "string", - "storage_mapping": {"analyzer": "keyword", "type": "string"}, + "storage_mapping": {"type": "keyword"}, }, "android_board": { "data_validation_type": "enum", @@ -686,7 +620,7 @@ def apply_schema_properties(fields, schema): "name": "android_board", "namespace": "processed_crash", "query_type": "enum", - "storage_mapping": {"type": "string"}, + "storage_mapping": {"type": "text"}, }, "android_brand": { "data_validation_type": "enum", @@ -698,7 +632,7 @@ def apply_schema_properties(fields, schema): "name": "android_brand", "namespace": "processed_crash", "query_type": "enum", - "storage_mapping": {"type": "string"}, + "storage_mapping": {"type": "text"}, }, "android_cpu_abi": { "data_validation_type": "enum", @@ -710,7 +644,7 @@ def apply_schema_properties(fields, schema): "name": "android_cpu_abi", "namespace": "processed_crash", "query_type": "enum", - "storage_mapping": {"analyzer": "keyword", "type": "string"}, + "storage_mapping": {"type": "keyword"}, }, "android_cpu_abi2": { "data_validation_type": "enum", @@ -722,7 +656,7 @@ def apply_schema_properties(fields, schema): "name": "android_cpu_abi2", "namespace": "processed_crash", "query_type": "enum", - "storage_mapping": {"analyzer": "keyword", "type": "string"}, + "storage_mapping": {"type": "keyword"}, }, "android_device": { "data_validation_type": "enum", @@ -734,7 +668,7 @@ def apply_schema_properties(fields, schema): "name": "android_device", "namespace": "processed_crash", "query_type": "enum", - "storage_mapping": {"analyzer": "keyword", "type": "string"}, + "storage_mapping": {"type": "keyword"}, }, "android_display": { "data_validation_type": "enum", @@ -746,7 +680,7 @@ def apply_schema_properties(fields, schema): "name": "android_display", "namespace": "processed_crash", "query_type": "enum", - "storage_mapping": {"type": "string"}, + "storage_mapping": {"type": "text"}, }, "android_fingerprint": { "data_validation_type": "enum", @@ -758,7 +692,7 @@ def apply_schema_properties(fields, schema): "name": "android_fingerprint", "namespace": "processed_crash", "query_type": "enum", - "storage_mapping": {"type": "string"}, + "storage_mapping": {"type": "text"}, }, "android_hardware": { "data_validation_type": "enum", @@ -770,7 +704,7 @@ def apply_schema_properties(fields, schema): "name": "android_hardware", "namespace": "processed_crash", "query_type": "enum", - "storage_mapping": {"type": "string"}, + "storage_mapping": {"type": "text"}, }, "android_manufacturer": { "data_validation_type": "enum", @@ -782,7 +716,7 @@ def apply_schema_properties(fields, schema): "name": "android_manufacturer", "namespace": "processed_crash", "query_type": "enum", - "storage_mapping": {"analyzer": "keyword", "type": "string"}, + "storage_mapping": {"type": "keyword"}, }, "android_model": { "data_validation_type": "str", @@ -796,9 +730,9 @@ def apply_schema_properties(fields, schema): "query_type": "string", "storage_mapping": { "fields": { - "full": {"index": "not_analyzed", "type": "string"}, + "full": {"type": "keyword"}, }, - "type": "string", + "type": "text", }, }, "android_packagename": keyword_field(name="android_packagename"), @@ -812,7 +746,7 @@ def apply_schema_properties(fields, schema): "name": "android_version", "namespace": "processed_crash", "query_type": "string", - "storage_mapping": {"analyzer": "keyword", "type": "string"}, + "storage_mapping": {"type": "keyword"}, }, "app_init_dlls": { "data_validation_type": "str", @@ -824,7 +758,11 @@ def apply_schema_properties(fields, schema): "name": "app_init_dlls", "namespace": "processed_crash", "query_type": "string", - "storage_mapping": {"analyzer": "semicolon_keywords", "type": "string"}, + "storage_mapping": { + "analyzer": "semicolon_keywords", + "type": "text", + "fielddata": True, + }, }, "app_notes": { "data_validation_type": "str", @@ -837,9 +775,8 @@ def apply_schema_properties(fields, schema): "namespace": "processed_crash", "query_type": "string", "storage_mapping": { - "fields": {"full": {"index": "not_analyzed", "type": "string"}}, - "index": "analyzed", - "type": "string", + "fields": {"full": {"type": "keyword"}}, + "type": "text", }, }, "async_shutdown_timeout": { @@ -854,11 +791,9 @@ def apply_schema_properties(fields, schema): "query_type": "string", "storage_mapping": { "fields": { - "full": {"index": "not_analyzed", "type": "string"}, + "full": {"type": "keyword"}, }, - "analyzer": "standard", - "index": "analyzed", - "type": "string", + "type": "text", }, }, "available_page_file": { @@ -920,7 +855,7 @@ def apply_schema_properties(fields, schema): "name": "co_marshal_interface_failure", "namespace": "processed_crash", "query_type": "enum", - "storage_mapping": {"type": "string"}, + "storage_mapping": {"type": "text"}, }, "collector_notes": { "data_validation_type": "str", @@ -934,9 +869,8 @@ def apply_schema_properties(fields, schema): "query_type": "string", "source_key": "processed_crash.collector_metadata.collector_notes", "storage_mapping": { - "fields": {"full": {"index": "not_analyzed", "type": "string"}}, - "index": "analyzed", - "type": "string", + "fields": {"full": {"type": "keyword"}}, + "type": "text", }, }, # FIXME(willkg): We have this indexed as an integer, but the annotation is listed as @@ -1002,7 +936,7 @@ def apply_schema_properties(fields, schema): "name": "cpu_arch", "namespace": "processed_crash", "query_type": "enum", - "storage_mapping": {"analyzer": "keyword", "type": "string"}, + "storage_mapping": {"type": "keyword"}, }, "cpu_count": { "data_validation_type": "int", @@ -1034,11 +968,9 @@ def apply_schema_properties(fields, schema): "query_type": "string", "storage_mapping": { "fields": { - "full": {"index": "not_analyzed", "type": "string"}, + "full": {"type": "keyword"}, }, - "analyzer": "standard", - "index": "analyzed", - "type": "string", + "type": "text", }, }, "cpu_microcode_version": { @@ -1051,7 +983,7 @@ def apply_schema_properties(fields, schema): "name": "cpu_microcode_version", "namespace": "processed_crash", "query_type": "enum", - "storage_mapping": {"type": "string"}, + "storage_mapping": {"type": "text"}, }, "crashing_thread": integer_field(name="crashing_thread"), "crashing_thread_name": keyword_field( @@ -1069,7 +1001,6 @@ def apply_schema_properties(fields, schema): "namespace": "processed_crash", "query_type": "date", "storage_mapping": { - "format": "yyyy-MM-dd'T'HH:mm:ssZZ||yyyy-MM-dd'T'HH:mm:ss.SSSSSSZZ", "type": "date", }, }, @@ -1112,11 +1043,9 @@ def apply_schema_properties(fields, schema): "query_type": "string", "storage_mapping": { "fields": { - "full": {"index": "not_analyzed", "type": "string"}, + "full": {"type": "keyword"}, }, - "type": "string", - "index": "analyzed", - "analyzer": "standard", + "type": "text", }, }, "em_check_compatibility": { @@ -1141,7 +1070,7 @@ def apply_schema_properties(fields, schema): "name": "gmp_library_path", "namespace": "processed_crash", "query_type": "string", - "storage_mapping": {"analyzer": "keyword", "type": "string"}, + "storage_mapping": {"type": "keyword"}, }, "gmp_plugin": { "data_validation_type": "bool", @@ -1166,9 +1095,8 @@ def apply_schema_properties(fields, schema): "namespace": "processed_crash", "query_type": "string", "storage_mapping": { - "fields": {"full": {"index": "not_analyzed", "type": "string"}}, - "index": "analyzed", - "type": "string", + "fields": {"full": {"type": "keyword"}}, + "type": "text", }, }, "graphics_startup_test": { @@ -1232,7 +1160,7 @@ def apply_schema_properties(fields, schema): "name": "ipc_channel_error", "namespace": "processed_crash", "query_type": "string", - "storage_mapping": {"type": "string"}, + "storage_mapping": {"type": "text"}, }, "ipc_fatal_error_msg": { "data_validation_type": "str", @@ -1245,9 +1173,8 @@ def apply_schema_properties(fields, schema): "namespace": "processed_crash", "query_type": "string", "storage_mapping": { - "fields": {"full": {"index": "not_analyzed", "type": "string"}}, - "index": "analyzed", - "type": "string", + "fields": {"full": {"type": "keyword"}}, + "type": "text", }, }, "ipc_fatal_error_protocol": { @@ -1260,7 +1187,7 @@ def apply_schema_properties(fields, schema): "name": "ipc_fatal_error_protocol", "namespace": "processed_crash", "query_type": "string", - "storage_mapping": {"type": "string"}, + "storage_mapping": {"type": "text"}, }, "ipc_message_name": { "data_validation_type": "str", @@ -1273,9 +1200,8 @@ def apply_schema_properties(fields, schema): "namespace": "processed_crash", "query_type": "string", "storage_mapping": { - "fields": {"full": {"index": "not_analyzed", "type": "string"}}, - "index": "analyzed", - "type": "string", + "fields": {"full": {"type": "keyword"}}, + "type": "text", }, }, "ipc_message_size": { @@ -1300,7 +1226,7 @@ def apply_schema_properties(fields, schema): "name": "ipc_shutdown_state", "namespace": "processed_crash", "query_type": "enum", - "storage_mapping": {"analyzer": "keyword", "type": "string"}, + "storage_mapping": {"type": "keyword"}, }, "ipc_system_error": { "data_validation_type": "int", @@ -1337,9 +1263,8 @@ def apply_schema_properties(fields, schema): "namespace": "processed_crash", "query_type": "string", "storage_mapping": { - "fields": {"full": {"index": "not_analyzed", "type": "string"}}, - "index": "analyzed", - "type": "string", + "fields": {"full": {"type": "keyword"}}, + "type": "text", }, }, "java_stack_trace_raw": { @@ -1353,9 +1278,8 @@ def apply_schema_properties(fields, schema): "namespace": "processed_crash", "query_type": "string", "storage_mapping": { - "fields": {"full": {"index": "not_analyzed", "type": "string"}}, - "index": "analyzed", - "type": "string", + "fields": {"full": {"type": "keyword"}}, + "type": "text", }, }, "js_large_allocation_failure": keyword_field(name="js_large_allocation_failure"), @@ -1382,9 +1306,8 @@ def apply_schema_properties(fields, schema): "namespace": "processed_crash", "query_type": "string", "storage_mapping": { - "fields": {"full": {"index": "not_analyzed", "type": "string"}}, - "index": "analyzed", - "type": "string", + "fields": {"full": {"type": "keyword"}}, + "type": "text", }, }, "mac_crash_info": { @@ -1398,9 +1321,8 @@ def apply_schema_properties(fields, schema): "namespace": "processed_crash", "query_type": "string", "storage_mapping": { - "fields": {"full": {"index": "not_analyzed", "type": "string"}}, - "index": "analyzed", - "type": "string", + "fields": {"full": {"type": "keyword"}}, + "type": "text", }, }, "mac_available_memory_sysctl": integer_field(name="mac_available_memory_sysctl"), @@ -1623,7 +1545,7 @@ def apply_schema_properties(fields, schema): "name": "minidump_sha256_hash", "namespace": "processed_crash", "query_type": "string", - "storage_mapping": {"type": "string"}, + "storage_mapping": {"type": "text"}, }, "missing_symbols": { "data_validation_type": "str", @@ -1635,7 +1557,11 @@ def apply_schema_properties(fields, schema): "name": "missing_symbols", "namespace": "processed_crash", "query_type": "string", - "storage_mapping": {"analyzer": "semicolon_keywords", "type": "string"}, + "storage_mapping": { + "analyzer": "semicolon_keywords", + "type": "text", + "fielddata": True, + }, }, "modules_in_stack": { "data_validation_type": "str", @@ -1647,7 +1573,11 @@ def apply_schema_properties(fields, schema): "name": "modules_in_stack", "namespace": "processed_crash", "query_type": "string", - "storage_mapping": {"analyzer": "semicolon_keywords", "type": "string"}, + "storage_mapping": { + "analyzer": "semicolon_keywords", + "type": "text", + "fielddata": True, + }, }, "moz_crash_reason": { "data_validation_type": "str", @@ -1660,9 +1590,8 @@ def apply_schema_properties(fields, schema): "namespace": "processed_crash", "query_type": "string", "storage_mapping": { - "fields": {"full": {"index": "not_analyzed", "type": "string"}}, - "index": "analyzed", - "type": "string", + "fields": {"full": {"type": "keyword"}}, + "type": "text", }, }, "moz_crash_reason_raw": { @@ -1676,9 +1605,8 @@ def apply_schema_properties(fields, schema): "namespace": "processed_crash", "query_type": "string", "storage_mapping": { - "fields": {"full": {"index": "not_analyzed", "type": "string"}}, - "index": "analyzed", - "type": "string", + "fields": {"full": {"type": "keyword"}}, + "type": "text", }, }, "oom_allocation_size": { @@ -1705,9 +1633,9 @@ def apply_schema_properties(fields, schema): "query_type": "enum", "storage_mapping": { "fields": { - "full": {"index": "not_analyzed", "type": "string"}, + "full": {"type": "keyword"}, }, - "type": "string", + "type": "text", }, }, "platform_pretty_version": { @@ -1721,9 +1649,8 @@ def apply_schema_properties(fields, schema): "namespace": "processed_crash", "query_type": "string", "storage_mapping": { - "fields": {"full": {"index": "not_analyzed", "type": "string"}}, - "index": "analyzed", - "type": "string", + "fields": {"full": {"type": "keyword"}}, + "type": "text", }, }, "platform_version": { @@ -1737,9 +1664,8 @@ def apply_schema_properties(fields, schema): "namespace": "processed_crash", "query_type": "string", "storage_mapping": { - "fields": {"full": {"index": "not_analyzed", "type": "string"}}, - "index": "analyzed", - "type": "string", + "fields": {"full": {"type": "keyword"}}, + "type": "text", }, }, "plugin_filename": { @@ -1754,10 +1680,9 @@ def apply_schema_properties(fields, schema): "query_type": "enum", "storage_mapping": { "fields": { - "full": {"index": "not_analyzed", "type": "string"}, + "full": {"type": "keyword"}, }, - "index": "analyzed", - "type": "string", + "type": "text", }, }, "plugin_name": { @@ -1772,10 +1697,9 @@ def apply_schema_properties(fields, schema): "query_type": "enum", "storage_mapping": { "fields": { - "full": {"index": "not_analyzed", "type": "string"}, + "full": {"type": "keyword"}, }, - "index": "analyzed", - "type": "string", + "type": "text", }, }, "plugin_version": { @@ -1790,10 +1714,9 @@ def apply_schema_properties(fields, schema): "query_type": "enum", "storage_mapping": { "fields": { - "full": {"index": "not_analyzed", "type": "string"}, + "full": {"type": "keyword"}, }, - "index": "analyzed", - "type": "string", + "type": "text", }, }, "possible_bit_flips_max_confidence": integer_field( @@ -1816,7 +1739,10 @@ def apply_schema_properties(fields, schema): "name": "process_type", "namespace": "processed_crash", "query_type": "string", - "storage_mapping": {"type": "string"}, + "storage_mapping": { + "fielddata": True, # FIXME(relud): this may be required in more fields? + "type": "text", + }, }, "processor_notes": { "data_validation_type": "str", @@ -1829,9 +1755,8 @@ def apply_schema_properties(fields, schema): "namespace": "processed_crash", "query_type": "string", "storage_mapping": { - "fields": {"full": {"index": "not_analyzed", "type": "string"}}, - "index": "analyzed", - "type": "string", + "fields": {"full": {"type": "keyword"}}, + "type": "text", }, }, "product": { @@ -1846,10 +1771,9 @@ def apply_schema_properties(fields, schema): "query_type": "enum", "storage_mapping": { "fields": { - "full": {"index": "not_analyzed", "type": "string"}, + "full": {"type": "keyword"}, }, - "index": "analyzed", - "type": "string", + "type": "text", }, }, "productid": { @@ -1862,7 +1786,7 @@ def apply_schema_properties(fields, schema): "name": "productid", "namespace": "processed_crash", "query_type": "enum", - "storage_mapping": {"analyzer": "keyword", "type": "string"}, + "storage_mapping": {"type": "keyword"}, }, "proto_signature": { "data_validation_type": "str", @@ -1875,9 +1799,8 @@ def apply_schema_properties(fields, schema): "namespace": "processed_crash", "query_type": "string", "storage_mapping": { - "fields": {"full": {"index": "not_analyzed", "type": "string"}}, - "index": "analyzed", - "type": "string", + "fields": {"full": {"type": "keyword"}}, + "type": "text", }, }, "quota_manager_shutdown_timeout": { @@ -1891,9 +1814,8 @@ def apply_schema_properties(fields, schema): "namespace": "processed_crash", "query_type": "string", "storage_mapping": { - "fields": {"full": {"index": "not_analyzed", "type": "string"}}, - "index": "analyzed", - "type": "string", + "fields": {"full": {"type": "keyword"}}, + "type": "text", }, }, "reason": { @@ -1908,10 +1830,9 @@ def apply_schema_properties(fields, schema): "query_type": "string", "storage_mapping": { "fields": { - "full": {"index": "not_analyzed", "type": "string"}, + "full": {"type": "keyword"}, }, - "analyzer": "standard", - "type": "string", + "type": "text", }, }, "release_channel": { @@ -1924,7 +1845,7 @@ def apply_schema_properties(fields, schema): "name": "release_channel", "namespace": "processed_crash", "query_type": "enum", - "storage_mapping": {"analyzer": "keyword", "type": "string"}, + "storage_mapping": {"type": "keyword"}, }, "remote_type": { "data_validation_type": "enum", @@ -1936,7 +1857,7 @@ def apply_schema_properties(fields, schema): "name": "remote_type", "namespace": "processed_crash", "query_type": "enum", - "storage_mapping": {"analyzer": "keyword", "type": "string"}, + "storage_mapping": {"type": "keyword"}, }, "report_type": keyword_field(name="report_type"), "safe_mode": { @@ -1961,7 +1882,7 @@ def apply_schema_properties(fields, schema): "name": "shutdown_progress", "namespace": "processed_crash", "query_type": "string", - "storage_mapping": {"type": "string"}, + "storage_mapping": {"type": "text"}, }, "shutdown_reason": keyword_field(name="shutdown_reason"), "signature": { @@ -1976,9 +1897,11 @@ def apply_schema_properties(fields, schema): "query_type": "string", "storage_mapping": { "fields": { - "full": {"index": "not_analyzed", "type": "string"}, + "full": { + "type": "keyword", + }, }, - "type": "string", + "type": "text", }, }, "stackwalk_version": keyword_field( @@ -1997,7 +1920,7 @@ def apply_schema_properties(fields, schema): "query_type": "bool", # NOTE(willkg): startup_crash is used in signature report in some interesting # ways so I think we need to have both T and F values in ES - "storage_mapping": {"null_value": "False", "type": "boolean"}, + "storage_mapping": {"null_value": False, "type": "boolean"}, }, "startup_time": { "data_validation_type": "int", @@ -2046,7 +1969,11 @@ def apply_schema_properties(fields, schema): "name": "topmost_filenames", "namespace": "processed_crash", "query_type": "string", - "storage_mapping": {"analyzer": "semicolon_keywords", "type": "string"}, + "storage_mapping": { + "analyzer": "semicolon_keywords", + "type": "text", + "fielddata": True, + }, }, "total_page_file": { "data_validation_type": "int", @@ -2107,7 +2034,7 @@ def apply_schema_properties(fields, schema): "name": "url", "namespace": "processed_crash", "query_type": "string", - "storage_mapping": {"analyzer": "keyword", "type": "string"}, + "storage_mapping": {"type": "keyword"}, }, "user_comments": { "data_validation_type": "str", @@ -2121,9 +2048,9 @@ def apply_schema_properties(fields, schema): "query_type": "string", "storage_mapping": { "fields": { - "full": {"index": "not_analyzed", "type": "string"}, + "full": {"type": "keyword"}, }, - "type": "string", + "type": "text", }, }, "useragent_locale": { @@ -2136,7 +2063,11 @@ def apply_schema_properties(fields, schema): "name": "useragent_locale", "namespace": "processed_crash", "query_type": "enum", - "storage_mapping": {"analyzer": "semicolon_keywords", "type": "string"}, + "storage_mapping": { + "analyzer": "semicolon_keywords", + "type": "text", + "fielddata": True, + }, }, "utility_actors_name": keyword_field(name="utility_actors_name"), "utility_process_sandboxing_kind": integer_field( @@ -2152,7 +2083,7 @@ def apply_schema_properties(fields, schema): "name": "uuid", "namespace": "processed_crash", "query_type": "enum", - "storage_mapping": {"analyzer": "keyword", "type": "string"}, + "storage_mapping": {"type": "keyword"}, }, "vendor": { "data_validation_type": "enum", @@ -2164,7 +2095,7 @@ def apply_schema_properties(fields, schema): "name": "vendor", "namespace": "processed_crash", "query_type": "enum", - "storage_mapping": {"type": "string"}, + "storage_mapping": {"type": "text"}, }, "version": { "data_validation_type": "enum", @@ -2176,7 +2107,7 @@ def apply_schema_properties(fields, schema): "name": "version", "namespace": "processed_crash", "query_type": "enum", - "storage_mapping": {"analyzer": "keyword", "type": "string"}, + "storage_mapping": {"type": "keyword"}, }, "windows_error_reporting": boolean_field( name="windows_error_reporting", diff --git a/socorro/external/es/supersearch.py b/socorro/external/es/supersearch.py index 0fcaf1da65..ed642a2702 100644 --- a/socorro/external/es/supersearch.py +++ b/socorro/external/es/supersearch.py @@ -7,8 +7,9 @@ import datetime import re -from elasticsearch_1_9_0.exceptions import NotFoundError, RequestError -from elasticsearch_dsl_0_0_11 import A, F, Q, Search +from elasticsearch.exceptions import NotFoundError, BadRequestError +from elasticsearch_dsl import A, Q, Search +from more_itertools import one from socorro.external.es.base import generate_list_of_indexes from socorro.external.es.search_common import SearchBase @@ -17,9 +18,7 @@ BAD_INDEX_REGEX = re.compile(r"\[\[(.*)\] missing\]") -ELASTICSEARCH_PARSE_EXCEPTION_REGEX = re.compile( - r"ElasticsearchParseException\[Failed to parse \[([^\]]+)\]\]" -) +ELASTICSEARCH_PARSE_EXCEPTION_REGEX = re.compile(r"\[([^\]]+)\] could not be parsed") def prune_invalid_indices(indices, policy, template): @@ -76,6 +75,13 @@ def get_indices(self, dates): def format_field_names(self, hit): """Return hit with field's search_key replaced with name""" + if not hit.keys(): + return {} + + # unnest hit one level and update keys + namespace = one(hit) + hit = {f"{namespace}.{key}": item for key, item in hit[namespace].items()} + new_hit = {} for field_name in self.request_columns: field = self.all_fields[field_name] @@ -137,8 +143,17 @@ def _format(aggregation): return aggregation for i, bucket in enumerate(aggregation["buckets"]): + term = bucket.get("key_as_string") + if "key_as_string" in bucket: + term = bucket["key_as_string"] + if term in ("true", "false"): + # Restore es 1.4 format for boolean terms as string + term = term[:1].upper() + else: + term = bucket["key"] + new_bucket = { - "term": bucket.get("key_as_string", bucket["key"]), + "term": term, "count": bucket["doc_count"], } facets = {} @@ -161,6 +176,10 @@ def _format(aggregation): for agg in aggs: aggs[agg] = _format(aggs[agg]) + # NOTE(krzepka) ES 8.X responds with additional "__orig_class__" key, which would be + # treated as an additional aggregation facet, so it gets deleted below. + # aggs.pop("__orig_class__", None) + return aggs def get(self, **kwargs): @@ -192,7 +211,6 @@ def get(self, **kwargs): search = Search( using=self.get_connection(), index=indices, - doc_type=self.crashstorage.get_doctype(), ) # Create filters. @@ -267,12 +285,14 @@ def get(self, **kwargs): # to match for analyzed and non-analyzed supersearch fields. if len(param.value) == 1: # Only one value, so we only do a single match - filter_type = "query" - args = Q({"match": {search_key: param.value[0]}}).to_dict() + filter_type = "match" + args = Q({"match": {search_key: param.value[0]}}).to_dict()[ + "match" + ] else: # Multiple values, so we do multiple matches wrapped in a bool # query where at least one of them should match - filter_type = "query" + filter_type = "bool" args = Q( { "bool": { @@ -283,7 +303,7 @@ def get(self, **kwargs): "minimum_should_match": 1, } } - ).to_dict() + ).to_dict()["bool"] elif param.operator == "=": # is exactly if field_data["has_full_version"]: @@ -293,8 +313,8 @@ def get(self, **kwargs): filter_type = "range" filter_value = {operator_range[param.operator]: param.value} elif param.operator == "__null__": - filter_type = "missing" - args["field"] = search_key + filter_type = "bool" + args["must_not"] = [Q("exists", field=search_key)] elif param.operator == "__true__": filter_type = "term" filter_value = True @@ -304,7 +324,7 @@ def get(self, **kwargs): search_key = f"{search_key}.full" filter_value = param.value elif param.operator in operator_wildcards: - filter_type = "query" + filter_type = "wildcard" # Wildcard operations are better applied to a non-analyzed # field (called "full") if there is one. @@ -316,13 +336,13 @@ def get(self, **kwargs): operator_wildcards[param.operator] % param.value ) query = Q("wildcard", **q_args) - args = query.to_dict() + args = query.to_dict()["wildcard"] if filter_value is not None: args[search_key] = filter_value if args: - new_filter = F(filter_type, **args) + new_filter = Q(filter_type, **args) if param.operator_not: new_filter = ~new_filter @@ -338,7 +358,7 @@ def get(self, **kwargs): if sub_filters is not None: filters.append(sub_filters) - search = search.filter(F("bool", must=filters)) + search = search.filter(Q("bool", must=filters)) # Restricting returned fields. fields = [] @@ -355,7 +375,7 @@ def get(self, **kwargs): field_name = self.get_field_name(value, full=False) fields.append(field_name) - search = search.fields(fields) + search = search.source(fields) # Sorting. sort_fields = [] @@ -424,7 +444,10 @@ def get(self, **kwargs): break # Yay! Results! except NotFoundError as e: - missing_index = re.findall(BAD_INDEX_REGEX, e.error)[0] + if e.body["error"]["type"] != "index_not_found_exception": + # Could be an alias not found exception, which we don't handle here + raise + missing_index = e.body["error"]["index"] if missing_index in indices: del indices[indices.index(missing_index)] else: @@ -450,24 +473,25 @@ def get(self, **kwargs): shards = None break - except RequestError as exc: + except BadRequestError as exc: # Try to handle it gracefully if we can find out what # input was bad and caused the exception. # Not an ElasticsearchParseException exception with suppress(IndexError): - bad_input = ELASTICSEARCH_PARSE_EXCEPTION_REGEX.findall(exc.error)[ - -1 - ] - # Loop over the original parameters to try to figure - # out which *key* had the bad input. - for key, value in kwargs.items(): - if value == bad_input: - raise BadArgumentError(key) from exc - - # If it's a search parse exception, but we don't know what key is the - # problem, raise a general BadArgumentError - if "Failed to parse source" in str(exc): - raise BadArgumentError("Malformed supersearch query.") from exc + if ( + exc.body["error"]["type"] == "x_content_parse_exception" + and exc.body["error"]["caused_by"]["type"] + == "illegal_argument_exception" + ): + bad_input = ELASTICSEARCH_PARSE_EXCEPTION_REGEX.findall( + exc.body["error"]["caused_by"]["reason"] + )[-1] + + # Loop over the original parameters to try to figure + # out which *key* had the bad input. + for key, value in kwargs.items(): + if value == bad_input: + raise BadArgumentError(key) from exc # Re-raise the original exception raise @@ -477,6 +501,13 @@ def get(self, **kwargs): # results, so the client can decide what to do. failed_indices = defaultdict(int) for failure in shards.failures: + # If it's a search parse exception we don't know what key is the + # problem, so raise a general BadArgumentError + if ( + failure.reason.type == "query_shard_exception" + and failure.reason.caused_by.type == "illegal_argument_exception" + ): + raise BadArgumentError(f"Malformed supersearch query: {failure}") failed_indices[failure.index] += 1 for index, shards_count in failed_indices.items(): @@ -540,15 +571,27 @@ def _get_histogram_agg(self, field, intervals): and "date_histogram" or "histogram" ) - return A( - histogram_type, field=self.get_field_name(field), interval=intervals[field] - ) + # NOTE(krzepka) By default ES 8.15 will return empty buckets, which creates issues in tests. + # Setting min_doc_count makes ES return only non-empty buckets. + # https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-bucket-histogram-aggregation.html + kwargs = {"min_doc_count": 1} + # NOTE(krzepka) "The date_histogram aggregation’s interval parameter is no longer valid." + # https://www.elastic.co/guide/en/elasticsearch/reference/master/migrating-8.0.html + if histogram_type == "date_histogram": + kwargs["calendar_interval"] = intervals[field] + else: + kwargs["interval"] = intervals[field] + return A(histogram_type, field=self.get_field_name(field), **kwargs) def _get_cardinality_agg(self, field): return A("cardinality", field=self.get_field_name(field)) def _get_fields_agg(self, field, facets_size): - return A("terms", field=self.get_field_name(field), size=facets_size) + return A( + "terms", + field=self.get_field_name(field), + size=facets_size, + ) def _add_second_level_aggs( self, param, recipient, facets_size, histogram_intervals diff --git a/socorro/external/legacy_es/crashstorage.py b/socorro/external/legacy_es/crashstorage.py index ce880a65b5..3d64482789 100644 --- a/socorro/external/legacy_es/crashstorage.py +++ b/socorro/external/legacy_es/crashstorage.py @@ -16,6 +16,8 @@ from socorro.external.crashstorage_base import CrashStorageBase from socorro.external.legacy_es.connection_context import LegacyConnectionContext +from socorro.external.legacy_es.query import LegacyQuery +from socorro.external.legacy_es.supersearch import LegacySuperSearch from socorro.external.legacy_es.super_search_fields import ( build_mapping, FIELDS, @@ -28,6 +30,12 @@ from socorro.lib.libdatetime import JsonDTEncoder, string_to_datetime, utc_now +# Map of processed crash schema permissions to webapp permissions +PROCESSED_CRASH_TO_WEBAPP_PERMISSIONS = { + "public": "", + "protected": "crashstats.view_pii", +} + # Additional custom analyzers for crash report data ES_CUSTOM_ANALYZERS = { "analyzer": {"semicolon_keywords": {"type": "pattern", "pattern": ";"}} @@ -270,9 +278,36 @@ def build_document(src, crash_document, fields, all_keys): glom.assign(crash_document, dest_key, value, missing=dict) +def convert_permissions(fields): + """Converts processed crash schema / super search permissions to webapp permissions + + :arg fields: super search fields structure to convert permissions of + + :returns: fields with permissions converted + + """ + + def _convert(permissions): + if not permissions: + return permissions + + new_permissions = [ + PROCESSED_CRASH_TO_WEBAPP_PERMISSIONS[perm] for perm in permissions + ] + return [perm for perm in new_permissions if perm] + + for val in fields.values(): + val["permissions_needed"] = _convert(val["permissions_needed"]) + + return fields + + class LegacyESCrashStorage(CrashStorageBase): """Indexes documents based on the processed crash to Elasticsearch.""" + # For use in webapp + SUPERSEARCH_FIELDS = convert_permissions(FIELDS) + # These regex will catch field names from Elasticsearch exceptions. They # have been tested with Elasticsearch 1.4. field_name_string_error_re = re.compile(r"field=\"([\w\-.]+)\"") @@ -316,6 +351,24 @@ def __init__( def build_client(cls, url, timeout): return LegacyConnectionContext(url=url, timeout=timeout) + def build_query(self): + """Return new instance of LegacyQuery.""" + return LegacyQuery(crashstorage=self) + + def build_supersearch(self): + """Return new instance of LegacySuperSearch.""" + return LegacySuperSearch(crashstorage=self) + + def build_search(self, **kwargs): + """Return new instance of elasticsearch_dsl's Search.""" + with self.client() as conn: + return Search(using=conn, doc_type=self.get_doctype(), **kwargs) + + @staticmethod + def get_source_key(field): + """Return source key for the field.""" + return get_source_key(field) + def get_index_template(self): """Return template for index names.""" return self.index @@ -355,28 +408,26 @@ def get_socorro_index_settings(self, mappings): "mappings": mappings, } - def get_mapping(self, index_name, es_doctype, reraise=False): - """Retrieves the mapping for a given index and doctype + def get_mapping(self, index_name, reraise=False): + """Retrieves the mapping for a given index and the configured doctype NOTE(willkg): Mappings are cached on the LegacyESCrashStorage instance. If you change the indices (like in tests), you should get a new LegacyESCrashStorage instance. :arg str index_name: the index to retrieve the mapping for - :arg str es_doctype: the doctype to retrieve the mapping for :arg bool reraise: True if you want this to reraise a NotFoundError; False otherwise :returns: mapping as a dict or None """ - cache_key = f"{index_name}::{es_doctype}" - mapping = self._mapping_cache.get(cache_key) + mapping = self._mapping_cache.get(index_name) if mapping is None: try: mapping = self.client.get_mapping( - index_name=index_name, doc_type=es_doctype + index_name=index_name, doc_type=self.get_doctype() ) - self._mapping_cache[cache_key] = mapping + self._mapping_cache[index_name] = mapping except elasticsearch.exceptions.NotFoundError: if reraise: raise @@ -458,31 +509,29 @@ def get_keys_for_indexable_fields(self): return keys - def get_keys_for_mapping(self, index_name, es_doctype): + def get_keys_for_mapping(self, index_name): """Get the keys in "namespace.key" format for a given mapping NOTE(willkg): Results are cached on this LegacyESCrashStorage instance. :arg str index_name: the name of the index - :arg str es_doctype: the doctype for the index :returns: set of "namespace.key" fields :raise elasticsearch.exceptions.NotFoundError: if the index doesn't exist """ - cache_key = f"{index_name}::{es_doctype}" - keys = self._keys_for_mapping_cache.get(cache_key) + keys = self._keys_for_mapping_cache.get(index_name) if keys is None: - mapping = self.get_mapping(index_name, es_doctype, reraise=True) + mapping = self.get_mapping(index_name, reraise=True) keys = parse_mapping(mapping, None) - self._keys_for_mapping_cache[cache_key] = keys + self._keys_for_mapping_cache[index_name] = keys return keys - def get_keys(self, index_name, es_doctype): + def get_keys(self, index_name): supersearch_fields_keys = self.get_keys_for_indexable_fields() try: - mapping_keys = self.get_keys_for_mapping(index_name, es_doctype) + mapping_keys = self.get_keys_for_mapping(index_name) except NotFoundError: mapping_keys = None all_valid_keys = supersearch_fields_keys @@ -501,8 +550,7 @@ def save_processed_crash(self, raw_crash, processed_crash): index_name = self.get_index_for_date( string_to_datetime(processed_crash["date_processed"]) ) - es_doctype = self.get_doctype() - all_valid_keys = self.get_keys(index_name, es_doctype) + all_valid_keys = self.get_keys(index_name) src = {"processed_crash": copy.deepcopy(processed_crash)} @@ -517,7 +565,6 @@ def save_processed_crash(self, raw_crash, processed_crash): self._submit_crash_to_elasticsearch( crash_id=crash_id, - es_doctype=es_doctype, index_name=index_name, crash_document=crash_document, ) @@ -537,11 +584,14 @@ def _capture(key, data): _capture("crash_document_size", crash_document) - def _index_crash(self, connection, es_index, es_doctype, crash_document, crash_id): + def _index_crash(self, connection, es_index, crash_document, crash_id): try: start_time = time.time() connection.index( - index=es_index, doc_type=es_doctype, body=crash_document, id=crash_id + index=es_index, + doc_type=self.get_doctype(), + body=crash_document, + id=crash_id, ) index_outcome = "successful" except Exception: @@ -553,9 +603,7 @@ def _index_crash(self, connection, es_index, es_doctype, crash_document, crash_i "index", value=elapsed_time * 1000.0, tags=["outcome:" + index_outcome] ) - def _submit_crash_to_elasticsearch( - self, crash_id, es_doctype, index_name, crash_document - ): + def _submit_crash_to_elasticsearch(self, crash_id, index_name, crash_document): """Submit a crash report to elasticsearch""" # Attempt to create the index; it's OK if it already exists. @@ -569,9 +617,7 @@ def _submit_crash_to_elasticsearch( for _ in range(5): try: with self.client() as conn: - return self._index_crash( - conn, index_name, es_doctype, crash_document, crash_id - ) + return self._index_crash(conn, index_name, crash_document, crash_id) except elasticsearch.exceptions.ConnectionError: # If this is a connection error, sleep a second and then try again diff --git a/socorro/tests/conftest.py b/socorro/tests/conftest.py index 1e06e8d8c5..47493ae79c 100644 --- a/socorro/tests/conftest.py +++ b/socorro/tests/conftest.py @@ -13,7 +13,8 @@ import uuid import sys -from elasticsearch_dsl_0_0_11 import Search +from elasticsearch_dsl import Search +from elasticsearch_dsl_0_0_11 import Search as LegacySearch from google.api_core.exceptions import AlreadyExists, NotFound from google.auth.credentials import AnonymousCredentials from google.cloud import storage @@ -187,6 +188,107 @@ def gcs_helper(): yield gcs_helper +class ElasticsearchHelper: + """Elasticsearch helper class. + + When used in a context, this will clean up any indexes created. + + """ + + def __init__(self): + if settings.ELASTICSEARCH_MODE == "LEGACY_ONLY": + raise ValueError("cannot test elasticearch 8 in LEGACY_ONLY mode") + + self._crashstorage = build_instance_from_settings(settings.ES_STORAGE) + self.conn = self._crashstorage.client + + def get_index_template(self): + return self._crashstorage.get_index_template() + + def create_index(self, index_name): + print(f"ElasticsearchHelper: creating index: {index_name}") + self._crashstorage.create_index(index_name) + + def create_indices(self): + # Create all the indexes for the last couple of weeks; we have to do it this way + # to handle split indexes over the new year + template = self._crashstorage.index + to_create = set() + + for i in range(14): + index_name = (utc_now() - datetime.timedelta(days=i)).strftime(template) + to_create.add(index_name) + + for index_name in to_create: + print(f"ElasticsearchHelper: creating index: {index_name}") + self._crashstorage.create_index(index_name) + + self.health_check() + + def delete_indices(self): + for index in self._crashstorage.get_indices(): + self._crashstorage.delete_index(index) + + def get_indices(self): + return self._crashstorage.get_indices() + + def health_check(self): + with self.conn() as conn: + conn.options(request_timeout=5).cluster.health(wait_for_status="yellow") + + def get_url(self): + """Returns the Elasticsearch url.""" + return settings.ES_STORAGE["options"]["url"] + + def refresh(self): + self.conn.refresh() + + def index_crash(self, processed_crash, refresh=True): + """Index a single crash and refresh""" + self._crashstorage.save_processed_crash( + raw_crash={}, + processed_crash=processed_crash, + ) + + if refresh: + self.refresh() + + def index_many_crashes(self, number, processed_crash=None, loop_field=None): + """Index multiple crashes and refresh at the end""" + processed_crash = processed_crash or {} + + crash_ids = [] + for i in range(number): + processed_copy = processed_crash.copy() + processed_copy["uuid"] = create_new_ooid() + processed_copy["date_processed"] = date_from_ooid(processed_copy["uuid"]) + if loop_field is not None: + processed_copy[loop_field] = processed_crash[loop_field] % i + + self.index_crash(processed_crash=processed_copy, refresh=False) + + self.refresh() + return crash_ids + + def get_crash_data(self, crash_id): + """Get source in index for given crash_id + + :arg crash_id: the crash id to fetch the source for + + :returns: source as a Python dict or None if it doesn't exist + + """ + index = self._crashstorage.get_index_for_date(date_from_ooid(crash_id)) + + with self.conn() as conn: + search = Search(using=conn, index=index) + search = search.filter("term", **{"processed_crash.uuid": crash_id}) + results = search.execute().to_dict() + + if results["hits"]["hits"]: + return results["hits"]["hits"][0]["_source"] + + class LegacyElasticsearchHelper: """Legacy Elasticsearch helper class. @@ -281,7 +383,7 @@ def get_crash_data(self, crash_id): doc_type = self._crashstorage.get_doctype() with self.conn() as conn: - search = Search(using=conn, index=index, doc_type=doc_type) + search = LegacySearch(using=conn, index=index, doc_type=doc_type) search = search.filter("term", **{"processed_crash.uuid": crash_id}) results = search.execute().to_dict() @@ -312,14 +414,45 @@ def legacy_es_helper(): legacy_es_helper.delete_indices() +def _generate_es_helper(): + es_helper = ElasticsearchHelper() + es_helper.create_indices() + yield es_helper + es_helper.delete_indices() + + +@pytest.fixture +def es_helper(): + """Returns an Elasticsearch helper for tests. + + Provides: + + * ``get_url()`` + * ``create_indices()`` + * ``delete_indices()`` + * ``get_indices()`` + * ``index_crash()`` + * ``index_many_crashes()`` + * ``refresh()`` + * ``get_crash_data()`` + + """ + yield from _generate_es_helper() + + @pytest.fixture -def es_helper(legacy_es_helper): +def preferred_es_helper(legacy_es_helper): """Returns an Elasticsearch helper for tests. This returns a legacy or non-legacy helper depending on how the webapp is - configured. For now that means it always returns a legacy helper. + configured. """ - yield legacy_es_helper + if settings.ELASTICSEARCH_MODE == "LEGACY_ONLY": + yield legacy_es_helper + else: + # NOTE(relud): this fixture cannot reuse the es_helper fixture because + # ElasticsearchHelper cannot be instantiated in LEGACY_ONLY mode + yield from _generate_es_helper() class PubSubHelper: diff --git a/socorro/tests/external/es/test_crashstorage.py b/socorro/tests/external/es/test_crashstorage.py index feb45a388c..915f8b36db 100644 --- a/socorro/tests/external/es/test_crashstorage.py +++ b/socorro/tests/external/es/test_crashstorage.py @@ -12,6 +12,7 @@ from socorro import settings from socorro.external.es.crashstorage import ( + convert_permissions, fix_boolean, fix_integer, fix_keyword, @@ -21,7 +22,7 @@ ) from socorro.external.es.super_search_fields import build_mapping -from socorro.libclass import build_instance +from socorro.libclass import build_instance_from_settings from socorro.lib.libdatetime import date_to_string, utc_now from socorro.lib.libooid import create_new_ooid, date_from_ooid @@ -94,10 +95,10 @@ def test_invalid_key(self, key): class TestESCrashStorage: def build_crashstorage(self): - return build_instance( - class_path="socorro.external.es.crashstorage.ESCrashStorage", - kwargs=settings.ES_STORAGE["options"], - ) + if settings.ELASTICSEARCH_MODE == "LEGACY_ONLY": + raise ValueError("cannot test elasticearch 8 in LEGACY_ONLY mode") + + return build_instance_from_settings(settings.ES_STORAGE) def test_index_crash(self, es_helper): """Test indexing a crash document.""" @@ -122,7 +123,7 @@ def test_index_crash_indexable_keys(self, es_helper): "another_invalid_key": "alpha", "date_processed": date_to_string(utc_now()), "uuid": "936ce666-ff3b-4c7a-9674-367fe2120408", - "dom_fission_enabled": "1", + "dom_fission_enabled": False, } crashstorage = self.build_crashstorage() @@ -167,9 +168,8 @@ def test_index_crash_mapping_keys(self, es_helper): # We're going to use a mapping from super search fields, bug remove the # user_comments field. - mappings = build_mapping(crashstorage.get_doctype()) - doctype = crashstorage.get_doctype() - del mappings[doctype]["properties"]["processed_crash"]["properties"][field] + mappings = build_mapping() + del mappings["properties"]["processed_crash"]["properties"][field] # Create the index for 4 weeks ago crashstorage.create_index( @@ -297,7 +297,6 @@ def test_index_data_capture(self, es_helper): crashstorage._index_crash( connection=mock_connection, es_index=None, - es_doctype=None, crash_document=None, crash_id=None, ) @@ -307,7 +306,6 @@ def test_index_data_capture(self, es_helper): crashstorage._index_crash( connection=mock_connection, es_index=None, - es_doctype=None, crash_document=None, crash_id=None, ) @@ -382,8 +380,7 @@ def test_delete_expired_indices(self, es_helper): REMOVED_VALUE, ), # Booleans are converted - # FIXME(willkg): fix_boolean is never called--that's wrong - # ("processed_crash.accessibility", "true", True), + ("processed_crash.accessibility", "true", True), ], ) def test_indexing_bad_data(self, key, value, expected_value, es_helper): @@ -508,3 +505,34 @@ def test_fix_integer(value, expected): def test_fix_long(value, expected): new_value = fix_long(value) assert new_value == expected + + +def test_convert_permissions(): + fields = { + "build": { + "permissions_needed": [], + }, + "product": { + "permissions_needed": ["public"], + }, + "version": { + "permissions_needed": ["public", "protected"], + }, + } + + expected = { + "build": { + # No permission -> no required permissions + "permissions_needed": [], + }, + "product": { + # "public" -> no required permissions + "permissions_needed": [], + }, + "version": { + # "protected" -> "crashstats.view_pii" + "permissions_needed": ["crashstats.view_pii"], + }, + } + + assert convert_permissions(fields) == expected diff --git a/socorro/tests/external/es/test_query.py b/socorro/tests/external/es/test_query.py index f6005aff0b..9ec4fc2b07 100644 --- a/socorro/tests/external/es/test_query.py +++ b/socorro/tests/external/es/test_query.py @@ -52,9 +52,8 @@ def test_get(self, es_helper): query = { "query": { - "filtered": { - "query": {"match_all": {}}, - "filter": {"term": {"product": "earthraccoon"}}, + "constant_score": { + "filter": {"term": {"processed_crash.product": "earthraccoon"}}, } } } @@ -80,7 +79,6 @@ def test_get_with_indices(self, es_helper, monkeypatch): """Verify that .get() uses the correct indices.""" crashstorage = self.build_crashstorage() api = Query(crashstorage=crashstorage) - doc_type = api.crashstorage.get_doctype() # Mock the connection so we can see the list of indexes it's building mocked_connection = mock.MagicMock() @@ -96,7 +94,6 @@ def test_get_with_indices(self, es_helper, monkeypatch): mocked_connection.return_value.search.assert_called_with( body='{"query": {}}', index=indices, - doc_type=doc_type, ) # Test all indices. @@ -111,5 +108,4 @@ def test_get_with_indices(self, es_helper, monkeypatch): mocked_connection.return_value.search.assert_called_with( body='{"query": {}}', index=["socorro_201801", "socorro_200047", "not_an_index"], - doc_type=doc_type, ) diff --git a/socorro/tests/external/es/test_super_search_fields.py b/socorro/tests/external/es/test_super_search_fields.py index 797c3f13cc..7132ef65de 100644 --- a/socorro/tests/external/es/test_super_search_fields.py +++ b/socorro/tests/external/es/test_super_search_fields.py @@ -8,10 +8,8 @@ import pytest from socorro.external.es.super_search_fields import ( - add_doc_values, build_mapping, FIELDS, - is_doc_values_friendly, get_fields_by_item, ) @@ -29,49 +27,49 @@ class Test_get_fields_by_item: # No storage_mapping {"key": {"in_database_name": "key"}}, # Wrong or missing analyzer - {"key": {"in_database_name": "key", "storage_mapping": {"type": "string"}}}, + {"key": {"in_database_name": "key", "storage_mapping": {"type": "text"}}}, { "key": { "in_database_name": "key", "storage_mapping": { "analyzer": "semicolon_keywords", - "type": "string", + "type": "text", }, } }, ], ) def test_no_match(self, fields): - assert get_fields_by_item(fields, "analyzer", "keyword") == [] + assert get_fields_by_item(fields, "type", "keyword") == [] def test_match(self): fields = { "key": { "in_database_name": "key", - "storage_mapping": {"analyzer": "keyword", "type": "string"}, + "storage_mapping": {"type": "keyword"}, } } - assert get_fields_by_item(fields, "analyzer", "keyword") == [fields["key"]] + assert get_fields_by_item(fields, "type", "keyword") == [fields["key"]] def test_match_by_type(self): fields = { "key": { "in_database_name": "key", - "storage_mapping": {"analyzer": "keyword", "type": "string"}, + "storage_mapping": {"type": "keyword"}, } } - assert get_fields_by_item(fields, "type", "string") == [fields["key"]] + assert get_fields_by_item(fields, "type", "keyword") == [fields["key"]] def test_caching(self): # Verify caching works fields = { "key": { "in_database_name": "key", - "storage_mapping": {"analyzer": "keyword", "type": "string"}, + "storage_mapping": {"type": "keyword"}, } } - result = get_fields_by_item(fields, "analyzer", "keyword") - second_result = get_fields_by_item(fields, "analyzer", "keyword") + result = get_fields_by_item(fields, "type", "keyword") + second_result = get_fields_by_item(fields, "type", "keyword") assert id(result) == id(second_result) # This is the same data as fields, but a different dict, so it has a @@ -79,22 +77,20 @@ def test_caching(self): second_fields = { "key": { "in_database_name": "key", - "storage_mapping": {"analyzer": "keyword", "type": "string"}, + "storage_mapping": {"type": "keyword"}, } } - third_result = get_fields_by_item(second_fields, "analyzer", "keyword") + third_result = get_fields_by_item(second_fields, "type", "keyword") assert id(result) != id(third_result) class Test_build_mapping: """Test build_mapping with an elasticsearch database containing fake data""" - def test_get_mapping(self, es_helper): - doctype = es_helper.get_doctype() - mapping = build_mapping(doctype=doctype, fields=get_fields()) + def test_get_mapping(self): + mapping = build_mapping(fields=get_fields()) - assert doctype in mapping - properties = mapping[doctype]["properties"] + properties = mapping["properties"] print(json.dumps(properties, indent=4, sort_keys=True)) assert "processed_crash" in properties @@ -107,8 +103,7 @@ def test_get_mapping(self, es_helper): # Those fields have a `storage_mapping`. assert processed_crash["release_channel"] == { - "analyzer": "keyword", - "type": "string", + "type": "keyword", } # Test nested objects. @@ -116,7 +111,6 @@ def test_get_mapping(self, es_helper): "cpu_count" ] == { "type": "short", - "doc_values": True, } @@ -223,47 +217,3 @@ def test_validate_super_search_fields(name, properties): f"processed_crash.{properties['name']}", ] assert key in possible_keys - - -@pytest.mark.parametrize( - "value, expected", - [ - # No type -> False - ({}, False), - # object -> False - ({"type": "object"}, False), - # Analyzed string -> False - ({"type": "string"}, False), - ({"type": "string", "analyzer": "keyword"}, False), - # Unanalyzed string -> True - ({"type": "string", "index": "not_analyzed"}, True), - # Anything else -> True - ({"type": "long"}, True), - ], -) -def test_is_doc_values_friendly(value, expected): - assert is_doc_values_friendly(value) == expected - - -def test_add_doc_values(): - storage_mapping = {"type": "short"} - add_doc_values(storage_mapping) - assert storage_mapping == {"type": "short", "doc_values": True} - - storage_mapping = { - "fields": { - "full": {"index": "not_analyzed", "type": "string"}, - }, - "analyzer": "standard", - "index": "analyzed", - "type": "string", - } - add_doc_values(storage_mapping) - assert storage_mapping == { - "fields": { - "full": {"index": "not_analyzed", "type": "string", "doc_values": True}, - }, - "analyzer": "standard", - "index": "analyzed", - "type": "string", - } diff --git a/socorro/tests/external/es/test_supersearch.py b/socorro/tests/external/es/test_supersearch.py index 824afb9193..10056854c5 100644 --- a/socorro/tests/external/es/test_supersearch.py +++ b/socorro/tests/external/es/test_supersearch.py @@ -4,8 +4,10 @@ import datetime import json +import http.server +import threading +from contextlib import contextmanager -import requests_mock import pytest from socorro import settings @@ -19,6 +21,30 @@ from socorro.lib.libooid import create_new_ooid +@contextmanager +def mock_es_server(ip, port, post_response): + class MockHTTPRequestHandler(http.server.BaseHTTPRequestHandler): + """mock request handler""" + + def do_POST(self): # pylint: disable=invalid-name + """Handle GET requests""" + self.send_response(200) + self.send_header("Content-Type", "application/json") + self.send_header("X-Elastic-Product", "Elasticsearch") + self.end_headers() + self.wfile.write(json.dumps(post_response).encode("utf-8")) + + server = http.server.ThreadingHTTPServer((ip, port), MockHTTPRequestHandler) + with server: + server_thread = threading.Thread(target=server.serve_forever) + server_thread.daemon = True + server_thread.start() + try: + yield + finally: + server.shutdown() + + class SuperSearchWithFields(SuperSearch): """Adds FIELDS to all .get() calls""" @@ -89,9 +115,14 @@ def test_get(self, es_helper): }, ) es_helper.refresh() - res = api.get( - _columns=["date", "build_id", "platform", "signature", "cpu_count"] + _columns=[ + "date", + "build_id", + "platform", + "signature", + "cpu_count", + ] ) assert "hits" in res @@ -103,10 +134,12 @@ def test_get(self, es_helper): assert res["hits"][0]["signature"] == "js::break_your_browser" assert list(res["facets"].keys()) == ["signature"] - assert res["facets"]["signature"][0] == { - "term": "js::break_your_browser", - "count": 1, - } + assert res["facets"]["signature"] == [ + { + "term": "js::break_your_browser", + "count": 1, + } + ] # Test fields are being renamed. assert "date" in res["hits"][0] # date_processed -> date @@ -1195,9 +1228,12 @@ def test_get_with_date_histogram(self, es_helper): def dt_to_midnight(date): return date.replace(hour=0, minute=0, second=0, microsecond=0) - today_str = dt_to_midnight(now).isoformat() - yesterday_str = dt_to_midnight(yesterday).isoformat() - day_before_str = dt_to_midnight(the_day_before).isoformat() + # NOTE(relud) this used to use .isoformat() but with elasticsearch 8 dates are + # returned with millisecond precision and indicate time zone with Z instead of + # +00:00, so we have to carefully format the dates here. + today_str = f"{dt_to_midnight(now):%FT%H:%M:%S.000Z}" + yesterday_str = f"{dt_to_midnight(yesterday):%FT%H:%M:%S.000Z}" + day_before_str = f"{dt_to_midnight(the_day_before):%FT%H:%M:%S.000Z}" expected_terms = [ { @@ -1327,13 +1363,9 @@ def test_get_with_date_histogram_with_bad_interval(self, es_helper): "_histogram_interval.date": "xdays", # Note! It's just wrong } - # Not using assert_raises here so we can do a check on the exception - # object when it does raise. - try: + with pytest.raises(BadArgumentError) as exc_info: api.get(**kwargs) - raise AssertionError("The line above is supposed to error out") - except BadArgumentError as exception: - assert exception.param == "_histogram_interval.date" + assert exc_info._excinfo[1].param == "_histogram_interval.date" def test_get_with_number_histogram(self, es_helper): crashstorage = self.build_crashstorage() @@ -1735,107 +1767,120 @@ def test_get_with_bad_regex(self, es_helper): crashstorage = self.build_crashstorage() api = SuperSearchWithFields(crashstorage=crashstorage) - # A bad regex kicks up a SearchParseException which supersearch converts + # regex is lazy-evaluated, so there has to be a crash matched by everything but + # the regex to produce a failure + now = utc_now() + es_helper.index_crash( + processed_crash={ + "uuid": create_new_ooid(timestamp=now), + "signature": "test.dll", + "date_processed": now, + }, + ) + + # A bad regex kicks up a shard failure which supersearch converts # to a BadArgumentError with pytest.raises(BadArgumentError): - api.get(signature='@"OOM | ".*" | ".*"()&%