Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IP and email scoring bot #1

Open
wants to merge 11 commits into
base: 7.0.x
Choose a base branch
from
314 changes: 314 additions & 0 deletions sopel/modules/emailcheck.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,314 @@
# coding=utf-8
"""
emailcheck.py - Watch oper messages for new nicks being registered
Copyright © 2021, Kufat <[email protected]>
Based on existing sopel code.
Licensed under the Eiffel Forum License 2.
"""

import logging
import re
import urllib

import sqlalchemy.sql

from dataclasses import dataclass

from sopel import db, module
from sopel.config.types import FilenameAttribute, StaticSection, ValidatedAttribute, ListAttribute
from sopel.tools import events, target, Identifier

from sqlalchemy import Column, String, Float, Boolean, TIMESTAMP
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.declarative import declarative_base

try:
from ip import get_exemption
from ip import ipqs_lock
Kufat marked this conversation as resolved.
Show resolved Hide resolved
except:
Kufat marked this conversation as resolved.
Show resolved Hide resolved
import threading

def get_exemption(ip):
return "Can't access exemptions; failing safe"

ipqs_lock = threading.Lock()


EMAIL_REGEX = re.compile(r"([a-zA-Z0-9_.+-]+)@([a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+)")
IRCCLOUD_USER_REGEX = re.compile(r"[us]id[0-9]{4,}")
DOMAIN_LEN = 50

DEFAULT_EXEMPT_SUFFIXES = {
Kufat marked this conversation as resolved.
Show resolved Hide resolved
"@gmail.com",
"@hotmail.com",
"@protonmail.com",
".edu"
}

KILL_STR = ":Use of disposable email service for nick registration"

LOGGER = logging.getLogger(__name__)

BASE = declarative_base()

safe_mode = True
db_populated = False

#SQLAlchemy container class
class KnownEmails(BASE):
__tablename__ = 'known_emails'
domain = Column(String(DOMAIN_LEN), primary_key=True)
first_nick = Column(String(40))
score = Column(Float)
flag_disposable = Column(Boolean)
flag_recent_abuse = Column(Boolean)
first_seen = Column(TIMESTAMP, server_default=sqlalchemy.sql.func.now())

class EmailCheckSection(StaticSection):
IPQS_key = ValidatedAttribute('IPQS_key')
disallow_threshold = ValidatedAttribute("disallow_threshold", parse=float, default=50.0)
malicious_threshold = ValidatedAttribute("malicious_threshold", parse=float, default=75.0)
gline_time = ValidatedAttribute('gline_time', default="24h")
exempt_suffixes = ListAttribute("exempt_suffixes")
warn_chans = ListAttribute("warn_chans")
protect_chans = ListAttribute("protect_chans")
Kufat marked this conversation as resolved.
Show resolved Hide resolved

def configure(config):
config.define_section('emailcheck', EmailCheckSection)
config.emailcheck.configure_setting('IPQS_key',
'Access key for IPQS service')
config.emailcheck.configure_setting('disallow_threshold',
'Addresses with scores >= this will be disallowed; no punishment')
config.emailcheck.configure_setting('malicious_threshold',
'Addresses with scores >= this will be interpreted as attacks')
config.emailcheck.configure_setting('gline_time',
'Users attempting to register with malicious addresses will be '
'glined for this priod of time.')
config.emailcheck.configure_setting('exempt_suffixes',
'Suffixes (TLD, whole domain, etc.) to exempt from checking')
config.emailcheck.configure_setting('warn_chans',
'List of channels to warn when a suspicious user is detected. '
'May be empty.')
config.emailcheck.configure_setting('protect_chans',
'List of channels to +R after malicious attempt to reg. '
'May be empty.')

def setup(bot):
bot.config.define_section('emailcheck', EmailCheckSection)

@dataclass
class Email:
user: str
domain: str
def get_address(self):
return f'{self.user}@{self.domain}'
def __str__(self):
return self.get_address()
def __post_init__(self):
self.domain = self.domain.lower()

@dataclass
class DomainInfo:
score: float
flag_disposable: bool
flag_recent_abuse: bool

def alert(bot, alert_msg: str, log_err: bool = False):
for channel in bot.config.emailcheck.warn_chans:
bot.say(alert_msg, channel)
if log_err:
LOGGER.error(alert_msg)

def add_badmail(bot, email):
#Right now we're BADMAILing whole domains. This might change.
if safe_mode:
LOGGER.info(f"SAFE MODE: Would badmail {email}")
else:
bot.write("NICKSERV", "badmail", "add", f'*@{email.domain}')

def fdrop(bot, nick: str):
if safe_mode:
LOGGER.info(f"SAFE MODE: Would fdrop {nick}")
else:
bot.write("NICKSERV", "fdrop", nick.lower())

def gline_ip(bot, ip: str, duration: str):
if safe_mode:
LOGGER.info(f"SAFE MODE: Would gline {ip} for {duration}")
else:
bot.write("GLINE", f'*@{ip}', duration, KILL_STR)

def gline_username(bot, nick: str, duration: str):
Kufat marked this conversation as resolved.
Show resolved Hide resolved
if known_user := bot.users.get(Identifier(nick)):
username = known_user.user.lower() # Should already be lowercase
if IRCCLOUD_USER_REGEX.match(username):
if safe_mode:
LOGGER.info(f"SAFE MODE: Would gline {username} for {duration}")
else:
bot.write("GLINE", f'{username}@*', duration, KILL_STR)
return
else:
alert(bot, f"User {nick} had unexpected non-IRCCloud username {username}", true)
else:
alert(bot, f"Couldn't find irccloud uid/sid for {nick} to G-line!", true)
kill_nick(bot, nick) # Something went wrong with G-line, so fall back to /kill

def kill_nick(bot, nick: str):
if safe_mode:
LOGGER.info(f"SAFE MODE: Would kill {nick}")
else:
bot.write("KILL", nick.lower(), KILL_STR)

def gline_strategy(bot, nick):
if (known_user := bot.users.get(Identifier(nick))):
if hasattr(known_user, "ip"):
ip = known_user.ip
exemption = get_exemption(ip)
Kufat marked this conversation as resolved.
Show resolved Hide resolved
if exemption:
if "irccloud" in exemption.lower():
# IRCCloud special case: ban uid/sid
return ["gline_username", known_user.user]
else: # Fully exempt, so no g-line
return None
else: # No exemption
return ["gline_ip", ip]
else: # Fail safely
return None

def gline_or_kill(bot, nick: str, duration: str):
if strategy := gline_strategy(bot, nick):
if strategy[0] == "gline_ip":
gline_ip(bot, strategy[1], duration)
elif strategy[0] == "gline_username":
gline_username(bot, strategy[1], duration)
else:
alert(bot, f"Unknown strategy {strategy} for nick {nick}", true)
kill_nick(bot, nick) # safest option
else:
kill_nick(bot, nick) # duration ignored

def protect_chans(bot):
Kufat marked this conversation as resolved.
Show resolved Hide resolved
if safe_mode:
LOGGER.info(f"SAFE MODE: Would protect chans")
return
for chan in bot.config.emailcheck.protect_chans:
bot.write("MODE", chan, "+R")

def malicious_response(bot, nick: str, email):
fdrop(bot, nick)
add_badmail(bot, email)
bot.say(f"You have been temporarily banned from this network because {email.domain} "
"has a history of spam or abuse, and/or is a disposable email domain. "
"If this is a legitimate domain, contact staff for assistance.",
nick.lower())
gline_or_kill(bot, nick, bot.config.emailcheck.gline_time)
protect_chans(bot)
alert(bot, f"ALERT: User {nick} attempted to register a nick with disposable/spam domain {email.domain}!")

def disallow_response(bot, nick: str, email):
fdrop(bot, nick)
add_badmail(bot, email)
bot.say(f"Your registration has been disallowed because {email.domain} appears to be suspicious. "
"If this is a legitimate domain, contact staff for assistance.",
nick.lower())
alert(bot, f"WARNING: User {nick} attempted to register a nick with suspicious domain {email.domain}.")

def fetch_IPQS_email_score(
email_addr: str,
key: str,
fast: bool = True
) -> tuple[float, bool, bool]: #score, disposable, has recent abuse flag set
'''Perform lookup on a specific email adress using ipqualityscore.com'''
email_str = urllib.parse.quote(email_addr)
faststr = str(bool(fast)).lower() #lower + handle None and other garbage
params = urllib.parse.urlencode({'fast': faststr})
with urllib.request.urlopen(
f"https://ipqualityscore.com/api/json/email/{key}/{email_str}?{params}") as url:
data = json.loads(url.read().decode())
LOGGER.debug(data)
Kufat marked this conversation as resolved.
Show resolved Hide resolved
if not data['success']:
errstr = f"{email_addr} lookup failed with {data['message']}"
LOGGER.error(errstr)
raise RuntimeError(errstr)
return (data['fraud_score'], data["disposable"], data["recent_abuse"])

def get_email_score_from_db(session, email):
query_result = session.query(KnownEmails)\
.filter(KnownEmails.domain == email.domain)\
.one_or_none()
if query_result:
#Any known problematic provider should've been BADMAILed by now, but...
return DomainInfo(query_result.score,
query_result.flag_disposable,
query_result.flag_recent_abuse)

def store_email_score_in_db(session, email, nick, IPQSresult):
new_known_email = KnownEmails(domain= email.doman[:DOMAIN_LEN],
first_nick= nick,
score= IPQSresult[0],
flag_disposable= IPQSresult[1],
flag_recent_abuse= IPQSresult[2])
session.add(new_known_email)
session.commit()

def retrieve_score(bot, email, nick):
session = bot.db.session()
try:
global db_populated
if not db_populated:
BASE.metadata.create_all(bot.db.engine)
Kufat marked this conversation as resolved.
Show resolved Hide resolved
db_populated = True
if retval := get_email_score_from_db(session, email):
return retval
else:
if IPQSresult := fetch_IPQS_email_score(email, bot.config.emailcheck.IPQS_key):
store_email_score_in_db(session, email, nick, IPQSresult)
return IPQSresult
else: #Shouldn't be possible
raise RuntimeError(f"Couldn't retrieve IPQS for {email}!")
except SQLAlchemyError:
session.rollback()
raise
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you have more than one function doing DB calls, I would suggest a wrapper that lets you do with ... syntax for transactions that automatically commits and rolls back.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do want to bang on this a bit more and figure out exactly what the hell I'm doing. There should probably also be a finally with session.close()


def check_email(bot, email, nick):
if any(map(email.endswith, DEFAULT_EXEMPT_SUFFIXES)):
#email is exempt
LOGGER.info(f'Email {email} used by {nick} is on the exemption list.')
return None # No lookup, no result
#Check database
else:
return retrieve_score(bot, email, nick)

@module.require_owner
@module.commands('toggle_safe_email')
def toggle_safe(bot, trigger):
global safe_mode
safe_mode = not safe_mode
return bot.reply(f"Email check module safe mode now {'on' if safe_mode else 'off'}")
Kufat marked this conversation as resolved.
Show resolved Hide resolved

# <NickServ> ExampleAccount REGISTER: ExampleNick to [email protected]
# (note the 0x02 bold chars)
@module.rule(r'(\S*)\s*REGISTER: \u0002?([\S]+?)\u0002? to \u0002?(\S+)@(\S+?)\u0002?$')
@module.event("PRIVMSG")
@module.priority("high")
def handle_ns_register(bot, trigger):
if "nickserv" != trigger.nick.lower():
LOGGER.warning(f"Fake registration notice from {trigger.nick.lower()}!")
return
#It's really from nickserv.
_, nick, email_user, email_domain = trigger.groups()
email = Email(email_user, email_domain)
try:
if res := check_email(bot, email_user, email_domain, nick): #may be None, in which case we're done
if res.flag_disposable or (
res.score >= bot.config.emailcheck.malicious_threshold):
malicious_response(bot, nick, email)
elif res.flag_recent_abuse or (
res.score >= bot.config.emailcheck.disallow_threshold):
disallow_response(bot, nick, email)
else:
#already logged server response
return LOGGER.debug(f'Registration of {nick} to {email} OK.')
except:
alert(f"Lookup for f{nick} with email @f{domain} failed! Keep an eye on them.")

Loading