diff --git a/bin/setup_services.sh b/bin/setup_services.sh index 8dc46e259e..1e787791e4 100755 --- a/bin/setup_services.sh +++ b/bin/setup_services.sh @@ -13,6 +13,9 @@ set -euo pipefail +# wait for dev services to start up +./bin/waitfor_services.sh + # Drop and re-create the breakpad database with tables, stored procedures, # types, indexes, and keys; also bulk-loads static data for some lookup tables /app/bin/setup_postgres.sh diff --git a/bin/test.sh b/bin/test.sh index 8143e8ede3..cd1c4ee425 100755 --- a/bin/test.sh +++ b/bin/test.sh @@ -27,13 +27,7 @@ PYTHON="$(which python)" echo ">>> wait for services to be ready" -waitfor --verbose --conn-only "${DATABASE_URL}" -waitfor --verbose "${LEGACY_ELASTICSEARCH_URL}" -waitfor --verbose "http://${PUBSUB_EMULATOR_HOST}" -waitfor --verbose "${STORAGE_EMULATOR_HOST}/storage/v1/b" -waitfor --verbose --codes={200,404} "${SENTRY_DSN}" -# wait for this last because it's slow to start -waitfor --verbose --timeout=30 "${ELASTICSEARCH_URL}" +./bin/waitfor_services.sh echo ">>> build queue things and db things" @@ -53,16 +47,14 @@ popd echo ">>> run tests" # Run socorro tests -ELASTICSEARCH_MODE=LEGACY_ONLY "${PYTEST}" -# override ELASTICSEARCH_URL to use legacy elasticsearch so we can test PREFER_NEW without -# implementing es8 support. Override will be removed when socorrro/external/es expects es8 -ELASTICSEARCH_MODE=PREFER_NEW ELASTICSEARCH_URL="${LEGACY_ELASTICSEARCH_URL}" "${PYTEST}" +# ignore tests because elasticsearch 8 tests cannot be run with LEGACY_ONLY +ELASTICSEARCH_MODE=LEGACY_ONLY "${PYTEST}" --ignore=socorro/tests/external/es/ +# ignore tests because elasticsearch 8 supersearch is not implemented yet +ELASTICSEARCH_MODE=PREFER_NEW "${PYTEST}" # Collect static and then run pytest in the webapp pushd webapp ${PYTHON} manage.py collectstatic --noinput ELASTICSEARCH_MODE=LEGACY_ONLY "${PYTEST}" -# override ELASTICSEARCH_URL to use legacy elasticsearch so we can test PREFER_NEW without -# implementing es8 support. Override will be removed when socorrro/external/es expects es8 -ELASTICSEARCH_MODE=PREFER_NEW ELASTICSEARCH_URL="${LEGACY_ELASTICSEARCH_URL}" "${PYTEST}" +ELASTICSEARCH_MODE=PREFER_NEW "${PYTEST}" popd diff --git a/bin/waitfor_services.sh b/bin/waitfor_services.sh new file mode 100755 index 0000000000..58919317c0 --- /dev/null +++ b/bin/waitfor_services.sh @@ -0,0 +1,21 @@ +#!/bin/bash + +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + +# Usage: bin/waitfor_services.sh +# +# Waits for dev services to start up. +# +# Note: This should be called from inside a container. + +set -euo pipefail + +waitfor --verbose --conn-only "${DATABASE_URL}" +waitfor --verbose "${LEGACY_ELASTICSEARCH_URL}" +waitfor --verbose "http://${PUBSUB_EMULATOR_HOST}" +waitfor --verbose "${STORAGE_EMULATOR_HOST}/storage/v1/b" +waitfor --verbose --codes={200,404} "${SENTRY_DSN}" +# wait for this last because it's slow to start +waitfor --verbose --timeout=30 "${ELASTICSEARCH_URL}" diff --git a/docker-compose.yml b/docker-compose.yml index f76bc5a1c2..8a9b2a4a45 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -206,7 +206,7 @@ services: ports: - "9200:9200" - # https://www.elastic.co/guide/en/elasticsearch/reference/8.14/docker.html + # https://www.elastic.co/guide/en/elasticsearch/reference/8.16/docker.html elasticsearch: build: context: docker/images/elasticsearch diff --git a/requirements.in b/requirements.in index 4279243bea..8aa7d5f93c 100644 --- a/requirements.in +++ b/requirements.in @@ -64,8 +64,10 @@ django-waffle==2.3.0 # NOTE(willkg): We stick with LTS releases and the next one is 5.2 (2025). django==4.2.16 -# NOTE(willkg): Need to keep elasticsearch and elasticsearch-dsl at these versions -# because they work with the cluster we're using +# NOTE(relud): Need to keep elasticsearch and elasticsearch-dsl at versions compatible with the +# cluster we're using +elasticsearch==8.15.1 +elasticsearch-dsl==8.15.3 # via docker/set_up_legacy_es_requirements.sh and legacy-es-requirements.txt # NOTE(relud): these dependencies are installed separately via legacy-es-requirements.txt and # docker/set_up_legacy_es_requirements.sh, relocated, and patched to reference the new location (aka diff --git a/requirements.txt b/requirements.txt index 19751f447e..4e15703034 100644 --- a/requirements.txt +++ b/requirements.txt @@ -48,6 +48,7 @@ certifi==2024.7.4 \ --hash=sha256:5a1e7645bc0ec61a09e26c36f6106dd4cf40c6db3a1fb6352b0244e7fb057c7b \ --hash=sha256:c198e21b1289c2ab85ee4e67bb4b4ef3ead0892059901a8d5b622f24a1101e90 # via + # elastic-transport # requests # sentry-sdk cffi==1.16.0 \ @@ -311,6 +312,20 @@ docutils==0.20.1 \ # via # sphinx # sphinx-rtd-theme +elastic-transport==8.15.0 \ + --hash=sha256:85d62558f9baafb0868c801233a59b235e61d7b4804c28c2fadaa866b6766233 \ + --hash=sha256:d7080d1dada2b4eee69e7574f9c17a76b42f2895eff428e562f94b0360e158c0 + # via elasticsearch +elasticsearch==8.15.1 \ + --hash=sha256:02a0476e98768a30d7926335fc0d305c04fdb928eea1354c6e6040d8c2814569 \ + --hash=sha256:40c0d312f8adf8bdc81795bc16a0b546ddf544cb1f90e829a244e4780c4dbfd8 + # via + # -r requirements.in + # elasticsearch-dsl +elasticsearch-dsl==8.15.3 \ + --hash=sha256:4005fa1abef5c0adc5266896c3177d0e98ff5ac49603c36dda9058d616f79257 \ + --hash=sha256:9541a669e5ea996a447289ba3aefb3540dd9983ca0670c0f90ad4a711753422d + # via -r requirements.in enforce-typing==1.0.0.post1 \ --hash=sha256:90347a61d08e7f7578d9714b4f0fd8abd9b6bc48c8ac8d46d7f290d413afabb7 \ --hash=sha256:d3184dfdbfd7f9520c884986561751a6106c57cdd65d730470645d2d40c47e18 @@ -1032,6 +1047,7 @@ python-dateutil==2.9.0.post0 \ --hash=sha256:a8b2bc7bffae282281c8140a97d3aa9c14da0b136dfe83f850eea9a5f7470427 # via # -r requirements.in + # elasticsearch-dsl # freezegun python-decouple==3.8 \ --hash=sha256:ba6e2657d4f376ecc46f77a3a615e058d93ba5e465c01bbe57289bfb7cce680f \ @@ -1320,12 +1336,14 @@ typing-extensions==4.9.0 \ --hash=sha256:af72aea155e91adfc61c3ae9e0e342dbc0cba726d6cba4b6c72c1f34e47291cd # via # dj-database-url + # elasticsearch-dsl # opentelemetry-sdk urllib3==1.26.20 \ --hash=sha256:0ed14ccfbf1c30a9072c7ca157e4319b70d65f623e91e7b32fadb2853431016e \ --hash=sha256:40c2dc0c681e47eb8f90e7e27bf6ff7df2e677421fd46756da1161c39ca70d32 # via # -r requirements.in + # elastic-transport # requests # sentry-sdk urlwait==1.0 \ diff --git a/socorro/external/es/connection_context.py b/socorro/external/es/connection_context.py index aed29789dd..962fef8848 100644 --- a/socorro/external/es/connection_context.py +++ b/socorro/external/es/connection_context.py @@ -2,9 +2,9 @@ # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at https://mozilla.org/MPL/2.0/. -import contextlib +from contextlib import contextmanager -import elasticsearch_1_9_0 as elasticsearch +from elasticsearch import Elasticsearch, RequestError class ConnectionContext: @@ -37,10 +37,9 @@ def connection(self, name=None, timeout=None): if timeout is None: timeout = self.timeout - return elasticsearch.Elasticsearch( + return Elasticsearch( hosts=self.url, - timeout=timeout, - connection_class=elasticsearch.connection.RequestsHttpConnection, + request_timeout=timeout, verify_certs=True, ) @@ -51,9 +50,9 @@ def indices_client(self, name=None): http://elasticsearch-py.readthedocs.org/en/latest/api.html#indices """ - return elasticsearch.client.IndicesClient(self.connection()) + return self.connection().indices - @contextlib.contextmanager + @contextmanager def __call__(self, name=None, timeout=None): conn = self.connection(name, timeout) yield conn @@ -72,10 +71,9 @@ def create_index(self, index_name, index_settings): client.create(index=index_name, body=index_settings) return True - except elasticsearch.exceptions.RequestError as exc: + except RequestError as exc: # If this index already exists, swallow the error. - # NOTE! This is NOT how the error looks like in ES 2.x - if "IndexAlreadyExistsException" not in str(exc): + if "resource_already_exists_exception" not in str(exc): raise return False @@ -85,23 +83,20 @@ def get_indices(self): :returns: list of str """ - indices_client = self.indices_client() - status = indices_client.status() - indices = status["indices"].keys() - return indices + return self.indices_client().get_alias().keys() def delete_index(self, index_name): """Delete an index.""" - self.indices_client().delete(index_name) + self.indices_client().delete(index=index_name) - def get_mapping(self, index_name, doc_type): - """Return the mapping for the specified index and doc_type.""" + def get_mapping(self, index_name): + """Return the mapping for the specified index.""" resp = self.indices_client().get_mapping(index=index_name) - return resp[index_name]["mappings"][doc_type]["properties"] + return resp[index_name]["mappings"]["properties"] def refresh(self, index_name=None): self.indices_client().refresh(index=index_name or "_all") def health_check(self): with self() as conn: - conn.cluster.health(wait_for_status="yellow", request_timeout=5) + conn.options(request_timeout=5).cluster.health(wait_for_status="yellow") diff --git a/socorro/external/es/crashstorage.py b/socorro/external/es/crashstorage.py index a94d92a89d..a203e78c67 100644 --- a/socorro/external/es/crashstorage.py +++ b/socorro/external/es/crashstorage.py @@ -8,14 +8,16 @@ import re import time -import elasticsearch_1_9_0 as elasticsearch -from elasticsearch_1_9_0.exceptions import NotFoundError -from elasticsearch_dsl_0_0_11 import Search +import elasticsearch +from elasticsearch.exceptions import NotFoundError +from elasticsearch_dsl import Search import glom import markus from socorro.external.crashstorage_base import CrashStorageBase from socorro.external.es.connection_context import ConnectionContext +from socorro.external.es.query import Query +from socorro.external.es.supersearch import SuperSearch from socorro.external.es.super_search_fields import ( build_mapping, FIELDS, @@ -225,12 +227,11 @@ def build_document(src, crash_document, fields, all_keys): # Fix values so they index correctly storage_type = field.get("type", field["storage_mapping"].get("type")) - if storage_type == "string": - analyzer = field.get("analyzer", field["storage_mapping"].get("analyzer")) - if analyzer == "keyword": - value = fix_keyword(value, max_size=MAX_KEYWORD_FIELD_VALUE_SIZE) - else: - value = fix_string(value, max_size=MAX_STRING_FIELD_VALUE_SIZE) + if storage_type == "keyword": + value = fix_keyword(value, max_size=MAX_KEYWORD_FIELD_VALUE_SIZE) + + elif storage_type == "text": + value = fix_string(value, max_size=MAX_STRING_FIELD_VALUE_SIZE) elif storage_type == "integer": value = fix_integer(value) @@ -247,6 +248,9 @@ def build_document(src, crash_document, fields, all_keys): if value is None: continue + elif storage_type == "boolean": + value = fix_boolean(value) + for dest_key in get_destination_keys(field): if dest_key in all_keys: glom.assign(crash_document, dest_key, value, missing=dict) @@ -260,7 +264,7 @@ class ESCrashStorage(CrashStorageBase): # These regex will catch field names from Elasticsearch exceptions. They # have been tested with Elasticsearch 1.4. field_name_string_error_re = re.compile(r"field=\"([\w\-.]+)\"") - field_name_number_error_re = re.compile(r"\[failed to parse \[([\w\-.]+)]]") + field_name_number_error_re = re.compile(r"failed to parse field \[([\w\-.]+)]") field_name_unknown_property_error_re = field_name_number_error_re def __init__( @@ -269,7 +273,6 @@ def __init__( index="socorro%Y%W", index_regex=r"^socorro[0-9]{6}$", retention_policy=26, - doctype="crash_reports", metrics_prefix="processor.es", timeout=30, shards_per_index=10, @@ -288,7 +291,6 @@ def __init__( self.index = index self.index_regex = index_regex self.retention_policy = retention_policy - self.doctype = doctype self.shards_per_index = shards_per_index # Cached answers for things that don't change @@ -300,6 +302,24 @@ def __init__( def build_client(cls, url, timeout): return ConnectionContext(url=url, timeout=timeout) + def build_query(self): + """Return new instance of Query.""" + return Query(crashstorage=self) + + def build_supersearch(self): + """Return new instance of SuperSearch.""" + return SuperSearch(crashstorage=self) + + def build_search(self, **kwargs): + """Return new instance of elasticsearch_dsl's Search.""" + with self.client() as conn: + return Search(using=conn, **kwargs) + + @staticmethod + def get_source_key(field): + """Return source key for the field.""" + return get_source_key(field) + def get_index_template(self): """Return template for index names.""" return self.index @@ -318,10 +338,6 @@ def get_index_for_date(self, date): template = self.get_index_template() return date.strftime(template) - def get_doctype(self): - """Return doctype.""" - return self.doctype - def get_retention_policy(self): """Return retention policy in weeks.""" return self.retention_policy @@ -339,28 +355,24 @@ def get_socorro_index_settings(self, mappings): "mappings": mappings, } - def get_mapping(self, index_name, es_doctype, reraise=False): - """Retrieves the mapping for a given index and doctype + def get_mapping(self, index_name, reraise=False): + """Retrieves the mapping for a given index NOTE(willkg): Mappings are cached on the ESCrashStorage instance. If you change the indices (like in tests), you should get a new ESCrashStorage instance. :arg str index_name: the index to retrieve the mapping for - :arg str es_doctype: the doctype to retrieve the mapping for :arg bool reraise: True if you want this to reraise a NotFoundError; False otherwise :returns: mapping as a dict or None """ - cache_key = f"{index_name}::{es_doctype}" - mapping = self._mapping_cache.get(cache_key) + mapping = self._mapping_cache.get(index_name) if mapping is None: try: - mapping = self.client.get_mapping( - index_name=index_name, doc_type=es_doctype - ) - self._mapping_cache[cache_key] = mapping + mapping = self.client.get_mapping(index_name=index_name) + self._mapping_cache[index_name] = mapping except elasticsearch.exceptions.NotFoundError: if reraise: raise @@ -370,13 +382,13 @@ def create_index(self, index_name, mappings=None): """Create an index that will receive crash reports. :arg index_name: the name of the index to create - :arg mappings: dict of doctype->ES mapping + :arg mappings: ES mapping :returns: True if the index was created, False if it already existed """ if mappings is None: - mappings = build_mapping(doctype=self.get_doctype()) + mappings = build_mapping() index_settings = self.get_socorro_index_settings(mappings) @@ -442,31 +454,29 @@ def get_keys_for_indexable_fields(self): return keys - def get_keys_for_mapping(self, index_name, es_doctype): + def get_keys_for_mapping(self, index_name): """Get the keys in "namespace.key" format for a given mapping NOTE(willkg): Results are cached on this ESCrashStorage instance. :arg str index_name: the name of the index - :arg str es_doctype: the doctype for the index :returns: set of "namespace.key" fields :raise elasticsearch.exceptions.NotFoundError: if the index doesn't exist """ - cache_key = f"{index_name}::{es_doctype}" - keys = self._keys_for_mapping_cache.get(cache_key) + keys = self._keys_for_mapping_cache.get(index_name) if keys is None: - mapping = self.get_mapping(index_name, es_doctype, reraise=True) + mapping = self.get_mapping(index_name, reraise=True) keys = parse_mapping(mapping, None) - self._keys_for_mapping_cache[cache_key] = keys + self._keys_for_mapping_cache[index_name] = keys return keys - def get_keys(self, index_name, es_doctype): + def get_keys(self, index_name): supersearch_fields_keys = self.get_keys_for_indexable_fields() try: - mapping_keys = self.get_keys_for_mapping(index_name, es_doctype) + mapping_keys = self.get_keys_for_mapping(index_name) except NotFoundError: mapping_keys = None all_valid_keys = supersearch_fields_keys @@ -485,8 +495,7 @@ def save_processed_crash(self, raw_crash, processed_crash): index_name = self.get_index_for_date( string_to_datetime(processed_crash["date_processed"]) ) - es_doctype = self.get_doctype() - all_valid_keys = self.get_keys(index_name, es_doctype) + all_valid_keys = self.get_keys(index_name) src = {"processed_crash": copy.deepcopy(processed_crash)} @@ -503,7 +512,6 @@ def save_processed_crash(self, raw_crash, processed_crash): self._submit_crash_to_elasticsearch( crash_id=crash_id, - es_doctype=es_doctype, index_name=index_name, crash_document=crash_document, ) @@ -523,12 +531,10 @@ def _capture(key, data): _capture("crash_document_size", crash_document) - def _index_crash(self, connection, es_index, es_doctype, crash_document, crash_id): + def _index_crash(self, connection, es_index, crash_document, crash_id): try: start_time = time.time() - connection.index( - index=es_index, doc_type=es_doctype, body=crash_document, id=crash_id - ) + connection.index(index=es_index, body=crash_document, id=crash_id) index_outcome = "successful" except Exception: index_outcome = "failed" @@ -539,9 +545,7 @@ def _index_crash(self, connection, es_index, es_doctype, crash_document, crash_i "index", value=elapsed_time * 1000.0, tags=["outcome:" + index_outcome] ) - def _submit_crash_to_elasticsearch( - self, crash_id, es_doctype, index_name, crash_document - ): + def _submit_crash_to_elasticsearch(self, crash_id, index_name, crash_document): """Submit a crash report to elasticsearch""" # Attempt to create the index; it's OK if it already exists. @@ -555,45 +559,59 @@ def _submit_crash_to_elasticsearch( for _ in range(5): try: with self.client() as conn: - return self._index_crash( - conn, index_name, es_doctype, crash_document, crash_id - ) + return self._index_crash(conn, index_name, crash_document, crash_id) - except elasticsearch.exceptions.ConnectionError: + except elasticsearch.ConnectionError: # If this is a connection error, sleep a second and then try again time.sleep(1.0) - except elasticsearch.exceptions.TransportError as e: - # If this is a TransportError, we try to figure out what the error + except elasticsearch.BadRequestError as e: + # If this is a BadRequestError, we try to figure out what the error # is and fix the document and try again field_name = None - if "MaxBytesLengthExceededException" in e.error: + error = e.body["error"] + + if ( + error["type"] == "document_parsing_exception" + and error["caused_by"]["type"] == "illegal_argument_exception" + and error["reason"].startswith( + "Document contains at least one immense term" + ) + ): # This is caused by a string that is way too long for - # Elasticsearch. - matches = self.field_name_string_error_re.findall(e.error) + # Elasticsearch, specifically 32_766 bytes when UTF8 encoded. + matches = self.field_name_string_error_re.findall(error["reason"]) if matches: field_name = matches[0] self.metrics.incr( "indexerror", tags=["error:maxbyteslengthexceeded"] ) - elif "NumberFormatException" in e.error: + elif ( + error["type"] == "document_parsing_exception" + and error["caused_by"]["type"] == "number_format_exception" + ): # This is caused by a number that is either too big for # Elasticsearch or just not a number. - matches = self.field_name_number_error_re.findall(e.error) + matches = self.field_name_number_error_re.findall(error["reason"]) if matches: field_name = matches[0] self.metrics.incr( "indexerror", tags=["error:numberformatexception"] ) - elif "unknown property" in e.error: + elif ( + error["type"] == "document_parsing_exception" + and error["caused_by"]["type"] == "illegal_argument_exception" + ): # This is caused by field values that are nested for a field where a # previously indexed value was a string. For example, the processor # first indexes ModuleSignatureInfo value as a string, then tries to # index ModuleSignatureInfo as a nested dict. - matches = self.field_name_unknown_property_error_re.findall(e.error) + matches = self.field_name_unknown_property_error_re.findall( + error["reason"] + ) if matches: field_name = matches[0] self.metrics.incr("indexerror", tags=["error:unknownproperty"]) @@ -634,7 +652,7 @@ def _submit_crash_to_elasticsearch( else: crash_document["removed_fields"] = field_name - except elasticsearch.exceptions.ElasticsearchException as exc: + except elasticsearch.ApiError as exc: self.logger.critical( "Submission to Elasticsearch failed for %s (%s)", crash_id, @@ -648,7 +666,7 @@ def catalog_crash(self, crash_id): contents = [] with self.client() as conn: try: - search = Search(using=conn, doc_type=self.get_doctype()) + search = Search(using=conn) search = search.filter("term", **{"processed_crash.uuid": crash_id}) results = search.execute().to_dict() hits = results["hits"]["hits"] @@ -661,15 +679,13 @@ def catalog_crash(self, crash_id): def delete_crash(self, crash_id): with self.client() as conn: try: - search = Search(using=conn, doc_type=self.get_doctype()) + search = Search(using=conn) search = search.filter("term", **{"processed_crash.uuid": crash_id}) results = search.execute().to_dict() hits = results["hits"]["hits"] if hits: hit = hits[0] - conn.delete( - index=hit["_index"], doc_type=hit["_type"], id=hit["_id"] - ) + conn.delete(index=hit["_index"], id=hit["_id"]) self.client.refresh() except Exception: self.logger.exception(f"ERROR: es: when deleting {crash_id}") diff --git a/socorro/external/es/query.py b/socorro/external/es/query.py index c01eeb0cb8..7acbaab8e5 100644 --- a/socorro/external/es/query.py +++ b/socorro/external/es/query.py @@ -3,13 +3,12 @@ # file, You can obtain one at https://mozilla.org/MPL/2.0/. import datetime -import elasticsearch_1_9_0 as elasticsearch import json -import re + +from elasticsearch import NotFoundError, TransportError, BadRequestError from socorro.lib import DatabaseError, MissingArgumentError, ResourceNotFound from socorro.external.es.base import generate_list_of_indexes -from socorro.external.es.supersearch import BAD_INDEX_REGEX from socorro.lib import libdatetime, external_common @@ -54,18 +53,22 @@ def get(self, **kwargs): search_args = {} if indices: search_args["index"] = indices - search_args["doc_type"] = self.crashstorage.get_doctype() connection = self.get_connection() try: results = connection.search(body=json.dumps(params["query"]), **search_args) - except elasticsearch.exceptions.NotFoundError as exc: - missing_index = re.findall(BAD_INDEX_REGEX, exc.error)[0] + results_body = results.body + except NotFoundError as exc: + missing_index = exc.body["error"]["resource.id"] raise ResourceNotFound( f"elasticsearch index {missing_index!r} does not exist" ) from exc - except elasticsearch.exceptions.TransportError as exc: + except (TransportError, BadRequestError) as exc: raise DatabaseError(exc) from exc - return results + # unnest results.hits.total.value to results.hits.total to match legacy behavior + if "hits" in results_body: + results_body["hits"]["total"] = results_body["hits"]["total"]["value"] + + return results_body diff --git a/socorro/external/es/super_search_fields.py b/socorro/external/es/super_search_fields.py index 7516b99c26..a3c4db3fb5 100644 --- a/socorro/external/es/super_search_fields.py +++ b/socorro/external/es/super_search_fields.py @@ -153,13 +153,12 @@ def get_destination_keys(field): return None -def build_mapping(doctype, fields=None): +def build_mapping(fields=None): """Generates Elasticsearch mapping from the super search fields schema - :arg str doctype: the doctype to use :arg any fields: map of field name -> field value; defaults to FIELDS - :returns: dict of doctype -> Elasticsearch mapping + :returns: Elasticsearch mapping """ fields = fields or FIELDS @@ -169,81 +168,17 @@ def build_mapping(doctype, fields=None): if not field.get("storage_mapping"): continue - add_doc_values(field["storage_mapping"]) - destination_keys = get_destination_keys(field) for destination_key in destination_keys: key_parts = destination_key.split(".") add_field_to_properties(properties, key_parts, field) mapping = { - doctype: { - "_all": {"enabled": False}, - "_source": {"compress": True}, - "properties": properties, - } + "properties": properties, } return mapping -def is_doc_values_friendly(storage_value): - """Predicate denoting whether this storage should have doc_values added - - ``doc_values=True`` is a thing we can add to certain storages to reduce the - memory they use in Elasticsearch. - - This predicate determines whether we should add it or not for a given - storage. - - :arg storage_value: a storage value from super search storages - - :returns: True if ``doc_values=True` should be added; False otherwise - - """ - storage_type = storage_value.get("type") - - # No clue what type this is--probably false - if not storage_type: - return False - - # object storages don't work with doc_values=True - if storage_type == "object": - return False - - # analyzed string storages don't work with doc_values=True - if storage_type == "string" and storage_value.get("index") != "not_analyzed": - return False - - # Everything is fine! Yay! - return True - - -def add_doc_values(value): - """Add "doc_values": True to storage mapping of field value - - NOTE(willkg): Elasticsearch 2.0+ does this automatically, so we - can nix this when we upgrade. - - Note: This makes changes in-place and recurses on the structure - of value. - - :arg value: the storage mapping of a field value - - """ - if is_doc_values_friendly(value): - value["doc_values"] = True - - # Handle subfields - if value.get("fields"): - for field in value.get("fields", {}).values(): - add_doc_values(field) - - # Handle objects with nested properties - if value.get("properties"): - for field in value["properties"].values(): - add_doc_values(field) - - # Cache of hashed_args -> list of fields values _FIELDS_CACHE = {} @@ -354,7 +289,7 @@ def keyword_field( "is_exposed": True, "is_returned": True, "query_type": "string", - "storage_mapping": {"analyzer": "keyword", "type": "string"}, + "storage_mapping": {"type": "keyword"}, } @@ -490,7 +425,7 @@ def apply_schema_properties(fields, schema): "name": "phc_kind", "namespace": "processed_crash", "query_type": "string", - "storage_mapping": {"analyzer": "keyword", "type": "string"}, + "storage_mapping": {"type": "keyword"}, }, "phc_base_address": { "data_validation_type": "str", @@ -502,7 +437,7 @@ def apply_schema_properties(fields, schema): "name": "phc_base_address", "namespace": "processed_crash", "query_type": "string", - "storage_mapping": {"type": "string"}, + "storage_mapping": {"type": "text"}, }, "phc_usable_size": { "data_validation_type": "int", @@ -526,7 +461,7 @@ def apply_schema_properties(fields, schema): "name": "phc_alloc_stack", "namespace": "processed_crash", "query_type": "string", - "storage_mapping": {"type": "string"}, + "storage_mapping": {"type": "text"}, }, "phc_free_stack": { "data_validation_type": "str", @@ -538,7 +473,7 @@ def apply_schema_properties(fields, schema): "name": "phc_free_stack", "namespace": "processed_crash", "query_type": "string", - "storage_mapping": {"type": "string"}, + "storage_mapping": {"type": "text"}, }, "abort_message": { "data_validation_type": "str", @@ -551,9 +486,8 @@ def apply_schema_properties(fields, schema): "namespace": "processed_crash", "query_type": "string", "storage_mapping": { - "fields": {"full": {"index": "not_analyzed", "type": "string"}}, - "index": "analyzed", - "type": "string", + "fields": {"full": {"type": "keyword"}}, + "type": "text", }, }, "accessibility": { @@ -578,7 +512,7 @@ def apply_schema_properties(fields, schema): "name": "accessibility_client", "namespace": "processed_crash", "query_type": "string", - "storage_mapping": {"analyzer": "keyword", "type": "string"}, + "storage_mapping": {"type": "keyword"}, }, "accessibility_in_proc_client": { "data_validation_type": "str", @@ -590,7 +524,7 @@ def apply_schema_properties(fields, schema): "name": "accessibility_in_proc_client", "namespace": "processed_crash", "query_type": "string", - "storage_mapping": {"analyzer": "keyword", "type": "string"}, + "storage_mapping": {"type": "keyword"}, }, "adapter_device_id": { "data_validation_type": "str", @@ -602,7 +536,7 @@ def apply_schema_properties(fields, schema): "name": "adapter_device_id", "namespace": "processed_crash", "query_type": "string", - "storage_mapping": {"analyzer": "keyword", "type": "string"}, + "storage_mapping": {"type": "keyword"}, }, "adapter_driver_version": { "data_validation_type": "str", @@ -614,7 +548,7 @@ def apply_schema_properties(fields, schema): "name": "adapter_driver_version", "namespace": "processed_crash", "query_type": "string", - "storage_mapping": {"type": "string"}, + "storage_mapping": {"type": "text"}, }, "adapter_subsys_id": { "data_validation_type": "str", @@ -626,7 +560,7 @@ def apply_schema_properties(fields, schema): "name": "adapter_subsys_id", "namespace": "processed_crash", "query_type": "string", - "storage_mapping": {"type": "string"}, + "storage_mapping": {"type": "text"}, }, "adapter_vendor_id": { "data_validation_type": "str", @@ -638,7 +572,7 @@ def apply_schema_properties(fields, schema): "name": "adapter_vendor_id", "namespace": "processed_crash", "query_type": "string", - "storage_mapping": {"analyzer": "keyword", "type": "string"}, + "storage_mapping": {"type": "keyword"}, }, "addons": { "data_validation_type": "str", @@ -650,7 +584,7 @@ def apply_schema_properties(fields, schema): "name": "addons", "namespace": "processed_crash", "query_type": "string", - "storage_mapping": {"analyzer": "keyword", "type": "string"}, + "storage_mapping": {"type": "keyword"}, }, "addons_checked": { "data_validation_type": "bool", @@ -674,7 +608,7 @@ def apply_schema_properties(fields, schema): "name": "address", "namespace": "processed_crash", "query_type": "string", - "storage_mapping": {"analyzer": "keyword", "type": "string"}, + "storage_mapping": {"type": "keyword"}, }, "android_board": { "data_validation_type": "enum", @@ -686,7 +620,7 @@ def apply_schema_properties(fields, schema): "name": "android_board", "namespace": "processed_crash", "query_type": "enum", - "storage_mapping": {"type": "string"}, + "storage_mapping": {"type": "text"}, }, "android_brand": { "data_validation_type": "enum", @@ -698,7 +632,7 @@ def apply_schema_properties(fields, schema): "name": "android_brand", "namespace": "processed_crash", "query_type": "enum", - "storage_mapping": {"type": "string"}, + "storage_mapping": {"type": "text"}, }, "android_cpu_abi": { "data_validation_type": "enum", @@ -710,7 +644,7 @@ def apply_schema_properties(fields, schema): "name": "android_cpu_abi", "namespace": "processed_crash", "query_type": "enum", - "storage_mapping": {"analyzer": "keyword", "type": "string"}, + "storage_mapping": {"type": "keyword"}, }, "android_cpu_abi2": { "data_validation_type": "enum", @@ -722,7 +656,7 @@ def apply_schema_properties(fields, schema): "name": "android_cpu_abi2", "namespace": "processed_crash", "query_type": "enum", - "storage_mapping": {"analyzer": "keyword", "type": "string"}, + "storage_mapping": {"type": "keyword"}, }, "android_device": { "data_validation_type": "enum", @@ -734,7 +668,7 @@ def apply_schema_properties(fields, schema): "name": "android_device", "namespace": "processed_crash", "query_type": "enum", - "storage_mapping": {"analyzer": "keyword", "type": "string"}, + "storage_mapping": {"type": "keyword"}, }, "android_display": { "data_validation_type": "enum", @@ -746,7 +680,7 @@ def apply_schema_properties(fields, schema): "name": "android_display", "namespace": "processed_crash", "query_type": "enum", - "storage_mapping": {"type": "string"}, + "storage_mapping": {"type": "text"}, }, "android_fingerprint": { "data_validation_type": "enum", @@ -758,7 +692,7 @@ def apply_schema_properties(fields, schema): "name": "android_fingerprint", "namespace": "processed_crash", "query_type": "enum", - "storage_mapping": {"type": "string"}, + "storage_mapping": {"type": "text"}, }, "android_hardware": { "data_validation_type": "enum", @@ -770,7 +704,7 @@ def apply_schema_properties(fields, schema): "name": "android_hardware", "namespace": "processed_crash", "query_type": "enum", - "storage_mapping": {"type": "string"}, + "storage_mapping": {"type": "text"}, }, "android_manufacturer": { "data_validation_type": "enum", @@ -782,7 +716,7 @@ def apply_schema_properties(fields, schema): "name": "android_manufacturer", "namespace": "processed_crash", "query_type": "enum", - "storage_mapping": {"analyzer": "keyword", "type": "string"}, + "storage_mapping": {"type": "keyword"}, }, "android_model": { "data_validation_type": "str", @@ -796,9 +730,9 @@ def apply_schema_properties(fields, schema): "query_type": "string", "storage_mapping": { "fields": { - "full": {"index": "not_analyzed", "type": "string"}, + "full": {"type": "keyword"}, }, - "type": "string", + "type": "text", }, }, "android_packagename": keyword_field(name="android_packagename"), @@ -812,7 +746,7 @@ def apply_schema_properties(fields, schema): "name": "android_version", "namespace": "processed_crash", "query_type": "string", - "storage_mapping": {"analyzer": "keyword", "type": "string"}, + "storage_mapping": {"type": "keyword"}, }, "app_init_dlls": { "data_validation_type": "str", @@ -824,7 +758,11 @@ def apply_schema_properties(fields, schema): "name": "app_init_dlls", "namespace": "processed_crash", "query_type": "string", - "storage_mapping": {"analyzer": "semicolon_keywords", "type": "string"}, + "storage_mapping": { + "analyzer": "semicolon_keywords", + "type": "text", + "fielddata": True, + }, }, "app_notes": { "data_validation_type": "str", @@ -837,9 +775,8 @@ def apply_schema_properties(fields, schema): "namespace": "processed_crash", "query_type": "string", "storage_mapping": { - "fields": {"full": {"index": "not_analyzed", "type": "string"}}, - "index": "analyzed", - "type": "string", + "fields": {"full": {"type": "keyword"}}, + "type": "text", }, }, "async_shutdown_timeout": { @@ -854,11 +791,9 @@ def apply_schema_properties(fields, schema): "query_type": "string", "storage_mapping": { "fields": { - "full": {"index": "not_analyzed", "type": "string"}, + "full": {"type": "keyword"}, }, - "analyzer": "standard", - "index": "analyzed", - "type": "string", + "type": "text", }, }, "available_page_file": { @@ -920,7 +855,7 @@ def apply_schema_properties(fields, schema): "name": "co_marshal_interface_failure", "namespace": "processed_crash", "query_type": "enum", - "storage_mapping": {"type": "string"}, + "storage_mapping": {"type": "text"}, }, "collector_notes": { "data_validation_type": "str", @@ -934,9 +869,8 @@ def apply_schema_properties(fields, schema): "query_type": "string", "source_key": "processed_crash.collector_metadata.collector_notes", "storage_mapping": { - "fields": {"full": {"index": "not_analyzed", "type": "string"}}, - "index": "analyzed", - "type": "string", + "fields": {"full": {"type": "keyword"}}, + "type": "text", }, }, # FIXME(willkg): We have this indexed as an integer, but the annotation is listed as @@ -1002,7 +936,7 @@ def apply_schema_properties(fields, schema): "name": "cpu_arch", "namespace": "processed_crash", "query_type": "enum", - "storage_mapping": {"analyzer": "keyword", "type": "string"}, + "storage_mapping": {"type": "keyword"}, }, "cpu_count": { "data_validation_type": "int", @@ -1034,11 +968,9 @@ def apply_schema_properties(fields, schema): "query_type": "string", "storage_mapping": { "fields": { - "full": {"index": "not_analyzed", "type": "string"}, + "full": {"type": "keyword"}, }, - "analyzer": "standard", - "index": "analyzed", - "type": "string", + "type": "text", }, }, "cpu_microcode_version": { @@ -1051,7 +983,7 @@ def apply_schema_properties(fields, schema): "name": "cpu_microcode_version", "namespace": "processed_crash", "query_type": "enum", - "storage_mapping": {"type": "string"}, + "storage_mapping": {"type": "text"}, }, "crash_inconsistencies": keyword_field( name="crash_inconsistencies", @@ -1072,7 +1004,7 @@ def apply_schema_properties(fields, schema): "namespace": "processed_crash", "query_type": "date", "storage_mapping": { - "format": "yyyy-MM-dd'T'HH:mm:ssZZ||yyyy-MM-dd'T'HH:mm:ss.SSSSSSZZ", + "format": "yyyy-MM-dd'T'HH:mm:ssZZZZZ||yyyy-MM-dd'T'HH:mm:ss.SSSSSSZZZZZ", "type": "date", }, }, @@ -1115,11 +1047,9 @@ def apply_schema_properties(fields, schema): "query_type": "string", "storage_mapping": { "fields": { - "full": {"index": "not_analyzed", "type": "string"}, + "full": {"type": "keyword"}, }, - "type": "string", - "index": "analyzed", - "analyzer": "standard", + "type": "text", }, }, "em_check_compatibility": { @@ -1144,7 +1074,7 @@ def apply_schema_properties(fields, schema): "name": "gmp_library_path", "namespace": "processed_crash", "query_type": "string", - "storage_mapping": {"analyzer": "keyword", "type": "string"}, + "storage_mapping": {"type": "keyword"}, }, "gmp_plugin": { "data_validation_type": "bool", @@ -1169,9 +1099,8 @@ def apply_schema_properties(fields, schema): "namespace": "processed_crash", "query_type": "string", "storage_mapping": { - "fields": {"full": {"index": "not_analyzed", "type": "string"}}, - "index": "analyzed", - "type": "string", + "fields": {"full": {"type": "keyword"}}, + "type": "text", }, }, "graphics_startup_test": { @@ -1235,7 +1164,7 @@ def apply_schema_properties(fields, schema): "name": "ipc_channel_error", "namespace": "processed_crash", "query_type": "string", - "storage_mapping": {"type": "string"}, + "storage_mapping": {"type": "text"}, }, "ipc_fatal_error_msg": { "data_validation_type": "str", @@ -1248,9 +1177,8 @@ def apply_schema_properties(fields, schema): "namespace": "processed_crash", "query_type": "string", "storage_mapping": { - "fields": {"full": {"index": "not_analyzed", "type": "string"}}, - "index": "analyzed", - "type": "string", + "fields": {"full": {"type": "keyword"}}, + "type": "text", }, }, "ipc_fatal_error_protocol": { @@ -1263,7 +1191,7 @@ def apply_schema_properties(fields, schema): "name": "ipc_fatal_error_protocol", "namespace": "processed_crash", "query_type": "string", - "storage_mapping": {"type": "string"}, + "storage_mapping": {"type": "text"}, }, "ipc_message_name": { "data_validation_type": "str", @@ -1276,9 +1204,8 @@ def apply_schema_properties(fields, schema): "namespace": "processed_crash", "query_type": "string", "storage_mapping": { - "fields": {"full": {"index": "not_analyzed", "type": "string"}}, - "index": "analyzed", - "type": "string", + "fields": {"full": {"type": "keyword"}}, + "type": "text", }, }, "ipc_message_size": { @@ -1303,7 +1230,7 @@ def apply_schema_properties(fields, schema): "name": "ipc_shutdown_state", "namespace": "processed_crash", "query_type": "enum", - "storage_mapping": {"analyzer": "keyword", "type": "string"}, + "storage_mapping": {"type": "keyword"}, }, "ipc_system_error": { "data_validation_type": "int", @@ -1340,9 +1267,8 @@ def apply_schema_properties(fields, schema): "namespace": "processed_crash", "query_type": "string", "storage_mapping": { - "fields": {"full": {"index": "not_analyzed", "type": "string"}}, - "index": "analyzed", - "type": "string", + "fields": {"full": {"type": "keyword"}}, + "type": "text", }, }, "java_stack_trace_raw": { @@ -1356,9 +1282,8 @@ def apply_schema_properties(fields, schema): "namespace": "processed_crash", "query_type": "string", "storage_mapping": { - "fields": {"full": {"index": "not_analyzed", "type": "string"}}, - "index": "analyzed", - "type": "string", + "fields": {"full": {"type": "keyword"}}, + "type": "text", }, }, "js_large_allocation_failure": keyword_field(name="js_large_allocation_failure"), @@ -1385,9 +1310,8 @@ def apply_schema_properties(fields, schema): "namespace": "processed_crash", "query_type": "string", "storage_mapping": { - "fields": {"full": {"index": "not_analyzed", "type": "string"}}, - "index": "analyzed", - "type": "string", + "fields": {"full": {"type": "keyword"}}, + "type": "text", }, }, "mac_crash_info": { @@ -1401,9 +1325,8 @@ def apply_schema_properties(fields, schema): "namespace": "processed_crash", "query_type": "string", "storage_mapping": { - "fields": {"full": {"index": "not_analyzed", "type": "string"}}, - "index": "analyzed", - "type": "string", + "fields": {"full": {"type": "keyword"}}, + "type": "text", }, }, "mac_available_memory_sysctl": integer_field(name="mac_available_memory_sysctl"), @@ -1626,7 +1549,7 @@ def apply_schema_properties(fields, schema): "name": "minidump_sha256_hash", "namespace": "processed_crash", "query_type": "string", - "storage_mapping": {"type": "string"}, + "storage_mapping": {"type": "text"}, }, "missing_symbols": { "data_validation_type": "str", @@ -1638,7 +1561,11 @@ def apply_schema_properties(fields, schema): "name": "missing_symbols", "namespace": "processed_crash", "query_type": "string", - "storage_mapping": {"analyzer": "semicolon_keywords", "type": "string"}, + "storage_mapping": { + "analyzer": "semicolon_keywords", + "type": "text", + "fielddata": True, + }, }, "modules_in_stack": { "data_validation_type": "str", @@ -1650,7 +1577,11 @@ def apply_schema_properties(fields, schema): "name": "modules_in_stack", "namespace": "processed_crash", "query_type": "string", - "storage_mapping": {"analyzer": "semicolon_keywords", "type": "string"}, + "storage_mapping": { + "analyzer": "semicolon_keywords", + "type": "text", + "fielddata": True, + }, }, "moz_crash_reason": { "data_validation_type": "str", @@ -1663,9 +1594,8 @@ def apply_schema_properties(fields, schema): "namespace": "processed_crash", "query_type": "string", "storage_mapping": { - "fields": {"full": {"index": "not_analyzed", "type": "string"}}, - "index": "analyzed", - "type": "string", + "fields": {"full": {"type": "keyword"}}, + "type": "text", }, }, "moz_crash_reason_raw": { @@ -1679,9 +1609,8 @@ def apply_schema_properties(fields, schema): "namespace": "processed_crash", "query_type": "string", "storage_mapping": { - "fields": {"full": {"index": "not_analyzed", "type": "string"}}, - "index": "analyzed", - "type": "string", + "fields": {"full": {"type": "keyword"}}, + "type": "text", }, }, "oom_allocation_size": { @@ -1708,9 +1637,9 @@ def apply_schema_properties(fields, schema): "query_type": "enum", "storage_mapping": { "fields": { - "full": {"index": "not_analyzed", "type": "string"}, + "full": {"type": "keyword"}, }, - "type": "string", + "type": "text", }, }, "platform_pretty_version": { @@ -1724,9 +1653,8 @@ def apply_schema_properties(fields, schema): "namespace": "processed_crash", "query_type": "string", "storage_mapping": { - "fields": {"full": {"index": "not_analyzed", "type": "string"}}, - "index": "analyzed", - "type": "string", + "fields": {"full": {"type": "keyword"}}, + "type": "text", }, }, "platform_version": { @@ -1740,9 +1668,8 @@ def apply_schema_properties(fields, schema): "namespace": "processed_crash", "query_type": "string", "storage_mapping": { - "fields": {"full": {"index": "not_analyzed", "type": "string"}}, - "index": "analyzed", - "type": "string", + "fields": {"full": {"type": "keyword"}}, + "type": "text", }, }, "plugin_filename": { @@ -1757,10 +1684,9 @@ def apply_schema_properties(fields, schema): "query_type": "enum", "storage_mapping": { "fields": { - "full": {"index": "not_analyzed", "type": "string"}, + "full": {"type": "keyword"}, }, - "index": "analyzed", - "type": "string", + "type": "text", }, }, "plugin_name": { @@ -1775,10 +1701,9 @@ def apply_schema_properties(fields, schema): "query_type": "enum", "storage_mapping": { "fields": { - "full": {"index": "not_analyzed", "type": "string"}, + "full": {"type": "keyword"}, }, - "index": "analyzed", - "type": "string", + "type": "text", }, }, "plugin_version": { @@ -1793,10 +1718,9 @@ def apply_schema_properties(fields, schema): "query_type": "enum", "storage_mapping": { "fields": { - "full": {"index": "not_analyzed", "type": "string"}, + "full": {"type": "keyword"}, }, - "index": "analyzed", - "type": "string", + "type": "text", }, }, "possible_bit_flips_max_confidence": integer_field( @@ -1819,7 +1743,9 @@ def apply_schema_properties(fields, schema): "name": "process_type", "namespace": "processed_crash", "query_type": "string", - "storage_mapping": {"type": "string"}, + "storage_mapping": { + "type": "keyword", + }, }, "processor_notes": { "data_validation_type": "str", @@ -1832,9 +1758,8 @@ def apply_schema_properties(fields, schema): "namespace": "processed_crash", "query_type": "string", "storage_mapping": { - "fields": {"full": {"index": "not_analyzed", "type": "string"}}, - "index": "analyzed", - "type": "string", + "fields": {"full": {"type": "keyword"}}, + "type": "text", }, }, "product": { @@ -1849,10 +1774,9 @@ def apply_schema_properties(fields, schema): "query_type": "enum", "storage_mapping": { "fields": { - "full": {"index": "not_analyzed", "type": "string"}, + "full": {"type": "keyword"}, }, - "index": "analyzed", - "type": "string", + "type": "text", }, }, "productid": { @@ -1865,7 +1789,7 @@ def apply_schema_properties(fields, schema): "name": "productid", "namespace": "processed_crash", "query_type": "enum", - "storage_mapping": {"analyzer": "keyword", "type": "string"}, + "storage_mapping": {"type": "keyword"}, }, "proto_signature": { "data_validation_type": "str", @@ -1878,9 +1802,8 @@ def apply_schema_properties(fields, schema): "namespace": "processed_crash", "query_type": "string", "storage_mapping": { - "fields": {"full": {"index": "not_analyzed", "type": "string"}}, - "index": "analyzed", - "type": "string", + "fields": {"full": {"type": "keyword"}}, + "type": "text", }, }, "quota_manager_shutdown_timeout": { @@ -1894,9 +1817,8 @@ def apply_schema_properties(fields, schema): "namespace": "processed_crash", "query_type": "string", "storage_mapping": { - "fields": {"full": {"index": "not_analyzed", "type": "string"}}, - "index": "analyzed", - "type": "string", + "fields": {"full": {"type": "keyword"}}, + "type": "text", }, }, "reason": { @@ -1911,10 +1833,9 @@ def apply_schema_properties(fields, schema): "query_type": "string", "storage_mapping": { "fields": { - "full": {"index": "not_analyzed", "type": "string"}, + "full": {"type": "keyword"}, }, - "analyzer": "standard", - "type": "string", + "type": "text", }, }, "release_channel": { @@ -1927,7 +1848,7 @@ def apply_schema_properties(fields, schema): "name": "release_channel", "namespace": "processed_crash", "query_type": "enum", - "storage_mapping": {"analyzer": "keyword", "type": "string"}, + "storage_mapping": {"type": "keyword"}, }, "remote_type": { "data_validation_type": "enum", @@ -1939,7 +1860,7 @@ def apply_schema_properties(fields, schema): "name": "remote_type", "namespace": "processed_crash", "query_type": "enum", - "storage_mapping": {"analyzer": "keyword", "type": "string"}, + "storage_mapping": {"type": "keyword"}, }, "report_type": keyword_field(name="report_type"), "safe_mode": { @@ -1964,7 +1885,7 @@ def apply_schema_properties(fields, schema): "name": "shutdown_progress", "namespace": "processed_crash", "query_type": "string", - "storage_mapping": {"type": "string"}, + "storage_mapping": {"type": "text"}, }, "shutdown_reason": keyword_field(name="shutdown_reason"), "signature": { @@ -1979,9 +1900,11 @@ def apply_schema_properties(fields, schema): "query_type": "string", "storage_mapping": { "fields": { - "full": {"index": "not_analyzed", "type": "string"}, + "full": { + "type": "keyword", + }, }, - "type": "string", + "type": "text", }, }, "stackwalk_version": keyword_field( @@ -2000,7 +1923,7 @@ def apply_schema_properties(fields, schema): "query_type": "bool", # NOTE(willkg): startup_crash is used in signature report in some interesting # ways so I think we need to have both T and F values in ES - "storage_mapping": {"null_value": "False", "type": "boolean"}, + "storage_mapping": {"null_value": False, "type": "boolean"}, }, "startup_time": { "data_validation_type": "int", @@ -2049,7 +1972,11 @@ def apply_schema_properties(fields, schema): "name": "topmost_filenames", "namespace": "processed_crash", "query_type": "string", - "storage_mapping": {"analyzer": "semicolon_keywords", "type": "string"}, + "storage_mapping": { + "analyzer": "semicolon_keywords", + "type": "text", + "fielddata": True, + }, }, "total_page_file": { "data_validation_type": "int", @@ -2110,7 +2037,7 @@ def apply_schema_properties(fields, schema): "name": "url", "namespace": "processed_crash", "query_type": "string", - "storage_mapping": {"analyzer": "keyword", "type": "string"}, + "storage_mapping": {"type": "keyword"}, }, "user_comments": { "data_validation_type": "str", @@ -2124,9 +2051,9 @@ def apply_schema_properties(fields, schema): "query_type": "string", "storage_mapping": { "fields": { - "full": {"index": "not_analyzed", "type": "string"}, + "full": {"type": "keyword"}, }, - "type": "string", + "type": "text", }, }, "useragent_locale": { @@ -2139,7 +2066,11 @@ def apply_schema_properties(fields, schema): "name": "useragent_locale", "namespace": "processed_crash", "query_type": "enum", - "storage_mapping": {"analyzer": "semicolon_keywords", "type": "string"}, + "storage_mapping": { + "analyzer": "semicolon_keywords", + "type": "text", + "fielddata": True, + }, }, "utility_actors_name": keyword_field(name="utility_actors_name"), "utility_process_sandboxing_kind": integer_field( @@ -2155,7 +2086,7 @@ def apply_schema_properties(fields, schema): "name": "uuid", "namespace": "processed_crash", "query_type": "enum", - "storage_mapping": {"analyzer": "keyword", "type": "string"}, + "storage_mapping": {"type": "keyword"}, }, "vendor": { "data_validation_type": "enum", @@ -2167,7 +2098,7 @@ def apply_schema_properties(fields, schema): "name": "vendor", "namespace": "processed_crash", "query_type": "enum", - "storage_mapping": {"type": "string"}, + "storage_mapping": {"type": "text"}, }, "version": { "data_validation_type": "enum", @@ -2179,7 +2110,7 @@ def apply_schema_properties(fields, schema): "name": "version", "namespace": "processed_crash", "query_type": "enum", - "storage_mapping": {"analyzer": "keyword", "type": "string"}, + "storage_mapping": {"type": "keyword"}, }, "windows_error_reporting": boolean_field( name="windows_error_reporting", diff --git a/socorro/external/es/supersearch.py b/socorro/external/es/supersearch.py index 0fcaf1da65..dc213ada38 100644 --- a/socorro/external/es/supersearch.py +++ b/socorro/external/es/supersearch.py @@ -7,8 +7,8 @@ import datetime import re -from elasticsearch_1_9_0.exceptions import NotFoundError, RequestError -from elasticsearch_dsl_0_0_11 import A, F, Q, Search +from elasticsearch.exceptions import NotFoundError, BadRequestError +from elasticsearch_dsl import A, Q, Search from socorro.external.es.base import generate_list_of_indexes from socorro.external.es.search_common import SearchBase @@ -17,9 +17,7 @@ BAD_INDEX_REGEX = re.compile(r"\[\[(.*)\] missing\]") -ELASTICSEARCH_PARSE_EXCEPTION_REGEX = re.compile( - r"ElasticsearchParseException\[Failed to parse \[([^\]]+)\]\]" -) +ELASTICSEARCH_PARSE_EXCEPTION_REGEX = re.compile(r"\[([^\]]+)\] could not be parsed") def prune_invalid_indices(indices, policy, template): @@ -76,6 +74,13 @@ def get_indices(self, dates): def format_field_names(self, hit): """Return hit with field's search_key replaced with name""" + # unnest hit one level and update keys + hit = { + f"{namespace}.{key}": item + for namespace in hit + for key, item in hit[namespace].items() + } + new_hit = {} for field_name in self.request_columns: field = self.all_fields[field_name] @@ -137,8 +142,16 @@ def _format(aggregation): return aggregation for i, bucket in enumerate(aggregation["buckets"]): + if "key_as_string" in bucket: + term = bucket["key_as_string"] + if term in ("true", "false") and bucket["key"] in (1, 0): + # Restore es 1.4 format for boolean terms as string + term = term = term[:1].upper() + else: + term = bucket["key"] + new_bucket = { - "term": bucket.get("key_as_string", bucket["key"]), + "term": term, "count": bucket["doc_count"], } facets = {} @@ -192,7 +205,6 @@ def get(self, **kwargs): search = Search( using=self.get_connection(), index=indices, - doc_type=self.crashstorage.get_doctype(), ) # Create filters. @@ -258,7 +270,7 @@ def get(self, **kwargs): operator_range = {">": "gt", "<": "lt", ">=": "gte", "<=": "lte"} args = {} - filter_type = "term" + query_name = "term" filter_value = None if not param.operator: @@ -267,68 +279,61 @@ def get(self, **kwargs): # to match for analyzed and non-analyzed supersearch fields. if len(param.value) == 1: # Only one value, so we only do a single match - filter_type = "query" - args = Q({"match": {search_key: param.value[0]}}).to_dict() + query_name = "match" + args = {search_key: param.value[0]} else: # Multiple values, so we do multiple matches wrapped in a bool # query where at least one of them should match - filter_type = "query" - args = Q( - { - "bool": { - "should": [ - {"match": {search_key: param_value}} - for param_value in param.value - ], - "minimum_should_match": 1, - } - } - ).to_dict() + query_name = "bool" + args = { + "should": [ + {"match": {search_key: param_value}} + for param_value in param.value + ], + "minimum_should_match": 1, + } elif param.operator == "=": # is exactly if field_data["has_full_version"]: search_key = f"{search_key}.full" filter_value = param.value elif param.operator in operator_range: - filter_type = "range" + query_name = "range" filter_value = {operator_range[param.operator]: param.value} elif param.operator == "__null__": - filter_type = "missing" - args["field"] = search_key + query_name = "bool" + args["must_not"] = [Q("exists", field=search_key)] elif param.operator == "__true__": - filter_type = "term" + query_name = "term" filter_value = True elif param.operator == "@": - filter_type = "regexp" + query_name = "regexp" if field_data["has_full_version"]: search_key = f"{search_key}.full" filter_value = param.value elif param.operator in operator_wildcards: - filter_type = "query" + query_name = "wildcard" # Wildcard operations are better applied to a non-analyzed # field (called "full") if there is one. if field_data["has_full_version"]: search_key = f"{search_key}.full" - q_args = {} - q_args[search_key] = ( - operator_wildcards[param.operator] % param.value - ) - query = Q("wildcard", **q_args) - args = query.to_dict() + args = { + search_key: (operator_wildcards[param.operator] % param.value) + } if filter_value is not None: args[search_key] = filter_value if args: - new_filter = F(filter_type, **args) + new_filter = Q(query_name, **args) if param.operator_not: new_filter = ~new_filter if sub_filters is None: sub_filters = new_filter - elif filter_type == "range": + elif query_name == "range": sub_filters &= new_filter else: sub_filters |= new_filter @@ -338,7 +343,7 @@ def get(self, **kwargs): if sub_filters is not None: filters.append(sub_filters) - search = search.filter(F("bool", must=filters)) + search = search.filter(Q("bool", must=filters)) # Restricting returned fields. fields = [] @@ -355,7 +360,7 @@ def get(self, **kwargs): field_name = self.get_field_name(value, full=False) fields.append(field_name) - search = search.fields(fields) + search = search.source(fields) # Sorting. sort_fields = [] @@ -424,7 +429,10 @@ def get(self, **kwargs): break # Yay! Results! except NotFoundError as e: - missing_index = re.findall(BAD_INDEX_REGEX, e.error)[0] + if e.body["error"]["type"] != "index_not_found_exception": + # Could be an alias not found exception, which we don't handle here + raise + missing_index = e.body["error"]["index"] if missing_index in indices: del indices[indices.index(missing_index)] else: @@ -450,24 +458,25 @@ def get(self, **kwargs): shards = None break - except RequestError as exc: + except BadRequestError as exc: # Try to handle it gracefully if we can find out what # input was bad and caused the exception. # Not an ElasticsearchParseException exception with suppress(IndexError): - bad_input = ELASTICSEARCH_PARSE_EXCEPTION_REGEX.findall(exc.error)[ - -1 - ] - # Loop over the original parameters to try to figure - # out which *key* had the bad input. - for key, value in kwargs.items(): - if value == bad_input: - raise BadArgumentError(key) from exc - - # If it's a search parse exception, but we don't know what key is the - # problem, raise a general BadArgumentError - if "Failed to parse source" in str(exc): - raise BadArgumentError("Malformed supersearch query.") from exc + if ( + exc.body["error"]["type"] == "x_content_parse_exception" + and exc.body["error"]["caused_by"]["type"] + == "illegal_argument_exception" + ): + bad_input = ELASTICSEARCH_PARSE_EXCEPTION_REGEX.findall( + exc.body["error"]["caused_by"]["reason"] + )[-1] + + # Loop over the original parameters to try to figure + # out which *key* had the bad input. + for key, value in kwargs.items(): + if value == bad_input: + raise BadArgumentError(key) from exc # Re-raise the original exception raise @@ -477,6 +486,13 @@ def get(self, **kwargs): # results, so the client can decide what to do. failed_indices = defaultdict(int) for failure in shards.failures: + # If it's a search parse exception we don't know what key is the + # problem, so raise a general BadArgumentError + if ( + failure.reason.type == "query_shard_exception" + and failure.reason.caused_by.type == "illegal_argument_exception" + ): + raise BadArgumentError(f"Malformed supersearch query: {failure}") failed_indices[failure.index] += 1 for index, shards_count in failed_indices.items(): @@ -540,15 +556,27 @@ def _get_histogram_agg(self, field, intervals): and "date_histogram" or "histogram" ) - return A( - histogram_type, field=self.get_field_name(field), interval=intervals[field] - ) + # NOTE(krzepka) By default ES 8.15 will return empty buckets, which creates issues in tests. + # Setting min_doc_count makes ES return only non-empty buckets. + # https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-bucket-histogram-aggregation.html + kwargs = {"min_doc_count": 1} + # NOTE(relud) We want to use "calendar_interval" for date_histogram and + # "interval" for everything else. + if histogram_type == "date_histogram": + kwargs["calendar_interval"] = intervals[field] + else: + kwargs["interval"] = intervals[field] + return A(histogram_type, field=self.get_field_name(field), **kwargs) def _get_cardinality_agg(self, field): return A("cardinality", field=self.get_field_name(field)) def _get_fields_agg(self, field, facets_size): - return A("terms", field=self.get_field_name(field), size=facets_size) + return A( + "terms", + field=self.get_field_name(field), + size=facets_size, + ) def _add_second_level_aggs( self, param, recipient, facets_size, histogram_intervals diff --git a/socorro/external/legacy_es/crashstorage.py b/socorro/external/legacy_es/crashstorage.py index 49991a98f9..3f158c9dde 100644 --- a/socorro/external/legacy_es/crashstorage.py +++ b/socorro/external/legacy_es/crashstorage.py @@ -16,6 +16,8 @@ from socorro.external.crashstorage_base import CrashStorageBase from socorro.external.legacy_es.connection_context import LegacyConnectionContext +from socorro.external.legacy_es.query import LegacyQuery +from socorro.external.legacy_es.supersearch import LegacySuperSearch from socorro.external.legacy_es.super_search_fields import ( build_mapping, FIELDS, @@ -307,6 +309,24 @@ def __init__( def build_client(cls, url, timeout): return LegacyConnectionContext(url=url, timeout=timeout) + def build_query(self): + """Return new instance of LegacyQuery.""" + return LegacyQuery(crashstorage=self) + + def build_supersearch(self): + """Return new instance of LegacySuperSearch.""" + return LegacySuperSearch(crashstorage=self) + + def build_search(self, **kwargs): + """Return new instance of elasticsearch_dsl's Search.""" + with self.client() as conn: + return Search(using=conn, doc_type=self.get_doctype(), **kwargs) + + @staticmethod + def get_source_key(field): + """Return source key for the field.""" + return get_source_key(field) + def get_index_template(self): """Return template for index names.""" return self.index @@ -346,28 +366,26 @@ def get_socorro_index_settings(self, mappings): "mappings": mappings, } - def get_mapping(self, index_name, es_doctype, reraise=False): - """Retrieves the mapping for a given index and doctype + def get_mapping(self, index_name, reraise=False): + """Retrieves the mapping for a given index and the configured doctype NOTE(willkg): Mappings are cached on the LegacyESCrashStorage instance. If you change the indices (like in tests), you should get a new LegacyESCrashStorage instance. :arg str index_name: the index to retrieve the mapping for - :arg str es_doctype: the doctype to retrieve the mapping for :arg bool reraise: True if you want this to reraise a NotFoundError; False otherwise :returns: mapping as a dict or None """ - cache_key = f"{index_name}::{es_doctype}" - mapping = self._mapping_cache.get(cache_key) + mapping = self._mapping_cache.get(index_name) if mapping is None: try: mapping = self.client.get_mapping( - index_name=index_name, doc_type=es_doctype + index_name=index_name, doc_type=self.get_doctype() ) - self._mapping_cache[cache_key] = mapping + self._mapping_cache[index_name] = mapping except elasticsearch.exceptions.NotFoundError: if reraise: raise @@ -449,31 +467,29 @@ def get_keys_for_indexable_fields(self): return keys - def get_keys_for_mapping(self, index_name, es_doctype): + def get_keys_for_mapping(self, index_name): """Get the keys in "namespace.key" format for a given mapping NOTE(willkg): Results are cached on this LegacyESCrashStorage instance. :arg str index_name: the name of the index - :arg str es_doctype: the doctype for the index :returns: set of "namespace.key" fields :raise elasticsearch.exceptions.NotFoundError: if the index doesn't exist """ - cache_key = f"{index_name}::{es_doctype}" - keys = self._keys_for_mapping_cache.get(cache_key) + keys = self._keys_for_mapping_cache.get(index_name) if keys is None: - mapping = self.get_mapping(index_name, es_doctype, reraise=True) + mapping = self.get_mapping(index_name, reraise=True) keys = parse_mapping(mapping, None) - self._keys_for_mapping_cache[cache_key] = keys + self._keys_for_mapping_cache[index_name] = keys return keys - def get_keys(self, index_name, es_doctype): + def get_keys(self, index_name): supersearch_fields_keys = self.get_keys_for_indexable_fields() try: - mapping_keys = self.get_keys_for_mapping(index_name, es_doctype) + mapping_keys = self.get_keys_for_mapping(index_name) except NotFoundError: mapping_keys = None all_valid_keys = supersearch_fields_keys @@ -492,8 +508,7 @@ def save_processed_crash(self, raw_crash, processed_crash): index_name = self.get_index_for_date( string_to_datetime(processed_crash["date_processed"]) ) - es_doctype = self.get_doctype() - all_valid_keys = self.get_keys(index_name, es_doctype) + all_valid_keys = self.get_keys(index_name) src = {"processed_crash": copy.deepcopy(processed_crash)} @@ -510,7 +525,6 @@ def save_processed_crash(self, raw_crash, processed_crash): self._submit_crash_to_elasticsearch( crash_id=crash_id, - es_doctype=es_doctype, index_name=index_name, crash_document=crash_document, ) @@ -530,11 +544,14 @@ def _capture(key, data): _capture("crash_document_size", crash_document) - def _index_crash(self, connection, es_index, es_doctype, crash_document, crash_id): + def _index_crash(self, connection, es_index, crash_document, crash_id): try: start_time = time.time() connection.index( - index=es_index, doc_type=es_doctype, body=crash_document, id=crash_id + index=es_index, + doc_type=self.get_doctype(), + body=crash_document, + id=crash_id, ) index_outcome = "successful" except Exception: @@ -546,9 +563,7 @@ def _index_crash(self, connection, es_index, es_doctype, crash_document, crash_i "index", value=elapsed_time * 1000.0, tags=["outcome:" + index_outcome] ) - def _submit_crash_to_elasticsearch( - self, crash_id, es_doctype, index_name, crash_document - ): + def _submit_crash_to_elasticsearch(self, crash_id, index_name, crash_document): """Submit a crash report to elasticsearch""" # Attempt to create the index; it's OK if it already exists. @@ -562,9 +577,7 @@ def _submit_crash_to_elasticsearch( for _ in range(5): try: with self.client() as conn: - return self._index_crash( - conn, index_name, es_doctype, crash_document, crash_id - ) + return self._index_crash(conn, index_name, crash_document, crash_id) except elasticsearch.exceptions.ConnectionError: # If this is a connection error, sleep a second and then try again diff --git a/socorro/statsd_metrics.yaml b/socorro/statsd_metrics.yaml index 2da71ea327..52510c7313 100644 --- a/socorro/statsd_metrics.yaml +++ b/socorro/statsd_metrics.yaml @@ -125,6 +125,11 @@ socorro.processor.es.save_processed_crash: description: | Timer for how long it takes to save the processed crash to Elasticsearch. +socorro.processor.legacy_es.save_processed_crash: + type: "timing" + description: | + Timer for how long it takes to save the processed crash to Elasticsearch. + socorro.processor.ingestion_timing: type: "timing" description: | diff --git a/socorro/tests/conftest.py b/socorro/tests/conftest.py index 1e06e8d8c5..c0a62e9da1 100644 --- a/socorro/tests/conftest.py +++ b/socorro/tests/conftest.py @@ -13,7 +13,8 @@ import uuid import sys -from elasticsearch_dsl_0_0_11 import Search +from elasticsearch_dsl import Search +from elasticsearch_dsl_0_0_11 import Search as LegacySearch from google.api_core.exceptions import AlreadyExists, NotFound from google.auth.credentials import AnonymousCredentials from google.cloud import storage @@ -187,6 +188,107 @@ def gcs_helper(): yield gcs_helper +class ElasticsearchHelper: + """Elasticsearch helper class. + + When used in a context, this will clean up any indexes created. + + """ + + def __init__(self): + if settings.ELASTICSEARCH_MODE == "LEGACY_ONLY": + raise ValueError("cannot test elasticearch 8 in LEGACY_ONLY mode") + + self._crashstorage = build_instance_from_settings(settings.ES_STORAGE) + self.conn = self._crashstorage.client + + def get_index_template(self): + return self._crashstorage.get_index_template() + + def create_index(self, index_name): + print(f"ElasticsearchHelper: creating index: {index_name}") + self._crashstorage.create_index(index_name) + + def create_indices(self): + # Create all the indexes for the last couple of weeks; we have to do it this way + # to handle split indexes over the new year + template = self._crashstorage.index + to_create = set() + + for i in range(14): + index_name = (utc_now() - datetime.timedelta(days=i)).strftime(template) + to_create.add(index_name) + + for index_name in to_create: + print(f"ElasticsearchHelper: creating index: {index_name}") + self._crashstorage.create_index(index_name) + + self.health_check() + + def delete_indices(self): + for index in self._crashstorage.get_indices(): + self._crashstorage.delete_index(index) + + def get_indices(self): + return self._crashstorage.get_indices() + + def health_check(self): + with self.conn() as conn: + conn.options(request_timeout=5).cluster.health(wait_for_status="yellow") + + def get_url(self): + """Returns the Elasticsearch url.""" + return settings.ES_STORAGE["options"]["url"] + + def refresh(self): + self.conn.refresh() + + def index_crash(self, processed_crash, refresh=True): + """Index a single crash and refresh""" + self._crashstorage.save_processed_crash( + raw_crash={}, + processed_crash=processed_crash, + ) + + if refresh: + self.refresh() + + def index_many_crashes(self, number, processed_crash=None, loop_field=None): + """Index multiple crashes and refresh at the end""" + processed_crash = processed_crash or {} + + crash_ids = [] + for i in range(number): + processed_copy = processed_crash.copy() + processed_copy["uuid"] = create_new_ooid() + processed_copy["date_processed"] = date_from_ooid(processed_copy["uuid"]) + if loop_field is not None: + processed_copy[loop_field] = processed_crash[loop_field] % i + + self.index_crash(processed_crash=processed_copy, refresh=False) + + self.refresh() + return crash_ids + + def get_crash_data(self, crash_id): + """Get source in index for given crash_id + + :arg crash_id: the crash id to fetch the source for + + :returns: source as a Python dict or None if it doesn't exist + + """ + index = self._crashstorage.get_index_for_date(date_from_ooid(crash_id)) + + with self.conn() as conn: + search = Search(using=conn, index=index) + search = search.filter({"term": {"processed_crash.uuid": crash_id}}) + results = search.execute().to_dict() + + if results["hits"]["hits"]: + return results["hits"]["hits"][0]["_source"] + + class LegacyElasticsearchHelper: """Legacy Elasticsearch helper class. @@ -281,7 +383,7 @@ def get_crash_data(self, crash_id): doc_type = self._crashstorage.get_doctype() with self.conn() as conn: - search = Search(using=conn, index=index, doc_type=doc_type) + search = LegacySearch(using=conn, index=index, doc_type=doc_type) search = search.filter("term", **{"processed_crash.uuid": crash_id}) results = search.execute().to_dict() @@ -312,14 +414,45 @@ def legacy_es_helper(): legacy_es_helper.delete_indices() +def _generate_es_helper(): + es_helper = ElasticsearchHelper() + es_helper.create_indices() + yield es_helper + es_helper.delete_indices() + + +@pytest.fixture +def es_helper(): + """Returns an Elasticsearch helper for tests. + + Provides: + + * ``get_url()`` + * ``create_indices()`` + * ``delete_indices()`` + * ``get_indices()`` + * ``index_crash()`` + * ``index_many_crashes()`` + * ``refresh()`` + * ``get_crash_data()`` + + """ + yield from _generate_es_helper() + + @pytest.fixture -def es_helper(legacy_es_helper): +def preferred_es_helper(legacy_es_helper): """Returns an Elasticsearch helper for tests. This returns a legacy or non-legacy helper depending on how the webapp is - configured. For now that means it always returns a legacy helper. + configured. """ - yield legacy_es_helper + if settings.ELASTICSEARCH_MODE == "LEGACY_ONLY": + yield legacy_es_helper + else: + # NOTE(relud): this fixture cannot reuse the es_helper fixture because + # ElasticsearchHelper cannot be instantiated in LEGACY_ONLY mode + yield from _generate_es_helper() class PubSubHelper: diff --git a/socorro/tests/external/es/test_crashstorage.py b/socorro/tests/external/es/test_crashstorage.py index 0258257a49..bfe7f1f711 100644 --- a/socorro/tests/external/es/test_crashstorage.py +++ b/socorro/tests/external/es/test_crashstorage.py @@ -20,8 +20,8 @@ ) from socorro.external.es.super_search_fields import build_mapping -from socorro.libclass import build_instance -from socorro.lib.libdatetime import date_to_string, utc_now +from socorro.libclass import build_instance_from_settings +from socorro.lib.libdatetime import date_to_string, string_to_datetime, utc_now from socorro.lib.libooid import create_new_ooid, date_from_ooid @@ -81,10 +81,10 @@ class FakeException(Exception): class TestESCrashStorage: def build_crashstorage(self): - return build_instance( - class_path="socorro.external.es.crashstorage.ESCrashStorage", - kwargs=settings.ES_STORAGE["options"], - ) + if settings.ELASTICSEARCH_MODE == "LEGACY_ONLY": + raise ValueError("cannot test elasticearch 8 in LEGACY_ONLY mode") + + return build_instance_from_settings(settings.ES_STORAGE) def test_index_crash(self, es_helper): """Test indexing a crash document.""" @@ -154,9 +154,8 @@ def test_index_crash_mapping_keys(self, es_helper): # We're going to use a mapping from super search fields, bug remove the # user_comments field. - mappings = build_mapping(crashstorage.get_doctype()) - doctype = crashstorage.get_doctype() - del mappings[doctype]["properties"]["processed_crash"]["properties"][field] + mappings = build_mapping() + del mappings["properties"]["processed_crash"]["properties"][field] # Create the index for 4 weeks ago crashstorage.create_index( @@ -284,7 +283,6 @@ def test_index_data_capture(self, es_helper): crashstorage._index_crash( connection=mock_connection, es_index=None, - es_doctype=None, crash_document=None, crash_id=None, ) @@ -294,7 +292,6 @@ def test_index_data_capture(self, es_helper): crashstorage._index_crash( connection=mock_connection, es_index=None, - es_doctype=None, crash_document=None, crash_id=None, ) @@ -369,8 +366,7 @@ def test_delete_expired_indices(self, es_helper): REMOVED_VALUE, ), # Booleans are converted - # FIXME(willkg): fix_boolean is never called--that's wrong - # ("processed_crash.accessibility", "true", True), + ("processed_crash.accessibility", "true", True), ], ) def test_indexing_bad_data(self, key, value, expected_value, es_helper): @@ -398,9 +394,50 @@ def test_indexing_bad_data(self, key, value, expected_value, es_helper): doc = es_helper.get_crash_data(crash_id) assert glom.glom(doc, key, default=REMOVED_VALUE) == expected_value - # FIXME(willkg): write tests for index error handling; this is tricky because - # when we have index error handling, we write code to fix the error, so it's - # hard to test the error situation + @pytest.mark.parametrize( + "key, value", + [ + pytest.param( + "processed_crash.mac_available_memory_sysctl", + "not a number", + id="number_format_exception", + ), + pytest.param( + "processed_crash.user_comments", + "a" * 32_767, # max string lengthis 32_766 bytes + id="max_bytes_length_exceeded", + ), + pytest.param( + "processed_crash.user_comments", {"foo": "bar"}, id="unknown_property" + ), + ], + ) + def test_invalid_fields_removed(self, key, value, es_helper): + # create crash document + crash_id = create_new_ooid() + doc = { + "crash_id": crash_id, + "processed_crash": { + "date_processed": date_from_ooid(crash_id), + "uuid": crash_id, + }, + } + glom.assign(doc, key, value, missing=dict) + + # Save the crash data and then fetch it and verify the value is removed + crashstorage = self.build_crashstorage() + index_name = crashstorage.get_index_for_date( + string_to_datetime(doc["processed_crash"]["date_processed"]) + ) + crashstorage._submit_crash_to_elasticsearch( + crash_id=crash_id, + index_name=index_name, + crash_document=doc, + ) + es_helper.refresh() + + doc = es_helper.get_crash_data(crash_id) + assert glom.glom(doc, key, default=REMOVED_VALUE) == REMOVED_VALUE @pytest.mark.parametrize( diff --git a/socorro/tests/external/es/test_query.py b/socorro/tests/external/es/test_query.py index f6005aff0b..9ec4fc2b07 100644 --- a/socorro/tests/external/es/test_query.py +++ b/socorro/tests/external/es/test_query.py @@ -52,9 +52,8 @@ def test_get(self, es_helper): query = { "query": { - "filtered": { - "query": {"match_all": {}}, - "filter": {"term": {"product": "earthraccoon"}}, + "constant_score": { + "filter": {"term": {"processed_crash.product": "earthraccoon"}}, } } } @@ -80,7 +79,6 @@ def test_get_with_indices(self, es_helper, monkeypatch): """Verify that .get() uses the correct indices.""" crashstorage = self.build_crashstorage() api = Query(crashstorage=crashstorage) - doc_type = api.crashstorage.get_doctype() # Mock the connection so we can see the list of indexes it's building mocked_connection = mock.MagicMock() @@ -96,7 +94,6 @@ def test_get_with_indices(self, es_helper, monkeypatch): mocked_connection.return_value.search.assert_called_with( body='{"query": {}}', index=indices, - doc_type=doc_type, ) # Test all indices. @@ -111,5 +108,4 @@ def test_get_with_indices(self, es_helper, monkeypatch): mocked_connection.return_value.search.assert_called_with( body='{"query": {}}', index=["socorro_201801", "socorro_200047", "not_an_index"], - doc_type=doc_type, ) diff --git a/socorro/tests/external/es/test_super_search_fields.py b/socorro/tests/external/es/test_super_search_fields.py index 797c3f13cc..8e25f52779 100644 --- a/socorro/tests/external/es/test_super_search_fields.py +++ b/socorro/tests/external/es/test_super_search_fields.py @@ -8,10 +8,8 @@ import pytest from socorro.external.es.super_search_fields import ( - add_doc_values, build_mapping, FIELDS, - is_doc_values_friendly, get_fields_by_item, ) @@ -29,49 +27,49 @@ class Test_get_fields_by_item: # No storage_mapping {"key": {"in_database_name": "key"}}, # Wrong or missing analyzer - {"key": {"in_database_name": "key", "storage_mapping": {"type": "string"}}}, + {"key": {"in_database_name": "key", "storage_mapping": {"type": "text"}}}, { "key": { "in_database_name": "key", "storage_mapping": { "analyzer": "semicolon_keywords", - "type": "string", + "type": "text", }, } }, ], ) def test_no_match(self, fields): - assert get_fields_by_item(fields, "analyzer", "keyword") == [] + assert get_fields_by_item(fields, "type", "keyword") == [] def test_match(self): fields = { "key": { "in_database_name": "key", - "storage_mapping": {"analyzer": "keyword", "type": "string"}, + "storage_mapping": {"type": "keyword"}, } } - assert get_fields_by_item(fields, "analyzer", "keyword") == [fields["key"]] + assert get_fields_by_item(fields, "type", "keyword") == [fields["key"]] def test_match_by_type(self): fields = { "key": { "in_database_name": "key", - "storage_mapping": {"analyzer": "keyword", "type": "string"}, + "storage_mapping": {"type": "keyword"}, } } - assert get_fields_by_item(fields, "type", "string") == [fields["key"]] + assert get_fields_by_item(fields, "type", "keyword") == [fields["key"]] def test_caching(self): # Verify caching works fields = { "key": { "in_database_name": "key", - "storage_mapping": {"analyzer": "keyword", "type": "string"}, + "storage_mapping": {"type": "keyword"}, } } - result = get_fields_by_item(fields, "analyzer", "keyword") - second_result = get_fields_by_item(fields, "analyzer", "keyword") + result = get_fields_by_item(fields, "type", "keyword") + second_result = get_fields_by_item(fields, "type", "keyword") assert id(result) == id(second_result) # This is the same data as fields, but a different dict, so it has a @@ -79,22 +77,20 @@ def test_caching(self): second_fields = { "key": { "in_database_name": "key", - "storage_mapping": {"analyzer": "keyword", "type": "string"}, + "storage_mapping": {"type": "keyword"}, } } - third_result = get_fields_by_item(second_fields, "analyzer", "keyword") + third_result = get_fields_by_item(second_fields, "type", "keyword") assert id(result) != id(third_result) class Test_build_mapping: """Test build_mapping with an elasticsearch database containing fake data""" - def test_get_mapping(self, es_helper): - doctype = es_helper.get_doctype() - mapping = build_mapping(doctype=doctype, fields=get_fields()) + def test_get_mapping(self): + mapping = build_mapping(fields=get_fields()) - assert doctype in mapping - properties = mapping[doctype]["properties"] + properties = mapping["properties"] print(json.dumps(properties, indent=4, sort_keys=True)) assert "processed_crash" in properties @@ -105,10 +101,9 @@ def test_get_mapping(self, es_helper): assert "os_name" in processed_crash assert "platform" not in processed_crash - # Those fields have a `storage_mapping`. + # Test a field that has a storage_mapping. assert processed_crash["release_channel"] == { - "analyzer": "keyword", - "type": "string", + "type": "keyword", } # Test nested objects. @@ -116,7 +111,6 @@ def test_get_mapping(self, es_helper): "cpu_count" ] == { "type": "short", - "doc_values": True, } @@ -223,47 +217,3 @@ def test_validate_super_search_fields(name, properties): f"processed_crash.{properties['name']}", ] assert key in possible_keys - - -@pytest.mark.parametrize( - "value, expected", - [ - # No type -> False - ({}, False), - # object -> False - ({"type": "object"}, False), - # Analyzed string -> False - ({"type": "string"}, False), - ({"type": "string", "analyzer": "keyword"}, False), - # Unanalyzed string -> True - ({"type": "string", "index": "not_analyzed"}, True), - # Anything else -> True - ({"type": "long"}, True), - ], -) -def test_is_doc_values_friendly(value, expected): - assert is_doc_values_friendly(value) == expected - - -def test_add_doc_values(): - storage_mapping = {"type": "short"} - add_doc_values(storage_mapping) - assert storage_mapping == {"type": "short", "doc_values": True} - - storage_mapping = { - "fields": { - "full": {"index": "not_analyzed", "type": "string"}, - }, - "analyzer": "standard", - "index": "analyzed", - "type": "string", - } - add_doc_values(storage_mapping) - assert storage_mapping == { - "fields": { - "full": {"index": "not_analyzed", "type": "string", "doc_values": True}, - }, - "analyzer": "standard", - "index": "analyzed", - "type": "string", - } diff --git a/socorro/tests/external/es/test_supersearch.py b/socorro/tests/external/es/test_supersearch.py index 824afb9193..94a516fd7b 100644 --- a/socorro/tests/external/es/test_supersearch.py +++ b/socorro/tests/external/es/test_supersearch.py @@ -4,8 +4,11 @@ import datetime import json +import http.server +import threading +from contextlib import contextmanager +from copy import deepcopy -import requests_mock import pytest from socorro import settings @@ -19,6 +22,30 @@ from socorro.lib.libooid import create_new_ooid +@contextmanager +def mock_es_server(ip, port, post_response): + class MockHTTPRequestHandler(http.server.BaseHTTPRequestHandler): + """mock request handler""" + + def do_POST(self): # pylint: disable=invalid-name + """Handle GET requests""" + self.send_response(200) + self.send_header("Content-Type", "application/json") + self.send_header("X-Elastic-Product", "Elasticsearch") + self.end_headers() + self.wfile.write(json.dumps(post_response).encode("utf-8")) + + server = http.server.ThreadingHTTPServer((ip, port), MockHTTPRequestHandler) + with server: + server_thread = threading.Thread(target=server.serve_forever) + server_thread.daemon = True + server_thread.start() + try: + yield + finally: + server.shutdown() + + class SuperSearchWithFields(SuperSearch): """Adds FIELDS to all .get() calls""" @@ -89,9 +116,14 @@ def test_get(self, es_helper): }, ) es_helper.refresh() - res = api.get( - _columns=["date", "build_id", "platform", "signature", "cpu_count"] + _columns=[ + "date", + "build_id", + "platform", + "signature", + "cpu_count", + ] ) assert "hits" in res @@ -103,10 +135,12 @@ def test_get(self, es_helper): assert res["hits"][0]["signature"] == "js::break_your_browser" assert list(res["facets"].keys()) == ["signature"] - assert res["facets"]["signature"][0] == { - "term": "js::break_your_browser", - "count": 1, - } + assert res["facets"]["signature"] == [ + { + "term": "js::break_your_browser", + "count": 1, + } + ] # Test fields are being renamed. assert "date" in res["hits"][0] # date_processed -> date @@ -458,7 +492,8 @@ def test_get_with_bool_operators(self, es_helper): assert resp["total"] == 2 assert len(resp["hits"]) == 2 - assert not resp["hits"][0]["accessibility"] + for hit in resp["hits"]: + assert not hit["accessibility"] def test_get_with_combined_operators(self, es_helper): now = utc_now() @@ -1193,11 +1228,11 @@ def test_get_with_date_histogram(self, es_helper): assert "histogram_date" in res["facets"] def dt_to_midnight(date): - return date.replace(hour=0, minute=0, second=0, microsecond=0) + return f"{date.replace(hour=0, minute=0, second=0, microsecond=0):%FT%H:%M:%SZ}" - today_str = dt_to_midnight(now).isoformat() - yesterday_str = dt_to_midnight(yesterday).isoformat() - day_before_str = dt_to_midnight(the_day_before).isoformat() + today_str = dt_to_midnight(now) + yesterday_str = dt_to_midnight(yesterday) + day_before_str = dt_to_midnight(the_day_before) expected_terms = [ { @@ -1327,13 +1362,9 @@ def test_get_with_date_histogram_with_bad_interval(self, es_helper): "_histogram_interval.date": "xdays", # Note! It's just wrong } - # Not using assert_raises here so we can do a check on the exception - # object when it does raise. - try: + with pytest.raises(BadArgumentError) as exc_info: api.get(**kwargs) - raise AssertionError("The line above is supposed to error out") - except BadArgumentError as exception: - assert exception.param == "_histogram_interval.date" + assert exc_info._excinfo[1].param == "_histogram_interval.date" def test_get_with_number_histogram(self, es_helper): crashstorage = self.build_crashstorage() @@ -1735,107 +1766,103 @@ def test_get_with_bad_regex(self, es_helper): crashstorage = self.build_crashstorage() api = SuperSearchWithFields(crashstorage=crashstorage) - # A bad regex kicks up a SearchParseException which supersearch converts + # regex is lazy-evaluated, so there has to be a crash matched by everything but + # the regex to produce a failure + now = utc_now() + es_helper.index_crash( + processed_crash={ + "uuid": create_new_ooid(timestamp=now), + "signature": "test.dll", + "date_processed": now, + }, + ) + + # A bad regex kicks up a shard failure which supersearch converts # to a BadArgumentError with pytest.raises(BadArgumentError): - api.get(signature='@"OOM | ".*" | ".*"()&%