Skip to content

Commit

Permalink
Shared threadpool (smicallef#1489)
Browse files Browse the repository at this point in the history
* do not store return values by default

* pass callback with every submit()

* flake8 fixes

* shared threadpool between all modules

* flake8 fixes

* change taskName to help avoid conflicts

* minor bug fixes + performance improvements

* move threadpool tests to new file

* better error handling in scan status checks
  • Loading branch information
TheTechromancer authored Oct 10, 2021
1 parent 156b1f2 commit 1f35646
Show file tree
Hide file tree
Showing 7 changed files with 427 additions and 281 deletions.
2 changes: 2 additions & 0 deletions sf.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def main():
# be overridden from saved configuration settings stored in the DB.
sfConfig = {
'_debug': False, # Debug
'_maxthreads': 3, # Number of modules to run concurrently
'__logging': True, # Logging in general
'__outputfilter': None, # Event types to filter from modules' output
'_useragent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:62.0) Gecko/20100101 Firefox/62.0', # User-Agent to use for HTTP requests
Expand All @@ -71,6 +72,7 @@ def main():

sfOptdescs = {
'_debug': "Enable debugging?",
'_maxthreads': "Max number of modules to run concurrently",
'_useragent': "User-Agent string to use for HTTP requests. Prefix with an '@' to randomly select the User Agent from a file containing user agent strings for each request, e.g. @C:\\useragents.txt or @/home/bob/useragents.txt. Or supply a URL to load the list from there.",
'_dnsserver': "Override the default resolver with another DNS server. For example, 8.8.8.8 is Google's open DNS server.",
'_fetchtimeout': "Number of seconds before giving up on a HTTP request.",
Expand Down
90 changes: 45 additions & 45 deletions sfscan.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import dns.resolver

from sflib import SpiderFoot
from spiderfoot import SpiderFootDb, SpiderFootEvent, SpiderFootPlugin, SpiderFootTarget, SpiderFootHelpers, logger
from spiderfoot import SpiderFootDb, SpiderFootEvent, SpiderFootPlugin, SpiderFootTarget, SpiderFootHelpers, SpiderFootThreadPool, logger


def startSpiderFootScanner(loggingQueue, *args, **kwargs):
Expand Down Expand Up @@ -210,6 +210,8 @@ def __init__(self, scanName, scanId, targetValue, targetType, moduleList, global

self.__setStatus("INITIALIZING", time.time() * 1000, None)

self.__sharedThreadPool = SpiderFootThreadPool(threads=self.__config.get("_maxthreads", 3), name='sharedThreadPool')

# Used when module threading is enabled
self.eventQueue = None

Expand Down Expand Up @@ -255,19 +257,17 @@ def __setStatus(self, status, started=None, ended=None):
self.__status = status
self.__dbh.scanInstanceSet(self.__scanId, started, ended, status)

def __startScan(self, threaded=True):
def __startScan(self):
"""Start running a scan.
Args:
threaded (bool): whether to thread modules
"""
aborted = False

self.__setStatus("STARTING", time.time() * 1000, None)
self.__sf.status(f"Scan [{self.__scanId}] for '{self.__target.targetValue}' initiated.")

if threaded:
self.eventQueue = queue.Queue()
self.eventQueue = queue.Queue()

self.__sharedThreadPool.start()

# moduleList = list of modules the user wants to run
self.__sf.debug(f"Loading {len(self.__moduleList)} modules ...")
Expand Down Expand Up @@ -305,6 +305,7 @@ def __startScan(self, threaded=True):
mod.setScanId(self.__scanId)
mod.setup(self.__sf, self.__modconfig[modName])
mod.setDbh(self.__dbh)
mod.setSharedThreadPool(self.__sharedThreadPool)
except Exception:
self.__sf.error(f"Module {modName} initialization failed: {traceback.format_exc()}")
mod.errorState = True
Expand Down Expand Up @@ -343,13 +344,12 @@ def __startScan(self, threaded=True):
continue

# Set up the outgoing event queue
if threaded:
try:
mod.outgoingEventQueue = self.eventQueue
mod.incomingEventQueue = queue.Queue()
except Exception as e:
self.__sf.error(f"Module {modName} event queue setup failed: {e}")
continue
try:
mod.outgoingEventQueue = self.eventQueue
mod.incomingEventQueue = queue.Queue()
except Exception as e:
self.__sf.error(f"Module {modName} event queue setup failed: {e}")
continue

self.__moduleInstances[modName] = mod
self.__sf.status(f"{modName} module loaded.")
Expand All @@ -365,19 +365,6 @@ def __startScan(self, threaded=True):
# sort modules by priority
self.__moduleInstances = OrderedDict(sorted(self.__moduleInstances.items(), key=lambda m: m[-1]._priority))

if not threaded:
# Register listener modules and then start all modules sequentially
for module in list(self.__moduleInstances.values()):
for listenerModule in list(self.__moduleInstances.values()):
# Careful not to register twice or you will get duplicate events
if listenerModule in module._listenerModules:
continue
# Note the absence of a check for whether a module can register
# to itself. That is intentional because some modules will
# act on their own notifications (e.g. sfp_dns)!
if listenerModule.watchedEvents() is not None:
module.registerListener(listenerModule)

# Now we are ready to roll..
self.__setStatus("RUNNING")

Expand All @@ -387,13 +374,8 @@ def __startScan(self, threaded=True):
psMod.setTarget(self.__target)
psMod.setDbh(self.__dbh)
psMod.clearListeners()
if threaded:
psMod.outgoingEventQueue = self.eventQueue
psMod.incomingEventQueue = queue.Queue()
else:
for mod in list(self.__moduleInstances.values()):
if mod.watchedEvents() is not None:
psMod.registerListener(mod)
psMod.outgoingEventQueue = self.eventQueue
psMod.incomingEventQueue = queue.Queue()

# Create the "ROOT" event which un-triggered modules will link events to
rootEvent = SpiderFootEvent("ROOT", self.__targetValue, "", None)
Expand Down Expand Up @@ -422,13 +404,9 @@ def __startScan(self, threaded=True):
break

# start threads
if threaded and not aborted:
if not aborted:
self.waitForThreads()

if not threaded:
for mod in list(self.__moduleInstances.values()):
mod.finish()

if aborted:
self.__sf.status(f"Scan [{self.__scanId}] aborted.")
self.__setStatus("ABORTED", None, time.time() * 1000)
Expand Down Expand Up @@ -514,18 +492,40 @@ def waitForThreads(self):
# tell the modules to stop
for mod in self.__moduleInstances.values():
mod._stopScanning = True
self.__sharedThreadPool.shutdown(wait=True)

def threadsFinished(self, log_status=False):
if self.eventQueue is None:
return True

modules_waiting = {
m.__name__: m.incomingEventQueue.qsize() for m in
self.__moduleInstances.values() if m.incomingEventQueue is not None
}
modules_waiting = dict()
for m in self.__moduleInstances.values():
try:
if m.incomingEventQueue is not None:
modules_waiting[m.__name__] = m.incomingEventQueue.qsize()
except Exception:
with suppress(Exception):
m.errorState = True
modules_waiting = sorted(modules_waiting.items(), key=lambda x: x[-1], reverse=True)
modules_running = [m.__name__ for m in self.__moduleInstances.values() if m.running]
modules_errored = [m.__name__ for m in self.__moduleInstances.values() if m.errorState]

modules_running = []
for m in self.__moduleInstances.values():
try:
if m.running:
modules_running.append(m.__name__)
except Exception:
with suppress(Exception):
m.errorState = True

modules_errored = []
for m in self.__moduleInstances.values():
try:
if m.errorState:
modules_errored.append(m.__name__)
except Exception:
with suppress(Exception):
m.errorState = True

queues_empty = [qsize == 0 for m, qsize in modules_waiting]

for mod in self.__moduleInstances.values():
Expand Down
1 change: 1 addition & 0 deletions spiderfoot/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from .db import SpiderFootDb
from .event import SpiderFootEvent
from .threadpool import SpiderFootThreadPool
from .plugin import SpiderFootPlugin
from .target import SpiderFootTarget
from .helpers import SpiderFootHelpers
Expand Down
Loading

0 comments on commit 1f35646

Please sign in to comment.