Skip to content

Commit

Permalink
error handling for async scanning
Browse files Browse the repository at this point in the history
  • Loading branch information
juerkkil committed Dec 26, 2024
1 parent b96ea0f commit 36c74ad
Showing 1 changed file with 21 additions and 28 deletions.
49 changes: 21 additions & 28 deletions secheaders/secheaders.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,7 @@ def main():
parser.print_usage(sys.stderr)
sys.exit(1)

target_list = []
if args.target_list:
with open(args.target_list, encoding='utf-8') as file:
target_list = [line.rstrip() for line in file]
if args.url:
target_list.append(args.url)

target_list = list(set(target_list)) # Remove possible duplicates

if args.url:

try:
res = scan_target(args.url, args)
except SecurityHeadersException as e:
Expand All @@ -55,20 +45,18 @@ def main():


def async_scan_done(scan):
try:
res, args = scan.result()
res, args = scan.result()
if 'error' in res:
print(f"Scanning target {res['target']}...")
print(f"Error: {res['error']}\n")
else:
print(cmd_utils.output_text(res['target'], res['headers'], res['https'], args.no_color, args.verbose))
except SecurityHeadersException as e:
print(e, file=sys.stderr)


def scan_target(url, args):
try:
header_check = SecurityHeaders(url, args.max_redirects, args.insecure)
header_check.fetch_headers()
headers = header_check.check_headers()
except SecurityHeadersException as e:
raise e
header_check = SecurityHeaders(url, args.max_redirects, args.insecure)
header_check.fetch_headers()
headers = header_check.check_headers()

if not headers:
raise FailedToFetchHeaders("Failed to fetch headers")
Expand All @@ -78,33 +66,38 @@ def scan_target(url, args):


def scan_target_wrapper(url, args):
# A bit of a dirty hack to pass args to the done callback
return scan_target(url, args), args
try:
# Return the args also for the callback function
return scan_target(url, args), args
except SecurityHeadersException as e:
return {'target': url, 'error': str(e)}, args


async def scan_multiple_targets(args):
with open(args.target_list, encoding='utf-8') as file:
targets = [line.rstrip() for line in file]

targets = list(set(targets)) # Remove possible duplicates
loop = asyncio.get_event_loop()
tasks = []
for target in targets:
if args.json:
task = loop.run_in_executor(None, scan_target, target, args)
else:
task = loop.run_in_executor(None, scan_target_wrapper, target, args)
task = loop.run_in_executor(None, scan_target_wrapper, target, args)
if not args.json:
# Output result of each scan immediately
task.add_done_callback(async_scan_done)
tasks.append(task)

res = []
for task in tasks:
await task

# When json output, aggregate the results and output the json dump at the end
if args.json:
for t in tasks:
res.append(t.result())
result, _args = t.result()
res.append(result)

print(str(res))
print(json.dumps(res, indent=2))

if __name__ == "__main__":
main()

0 comments on commit 36c74ad

Please sign in to comment.