From 90aa65aed3b83e9826452b3b1ae04423dcae1555 Mon Sep 17 00:00:00 2001 From: David French <56409778+threat-punter@users.noreply.github.com> Date: Fri, 3 Sep 2021 14:35:59 -0600 Subject: [PATCH] Generate detection rule to alert on traffic to typosquatting/homonym domains (#1199) * create new cli commands * add kibana object to create_dnstwist_rule * Adding code for index-dnstwist-results * Changed es to es_client * Tested. it works! * flake8-ed * Adding timestamps * use eql.utils.load_dump to load json file * rename data to dnstwist_data * start working on create-dnstwist-rule command * add print statements for user * tweak formatting for line length * add template threat match rule file * continue working on threat match rule creation * create rule using TomlRuleContents * save rule to toml file * Moving rule creation to eswrap.py * Moving create dnstwist rule stuff to eswrap * Fixed imports * flake8 fixes * More flake8 fixes * fix usage of @add_client('kibana') * use ctx.invoke to upload rule * cleanup record assembly and use bulk api * swap order of notes in `note` for sample rule * small modifications * move command to root click group * remove unused click group * Update detection_rules/main.py Co-authored-by: Justin Ibarra * remove rule upload and convert template to ndjson * Adding docs for typosquatting rule * renaming the file * Adding a note * separate index and rule prep commands * Final changes Co-authored-by: Apoorva Co-authored-by: brokensound77 Co-authored-by: Apoorva Joshi <30438249+ajosh0504@users.noreply.github.com> --- detection_rules/eswrap.py | 27 +----- detection_rules/kbwrap.py | 22 +---- detection_rules/main.py | 95 ++++++++++++++++++++- detection_rules/misc.py | 54 ++++++++++-- docs/typosquatting_rule.md | 38 +++++++++ etc/rule_template_typosquatting_domain.json | 47 ++++++++++ 6 files changed, 232 insertions(+), 51 deletions(-) create mode 100644 docs/typosquatting_rule.md create mode 100644 etc/rule_template_typosquatting_domain.json diff --git a/detection_rules/eswrap.py b/detection_rules/eswrap.py index b6e610f0169..bc4ccae1f22 100644 --- a/detection_rules/eswrap.py +++ b/detection_rules/eswrap.py @@ -13,41 +13,20 @@ import click import elasticsearch from elasticsearch import Elasticsearch -from elasticsearch.client import AsyncSearchClient +from elasticsearch.client.async_search import AsyncSearchClient import kql from .main import root -from .misc import add_params, client_error, elasticsearch_options +from .misc import add_params, client_error, elasticsearch_options, get_elasticsearch_client from .rule import TOMLRule from .rule_loader import rta_mappings, RuleCollection from .utils import format_command_options, normalize_timing_and_sort, unix_time_to_formatted, get_path + COLLECTION_DIR = get_path('collections') MATCH_ALL = {'bool': {'filter': [{'match_all': {}}]}} -def get_elasticsearch_client(cloud_id=None, elasticsearch_url=None, es_user=None, es_password=None, ctx=None, **kwargs): - """Get an authenticated elasticsearch client.""" - if not (cloud_id or elasticsearch_url): - client_error("Missing required --cloud-id or --elasticsearch-url") - - # don't prompt for these until there's a cloud id or elasticsearch URL - es_user = es_user or click.prompt("es_user") - es_password = es_password or click.prompt("es_password", hide_input=True) - hosts = [elasticsearch_url] if elasticsearch_url else None - timeout = kwargs.pop('timeout', 60) - - try: - client = Elasticsearch(hosts=hosts, cloud_id=cloud_id, http_auth=(es_user, es_password), timeout=timeout, - **kwargs) - # force login to test auth - client.info() - return client - except elasticsearch.AuthenticationException as e: - error_msg = f'Failed authentication for {elasticsearch_url or cloud_id}' - client_error(error_msg, e, ctx=ctx, err=True) - - def add_range_to_dsl(dsl_filter, start_time, end_time='now'): dsl_filter.append( {"range": {"@timestamp": {"gt": start_time, "lte": end_time, "format": "strict_date_optional_time"}}} diff --git a/detection_rules/kbwrap.py b/detection_rules/kbwrap.py index 13895b4b66d..13484ff0fc8 100644 --- a/detection_rules/kbwrap.py +++ b/detection_rules/kbwrap.py @@ -9,32 +9,14 @@ import click import kql -from kibana import Kibana, Signal, RuleResource +from kibana import Signal, RuleResource from .cli_utils import multi_collection from .main import root -from .misc import add_params, client_error, kibana_options +from .misc import add_params, client_error, kibana_options, get_kibana_client from .schemas import downgrade from .utils import format_command_options -def get_kibana_client(cloud_id, kibana_url, kibana_user, kibana_password, kibana_cookie, **kwargs): - """Get an authenticated Kibana client.""" - if not (cloud_id or kibana_url): - client_error("Missing required --cloud-id or --kibana-url") - - if not kibana_cookie: - # don't prompt for these until there's a cloud id or Kibana URL - kibana_user = kibana_user or click.prompt("kibana_user") - kibana_password = kibana_password or click.prompt("kibana_password", hide_input=True) - - with Kibana(cloud_id=cloud_id, kibana_url=kibana_url, **kwargs) as kibana: - if kibana_cookie: - kibana.add_cookie(kibana_cookie) - else: - kibana.login(kibana_user, kibana_password) - return kibana - - @root.group('kibana') @add_params(*kibana_options) @click.pass_context diff --git a/detection_rules/main.py b/detection_rules/main.py index a72f0331a46..009ddb7daf0 100644 --- a/detection_rules/main.py +++ b/detection_rules/main.py @@ -10,6 +10,7 @@ import os import re import time +from datetime import datetime from pathlib import Path from typing import Dict from uuid import uuid4 @@ -17,12 +18,12 @@ import click from .cli_utils import rule_prompt, multi_collection -from .misc import nested_set, parse_config +from .misc import add_client, client_error, nested_set, parse_config from .rule import TOMLRule, TOMLRuleContents from .rule_formatter import toml_write from .rule_loader import RuleCollection from .schemas import all_versions -from .utils import get_path, clear_caches, load_rule_contents +from .utils import get_path, get_etc_path, clear_caches, load_dump, load_rule_contents RULES_DIR = get_path('rules') @@ -296,3 +297,93 @@ def test_rules(ctx): clear_caches() ctx.exit(pytest.main(["-v"])) + + +@root.group('typosquat') +def typosquat_group(): + """Commands for generating typosquat detections.""" + + +@typosquat_group.command('create-dnstwist-index') +@click.argument('input-file', type=click.Path(exists=True, dir_okay=False), required=True) +@click.pass_context +@add_client('elasticsearch', add_func_arg=False) +def create_dnstwist_index(ctx: click.Context, input_file: click.Path): + """Create a dnstwist index in Elasticsearch to work with a threat match rule.""" + from elasticsearch import Elasticsearch + + es_client: Elasticsearch = ctx.obj['es'] + + click.echo(f'Attempting to load dnstwist data from {input_file}') + dnstwist_data: dict = load_dump(input_file) + click.echo(f'{len(dnstwist_data)} records loaded') + + original_domain = next(r['domain-name'] for r in dnstwist_data if r.get('fuzzer', '') == 'original*') + click.echo(f'Original domain name identified: {original_domain}') + + domain = original_domain.split('.')[0] + domain_index = f'dnstwist-{domain}' + # If index already exists, prompt user to confirm if they want to overwrite + if es_client.indices.exists(index=domain_index): + if click.confirm( + f"dnstwist index: {domain_index} already exists for {original_domain}. Do you want to overwrite?", + abort=True): + es_client.indices.delete(index=domain_index) + + fields = [ + "dns-a", + "dns-aaaa", + "dns-mx", + "dns-ns", + "banner-http", + "fuzzer", + "original-domain", + "dns.question.registered_domain" + ] + timestamp_field = "@timestamp" + mappings = {"mappings": {"properties": {f: {"type": "keyword"} for f in fields}}} + mappings["mappings"]["properties"][timestamp_field] = {"type": "date"} + + es_client.indices.create(index=domain_index, body=mappings) + + # handle dns.question.registered_domain separately + fields.pop() + es_updates = [] + now = datetime.utcnow() + + for item in dnstwist_data: + if item['fuzzer'] == 'original*': + continue + + record = item.copy() + record.setdefault('dns', {}).setdefault('question', {}).setdefault('registered_domain', item.get('domain-name')) + + for field in fields: + record.setdefault(field, None) + + record['@timestamp'] = now + + es_updates.extend([{'create': {'_index': domain_index}}, record]) + + click.echo(f'Indexing data for domain {original_domain}') + + results = es_client.bulk(body=es_updates) + if results['errors']: + error = {r['create']['result'] for r in results['items'] if r['create']['status'] != 201} + client_error(f'Errors occurred during indexing:\n{error}') + + click.echo(f'{len(results["items"])} watchlist domains added to index') + click.echo('Run `prep-rule` and import to Kibana to create alerts on this index') + + +@typosquat_group.command('prep-rule') +@click.argument('author') +def prep_rule(author: str): + """Prep the detection threat match rule for dnstwist data with a rule_id and author.""" + rule_template_file = Path(get_etc_path('rule_template_typosquatting_domain.json')) + template_rule = json.loads(rule_template_file.read_text()) + template_rule.update(author=[author], rule_id=str(uuid4())) + updated_rule = Path(get_path('rule_typosquatting_domain.ndjson')) + updated_rule.write_text(json.dumps(template_rule, sort_keys=True)) + click.echo(f'Rule saved to: {updated_rule}. Import this to Kibana to create alerts on all dnstwist-* indexes') + click.echo('Note: you only need to import and enable this rule one time for all dnstwist-* indexes') diff --git a/detection_rules/misc.py b/detection_rules/misc.py index 2da9df46ca5..c21fe82e863 100644 --- a/detection_rules/misc.py +++ b/detection_rules/misc.py @@ -271,6 +271,50 @@ def getdefault(name): return lambda: os.environ.get(envvar, config.get(name)) +def get_elasticsearch_client(cloud_id=None, elasticsearch_url=None, es_user=None, es_password=None, ctx=None, **kwargs): + """Get an authenticated elasticsearch client.""" + from elasticsearch import AuthenticationException, Elasticsearch + + if not (cloud_id or elasticsearch_url): + client_error("Missing required --cloud-id or --elasticsearch-url") + + # don't prompt for these until there's a cloud id or elasticsearch URL + es_user = es_user or click.prompt("es_user") + es_password = es_password or click.prompt("es_password", hide_input=True) + hosts = [elasticsearch_url] if elasticsearch_url else None + timeout = kwargs.pop('timeout', 60) + + try: + client = Elasticsearch(hosts=hosts, cloud_id=cloud_id, http_auth=(es_user, es_password), timeout=timeout, + **kwargs) + # force login to test auth + client.info() + return client + except AuthenticationException as e: + error_msg = f'Failed authentication for {elasticsearch_url or cloud_id}' + client_error(error_msg, e, ctx=ctx, err=True) + + +def get_kibana_client(cloud_id, kibana_url, kibana_user, kibana_password, kibana_cookie, **kwargs): + """Get an authenticated Kibana client.""" + from kibana import Kibana + + if not (cloud_id or kibana_url): + client_error("Missing required --cloud-id or --kibana-url") + + if not kibana_cookie: + # don't prompt for these until there's a cloud id or Kibana URL + kibana_user = kibana_user or click.prompt("kibana_user") + kibana_password = kibana_password or click.prompt("kibana_password", hide_input=True) + + with Kibana(cloud_id=cloud_id, kibana_url=kibana_url, **kwargs) as kibana: + if kibana_cookie: + kibana.add_cookie(kibana_cookie) + else: + kibana.login(kibana_user, kibana_password) + return kibana + + client_options = { 'kibana': { 'cloud_id': click.Option(['--cloud-id'], default=getdefault('cloud_id')), @@ -293,12 +337,10 @@ def getdefault(name): elasticsearch_options = list(client_options['elasticsearch'].values()) -def add_client(*client_type, add_to_ctx=True): +def add_client(*client_type, add_to_ctx=True, add_func_arg=True): """Wrapper to add authed client.""" from elasticsearch import Elasticsearch, ElasticsearchException from kibana import Kibana - from .eswrap import get_elasticsearch_client - from .kbwrap import get_kibana_client def _wrapper(func): client_ops_dict = {} @@ -333,7 +375,8 @@ def _wrapped(*args, **kwargs): except ElasticsearchException: elasticsearch_client = get_elasticsearch_client(use_ssl=True, **es_client_args) - kwargs['elasticsearch_client'] = elasticsearch_client + if add_func_arg: + kwargs['elasticsearch_client'] = elasticsearch_client if ctx and add_to_ctx: ctx.obj['es'] = elasticsearch_client @@ -349,7 +392,8 @@ def _wrapped(*args, **kwargs): except (requests.HTTPError, AttributeError): kibana_client = get_kibana_client(**kibana_client_args) - kwargs['kibana_client'] = kibana_client + if add_func_arg: + kwargs['kibana_client'] = kibana_client if ctx and add_to_ctx: ctx.obj['kibana'] = kibana_client diff --git a/docs/typosquatting_rule.md b/docs/typosquatting_rule.md new file mode 100644 index 00000000000..1ca5c8a4397 --- /dev/null +++ b/docs/typosquatting_rule.md @@ -0,0 +1,38 @@ +# Generating detection rule to alert on traffic to typosquatting or homonym domains + +## What does the rule do? + +This rule helps detect spoofing attacks on domains that you want to protect. + + +## Steps + +### 1. Run [dnstwist](https://github.com/elceef/dnstwist) on the domain you want to watch + +Eg: `dnstwist --format json elastic.co | jq` + +This should give you a json file consisting of potentially malicious lookalike domains for your domain. + +### 2. Index the lookalike domains into Elasticsearch + +In order to detect network activity on the lookalike domains using a threat match rule, you would first need to index these domains into an Elasticsearch index using the following CLI command: + +`python -m detection_rules typosquat create-dnstwist-index [OPTIONS] INPUT_FILE` + +### 3. Prep rule to alert on generated indexes + +Run the following CLI command to generate the typosquat rule file, which you will then import into Kibana. + +`python -m detection_rules typosquat prep-rule [OPTIONS] AUTHOR` + + +### 4. Import the rule into Kibana + +Import the ndjson rule file generated in the previous step, into Kibana, via the Detection rules UI. + +### 5. Detect potentially malicious network activity targeting your organization! + + +## Note + +You DO NOT need to re-import the rule file each time you have an additional domain to track. For each new domain, you'd run Step 1 to generate the json file consisting of lookalike domains for that domain, followed by the CLI command in Step 2 to index these domains into a new index. This index will automatically be picked up by the rule you imported the very first time. \ No newline at end of file diff --git a/etc/rule_template_typosquatting_domain.json b/etc/rule_template_typosquatting_domain.json new file mode 100644 index 00000000000..6dfe1566689 --- /dev/null +++ b/etc/rule_template_typosquatting_domain.json @@ -0,0 +1,47 @@ +{ + "author": ["THIS WILL BE POPULATED BY create-dnstwist-index COMMAND"], + "description": "This rule is triggered when a DNS request is made for a domain in the list of typosquatting domains generated by\ndnstwist. Adversaries may register homonym or homoglyph domains for the organization that they're targeting before\nsending a phishing lure to a user in an attempt to infect their endpoint with malware or steal credentials.\n", + "from": "now-10m", + "index": [ + "packetbeat-*", + "winlogbeat-*" + ], + "interval": "9m", + "language": "kuery", + "license": "Elastic License v2", + "name": "DNS Request for Typosquatting Domain", + "note": "## Config\n\n- Packetbeat or Winlogbeat must be configured to log DNS request events to be compatible with this rule.\n\n\n## Triage and Analysis\n\n- Determine the reason that the DNS request was made by the affected endpoint. For example, did the user visit the domain\nafter receiving a phishing email or did they mistype one of the organization's registered domains?\n- Take appropriate security measures when investigating the domain in question, as it may host malware or an attacker\nmay be monitoring for potential victims visiting the domain. For example, Use open source intelligence such as the\nWHOIS domain database to obtain information about the domain or interact with it using a malware sandbox service that\nis segmented from any of your production systems.\n", + "query": "dns.question.registered_domain:*\n", + "references": [], + "risk_score": 73, + "rule_id": "THIS WILL BE POPULATED BY create-dnstwist-index COMMAND", + "severity": "high", + "tags": [ + "Elastic", + "Network", + "Windows", + "Continuous Monitoring", + "SecOps", + "Monitoring" + ], + "threat_index": [ + "dnstwist-*" + ], + "threat_indicator_path": "", + "threat_language": "kuery", + "threat_mapping": [ + { + "entries": [ + { + "field": "dns.question.registered_domain", + "type": "mapping", + "value": "dns.question.registered_domain" + } + ] + } + ], + "threat_query": "dns.question.registered_domain:*", + "timeline_id": "495ad7a7-316e-4544-8a0f-9c098daee76e", + "timeline_title": "Generic Threat Match Timeline", + "type": "threat_match" +} \ No newline at end of file