forked from elastic/detection-rules
-
Notifications
You must be signed in to change notification settings - Fork 0
/
kbwrap.py
75 lines (60 loc) · 2.9 KB
/
kbwrap.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License;
# you may not use this file except in compliance with the Elastic License.
"""Kibana cli commands."""
import click
from kibana import Kibana, RuleResource
from .main import root
from .misc import client_error, getdefault
from .rule_loader import load_rule_files, load_rules
from .utils import format_command_options
@root.group('kibana')
@click.option('--kibana-url', '-k', default=getdefault('kibana_url'))
@click.option('--cloud-id', default=getdefault('cloud_id'))
@click.option('--kibana-user', '-u', default=getdefault('kibana_user'))
@click.option('--kibana-password', '-p', default=getdefault('kibana_password'))
@click.option('--space', default=None)
@click.pass_context
def kibana_group(ctx: click.Context, **kibana_kwargs):
"""Commands for integrating with Kibana."""
ctx.ensure_object(dict)
# only initialize an kibana client if the subcommand is invoked without help (hacky)
if click.get_os_args()[-1] in ctx.help_option_names:
click.echo('Kibana client:')
click.echo(format_command_options(ctx))
else:
if not (kibana_kwargs['cloud_id'] or kibana_kwargs['kibana_url']):
client_error("Missing required --cloud-id or --kibana-url")
# don't prompt for these until there's a cloud id or Kibana URL
kibana_user = kibana_kwargs.pop('kibana_user', None) or click.prompt("kibana_user")
kibana_password = kibana_kwargs.pop('kibana_password', None) or click.prompt("kibana_password", hide_input=True)
with Kibana(**kibana_kwargs) as kibana:
kibana.login(kibana_user, kibana_password)
ctx.obj['kibana'] = kibana
@kibana_group.command("upload-rule")
@click.argument("toml-files", nargs=-1, required=True)
@click.pass_context
def upload_rule(ctx, toml_files):
"""Upload a list of rule .toml files to Kibana."""
from uuid import uuid4
from .packaging import manage_versions
from .schemas import downgrade
kibana = ctx.obj['kibana']
file_lookup = load_rule_files(paths=toml_files)
rules = list(load_rules(file_lookup=file_lookup).values())
# assign the versions from etc/versions.lock.json
# rules that have changed in hash get incremented, others stay as-is.
# rules that aren't in the lookup default to version 1
manage_versions(rules, verbose=False)
api_payloads = []
for rule in rules:
payload = rule.contents.copy()
meta = payload.setdefault("meta", {})
meta["original"] = dict(id=rule.id, **rule.metadata)
payload["rule_id"] = str(uuid4())
payload = downgrade(payload, kibana.version)
rule = RuleResource(payload)
api_payloads.append(rule)
with kibana:
rules = RuleResource.bulk_create(api_payloads)
click.echo(f"Successfully uploaded {len(rules)} rules")