diff --git a/CLI.md b/CLI.md index 59d02dd95f6..367846573b9 100644 --- a/CLI.md +++ b/CLI.md @@ -111,11 +111,11 @@ Usage: detection_rules es [OPTIONS] COMMAND [ARGS]... Commands for integrating with Elasticsearch. Options: - -e, --elasticsearch-url TEXT + -et, --timeout INTEGER Timeout for elasticsearch client + -ep, --es-password TEXT + -eu, --es-user TEXT --cloud-id TEXT - -u, --es-user TEXT - -p, --es-password TEXT - -t, --timeout INTEGER Timeout for elasticsearch client + -e, --elasticsearch-url TEXT -h, --help Show this message and exit. Commands: @@ -130,12 +130,12 @@ Usage: detection_rules kibana [OPTIONS] COMMAND [ARGS]... Commands for integrating with Kibana. Options: - -k, --kibana-url TEXT + --space TEXT Kibana space + -kp, --kibana-password TEXT + -ku, --kibana-user TEXT --cloud-id TEXT - -u, --kibana-user TEXT - -p, --kibana-password TEXT - -t, --timeout INTEGER Timeout for kibana client - -h, --help Show this message and exit. + -k, --kibana-url TEXT + -h, --help Show this message and exit. Commands: upload-rule Upload a list of rule .toml files to Kibana. @@ -153,11 +153,11 @@ python -m detection_rules kibana upload-rule -h Kibana client: Options: - -k, --kibana-url TEXT + --space TEXT Kibana space + -kp, --kibana-password TEXT + -ku, --kibana-user TEXT --cloud-id TEXT - -u, --kibana-user TEXT - -p, --kibana-password TEXT - -t, --timeout INTEGER Timeout for kibana client + -k, --kibana-url TEXT Usage: detection_rules kibana upload-rule [OPTIONS] TOML_FILES... diff --git a/detection_rules/devtools.py b/detection_rules/devtools.py index 22886c3c14a..0f1ccc21185 100644 --- a/detection_rules/devtools.py +++ b/detection_rules/devtools.py @@ -9,15 +9,20 @@ import os import shutil import subprocess +import time import click +from elasticsearch import Elasticsearch from eql import load_dump +from kibana.connector import Kibana from . import rule_loader +from .eswrap import CollectEvents, add_range_to_dsl from .main import root -from .misc import PYTHON_LICENSE, client_error +from .misc import PYTHON_LICENSE, add_client, client_error from .packaging import PACKAGE_FILE, Package, manage_versions, RELEASE_DIR from .rule import Rule +from .rule_loader import get_rule from .utils import get_path @@ -201,3 +206,151 @@ def license_check(ctx): click.echo(relative_path, err=True) ctx.exit(int(failed)) + + +@dev_group.group('test') +def test_group(): + """Commands for testing against stack resources.""" + + +@test_group.command('event-search') +@click.argument('query') +@click.option('--index', '-i', multiple=True, help='Index patterns to search against') +@click.option('--eql/--lucene', '-e/-l', 'language', default=None, help='Query language used (default: kql)') +@click.option('--date-range', '-d', type=(str, str), default=('now-7d', 'now'), help='Date range to scope search') +@click.option('--count', '-c', is_flag=True, help='Return count of results only') +@click.option('--max-results', '-m', type=click.IntRange(1, 1000), default=100, + help='Max results to return (capped at 1000)') +@click.option('--verbose', '-v', is_flag=True, default=True) +@add_client('elasticsearch') +def event_search(query, index, language, date_range, count, max_results, verbose=True, + elasticsearch_client: Elasticsearch = None): + """Search using a query against an Elasticsearch instance.""" + start_time, end_time = date_range + index = index or ('*',) + language_used = "kql" if language is None else "eql" if language is True else "lucene" + collector = CollectEvents(elasticsearch_client, max_results) + + if verbose: + click.echo(f'searching {",".join(index)} from {start_time} to {end_time}') + click.echo(f'{language_used}: {query}') + + if count: + results = collector.count(query, language_used, index, start_time, end_time) + click.echo(f'total results: {results}') + else: + results = collector.search(query, language_used, index, start_time, end_time, max_results) + click.echo(f'total results: {len(results)} (capped at {max_results})') + click.echo_via_pager(json.dumps(results, indent=2, sort_keys=True)) + + return results + + +@test_group.command('rule-event-search') +@click.argument('rule-file', type=click.Path(dir_okay=False), required=False) +@click.option('--rule-id', '-id') +@click.option('--date-range', '-d', type=(str, str), default=('now-7d', 'now'), help='Date range to scope search') +@click.option('--count', '-c', is_flag=True, help='Return count of results only') +@click.option('--max-results', '-m', type=click.IntRange(1, 1000), default=100, + help='Max results to return (capped at 1000)') +@click.option('--verbose', '-v', is_flag=True) +@click.pass_context +@add_client('elasticsearch') +def rule_event_search(ctx, rule_file, rule_id, date_range, count, max_results, verbose, + elasticsearch_client: Elasticsearch = None): + """Search using a rule file against an Elasticsearch instance.""" + rule = None + + if rule_id: + rule = get_rule(rule_id, verbose=False) + elif rule_file: + rule = Rule(rule_file, load_dump(rule_file)) + else: + client_error('Must specify a rule file or rule ID') + + if rule.query and rule.contents.get('language'): + if verbose: + click.echo(f'Searching rule: {rule.name}') + + rule_lang = rule.contents.get('language') + if rule_lang == 'kuery': + language = None + elif rule_lang == 'eql': + language = True + else: + language = False + ctx.invoke(event_search, query=rule.query, index=rule.contents.get('index', ['*']), language=language, + date_range=date_range, count=count, max_results=max_results, verbose=verbose, + elasticsearch_client=elasticsearch_client) + else: + client_error('Rule is not a query rule!') + + +@test_group.command('rule-survey') +@click.argument('query', required=False) +@click.option('--date-range', '-d', type=(str, str), default=('now-7d', 'now'), help='Date range to scope search') +@click.option('--dump-file', type=click.Path(dir_okay=False), + default=get_path('surveys', f'{time.strftime("%Y%m%dT%H%M%SL")}.json'), + help='Save details of results (capped at 1000 results/rule)') +@click.option('--hide-zero-counts', '-z', is_flag=True, help='Exclude rules with zero hits from printing') +@click.option('--hide-errors', '-e', is_flag=True, help='Exclude rules with errors from printing') +@click.pass_context +@add_client('elasticsearch', 'kibana', add_to_ctx=True) +def rule_survey(ctx: click.Context, query, date_range, dump_file, hide_zero_counts, hide_errors, + elasticsearch_client: Elasticsearch = None, kibana_client: Kibana = None): + """Survey rule counts.""" + from eql.table import Table + from kibana.resources import Signal + from . import rule_loader + from .main import search_rules + + survey_results = [] + start_time, end_time = date_range + + if query: + rule_paths = [r['file'] for r in ctx.invoke(search_rules, query=query, verbose=False)] + rules = rule_loader.load_rules(rule_loader.load_rule_files(paths=rule_paths, verbose=False), verbose=False) + rules = rules.values() + else: + rules = rule_loader.load_rules(verbose=False).values() + + click.echo(f'Running survey against {len(rules)} rules') + click.echo(f'Saving detailed dump to: {dump_file}') + + collector = CollectEvents(elasticsearch_client) + details = collector.search_from_rule(*rules, start_time=start_time, end_time=end_time) + counts = collector.count_from_rule(*rules, start_time=start_time, end_time=end_time) + + # add alerts + with kibana_client: + range_dsl = {'query': {'bool': {'filter': []}}} + add_range_to_dsl(range_dsl['query']['bool']['filter'], start_time, end_time) + alerts = {a['_source']['signal']['rule']['rule_id']: a['_source'] + for a in Signal.search(range_dsl)['hits']['hits']} + + for rule_id, count in counts.items(): + alert_count = len(alerts.get(rule_id, [])) + if alert_count > 0: + count['alert_count'] = alert_count + + details[rule_id].update(count) + + search_count = count['search_count'] + if not alert_count and (hide_zero_counts and search_count == 0) or (hide_errors and search_count == -1): + continue + + survey_results.append(count) + + fields = ['rule_id', 'name', 'search_count', 'alert_count'] + table = Table.from_list(fields, survey_results) + + if len(survey_results) > 200: + click.echo_via_pager(table) + else: + click.echo(table) + + os.makedirs(get_path('surveys'), exist_ok=True) + with open(dump_file, 'w') as f: + json.dump(details, f, indent=2, sort_keys=True) + + return survey_results diff --git a/detection_rules/eswrap.py b/detection_rules/eswrap.py index c954b1b3cb6..a3ecd4c9d05 100644 --- a/detection_rules/eswrap.py +++ b/detection_rules/eswrap.py @@ -6,37 +6,56 @@ import json import os import time +from collections import defaultdict +from typing import Union import click -from elasticsearch import AuthenticationException, Elasticsearch +import elasticsearch +from elasticsearch import Elasticsearch +from elasticsearch.client import AsyncSearchClient +import kql from .main import root -from .misc import client_error, getdefault +from .misc import add_params, client_error, elasticsearch_options from .utils import format_command_options, normalize_timing_and_sort, unix_time_to_formatted, get_path +from .rule import Rule from .rule_loader import get_rule, rta_mappings COLLECTION_DIR = get_path('collections') +MATCH_ALL = {'bool': {'filter': [{'match_all': {}}]}} -def get_es_client(es_user, es_password, elasticsearch_url=None, cloud_id=None, **kwargs): - """Get an auth-validated elsticsearch client.""" - assert elasticsearch_url or cloud_id, \ - 'You must specify a host or cloud_id to authenticate to an elasticsearch instance' +def get_elasticsearch_client(cloud_id=None, elasticsearch_url=None, es_user=None, es_password=None, ctx=None, **kwargs): + """Get an authenticated elasticsearch client.""" + if not (cloud_id or elasticsearch_url): + client_error("Missing required --cloud-id or --elasticsearch-url") - hosts = [elasticsearch_url] if elasticsearch_url else elasticsearch_url + # don't prompt for these until there's a cloud id or elasticsearch URL + es_user = es_user or click.prompt("es_user") + es_password = es_password or click.prompt("es_password", hide_input=True) + hosts = [elasticsearch_url] if elasticsearch_url else None + + try: + client = Elasticsearch(hosts=hosts, cloud_id=cloud_id, http_auth=(es_user, es_password), **kwargs) + # force login to test auth + client.info() + return client + except elasticsearch.AuthenticationException as e: + error_msg = f'Failed authentication for {elasticsearch_url or cloud_id}' + client_error(error_msg, e, ctx=ctx, err=True) - client = Elasticsearch(hosts=hosts, cloud_id=cloud_id, http_auth=(es_user, es_password), **kwargs) - # force login to test auth - client.info() - return client +def add_range_to_dsl(dsl_filter, start_time, end_time='now'): + dsl_filter.append( + {"range": {"@timestamp": {"gt": start_time, "lte": end_time, "format": "strict_date_optional_time"}}} + ) -class Events(object): + +class RtaEvents(object): """Events collected from Elasticsearch.""" - def __init__(self, agent_hostname, events): - self.agent_hostname = agent_hostname - self.events = self._normalize_event_timing(events) + def __init__(self, events): + self.events: dict = self._normalize_event_timing(events) @staticmethod def _normalize_event_timing(events): @@ -46,7 +65,8 @@ def _normalize_event_timing(events): return events - def _get_dump_dir(self, rta_name=None): + @staticmethod + def _get_dump_dir(rta_name=None, host_id=None): """Prepare and get the dump path.""" if rta_name: dump_dir = get_path('unit_tests', 'data', 'true_positives', rta_name) @@ -54,7 +74,7 @@ def _get_dump_dir(self, rta_name=None): return dump_dir else: time_str = time.strftime('%Y%m%dT%H%M%SL') - dump_dir = os.path.join(COLLECTION_DIR, self.agent_hostname, time_str) + dump_dir = os.path.join(COLLECTION_DIR, host_id or 'unknown_host', time_str) os.makedirs(dump_dir, exist_ok=True) return dump_dir @@ -81,11 +101,11 @@ def echo_events(self, pager=False, pretty=True): echo_fn = click.echo_via_pager if pager else click.echo echo_fn(json.dumps(self.events, indent=2 if pretty else None, sort_keys=True)) - def save(self, rta_name=None, dump_dir=None): + def save(self, rta_name=None, dump_dir=None, host_id=None): """Save collected events.""" - assert self.events, 'Nothing to save. Run Collector.run() method first' + assert self.events, 'Nothing to save. Run Collector.run() method first or verify logging' - dump_dir = dump_dir or self._get_dump_dir(rta_name) + dump_dir = dump_dir or self._get_dump_dir(rta_name=rta_name, host_id=host_id) for source, events in self.events.items(): path = os.path.join(dump_dir, source + '.jsonl') @@ -98,8 +118,8 @@ class CollectEvents(object): """Event collector for elastic stack.""" def __init__(self, client, max_events=3000): - self.client = client - self.MAX_EVENTS = max_events + self.client: Elasticsearch = client + self.max_events = max_events def _build_timestamp_map(self, index_str): """Build a mapping of indexes to timestamp data formats.""" @@ -107,16 +127,17 @@ def _build_timestamp_map(self, index_str): timestamp_map = {n: m['mappings'].get('properties', {}).get('@timestamp', {}) for n, m in mappings.items()} return timestamp_map - def _get_current_time(self, agent_hostname, index_str): + def _get_last_event_time(self, index_str, dsl=None): """Get timestamp of most recent event.""" - # https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-date-format.html - timestamp_map = self._build_timestamp_map(index_str) - - last_event = self._search_window(agent_hostname, index_str, start_time='now-1m', size=1, sort='@timestamp:desc') - last_event = last_event['hits']['hits'][0] + last_event = self.client.search(dsl, index_str, size=1, sort='@timestamp:desc')['hits']['hits'] + if not last_event: + return + last_event = last_event[0] index = last_event['_index'] timestamp = last_event['_source']['@timestamp'] + + timestamp_map = self._build_timestamp_map(index_str) event_date_format = timestamp_map[index].get('format', '').split('||') # there are many native supported date formats and even custom data formats, but most, including beats use the @@ -127,44 +148,183 @@ def _get_current_time(self, agent_hostname, index_str): return timestamp - def _search_window(self, agent_hostname, index_str, start_time, end_time='now', size=None, sort='@timestamp:asc', - **match): - """Collect all events within a time window and parse by source.""" - match = match.copy() - match.update({"agent.hostname": agent_hostname}) - body = {"query": {"bool": {"filter": [ - {"match": {"agent.hostname": agent_hostname}}, - {"range": {"@timestamp": {"gt": start_time, "lte": end_time, "format": "strict_date_optional_time"}}}] - }}} + @staticmethod + def _prep_query(query, language, index, start_time=None, end_time=None): + """Prep a query for search.""" + index_str = ','.join(index if isinstance(index, (list, tuple)) else index.split(',')) + lucene_query = query if language == 'lucene' else None + + if language in ('kql', 'kuery'): + formatted_dsl = {'query': kql.to_dsl(query)} + elif language == 'eql': + formatted_dsl = {'query': query, 'filter': MATCH_ALL} + elif language == 'lucene': + formatted_dsl = {'query': {'bool': {'filter': []}}} + elif language == 'dsl': + formatted_dsl = {'query': query} + else: + raise ValueError('Unknown search language') + + if start_time or end_time: + end_time = end_time or 'now' + dsl = formatted_dsl['filter']['bool']['filter'] if language == 'eql' else \ + formatted_dsl['query']['bool'].setdefault('filter', []) + add_range_to_dsl(dsl, start_time, end_time) + + return index_str, formatted_dsl, lucene_query + + def search(self, query, language, index: Union[str, list] = '*', start_time=None, end_time=None, size=None, + **kwargs): + """Search an elasticsearch instance.""" + index_str, formatted_dsl, lucene_query = self._prep_query(query=query, language=language, index=index, + start_time=start_time, end_time=end_time) + formatted_dsl.update(size=size or self.max_events) + + if language == 'eql': + results = self.client.eql.search(body=formatted_dsl, index=index_str, **kwargs)['hits'] + results = results.get('events') or results.get('sequences', []) + else: + results = self.client.search(body=formatted_dsl, q=lucene_query, index=index_str, + allow_no_indices=True, ignore_unavailable=True, **kwargs)['hits']['hits'] + + return results + + def search_from_rule(self, *rules: Rule, start_time=None, end_time='now', size=None): + """Search an elasticsearch instance using a rule.""" + from .misc import nested_get + + async_client = AsyncSearchClient(self.client) + survey_results = {} + + def parse_unique_field_results(rule_type, unique_fields, search_results): + parsed_results = defaultdict(lambda: defaultdict(int)) + hits = search_results['hits'] + hits = hits['hits'] if rule_type != 'eql' else hits.get('events') or hits.get('sequences', []) + for hit in hits: + for field in unique_fields: + match = nested_get(hit['_source'], field) + match = ','.join(sorted(match)) if isinstance(match, list) else match + parsed_results[field][match] += 1 + # if rule.type == eql, structure is different + return {'results': parsed_results} if parsed_results else {} + + multi_search = [] + multi_search_rules = [] + async_searches = {} + eql_searches = {} + + for rule in rules: + if not rule.query: + continue + + index_str, formatted_dsl, lucene_query = self._prep_query(query=rule.query, + language=rule.contents.get('language'), + index=rule.contents.get('index', '*'), + start_time=start_time, + end_time=end_time) + formatted_dsl.update(size=size or self.max_events) + + # prep for searches: msearch for kql | async search for lucene | eql client search for eql + if rule.contents['language'] == 'kuery': + multi_search_rules.append(rule) + multi_search.append(json.dumps( + {'index': index_str, 'allow_no_indices': 'true', 'ignore_unavailable': 'true'})) + multi_search.append(json.dumps(formatted_dsl)) + elif rule.contents['language'] == 'lucene': + # wait for 0 to try and force async with no immediate results (not guaranteed) + result = async_client.submit(body=formatted_dsl, q=rule.query, index=index_str, + allow_no_indices=True, ignore_unavailable=True, + wait_for_completion_timeout=0) + if result['is_running'] is True: + async_searches[rule] = result['id'] + else: + survey_results[rule.id] = parse_unique_field_results(rule.type, rule.unique_fields, + result['response']) + elif rule.contents['language'] == 'eql': + eql_body = { + 'index': index_str, + 'params': {'ignore_unavailable': 'true', 'allow_no_indices': 'true'}, + 'body': {'query': rule.query, 'filter': formatted_dsl['filter']} + } + eql_searches[rule] = eql_body + + # assemble search results + multi_search_results = self.client.msearch('\n'.join(multi_search) + '\n') + for index, result in enumerate(multi_search_results['responses']): + try: + rule = multi_search_rules[index] + survey_results[rule.id] = parse_unique_field_results(rule.type, rule.unique_fields, result) + except KeyError: + survey_results[multi_search_rules[index].id] = {'error_retrieving_results': True} + + for rule, search_args in eql_searches.items(): + try: + result = self.client.eql.search(**search_args) + survey_results[rule.id] = parse_unique_field_results(rule.type, rule.unique_fields, result) + except (elasticsearch.NotFoundError, elasticsearch.RequestError) as e: + survey_results[rule.id] = {'error_retrieving_results': True, 'error': e.info['error']['reason']} + + for rule, async_id in async_searches.items(): + result = async_client.get(async_id)['response'] + survey_results[rule.id] = parse_unique_field_results(rule.type, rule.unique_fields, result) + + return survey_results + + def count(self, query, language, index: Union[str, list], start_time=None, end_time='now'): + """Get a count of documents from elasticsearch.""" + index_str, formatted_dsl, lucene_query = self._prep_query(query=query, language=language, index=index, + start_time=start_time, end_time=end_time) + + # EQL API has no count endpoint + if language == 'eql': + results = self.search(query=query, language=language, index=index, start_time=start_time, end_time=end_time, + size=1000) + return len(results) + else: + return self.client.count(body=formatted_dsl, index=index_str, q=lucene_query, allow_no_indices=True, + ignore_unavailable=True)['count'] + + def count_from_rule(self, *rules, start_time=None, end_time='now'): + """Get a count of documents from elasticsearch using a rule.""" + survey_results = {} - if match: - body['query']['bool']['filter'].extend([{'match': {k: v}} for k, v in match.items()]) + for rule in rules: + rule_results = {'rule_id': rule.id, 'name': rule.name} - return self.client.search(index=index_str, body=body, size=size or self.MAX_EVENTS, sort=sort) + if not rule.query: + continue + + try: + rule_results['search_count'] = self.count(query=rule.query, language=rule.contents.get('language'), + index=rule.contents.get('index', '*'), start_time=start_time, + end_time=end_time) + except (elasticsearch.NotFoundError, elasticsearch.RequestError): + rule_results['search_count'] = -1 + + survey_results[rule.id] = rule_results + + return survey_results + + +class CollectRtaEvents(CollectEvents): + """Collect RTA events from elasticsearch.""" @staticmethod def _group_events_by_type(events): """Group events by agent.type.""" event_by_type = {} - for event in events['hits']['hits']: + for event in events: event_by_type.setdefault(event['_source']['agent']['type'], []).append(event['_source']) return event_by_type - def run(self, agent_hostname, indexes, verbose=True, **match): + def run(self, dsl, indexes, start_time): """Collect the events.""" - index_str = ','.join(indexes) - start_time = self._get_current_time(agent_hostname, index_str) - - if verbose: - click.echo('Setting start of event capture to: {}'.format(click.style(start_time, fg='yellow'))) - - click.pause('Press any key once detonation is complete ...') - time.sleep(5) - events = self._group_events_by_type(self._search_window(agent_hostname, index_str, start_time, **match)) - - return Events(agent_hostname, events) + results = self.search(dsl, language='dsl', index=indexes, start_time=start_time, end_time='now', size=5000, + sort='@timestamp:asc') + events = self._group_events_by_type(results) + return RtaEvents(events) @root.command('normalize-data') @@ -172,18 +332,14 @@ def run(self, agent_hostname, indexes, verbose=True, **match): def normalize_data(events_file): """Normalize Elasticsearch data timestamps and sort.""" file_name = os.path.splitext(os.path.basename(events_file.name))[0] - events = Events('_', {file_name: [json.loads(e) for e in events_file.readlines()]}) + events = RtaEvents({file_name: [json.loads(e) for e in events_file.readlines()]}) events.save(dump_dir=os.path.dirname(events_file.name)) @root.group('es') -@click.option('--elasticsearch-url', '-e', default=getdefault("elasticsearch_url")) -@click.option('--cloud-id', default=getdefault("cloud_id")) -@click.option('--es-user', '-u', default=getdefault("es_user")) -@click.option('--es-password', '-p', default=getdefault("es_password")) -@click.option('--timeout', '-t', default=60, help='Timeout for elasticsearch client') +@add_params(*elasticsearch_options) @click.pass_context -def es_group(ctx: click.Context, **es_kwargs): +def es_group(ctx: click.Context, **kwargs): """Commands for integrating with Elasticsearch.""" ctx.ensure_object(dict) @@ -193,38 +349,30 @@ def es_group(ctx: click.Context, **es_kwargs): click.echo(format_command_options(ctx)) else: - if not (es_kwargs['cloud_id'] or es_kwargs['elasticsearch_url']): - client_error("Missing required --cloud-id or --elasticsearch-url") - - # don't prompt for these until there's a cloud id or elasticsearch URL - es_kwargs['es_user'] = es_kwargs['es_user'] or click.prompt("es_user") - es_kwargs['es_password'] = es_kwargs['es_password'] or click.prompt("es_password", hide_input=True) - - try: - client = get_es_client(use_ssl=True, **es_kwargs) - ctx.obj['es'] = client - except AuthenticationException as e: - error_msg = f'Failed authentication for {es_kwargs.get("elasticsearch_url") or es_kwargs.get("cloud_id")}' - client_error(error_msg, e, ctx=ctx, err=True) + ctx.obj['es'] = get_elasticsearch_client(ctx=ctx, **kwargs) @es_group.command('collect-events') -@click.argument('agent-hostname') +@click.argument('host-id') +@click.option('--query', '-q', help='KQL query to scope search') @click.option('--index', '-i', multiple=True, help='Index(es) to search against (default: all indexes)') -@click.option('--agent-type', '-a', help='Restrict results to a source type (agent.type) ex: auditbeat') @click.option('--rta-name', '-r', help='Name of RTA in order to save events directly to unit tests data directory') @click.option('--rule-id', help='Updates rule mapping in rule-mapping.yml file (requires --rta-name)') @click.option('--view-events', is_flag=True, help='Print events after saving') @click.pass_context -def collect_events(ctx, agent_hostname, index, agent_type, rta_name, rule_id, view_events): +def collect_events(ctx, host_id, query, index, rta_name, rule_id, view_events): """Collect events from Elasticsearch.""" - match = {'agent.type': agent_type} if agent_type else {} client = ctx.obj['es'] + dsl = kql.to_dsl(query) if query else MATCH_ALL + dsl['bool'].setdefault('filter', []).append({'bool': {'should': [{'match_phrase': {'host.id': host_id}}]}}) try: - collector = CollectEvents(client) - events = collector.run(agent_hostname, index, **match) - events.save(rta_name) + collector = CollectRtaEvents(client) + start = time.time() + click.pause('Press any key once detonation is complete ...') + start_time = f'now-{round(time.time() - start) + 5}s' + events = collector.run(dsl, index or '*', start_time) + events.save(rta_name=rta_name, host_id=host_id) if rta_name and rule_id: events.evaluate_against_rule_and_update_mapping(rule_id, rta_name) diff --git a/detection_rules/kbwrap.py b/detection_rules/kbwrap.py index 3083793d3f8..92130565053 100644 --- a/detection_rules/kbwrap.py +++ b/detection_rules/kbwrap.py @@ -4,20 +4,31 @@ """Kibana cli commands.""" import click -from kibana import Kibana, RuleResource +import kql +from kibana import Kibana, Signal, RuleResource from .main import root -from .misc import client_error, getdefault +from .misc import add_params, client_error, kibana_options from .rule_loader import load_rule_files, load_rules from .utils import format_command_options +def get_kibana_client(cloud_id, kibana_url, kibana_user, kibana_password, **kwargs): + """Get an authenticated Kibana client.""" + if not (cloud_id or kibana_url): + client_error("Missing required --cloud-id or --kibana-url") + + # don't prompt for these until there's a cloud id or Kibana URL + kibana_user = kibana_user or click.prompt("kibana_user") + kibana_password = kibana_password or click.prompt("kibana_password", hide_input=True) + + with Kibana(cloud_id=cloud_id, kibana_url=kibana_url, **kwargs) as kibana: + kibana.login(kibana_user, kibana_password) + return kibana + + @root.group('kibana') -@click.option('--kibana-url', '-k', default=getdefault('kibana_url')) -@click.option('--cloud-id', default=getdefault('cloud_id')) -@click.option('--kibana-user', '-u', default=getdefault('kibana_user')) -@click.option('--kibana-password', '-p', default=getdefault('kibana_password')) -@click.option('--space', default=None) +@add_params(*kibana_options) @click.pass_context def kibana_group(ctx: click.Context, **kibana_kwargs): """Commands for integrating with Kibana.""" @@ -29,16 +40,7 @@ def kibana_group(ctx: click.Context, **kibana_kwargs): click.echo(format_command_options(ctx)) else: - if not (kibana_kwargs['cloud_id'] or kibana_kwargs['kibana_url']): - client_error("Missing required --cloud-id or --kibana-url") - - # don't prompt for these until there's a cloud id or Kibana URL - kibana_user = kibana_kwargs.pop('kibana_user', None) or click.prompt("kibana_user") - kibana_password = kibana_kwargs.pop('kibana_password', None) or click.prompt("kibana_password", hide_input=True) - - with Kibana(**kibana_kwargs) as kibana: - kibana.login(kibana_user, kibana_password) - ctx.obj['kibana'] = kibana + ctx.obj['kibana'] = get_kibana_client(**kibana_kwargs) @kibana_group.command("upload-rule") @@ -73,3 +75,30 @@ def upload_rule(ctx, toml_files): with kibana: rules = RuleResource.bulk_create(api_payloads) click.echo(f"Successfully uploaded {len(rules)} rules") + + +@kibana_group.command('search-alerts') +@click.argument('query', required=False) +@click.option('--date-range', '-d', type=(str, str), default=('now-7d', 'now'), help='Date range to scope search') +@click.option('--columns', '-c', multiple=True, help='Columns to display in table') +@click.option('--extend', '-e', is_flag=True, help='If columns are specified, extend the original columns') +@click.pass_context +def search_alerts(ctx, query, date_range, columns, extend): + """Search detection engine alerts with KQL.""" + from eql.table import Table + from .eswrap import MATCH_ALL, add_range_to_dsl + + kibana = ctx.obj['kibana'] + start_time, end_time = date_range + kql_query = kql.to_dsl(query) if query else MATCH_ALL + add_range_to_dsl(kql_query['bool'].setdefault('filter', []), start_time, end_time) + + with kibana: + alerts = [a['_source'] for a in Signal.search({'query': kql_query})['hits']['hits']] + + table_columns = ['host.hostname', 'signal.rule.name', 'signal.status', 'signal.original_time'] + if columns: + columns = list(columns) + table_columns = table_columns + columns if extend else columns + click.echo(Table.from_list(table_columns, alerts)) + return alerts diff --git a/detection_rules/misc.py b/detection_rules/misc.py index 2418e8c2a7d..7ef9949dc8e 100644 --- a/detection_rules/misc.py +++ b/detection_rules/misc.py @@ -8,11 +8,12 @@ import re import time import uuid +from functools import wraps import click import requests -from .utils import cached, get_path +from .utils import add_params, cached, get_path _CONFIG = {} @@ -37,12 +38,13 @@ class ClientError(click.ClickException): def __init__(self, message, original_error=None): super(ClientError, self).__init__(message) self.original_error = original_error + self.original_error_type = type(original_error).__name__ if original_error else '' def show(self, file=None, err=True): """Print the error to the console.""" - err = f' ({self.original_error})' if self.original_error else '' - click.echo(f'{click.style(f"CLI Error{err}", fg="red", bold=True)}: {self.format_message()}', - err=err, file=file) + err = f' {self.original_error_type}' if self.original_error else '' + msg = f'{click.style(f"CLI Error{self.original_error_type}", fg="red", bold=True)}: {self.format_message()}' + click.echo(msg, err=err, file=file) def client_error(message, exc: Exception = None, debug=None, ctx: click.Context = None, file=None, err=None): @@ -53,7 +55,7 @@ def client_error(message, exc: Exception = None, debug=None, ctx: click.Context click.echo(click.style('DEBUG: ', fg='yellow') + message, err=err, file=file) raise else: - raise ClientError(message, original_error=exc and type(exc).__name__) + raise ClientError(message, original_error=exc) def nested_get(_dict, dot_key, default=None): @@ -246,3 +248,90 @@ def getdefault(name): envvar = f"DR_{name.upper()}" config = parse_config() return lambda: os.environ.get(envvar, config.get(name)) + + +client_options = { + 'kibana': { + 'kibana_url': click.Option(['--kibana-url'], default=getdefault('kibana_url')), + 'cloud_id': click.Option(['--cloud-id'], default=getdefault('cloud_id')), + 'kibana_user': click.Option(['--kibana-user', '-ku'], default=getdefault('kibana_user')), + 'kibana_password': click.Option(['--kibana-password', '-kp'], default=getdefault('kibana_password')), + 'space': click.Option(['--space'], default=None, help='Kibana space') + }, + 'elasticsearch': { + 'elasticsearch_url': click.Option(['--elasticsearch-url'], default=getdefault("elasticsearch_url")), + 'cloud_id': click.Option(['--cloud-id'], default=getdefault("cloud_id")), + 'es_user': click.Option(['--es-user', '-eu'], default=getdefault("es_user")), + 'es_password': click.Option(['--es-password', '-ep'], default=getdefault("es_password")), + 'timeout': click.Option(['--timeout', '-et'], default=60, help='Timeout for elasticsearch client') + } +} +kibana_options = list(client_options['kibana'].values()) +elasticsearch_options = list(client_options['elasticsearch'].values()) + + +def add_client(*client_type, add_to_ctx=True): + """Wrapper to add authed client.""" + from elasticsearch import Elasticsearch, ElasticsearchException + from kibana import Kibana + from .eswrap import get_elasticsearch_client + from .kbwrap import get_kibana_client + + def _wrapper(func): + client_ops_dict = {} + client_ops_keys = {} + for c_type in client_type: + ops = client_options.get(c_type) + client_ops_dict.update(ops) + client_ops_keys[c_type] = list(ops) + + if not client_ops_dict: + raise ValueError(f'Unknown client: {client_type} in {func.__name__}') + + client_ops = list(client_ops_dict.values()) + + @wraps(func) + @add_params(*client_ops) + def _wrapped(*args, **kwargs): + ctx: click.Context = next((a for a in args if isinstance(a, click.Context)), None) + es_client_args = {k: kwargs.pop(k, None) for k in client_ops_keys.get('elasticsearch', [])} + # shared args like cloud_id + kibana_client_args = {k: kwargs.pop(k, es_client_args.get(k)) for k in client_ops_keys.get('kibana', [])} + + if 'elasticsearch' in client_type: + # for nested ctx invocation, no need to re-auth if an existing client is already passed + elasticsearch_client: Elasticsearch = kwargs.get('elasticsearch_client') + try: + if elasticsearch_client and isinstance(elasticsearch_client, Elasticsearch) and \ + elasticsearch_client.info(): + pass + else: + elasticsearch_client = get_elasticsearch_client(use_ssl=True, **es_client_args) + except ElasticsearchException: + elasticsearch_client = get_elasticsearch_client(use_ssl=True, **es_client_args) + + kwargs['elasticsearch_client'] = elasticsearch_client + if ctx and add_to_ctx: + ctx.obj['es'] = elasticsearch_client + + if 'kibana' in client_type: + # for nested ctx invocation, no need to re-auth if an existing client is already passed + kibana_client: Kibana = kwargs.get('kibana_client') + try: + with kibana_client: + if kibana_client and isinstance(kibana_client, Kibana) and kibana_client.version: + pass + else: + kibana_client = get_kibana_client(**kibana_client_args) + except (requests.HTTPError, AttributeError): + kibana_client = get_kibana_client(**kibana_client_args) + + kwargs['kibana_client'] = kibana_client + if ctx and add_to_ctx: + ctx.obj['kibana'] = kibana_client + + return func(*args, **kwargs) + + return _wrapped + + return _wrapper diff --git a/detection_rules/utils.py b/detection_rules/utils.py index 81726b0cee3..11d0b36c58c 100644 --- a/detection_rules/utils.py +++ b/detection_rules/utils.py @@ -252,3 +252,14 @@ def format_command_options(ctx): formatter.write_dl(opts) return formatter.getvalue() + + +def add_params(*params): + """Add parameters to a click command.""" + def decorator(f): + if not hasattr(f, '__click_params__'): + f.__click_params__ = [] + f.__click_params__.extend(params) + return f + + return decorator diff --git a/kibana/connector.py b/kibana/connector.py index e79c15a28cb..b8863036491 100644 --- a/kibana/connector.py +++ b/kibana/connector.py @@ -29,7 +29,7 @@ def __init__(self, cloud_id=None, kibana_url=None, verify=True, elasticsearch=No self.cloud_id = cloud_id self.kibana_url = kibana_url self.elastic_url = None - self.space = space + self.space = space if space and space.lower() != 'default' else None self.status = None if self.cloud_id: @@ -62,7 +62,7 @@ def url(self, uri): uri = "s/{}/{}".format(self.space, uri) return f"{self.kibana_url}/{uri}" - def request(self, method, uri, params=None, data=None, error=True, verbose=True): + def request(self, method, uri, params=None, data=None, error=True, verbose=True, raw=False): """Perform a RESTful HTTP request with JSON responses.""" params = params or {} url = self.url(uri) @@ -83,7 +83,7 @@ def request(self, method, uri, params=None, data=None, error=True, verbose=True) if not response.content: return - return response.json() + return response.content if raw else response.json() def get(self, uri, params=None, data=None, error=True, **kwargs): """Perform an HTTP GET.""" @@ -120,6 +120,10 @@ def login(self, kibana_username, kibana_password): else: raise + # Kibana will authenticate against URLs which contain invalid spaces + if self.space: + self.verify_space(self.space) + self.authenticated = True self.status = self.get("/api/status") @@ -133,7 +137,14 @@ def login(self, kibana_username, kibana_password): def logout(self): """Quit the current session.""" - # TODO: implement session logout + try: + self.get('/logout', raw=True, error=False) + except requests.exceptions.ConnectionError: + # for really short scoping from buildup to teardown, ES will cause a Max retry error + pass + self.status = None + self.authenticated = False + self.session = requests.Session() self.elasticsearch = None def __del__(self): @@ -161,3 +172,10 @@ def current(cls) -> 'Kibana': raise RuntimeError("No Kibana connector in scope!") return stack[-1] + + def verify_space(self, space): + """Verify a space is valid.""" + spaces = self.get('/api/spaces/space') + space_names = [s['name'] for s in spaces] + if space not in space_names: + raise ValueError(f'Unknown Kibana space: {space}') diff --git a/requirements.txt b/requirements.txt index 068fb3a3e33..ffdfe66af53 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,7 +6,7 @@ requests==2.22.0 Click==7.0 PyYAML~=5.3 eql~=0.9.5 -elasticsearch~=7.5.1 +elasticsearch~=7.9 XlsxWriter==1.3.6 # test deps diff --git a/tests/test_utils.py b/tests/test_utils.py index 312b6b0bc2a..eceeaaa3b00 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -8,7 +8,7 @@ import unittest from detection_rules.utils import normalize_timing_and_sort, cached -from detection_rules.eswrap import Events +from detection_rules.eswrap import RtaEvents from detection_rules.ecs import get_kql_schema @@ -56,7 +56,7 @@ def test_event_class_normalization(self): """Test that events are normalized properly within Events.""" events_data = self.get_events() for date_format, events in events_data.items(): - normalized = Events('_', {'winlogbeat': events}) + normalized = RtaEvents({'winlogbeat': events}) self.assert_sort(normalized.events['winlogbeat'], date_format) def test_schema_multifields(self):