Skip to content

Commit

Permalink
Feature: Threaded Modules (smicallef#1175)
Browse files Browse the repository at this point in the history
* threaded modules
  • Loading branch information
TheTechromancer authored Jul 4, 2021
1 parent d85b6d2 commit d9ad04c
Show file tree
Hide file tree
Showing 2 changed files with 210 additions and 50 deletions.
155 changes: 134 additions & 21 deletions sfscan.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@
import socket
import sys
import time
import queue
import traceback
from time import sleep
from copy import deepcopy
from collections import OrderedDict

import dns.resolver

Expand Down Expand Up @@ -180,6 +183,9 @@ def __init__(self, scanName, scanId, targetValue, targetType, moduleList, global

self.__setStatus("INITIALIZING", time.time() * 1000, None)

# Used when module threading is enabled
self.eventQueue = None

if start:
self.__startScan()

Expand Down Expand Up @@ -222,14 +228,20 @@ def __setStatus(self, status, started=None, ended=None):
self.__status = status
self.__dbh.scanInstanceSet(self.__scanId, started, ended, status)

def __startScan(self):
"""Start running a scan."""
def __startScan(self, threaded=True):
"""Start running a scan.
Args:
threaded (bool): whether to thread modules
"""
aborted = False

self.__setStatus("STARTING", time.time() * 1000, None)
self.__sf.status(f"Scan [{self.__scanId}] initiated.")

if threaded:
self.eventQueue = queue.Queue()

try:
# moduleList = list of modules the user wants to run
for modName in self.__moduleList:
Expand Down Expand Up @@ -276,22 +288,32 @@ def __startScan(self):
if self.__config['__outputfilter']:
mod.setOutputFilter(self.__config['__outputfilter'])

# Register the target with the module
mod.setTarget(self.__target)

if threaded:
# Set up the outgoing event queue
mod.outgoingEventQueue = self.eventQueue
mod.incomingEventQueue = queue.Queue()

self.__sf.status(modName + " module loaded.")

# Register listener modules and then start all modules sequentially
for module in list(self.__moduleInstances.values()):
# Register the target with the module
module.setTarget(self.__target)

for listenerModule in list(self.__moduleInstances.values()):
# Careful not to register twice or you will get duplicate events
if listenerModule in module._listenerModules:
continue
# Note the absence of a check for whether a module can register
# to itself. That is intentional because some modules will
# act on their own notifications (e.g. sfp_dns)!
if listenerModule.watchedEvents() is not None:
module.registerListener(listenerModule)
# sort modules by priority
self.__moduleInstances = OrderedDict(sorted(self.__moduleInstances.items(), key=lambda m: m[-1]._priority))

if not threaded:
# Register listener modules and then start all modules sequentially
for module in list(self.__moduleInstances.values()):

for listenerModule in list(self.__moduleInstances.values()):
# Careful not to register twice or you will get duplicate events
if listenerModule in module._listenerModules:
continue
# Note the absence of a check for whether a module can register
# to itself. That is intentional because some modules will
# act on their own notifications (e.g. sfp_dns)!
if listenerModule.watchedEvents() is not None:
module.registerListener(listenerModule)

# Now we are ready to roll..
self.__setStatus("RUNNING")
Expand All @@ -302,9 +324,13 @@ def __startScan(self):
psMod.setTarget(self.__target)
psMod.setDbh(self.__dbh)
psMod.clearListeners()
for mod in list(self.__moduleInstances.values()):
if mod.watchedEvents() is not None:
psMod.registerListener(mod)
if threaded:
psMod.outgoingEventQueue = self.eventQueue
psMod.incomingEventQueue = queue.Queue()
else:
for mod in list(self.__moduleInstances.values()):
if mod.watchedEvents() is not None:
psMod.registerListener(mod)

# Create the "ROOT" event which un-triggered modules will link events to
rootEvent = SpiderFootEvent("ROOT", self.__targetValue, "", None)
Expand All @@ -326,12 +352,16 @@ def __startScan(self):

# Check in case the user requested to stop the scan between modules
# initializing
for module in list(self.__moduleInstances.values()):
if module.checkForStop():
for mod in list(self.__moduleInstances.values()):
if mod.checkForStop():
self.__setStatus('ABORTING')
aborted = True
break

# start threads
if threaded and not aborted:
self.waitForThreads()

if aborted:
self.__sf.status(f"Scan [{self.__scanId}] aborted.")
self.__setStatus("ABORTED", None, time.time() * 1000)
Expand All @@ -347,3 +377,86 @@ def __startScan(self):
self.__setStatus("ERROR-FAILED", None, time.time() * 1000)

self.__dbh.close()

def waitForThreads(self):
counter = 0

try:
if not self.eventQueue:
return

# start one thread for each module
for mod in self.__moduleInstances.values():
mod.start()

# watch for newly-generated events
while True:

# log status of threads every 100 iterations
log_status = counter % 100 == 0
counter += 1

try:
sfEvent = self.eventQueue.get_nowait()
self.__sf.debug(f"waitForThreads() got event, {sfEvent.eventType}, from eventQueue.")
except queue.Empty:
# check if we're finished
if self.threadsFinished(log_status):
sleep(.1)
# but are we really?
if self.threadsFinished(log_status):
break
else:
# save on CPU
sleep(.01)
continue

if not isinstance(sfEvent, SpiderFootEvent):
raise TypeError(f"sfEvent is {type(sfEvent)}; expected SpiderFootEvent")

# for every module
for mod in self.__moduleInstances.values():
# if it's been aborted
if mod._stopScanning:
# break out of the while loop
raise AssertionError(f"{mod.__name__} requested stop")

# send it the new event if applicable
watchedEvents = mod.watchedEvents()
if sfEvent.eventType in watchedEvents or "*" in watchedEvents:
mod.incomingEventQueue.put(deepcopy(sfEvent))

except (KeyboardInterrupt, AssertionError) as e:
self.__sf.status(f"Scan [{self.__scanId}] aborted, {e}.")

finally:
# tell the modules to stop
for mod in self.__moduleInstances.values():
mod._stopScanning = True

def threadsFinished(self, log_status=False):
if self.eventQueue is None:
return True

modules_waiting = {m.__name__: m.incomingEventQueue.qsize() for m in self.__moduleInstances.values()}
modules_waiting = sorted(modules_waiting.items(), key=lambda x: x[-1], reverse=True)
modules_running = [m.__name__ for m in self.__moduleInstances.values() if m.running]
queues_empty = [qsize == 0 for m, qsize in modules_waiting]

if not modules_running and not queues_empty:
self.__sf.debug("Clearing queues for stalled/aborted modules.")
for mod in self.__moduleInstances.values():
try:
while True:
mod.incomingEventQueue.get_nowait()
except Exception:
pass

if log_status and modules_running:
events_queued = ", ".join([f"{mod}: {qsize:,}" for mod, qsize in modules_waiting[:5] if qsize > 0])
if events_queued:
self.__sf.info(f"Events queued: {events_queued}")

if all(queues_empty) and not modules_running:
return True
return False
105 changes: 76 additions & 29 deletions spiderfoot/plugin.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import logging
import threading
import queue
from time import sleep


class SpiderFootPlugin():
Expand Down Expand Up @@ -47,10 +50,18 @@ class SpiderFootPlugin():
errorState = False
# SOCKS proxy
socksProxy = None
# Queue for incoming events
incomingEventQueue = None
# Queue for produced events
outgoingEventQueue = None

def __init__(self):
"""Not really needed in most cases."""
pass

# Whether the module is currently executing
self.running = False
# Holds the thread object when module threading is enabled
self.thread = None

def _updateSocket(self, socksProxy):
"""Hack to override module's use of socket, replacing it with
Expand Down Expand Up @@ -194,6 +205,7 @@ def notifyListeners(self, sfEvent):
Raises:
TypeError: sfEvent argument was invalid type
"""

from spiderfoot import SpiderFootEvent

if not isinstance(sfEvent, SpiderFootEvent):
Expand Down Expand Up @@ -239,45 +251,53 @@ def notifyListeners(self, sfEvent):
break
prevEvent = prevEvent.sourceEvent

self._listenerModules.sort(key=lambda m: m._priority)
# output to queue if applicable
if self.outgoingEventQueue is not None:
self.outgoingEventQueue.put(sfEvent)
# otherwise, call other modules directly
else:
self._listenerModules.sort(key=lambda m: m._priority)

for listener in self._listenerModules:
if eventName not in listener.watchedEvents() and '*' not in listener.watchedEvents():
continue
for listener in self._listenerModules:
if eventName not in listener.watchedEvents() and '*' not in listener.watchedEvents():
continue

if storeOnly and "__stor" not in listener.__module__:
continue
if storeOnly and "__stor" not in listener.__module__:
continue

listener._currentEvent = sfEvent
listener._currentEvent = sfEvent

# Check if we've been asked to stop in the meantime, so that
# notifications stop triggering module activity.
if self.checkForStop():
return
# Check if we've been asked to stop in the meantime, so that
# notifications stop triggering module activity.
if self.checkForStop():
return

try:
listener.handleEvent(sfEvent)
except Exception as e:
self.log.exception(f"Module ({listener.__module__}) encountered an error: {e}")
try:
listener.handleEvent(sfEvent)
except Exception as e:
self.log.exception(f"Module ({listener.__module__}) encountered an error: {e}")

def checkForStop(self):
"""For modules to use to check for when they should give back control.
Returns:
bool
"""
if not self.__scanId__:
return False
if self.outgoingEventQueue and self.incomingEventQueue:
return self._stopScanning
else:
if not self.__scanId__:
return False

scanstatus = self.__sfdb__.scanInstanceGet(self.__scanId__)
scanstatus = self.__sfdb__.scanInstanceGet(self.__scanId__)

if not scanstatus:
return False
if not scanstatus:
return False

if scanstatus[5] == "ABORT-REQUESTED":
return True
if scanstatus[5] == "ABORT-REQUESTED":
return True

return False
return False

def watchedEvents(self):
"""What events is this module interested in for input. The format is a list
Expand Down Expand Up @@ -314,11 +334,38 @@ def handleEvent(self, sfEvent):
return

def start(self):
"""Kick off the work (for some modules nothing will happen here, but instead
the work will start from the handleEvent() method.
Will usually be overriden by the implementer.
"""
self.thread = threading.Thread(target=self.threadWorker)
self.thread.start()

return
def threadWorker(self):
try:
# create new database handle since we're in our own thread
from spiderfoot import SpiderFootDb
self.setDbh(SpiderFootDb(self.opts))

if not (self.incomingEventQueue and self.outgoingEventQueue):
self.log.error("Please set up queues before starting module as thread")
return

while not self.checkForStop():
try:
sfEvent = self.incomingEventQueue.get_nowait()
self.log.debug(f"{self.__name__}.threadWorker() got event, {sfEvent.eventType}, from incomingEventQueue.")
self.running = True
self.handleEvent(sfEvent)
self.running = False
except queue.Empty:
sleep(.3)
continue
except KeyboardInterrupt:
self.log.warning(f"Interrupted module {self.__name__}.")
self._stopScanning = True
except Exception as e:
import traceback
self.log.error(f"Exception ({e.__class__.__name__}) in module {self.__name__}."
+ traceback.format_exc())
self.errorState = True
finally:
self.running = False

# end of SpiderFootPlugin class

0 comments on commit d9ad04c

Please sign in to comment.