Skip to content

Commit

Permalink
search: introduce a new hyperscan-backed searchdef type: `HyperscanSe…
Browse files Browse the repository at this point in the history
…archDef`

Searchkit currently uses python's re which is not known for its' "blow your
socks off" pattern scanning performance, hence there is an opportunity for
optimization by simply swapping the regex engine.

Hyperscan is a highly optimized, performant regex engine that is typically
used high throughput network packet inspection systems (e.g. DPI, IDS/IPS
systems) for pattern recognition. The work that searchkit does is aligned
with hyperscan's properties so it would be beneficial for searchkit to
allow downstream users to leverage hyperscan, especially for searching large
files.

This patch introduces a hyperscan-backed SearchDef type which can be used as
a drop-in replacement for the existing SearchDef type. The patch also adds
hyperscan as a dependency and moves searchkit tests to a base class so the
tests can be used for testing both SearchDef and HyperscanSearchDef at the
same time.

Signed-off-by: Mustafa Kemal Gilor <[email protected]>
  • Loading branch information
mustafakemalgilor committed May 7, 2024
1 parent f2c5882 commit 05444e3
Show file tree
Hide file tree
Showing 6 changed files with 238 additions and 67 deletions.
8 changes: 8 additions & 0 deletions pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,11 @@ disable=
broad-exception-raised,
unspecified-encoding,
consider-using-f-string,

[TYPECHECK]
# List of module names for which member attributes should not be checked
#
# hyperscan is on ignore list because pylint is having trouble locating module
# members for hyperscan and raising invalid "no-member" errors, e.g.;
# Module 'hyperscan' has no 'Database' member (no-member)
ignored-modules=hyperscan
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,5 @@ dynamic = ["version"]
dependencies = [
'importlib-metadata; python_version >= "3.8"',
'fasteners',
'hyperscan >= 0.7.0'
]
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
fasteners
hyperscan >= 0.7.0
1 change: 1 addition & 0 deletions searchkit/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@
FileSearcher,
ResultFieldInfo,
SearchDef,
HyperscanSearchDef,
SequenceSearchDef,
)
145 changes: 136 additions & 9 deletions searchkit/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,20 @@
import gzip
import multiprocessing
import os
import io
import queue
import re
import signal
import subprocess
import threading
import time
import uuid
from functools import cached_property
import sys
from pathlib import Path
from collections import namedtuple, UserDict, UserList
from functools import cached_property

import hyperscan
from searchkit.log import log
from searchkit.constraints import CouldNotApplyConstraint

Expand Down Expand Up @@ -115,6 +119,117 @@ def run(self, line):
return ret


class HyperscanSearchDef(SearchDefBase):

"""
Simple search definition, backed by hyperscan re.
@param pattern: pattern or list of patterns to search for
@param tag: optional user-friendly identifier for this search term.
This is useful for retrieving results.
@param store_result_contents: by default the content of a search result
is saved but if it is not needed this
can be set to False. This effectively
makes the result True/False.
@param field_info: optional ResultFieldInfo object
"""

# This is defined as class level variable
# in order to be able to share hyperscan
# databases between processes without having
# to deal with `pickling`. Hyperscan instances
# are multithread/multiprocess safe by default.
hs_databases = {}

def _compile_hs_db(self, patterns, *args, **kwargs):
db = hyperscan.Database(*args, **kwargs)
expressions, ids, flags = [], [], []
group_names: dict[int, str] = {}
for i, (group_name, expr) in enumerate(patterns.items()):
ids.append(i)
expressions.append(expr.encode())
flags.append(0)
group_names[i] = group_name

db.compile(expressions=expressions, ids=ids, flags=flags)
log.debug(
"compiled hyperscan db for tag %s:"
"%s, size: %d byte(s)",
self.tag, db.info().decode(), db.size())
return db

def link_to_sequence(self, sequence_def, tag):
"""
If this search definition is part of a sequence, the parent
SequenceSearchDef must link itself to this object.
@param sequence_def: SequenceSearchDef object
@param tag: SequenceSearchDef object tag for this section def
"""
self.sequence_def = sequence_def
self.tag = tag

def __init__(self, pattern, tag=None, hint=None,
store_result_contents=True, field_info=None,
**kwargs) -> None:
self.tag = tag
self.hint = hint # not used atm.
self.sequence_def = None
self.field_info = field_info
self.store_result_contents = store_result_contents

if isinstance(pattern, list):
self.patterns = dict(enumerate(pattern))
else:
self.patterns = {0: pattern}
if self.id not in HyperscanSearchDef.hs_databases:
dbs = {}
dbs['prefilter'] = self._compile_hs_db(
self.patterns, mode=hyperscan.HS_MODE_BLOCK)
dbs['group'] = self._compile_hs_db(
self.patterns, chimera=True, mode=hyperscan.CH_MODE_GROUPS)
HyperscanSearchDef.hs_databases[self.id] = dbs
# do this last
super().__init__(**kwargs)

def run(self, line):
match_result = None

class match_object:
"""An object type that mimics the
re.match's return type.
"""
def __init__(self, mg) -> None:
self._line = mg[0]
self._groups = {k: mg[k]
for k in set(list(mg.keys())) - set([0])}

def groups(self):
return self._groups

def group(self, idx):
if idx == 0:
return self._line
return self.groups()[idx]

def gm_on_match(_id, _start, _end, _flags, captured, _ctx):
nonlocal match_result
nonlocal line
match_result = {}

for i, (_cap_flags, cap_from, cap_to) in enumerate(captured):
match_result[i] = line[cap_from:cap_to]

def on_match(_id, _start, _end, _flags, _ctx):
HyperscanSearchDef.hs_databases[self.id]['group'].scan(
line.encode(), gm_on_match)

HyperscanSearchDef.hs_databases[self.id]['prefilter'].scan(
line.encode(), on_match)

return match_object(match_result) if match_result else None


class SequenceSearchDef(SearchDefBase):

def __init__(self, start, tag, end=None, body=None, **kwargs):
Expand Down Expand Up @@ -977,17 +1092,25 @@ def _run_search(self, fd):
self.stats.reset()
sequence_results = SequenceSearchResults()
search_ids = set([s.id for s in self.search_defs]) # noqa, pylint: disable=R1718
search_tags = set([s.tag for s in self.search_defs]) # noqa, pylint: disable=R1718
offset = self.constraints_manager.apply_global(search_ids, fd)
log.debug("starting search of %s (offset=%s, pos=%s)", fd.name, offset,
fd.tell())
log.debug("starting search of %s (offset=%s, pos=%s) for tags [%s]",
fd.name, offset, fd.tell(), search_tags)

fd.seek(0, 2)
eof_offset = fd.tell()
fd.seek(0)

runnable = {s.id: _runnable
for s, _runnable in self.search_defs.items()}
ln = 0
# NOTE: line numbers start at 1 hence offset + 1
for ln, line in enumerate(fd, start=offset + 1):
# This could be helpful to show progress for large files
if ln % 100000 == 0:
log.debug("%s lines searched in %s", ln, fd.name)
log.debug("%s lines searched in %s (%g%%)", ln,
Path(*Path(fd.name).parts[-2:]),
(fd.tell()/eof_offset)*100.0)

self.stats['lines_searched'] += 1
line = line.decode("utf-8", **self.decode_kwargs)
Expand Down Expand Up @@ -1042,9 +1165,10 @@ def execute(self):
# test if file is gzip
fd.read(1)
fd.seek(0)
stats = self._run_search(fd)
with io.BufferedReader(fd, buffer_size=1024 ** 3) as fd_b:
stats = self._run_search(fd_b)
except OSError:
with open(path, 'rb') as fd:
with open(path, 'rb', buffering=1024 ** 3) as fd:
stats = self._run_search(fd)
except UnicodeDecodeError:
log.exception("caught UnicodeDecodeError while searching %s", path)
Expand Down Expand Up @@ -1261,9 +1385,12 @@ def _get_results(self, results, results_queue, event, stats):
log.debug("exiting results thread")
break
else:
log.debug("total %s results received, %s/%s jobs completed - "
"waiting for more", len(results),
stats['jobs_completed'], stats['total_jobs'])
# Avoid spamming the debug output
if (time.time() % 15) == 0:
log.debug("total %s results received, %s/%s jobs completed"
" - waiting for more. Size = %d", len(results),
stats['jobs_completed'], stats['total_jobs'],
sys.getsizeof(results))
# yield
time.sleep(0.1)

Expand Down
Loading

0 comments on commit 05444e3

Please sign in to comment.