Skip to content

Commit

Permalink
Merge pull request #5 from abbishal/main
Browse files Browse the repository at this point in the history
Main
  • Loading branch information
shamimrezasohag authored Oct 11, 2024
2 parents c796c94 + c67ef80 commit 8901ff6
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 37 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ This script should now provide comprehensive DNS security analysis, including op

## Usage
Run the script from the command line by specifying the input file containing domain names and other optional parameters:
```python3 dns_security_analysis_tool.py --domains-file domains.txt --format csv --output dns_security_report```
```python3 dns_security_analysis_tool.py domains.txt --format csv --output dns_security_report```

### Command-Line Arguments
- `--domains-file`: Path to the file containing the list of domains for analysis.
- `domains-file`: Path to the file containing the list of domains for analysis.
- `--dns-server`: (Optional) DNS server to use for queries (default: 8.8.8.8).
- `--format`: (Optional) output report format (`json`, `csv`, `html`).
- `--output`: (Optional) Name of the output file (without extension).
Expand Down
56 changes: 21 additions & 35 deletions dns_security_analysis_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import logging
import pandas as pd
import subprocess
import requests
import re
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor, as_completed
Expand All @@ -27,37 +26,27 @@ def query_all_records(self, domain):
# Query multiple DNS record types for a given domain
record_types = ['A', 'AAAA', 'MX', 'TXT', 'NS', 'CNAME', 'SOA', 'SPF', 'DKIM', 'DNSKEY', 'DS']
results = {'domain': domain}

for record_type in record_types:
record, response_code = self.query_record(domain, record_type)
if response_code == "NXDOMAIN":
results['Error'] = 'Domain does not exist'
break # Skip remaining checks as domain doesn't exist
if record:
results[record_type] = record
results[f'{record_type}_response_code'] = response_code

for record_type in record_types:
results[record_type], results[f'{record_type}_response_code'] = self.query_record(domain, record_type)
results['PTR'], _ = self.check_reverse_dns(results.get('A', '') + results.get('AAAA', ''))
results[record_type]= self.query_record(domain, record_type)
results['PTR'] = self.check_reverse_dns(results.get('A', '') + results.get('AAAA', ''))
results['Open Resolver'] = self.check_open_resolver(domain)
results['DNSSEC'] = self.check_dnssec(domain, results.get('NS', ''))
results['Anomalies'] = self.analyze_records(results)
# Empty and unnecessary data filtering
results = {k: v for k, v in results.items() if v}
return results
for record_type in record_types:
record, response_code = self.query_record(domain, record_type)
if record: # Only add non-empty records
results[record_type] = record
results[f'{record_type}_response_code'] = response_code

def query_record(self, domain, record_type):
# Perform a DNS query for a specific record type
try:
answers = self.resolver.resolve(domain, record_type)
return "; ".join([answer.to_text() for answer in answers]), "NOERROR"
except dns.resolver.NXDOMAIN:
return "Domain does not exist", "NXDOMAIN"
return "; ".join([answer.to_text() for answer in answers])
# removed NXDOMAIN as it doesn't require further processing
except Exception as e:
return "", str(e)
# instead of return error as result that would be logged as error
logging.error(f"Error querying {record_type} record for {domain}: {e}")
return ""

def check_reverse_dns(self, ip_addresses):
# Perform reverse DNS lookups for given IP addresses/got from the A record
Expand All @@ -69,7 +58,7 @@ def check_reverse_dns(self, ip_addresses):
ptr_records.extend([record.to_text() for record in ptr_record])
except Exception as e:
ptr_records.append(str(e))
return "; ".join(ptr_records), "NOERROR"
return "; ".join(ptr_records)

# Revised the logic to check the open_dns_resolver, Check if a DNS server is an open resolver using the 'dig' command
def check_open_resolver(self, domain):
Expand All @@ -79,7 +68,7 @@ def check_open_resolver(self, domain):
capture_output=True,
text=True
)
return "Open Resolver" if "ANSWER" in result.stdout else "Not an Open Resolver"
return "Open Resolver" if "ANSWER" in result.stdout else ""
except subprocess.CalledProcessError:
return "Check Failed"

Expand Down Expand Up @@ -133,43 +122,40 @@ def generate_report(all_data, output_format, output_filename):

def process_domains(domains, dns_server, output_format, output_filename):

def is_valid_domain(domain):
pattern = r'^((?!-)[A-Za-z0-9-]{1,63}(?<!-)\.)+[A-Za-z]{2,6}$'
return re.match(pattern, domain) is not None

pattern = r'^((?!-)[A-Za-z0-9-]{1,63}(?<!-)\.)+[A-Za-z]{2,6}$'
# ... inside your process_domains function ...
valid_domains = [domain for domain in domains if is_valid_domain(domain)]

valid_domains = [domain for domain in domains if re.match(pattern, domain) is not None and domain.strip()]

# Process a list of domains for DNS security analysis
tool = DNSQueryTool(dns_server)
all_results = []
max_threads = 10 # You can adjust the number of threads

# Filter out empty domains and prepare for progress tracking
filtered_domains = [domain for domain in domains if domain.strip()]
skipped_domains_count = len(domains) - len(filtered_domains)
skipped_domains_count = len(domains) - len(valid_domains)

with tqdm(total=len(filtered_domains), desc="Analyzing Domains") as pbar:
with tqdm(total=len(valid_domains), desc="Analyzing Domains") as pbar:
with ThreadPoolExecutor(max_workers=max_threads) as executor:
# Create a future for each domain in the filtered list
futures = {executor.submit(tool.query_all_records, domain): domain for domain in filtered_domains}
futures = {executor.submit(tool.query_all_records, domain): domain for domain in valid_domains}

for future in as_completed(futures):
dns_results = future.result()
all_results.append(dns_results)
pbar.update(1)

if skipped_domains_count > 0:
logging.warning(f"Skipped {skipped_domains_count} empty domain rows")
logging.warning(f"Skipped {skipped_domains_count} invalid domain rows")

generate_report(all_results, output_format, output_filename)

def main():
parser = argparse.ArgumentParser(description='Comprehensive DNS Security Analysis Tool')
parser.add_argument('--domains-file', required=True, help='Input file for bulk domain security analysis')
parser.add_argument('domains-file', help='Input file for bulk domain security analysis')
parser.add_argument('--dns-server', default='8.8.8.8', help='DNS server to use for queries')
parser.add_argument('--format', default='json', choices=['json', 'csv', 'html'], help='Format of the security report')
parser.add_argument('--output', default='dns_security_report', help='Output file name for the DNS security report')
parser.add_argument('--output', '-o', default='dns_security_report', help='Output file name for the DNS security report')
args = parser.parse_args()

with open(args.domains_file, 'r') as file:
Expand Down

0 comments on commit 8901ff6

Please sign in to comment.