Skip to content

Commit

Permalink
chg: Improve strict typing
Browse files Browse the repository at this point in the history
  • Loading branch information
Rafiot committed Jan 26, 2024
1 parent fcfe975 commit c67f01c
Show file tree
Hide file tree
Showing 13 changed files with 103 additions and 103 deletions.
6 changes: 3 additions & 3 deletions bin/archiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from pathlib import Path

from redis import Redis
import s3fs # type: ignore
import s3fs # type: ignore[import-untyped]

from lookyloo.default import AbstractManager, get_config, get_homedir, get_socket_path, try_make_file
from lookyloo.helpers import get_captures_dir, is_locked, make_ts_from_dirname, make_dirs_list
Expand Down Expand Up @@ -371,7 +371,7 @@ def _load_indexes(self) -> None:
self.logger.debug(f'Loading {index}')
if recent_uuids := self.__load_index(index):
self.logger.debug(f'{len(recent_uuids)} captures in directory {index.parent}.')
self.redis.hset('lookup_dirs', mapping=recent_uuids) # type: ignore
self.redis.hset('lookup_dirs', mapping=recent_uuids) # type: ignore[arg-type]
else:
index.unlink()
total_recent_captures = self.redis.hlen('lookup_dirs')
Expand All @@ -385,7 +385,7 @@ def _load_indexes(self) -> None:
self.logger.debug(f'Loading {index}')
if archived_uuids := self.__load_index(index):
self.logger.debug(f'{len(archived_uuids)} captures in directory {index.parent}.')
self.redis.hset('lookup_dirs_archived', mapping=archived_uuids) # type: ignore
self.redis.hset('lookup_dirs_archived', mapping=archived_uuids) # type: ignore[arg-type]
else:
index.unlink()
total_archived_captures = self.redis.hlen('lookup_dirs_archived')
Expand Down
4 changes: 2 additions & 2 deletions bin/async_capture.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,10 @@ def process_capture_queue(self) -> None:

if get_config('generic', 'default_public'):
# By default, the captures are on the index, unless the user mark them as un-listed
listing = False if ('listing' in to_capture and to_capture['listing'].lower() in ['false', '0', '']) else True # type: ignore
listing = False if ('listing' in to_capture and isinstance(to_capture['listing'], str) and to_capture['listing'].lower() in ['false', '0', '']) else True
else:
# By default, the captures are not on the index, unless the user mark them as listed
listing = True if ('listing' in to_capture and to_capture['listing'].lower() in ['true', '1']) else False # type: ignore
listing = True if ('listing' in to_capture and isinstance(to_capture['listing'], str) and to_capture['listing'].lower() in ['true', '1']) else False

self.lookyloo.store_capture(
uuid, listing,
Expand Down
2 changes: 1 addition & 1 deletion lookyloo/capturecache.py
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ def _set_capture_cache(self, capture_dir_str: str) -> CaptureCache:
p.hset('lookup_dirs_archived', uuid, capture_dir_str)

p.delete(capture_dir_str)
p.hset(capture_dir_str, mapping=cache) # type: ignore
p.hset(capture_dir_str, mapping=cache) # type: ignore[arg-type]
p.execute()
return CaptureCache(cache)

Expand Down
4 changes: 2 additions & 2 deletions lookyloo/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@

from har2tree import CrawledTree, HostNode, URLNode # type: ignore[attr-defined]
from playwrightcapture import get_devices
from publicsuffixlist import PublicSuffixList # type: ignore
from publicsuffixlist import PublicSuffixList # type: ignore[import-untyped]
from pytaxonomies import Taxonomies # type: ignore[attr-defined]
from ua_parser import user_agent_parser # type: ignore
from ua_parser import user_agent_parser # type: ignore[import-untyped]
from werkzeug.user_agent import UserAgent
from werkzeug.utils import cached_property

Expand Down
14 changes: 7 additions & 7 deletions lookyloo/lookyloo.py
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,7 @@ def get_crawled_tree(self, capture_uuid: str, /) -> CrawledTree:

def _prepare_lacus_query(self, query: CaptureSettings) -> CaptureSettings:
# Remove the none, it makes redis unhappy
query = {k: v for k, v in query.items() if v is not None} # type: ignore
query = {k: v for k, v in query.items() if v is not None} # type: ignore[assignment]

if 'url' in query and query['url'] is not None:
# Make sure the URL does not have any space or newline
Expand Down Expand Up @@ -626,9 +626,9 @@ def get_priority(source: str, user: str, authenticated: bool) -> int:

for key, value in query.items():
if isinstance(value, bool):
query[key] = 1 if value else 0 # type: ignore
query[key] = 1 if value else 0 # type: ignore[literal-required]
elif isinstance(value, (list, dict)):
query[key] = json.dumps(value) if value else None # type: ignore
query[key] = json.dumps(value) if value else None # type: ignore[literal-required]

query = self._prepare_lacus_query(query)

Expand Down Expand Up @@ -679,7 +679,7 @@ def get_priority(source: str, user: str, authenticated: bool) -> int:
if value:
mapping_capture[key] = json.dumps(value)
elif value is not None:
mapping_capture[key] = value # type: ignore
mapping_capture[key] = value # type: ignore[assignment]

p = self.redis.pipeline()
p.zadd('to_capture', {perma_uuid: priority})
Expand Down Expand Up @@ -1157,7 +1157,7 @@ def misp_export(self, capture_uuid: str, /, with_parent: bool=False) -> list[MIS
event = self.misps.export(cache, self.is_public_instance)
screenshot: MISPAttribute = event.add_attribute('attachment', 'screenshot_landing_page.png',
data=self.get_screenshot(cache.uuid),
disable_correlation=True) # type: ignore
disable_correlation=True) # type: ignore[assignment]
# If the last object attached to tht event is a file, it is the rendered page
if event.objects and event.objects[-1].name == 'file':
event.objects[-1].add_reference(screenshot, 'rendered-as', 'Screenshot of the page')
Expand All @@ -1180,7 +1180,7 @@ def misp_export(self, capture_uuid: str, /, with_parent: bool=False) -> list[MIS
pt_entry = self.phishtank.get_url_lookup(urls[0].value)
if not pt_entry or not pt_entry.get('phish_detail_url'):
continue
pt_attribute: MISPAttribute = event.add_attribute('link', value=pt_entry['phish_detail_url'], comment='Phishtank permalink') # type: ignore
pt_attribute: MISPAttribute = event.add_attribute('link', value=pt_entry['phish_detail_url'], comment='Phishtank permalink') # type: ignore[assignment]
e_obj.add_reference(pt_attribute, 'known-as', 'Permalink on Phishtank')

if self.urlscan.available:
Expand Down Expand Up @@ -1491,7 +1491,7 @@ def get_stats(self) -> dict[str, list[Any]]:
month_stats['uniq_urls'] = len(urls)
month_stats['uniq_domains'] = len(uniq_domains(urls))

year_stats['months'].append(month_stats) # type: ignore
year_stats['months'].append(month_stats) # type: ignore[union-attr]
year_stats['yearly_submissions'] += month_stats['submissions']
statistics['years'].append(year_stats)

Expand Down
8 changes: 4 additions & 4 deletions lookyloo/modules/misp.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def export(self, cache: CaptureCache, is_public_instance: bool=False,
self.__misp_add_ips_to_URLObject(initial_url, cache.tree.root_hartree.hostname_tree)
initial_obj = event.add_object(initial_url)

lookyloo_link: MISPAttribute = event.add_attribute('link', f'https://{public_domain}/tree/{cache.uuid}') # type: ignore
lookyloo_link: MISPAttribute = event.add_attribute('link', f'https://{public_domain}/tree/{cache.uuid}') # type: ignore[assignment]
if not is_public_instance:
lookyloo_link.distribution = 0
initial_obj.add_reference(lookyloo_link, 'captured-by', 'Capture on lookyloo')
Expand Down Expand Up @@ -165,7 +165,7 @@ def module_init(self) -> bool:
self.enable_push = bool(self.config.get('enable_push', False))
self.allow_auto_trigger = bool(self.config.get('allow_auto_trigger', False))

self.default_tags: list[str] = self.config.get('default_tags') # type: ignore
self.default_tags: list[str] = self.config.get('default_tags') # type: ignore[assignment]
self.auto_publish = bool(self.config.get('auto_publish', False))
self.storage_dir_misp = get_homedir() / 'misp'
self.storage_dir_misp.mkdir(parents=True, exist_ok=True)
Expand Down Expand Up @@ -270,11 +270,11 @@ def lookup(self, node: URLNode, hostnode: HostNode) -> dict[str, set[str]] | dic
to_return: dict[str, set[str]] = defaultdict(set)
# NOTE: We have MISPAttribute in that list
for a in attributes:
to_return[a.event_id].add(a.value) # type: ignore
to_return[a.event_id].add(a.value) # type: ignore[union-attr,index]
return to_return
else:
# The request returned an error
return attributes # type: ignore
return attributes # type: ignore[return-value]
return {'info': 'No hits.'}
else:
return {'error': 'Module not available or lookup not enabled.'}
2 changes: 1 addition & 1 deletion lookyloo/modules/riskiq.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from typing import Any, TYPE_CHECKING
from urllib.parse import urlparse

from passivetotal import AccountClient, DnsRequest, WhoisRequest # type: ignore
from passivetotal import AccountClient, DnsRequest, WhoisRequest # type: ignore[import-untyped]
from requests import Response

from ..default import ConfigError, get_homedir
Expand Down
6 changes: 3 additions & 3 deletions lookyloo/modules/vt.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
from datetime import date
from typing import Any, TYPE_CHECKING

import vt # type: ignore
from vt.error import APIError # type: ignore
from vt.object import WhistleBlowerDict # type: ignore
import vt # type: ignore[import-untyped]
from vt.error import APIError # type: ignore[import-untyped]
from vt.object import WhistleBlowerDict # type: ignore[import-untyped]

from ..default import ConfigError, get_homedir
from ..helpers import get_cache_directory
Expand Down
Loading

0 comments on commit c67f01c

Please sign in to comment.