diff --git a/esrally/rally.py b/esrally/rally.py index 1dfd55c83..3bd56a197 100644 --- a/esrally/rally.py +++ b/esrally/rally.py @@ -179,12 +179,17 @@ def add_track_source(subparser): required=True, help="Name of the generated track", ) - create_track_parser.add_argument( + indices_or_data_streams_group = create_track_parser.add_mutually_exclusive_group(required=True) + indices_or_data_streams_group.add_argument( "--indices", type=non_empty_list, - required=True, help="Comma-separated list of indices to include in the track", ) + indices_or_data_streams_group.add_argument( + "--data-streams", + type=non_empty_list, + help="Comma-separated list of data streams to include in the track", + ) create_track_parser.add_argument( "--target-hosts", default="", @@ -987,9 +992,16 @@ def dispatch_sub_command(arg_parser, args, cfg): cfg.add(config.Scope.applicationOverride, "generator", "output.path", args.output_path) generate(cfg) elif sub_command == "create-track": - cfg.add(config.Scope.applicationOverride, "generator", "indices", args.indices) - cfg.add(config.Scope.applicationOverride, "generator", "output.path", args.output_path) - cfg.add(config.Scope.applicationOverride, "track", "track.name", args.track) + if args.data_streams is not None: + cfg.add(config.Scope.applicationOverride, "generator", "indices", "*") + cfg.add(config.Scope.applicationOverride, "generator", "data_streams", args.data_streams) + cfg.add(config.Scope.applicationOverride, "generator", "output.path", args.output_path) + cfg.add(config.Scope.applicationOverride, "track", "track.name", args.track) + elif args.indices is not None: + cfg.add(config.Scope.applicationOverride, "generator", "indices", args.indices) + cfg.add(config.Scope.applicationOverride, "generator", "data_streams", args.data_streams) + cfg.add(config.Scope.applicationOverride, "generator", "output.path", args.output_path) + cfg.add(config.Scope.applicationOverride, "track", "track.name", args.track) configure_connection_params(arg_parser, args, cfg) tracker.create_track(cfg) diff --git a/esrally/tracker/index.py b/esrally/tracker/index.py index f445d8683..76f234445 100644 --- a/esrally/tracker/index.py +++ b/esrally/tracker/index.py @@ -46,10 +46,13 @@ def update_index_setting_parameters(settings): settings[s] = param.format(orig=orig_value) -def is_valid(index_name): +def is_valid(index_name, index_pattern): if len(index_name) == 0: return False, "Index name is empty" - if index_name.startswith("."): + # When the indices are requested directly (with --data-streams or --indices) then we honor the + # request, even if it includes hidden indices. But when asking for all indices we skip hidden + # indices as they could be system indices and restoring them to another cluster would break it. + if index_pattern in ("_all", "*") and index_name.startswith("."): return False, f"Index [{index_name}] is hidden" return True, None @@ -65,9 +68,9 @@ def extract_index_mapping_and_settings(client, index_pattern): results = {} logger = logging.getLogger(__name__) # the response might contain multiple indices if a wildcard was provided - response = client.indices.get(index=index_pattern) + response = client.indices.get(index=index_pattern, params={"expand_wildcards": "all"}) for index, details in response.items(): - valid, reason = is_valid(index) + valid, reason = is_valid(index, index_pattern) if valid: mappings = details["mappings"] index_settings = filter_ephemeral_index_settings(details["settings"]["index"]) @@ -104,3 +107,20 @@ def extract(client, outdir, index_pattern): } ) return results + + +def extract_indices_from_data_stream(client, data_stream_pattern): + """ + Calls Elasticsearch client get_data_stream function to retrieve list of indices + :param client: Elasticsearch client + :param data_stream_pattern: name of data stream + :return: list of index names + """ + results = [] + # the response might contain multiple indices if a wildcard was provided + params_defined = {"expand_wildcards": "all", "filter_path": "data_streams.name"} + results_data_streams = client.indices.get_data_stream(name=data_stream_pattern, params=params_defined) + + for indices in results_data_streams["data_streams"]: + results.append(indices.get("name")) + return results diff --git a/esrally/tracker/tracker.py b/esrally/tracker/tracker.py index 0882e6a25..d8daa06e1 100644 --- a/esrally/tracker/tracker.py +++ b/esrally/tracker/tracker.py @@ -35,6 +35,19 @@ def process_template(templates_path, template_filename, template_vars, output_pa f.write(template.render(template_vars)) +def extract_indices_from_data_streams(client, data_streams_to_extract): + indices = [] + # first extract index metadata (which is cheap) and defer extracting data to reduce the potential for + # errors due to invalid index names late in the process. + for data_stream_name in data_streams_to_extract: + try: + indices += index.extract_indices_from_data_stream(client, data_stream_name) + except ElasticsearchException: + logging.getLogger(__name__).exception("Failed to extract indices from data stream [%s]", data_stream_name) + + return indices + + def extract_mappings_and_corpora(client, output_path, indices_to_extract): indices = [] corpora = [] @@ -63,8 +76,7 @@ def create_track(cfg): root_path = cfg.opts("generator", "output.path") target_hosts = cfg.opts("client", "hosts") client_options = cfg.opts("client", "options") - - logger.info("Creating track [%s] matching indices [%s]", track_name, indices) + data_streams = cfg.opts("generator", "data_streams") client = EsClientFactory( hosts=target_hosts.all_hosts[opts.TargetHosts.DEFAULT], client_options=client_options.all_client_options[opts.TargetHosts.DEFAULT] @@ -76,6 +88,12 @@ def create_track(cfg): output_path = os.path.abspath(os.path.join(io.normalize_path(root_path), track_name)) io.ensure_dir(output_path) + if data_streams is not None: + logger.info("Creating track [%s] matching data streams [%s]", track_name, data_streams) + extracted_indices = extract_indices_from_data_streams(client, data_streams) + indices = extracted_indices + logger.info("Creating track [%s] matching indices [%s]", track_name, indices) + indices, corpora = extract_mappings_and_corpora(client, output_path, indices) if len(indices) == 0: raise RuntimeError("Failed to extract any indices for track!") diff --git a/tests/tracker/index_test.py b/tests/tracker/index_test.py index e4e391fd9..20d52b29d 100644 --- a/tests/tracker/index_test.py +++ b/tests/tracker/index_test.py @@ -19,6 +19,7 @@ from esrally.tracker.index import ( extract_index_mapping_and_settings, + extract_indices_from_data_stream, filter_ephemeral_index_settings, update_index_setting_parameters, ) @@ -116,7 +117,9 @@ def test_extract_index_create(client): }, }, } - expected = { + + res = extract_index_mapping_and_settings(client, "_all") + assert res == { "osmgeopoints": { "mappings": { "dynamic": "strict", @@ -145,5 +148,31 @@ def test_extract_index_create(client): }, }, } - res = extract_index_mapping_and_settings(client, "_all") - assert res == expected + + +@mock.patch("elasticsearch.Elasticsearch") +def test_extract_indices_from_data_stream(client): + data_streams_filter = ["metrics-kubernetes-*"] + client.indices.get_data_stream.return_value = { + "data_streams": [ + { + "name": "metrics-kubernetes.event-default", + "timestamp_field": {"name": "@timestamp"}, + "indices": [ + {"index_name": ".ds-metrics-kubernetes.event-default-2022.06.20-000001", "index_uuid": "0W8L56dKQoGXjkGQc8mfzg"} + ], + "generation": 1, + "_meta": {"description": "default metrics template installed by x-pack", "managed": "true"}, + "status": "GREEN", + "template": "metrics", + "ilm_policy": "metrics", + "hidden": "false", + "system": "false", + "allow_custom_routing": "false", + "replicated": "false", + } + ] + } + + res = extract_indices_from_data_stream(client, data_streams_filter) + assert res == ["metrics-kubernetes.event-default"]