Skip to content

Commit

Permalink
new: SIGTERM handling (PyLacus and LacusCore)
Browse files Browse the repository at this point in the history
  • Loading branch information
Rafiot committed Oct 28, 2022
1 parent ff68e29 commit a48c6e0
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 21 deletions.
44 changes: 38 additions & 6 deletions bin/async_capture.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
import asyncio
import json
import logging
import signal
import time

from datetime import datetime
from pathlib import Path
from typing import Dict, Optional, Set, Union
from typing import Dict, Optional, Union

from lacuscore import LacusCore, CaptureStatus as CaptureStatusCore, CaptureResponse as CaptureResponseCore
from pylacus import PyLacus, CaptureStatus as CaptureStatusPy, CaptureResponse as CaptureResponsePy
Expand Down Expand Up @@ -48,7 +50,7 @@ def __init__(self, loglevel: int=logging.INFO):
self.lacus = LacusCore(self.redis, get_config('generic', 'tor_proxy'),
get_config('generic', 'only_global_lookups'))

self.captures: Set[asyncio.Task] = set()
self.captures: Dict[asyncio.Task, float] = {}

self.fox = FOX(get_config('modules', 'FOX'))
if not self.fox.available:
Expand Down Expand Up @@ -79,7 +81,10 @@ async def process_capture_queue(self) -> None:
break
entries = self.lacus.get_capture(uuid)
if entries['status'] == CaptureStatusPy.DONE:
self.logger.info(f'Got the capture for {uuid} from Lacus')
log = f'Got the capture for {uuid} from Lacus'
if runtime := entries.get('runtime'):
log = f'{log} - Runtime: {runtime}'
self.logger.info(log)
break
else:
# No captures are ready
Expand Down Expand Up @@ -175,17 +180,44 @@ async def process_capture_queue(self) -> None:
self.unset_running()
self.logger.info(f'Done with {uuid}')

async def cancel_old_captures(self):
cancelled_tasks = []
for task, timestamp in self.captures.items():
if time.time() - timestamp >= get_config('generic', 'max_capture_time'):
task.cancel()
cancelled_tasks.append(task)
self.logger.warning('A capture has been going for too long, canceling it.')
if cancelled_tasks:
await asyncio.gather(*cancelled_tasks, return_exceptions=True)

async def _to_run_forever_async(self):
await self.cancel_old_captures()
if self.force_stop:
return
capture = asyncio.create_task(self.process_capture_queue())
capture.add_done_callback(self.captures.discard)
self.captures.add(capture)
self.captures[capture] = time.time()
capture.add_done_callback(self.captures.pop)
while len(self.captures) >= get_config('generic', 'async_capture_processes'):
await self.cancel_old_captures()
await asyncio.sleep(1)

async def _wait_to_finish(self):
while self.captures:
self.logger.info(f'Waiting for {len(self.captures)} capture(s) to finish...')
await asyncio.sleep(5)
self.logger.info('No more captures')


def main():
m = AsyncCapture()
asyncio.run(m.run_async(sleep_in_sec=1))

loop = asyncio.new_event_loop()
loop.add_signal_handler(signal.SIGTERM, lambda: loop.create_task(m.stop_async()))

try:
loop.run_until_complete(m.run_async(sleep_in_sec=1))
finally:
loop.close()


if __name__ == '__main__':
Expand Down
4 changes: 3 additions & 1 deletion config/generic.json.sample
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
},
"hide_captures_with_error": false,
"archive": 180,
"max_capture_time": 3600,
"_notes": {
"loglevel": "(lookyloo) Can be one of the value listed here: https://docs.python.org/3/library/logging.html#levels",
"only_global_lookups": "Set it to True if your instance is publicly available so users aren't able to scan your internal network",
Expand All @@ -75,6 +76,7 @@
"email": "Configuration for sending email notifications.",
"priority": "Define the priority of a new capture. A capture from the web interface has priority over a capture from the API, same for authenticated user vs. anonymous.",
"hide_captures_with_error": "Capturing an URL may result in an error (domain non-existent, HTTP error, ...). They may be useful to see, but if you have a public instance, they will clutter the index.",
"archive": "The captures older than this value (in days) will be archived. They're not cached by default in the Lookyloo class."
"archive": "The captures older than this value (in days) will be archived. They're not cached by default in the Lookyloo class.",
"max_capture_time": "The very maximal time we allow a capture to keep going. Should only be triggered by captures that cause playwright to never quit."
}
}
25 changes: 21 additions & 4 deletions lookyloo/default/abstractmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ def __init__(self, loglevel: int=logging.DEBUG):
self.process: Optional[Popen] = None
self.__redis = Redis(unix_socket_path=get_socket_path('cache'), db=1, decode_responses=True)

self.force_stop = False

@staticmethod
def is_running() -> List[Tuple[str, float]]:
try:
Expand Down Expand Up @@ -81,7 +83,7 @@ def shutdown_requested(self) -> bool:
return True

def _to_run_forever(self) -> None:
pass
raise NotImplementedError('This method must be implemented by the child')

def _kill_process(self):
if self.process is None:
Expand All @@ -103,7 +105,7 @@ def _kill_process(self):
def run(self, sleep_in_sec: int) -> None:
self.logger.info(f'Launching {self.__class__.__name__}')
try:
while True:
while not self.force_stop:
if self.shutdown_requested():
break
try:
Expand Down Expand Up @@ -135,13 +137,25 @@ def run(self, sleep_in_sec: int) -> None:
pass
self.logger.info(f'Shutting down {self.__class__.__name__}')

async def stop(self):
self.force_stop = True

async def _to_run_forever_async(self) -> None:
pass
raise NotImplementedError('This method must be implemented by the child')

async def _wait_to_finish(self) -> None:
self.logger.info('Not implemented, nothing to wait for.')

async def stop_async(self):
"""Method to pass the signal handler:
loop.add_signal_handler(signal.SIGTERM, lambda: loop.create_task(p.stop()))
"""
self.force_stop = True

async def run_async(self, sleep_in_sec: int) -> None:
self.logger.info(f'Launching {self.__class__.__name__}')
try:
while True:
while not self.force_stop:
if self.shutdown_requested():
break
try:
Expand All @@ -163,7 +177,10 @@ async def run_async(self, sleep_in_sec: int) -> None:
break
except KeyboardInterrupt:
self.logger.warning(f'{self.script_name} killed by user.')
except Exception as e:
self.logger.exception(e)
finally:
await self._wait_to_finish()
if self.process:
self._kill_process()
try:
Expand Down
16 changes: 8 additions & 8 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ har2tree = "^1.16.0"
passivetotal = "^2.5.9"
werkzeug = "2.1.2"
filetype = "^1.1.0"
pypandora = "^1.1.2"
lacuscore = "^1.1.1"
pypandora = "^1.2.0"
lacuscore = "^1.1.2"
pylacus = "^1.1.0"

[tool.poetry.extras]
Expand Down

0 comments on commit a48c6e0

Please sign in to comment.