Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft for OTP login #725

Draft
wants to merge 11 commits into
base: master
Choose a base branch
from
35 changes: 35 additions & 0 deletions djoser/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,41 @@ def __getattribute__(self, item):
"LOGIN_SERIALIZER": "djoser.webauthn.serializers.WebauthnLoginSerializer",
}
),
"PASSWORDLESS": {
"SHORT_TOKEN_LENGTH": 6,
"LONG_TOKEN_LENGTH": 64,
"SHORT_TOKEN_CHARS": "0123456789",
"LONG_TOKEN_CHARS": "abcdefghijklmnopqrstuvwxyz0123456789",
"TOKEN_LIFETIME": 600,
"REGISTER_NONEXISTENT_USERS": True,
"EMAIL_FIELD_NAME": "email",
"MOBILE_FIELD_NAME": "mobile",
# If true, an attempt to redeem a token with the wrong token type
# will count for the times a token has been used
"INCORRECT_SHORT_TOKEN_REDEEMS_TOKEN": False,
"ALLOWED_PASSWORDLESS_METHODS": ["EMAIL"], # or ["MOBILE"] or ["EMAIL", "MOBILE"]
"MAX_TOKEN_USES": 1,
"GENERATORS": ObjDict({
"username_generator": "djoser.passwordless.utils.username_generator",
}),
"EMAIL": ObjDict ({
"passwordless_request": "djoser.passwordless.email.PasswordlessRequestEmail",
}),
"SERIALIZERS": ObjDict({
"passwordless_request_email_token": "djoser.passwordless.serializers.EmailPasswordlessAuthSerializer",
"passwordless_request_mobile_token": "djoser.passwordless.serializers.MobilePasswordlessAuthSerializer",
"passwordless_token_exchange": "djoser.passwordless.serializers.PasswordlessTokenExchangeSerializer",
}),
"PERMISSIONS": ObjDict({
"passwordless_token_exchange": ["rest_framework.permissions.AllowAny"],
"passwordless_token_request": ["rest_framework.permissions.AllowAny"],
}),
"DECORATORS": ObjDict({
"token_request_rate_limit_decorator": "djoser.passwordless.utils.token_request_limiter",
"token_redeem_rate_limit_decorator": "djoser.passwordless.utils.token_redeem_limiter",
}),
"SMS_SENDER": "djoser.passwordless.sms.send_sms",
}
}

SETTINGS_TO_IMPORT = ["TOKEN_MODEL", "SOCIAL_AUTH_TOKEN_STRATEGY"]
Expand Down
3 changes: 3 additions & 0 deletions djoser/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,6 @@ class Messages:
INVALID_PASSWORD_ERROR = _("Invalid password.")
EMAIL_NOT_FOUND = _("User with given email does not exist.")
CANNOT_CREATE_USER_ERROR = _("Unable to create account.")

TOKEN_SENT= _("A token has been sent to you")
CANNOT_SEND_TOKEN= _("Unable to send token")
Empty file added djoser/passwordless/__init__.py
Empty file.
5 changes: 5 additions & 0 deletions djoser/passwordless/apps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from django.apps import AppConfig


class PasswordlessConfig(AppConfig):
name = "djoser.passwordless"
16 changes: 16 additions & 0 deletions djoser/passwordless/email.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from templated_mail.mail import BaseEmailMessage

from djoser import utils
from djoser.conf import settings

class PasswordlessRequestEmail(BaseEmailMessage):
template_name = "email/passwordless_request.html"

def get_context_data(self):
context = super().get_context_data()
user = context.get("user")

if settings.PASSWORDLESS.get("PASSWORDLESS_EMAIL_LOGIN_URL", None):
# Eg magic links / Deep links for mobile apps
context["url"] = settings.PASSWORDLESS["PASSWORDLESS_EMAIL_LOGIN_URL"].format(**context)
return context
29 changes: 29 additions & 0 deletions djoser/passwordless/migrations/0001_initial.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Generated by Django 3.2.18 on 2023-04-23 07:18

from django.conf import settings
from django.db import migrations, models
import django.db.models.deletion


class Migration(migrations.Migration):

initial = True

dependencies = [
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
]

operations = [
migrations.CreateModel(
name='PasswordlessChallengeToken',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('token', models.TextField(unique=True)),
('short_token', models.TextField()),
('created_at', models.DateTimeField(auto_now=True)),
('uses', models.IntegerField(default=0)),
('token_request_identifier', models.TextField()),
('user', models.ForeignKey(null=True, on_delete=django.db.models.deletion.CASCADE, related_name='djoser_passwordless_tokens', to=settings.AUTH_USER_MODEL)),
],
),
]
Empty file.
45 changes: 45 additions & 0 deletions djoser/passwordless/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from django.db import models
from django.conf import settings
from django.utils.timezone import now, timedelta


class PasswordlessChallengeTokenManager(models.Manager):
def delete_expired(self, token_lifetime_seconds, max_token_uses):
return self.filter(
models.Q(created_at__lt=now() - timedelta(seconds=token_lifetime_seconds))
| models.Q(uses__gte=max_token_uses)
).delete()

class PasswordlessChallengeToken(models.Model):
objects = PasswordlessChallengeTokenManager()

# We will deliver two tokens. One which is long and one which is short.
# The short one needs to be redeemed with the same identifier that it was
# sent to. This is a mitigation to brute force attacks, that might be able
# to break a short token.
# The long token can be redeemed without any other information, as it is
# significantly harder to brute force.

token = models.TextField(unique=True)
short_token = models.TextField()
created_at = models.DateTimeField(auto_now=True)
uses = models.IntegerField(default=0)
token_request_identifier = models.TextField()

user = models.ForeignKey(
settings.AUTH_USER_MODEL,
related_name="djoser_passwordless_tokens",
null=True,
on_delete=models.CASCADE,
)

def redeem(self):
self.uses += 1
return self.save()

def is_valid(self, token_lifetime_seconds, max_token_uses):
if self.created_at + timedelta(seconds=token_lifetime_seconds) < now():
return False
if self.uses >= max_token_uses:
return False
return True
124 changes: 124 additions & 0 deletions djoser/passwordless/serializers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
from django.contrib.auth import get_user_model
from rest_framework import serializers
from djoser.conf import settings
from .utils import username_generator
from djoser.constants import Messages
from .services import PasswordlessTokenService

User = get_user_model()

# Were serializing user model to ensure the same validation and cleaning up logic defined in the model is
# also happening in these fields. This is especially important for mobile numbers fields, which
# can be given in a variety of formats.
class AbstractPasswordlessSignupSerializer(serializers.ModelSerializer):
@property
def token_request_identifier_field(self):
raise NotImplementedError

def find_user_by_identifier(self, identifier_value):
try:
return User.objects.get(**{self.token_request_identifier_field+'__iexact': identifier_value})
except User.DoesNotExist:
return None

def validate(self, data):
identifier_value = data[self.token_request_identifier_field]
user = self.find_user_by_identifier(identifier_value)

if not settings.PASSWORDLESS["REGISTER_NONEXISTENT_USERS"] and not user:
raise serializers.ValidationError(Messages.CANNOT_SEND_TOKEN)
return super().validate(data)

def create(self, validated_data):
identifier_value = validated_data[self.token_request_identifier_field]
user = self.find_user_by_identifier(identifier_value)

if settings.PASSWORDLESS["REGISTER_NONEXISTENT_USERS"] is True and not user:
user = User.objects.create(**{
self.token_request_identifier_field: identifier_value,
# In many cases, the username is mandatory
User.USERNAME_FIELD: settings.PASSWORDLESS["GENERATORS"].username_generator(),
})
user.set_unusable_password()
user.save()

return user

class EmailPasswordlessAuthSerializer(AbstractPasswordlessSignupSerializer):
class Meta:
model = User
fields = (settings.PASSWORDLESS["EMAIL_FIELD_NAME"],)

@property
def token_request_identifier_field(self):
return settings.PASSWORDLESS["EMAIL_FIELD_NAME"]


class MobilePasswordlessAuthSerializer(AbstractPasswordlessSignupSerializer):
class Meta:
model = User
fields = (settings.PASSWORDLESS["MOBILE_FIELD_NAME"],)

@property
def token_request_identifier_field(self):
return settings.PASSWORDLESS["MOBILE_FIELD_NAME"]


# EXCHANGE (Turning a OTP into an Auth token)
# Again we're serializing the user model to ensure we get the basic validation and cleaning up
# logic (eg. standardizing phone numbers) as setup by the model fields
class PasswordlessTokenExchangeSerializer(serializers.ModelSerializer):

default_error_messages = {
"invalid_credentials": settings.CONSTANTS.messages.INVALID_CREDENTIALS_ERROR
}

token = serializers.CharField(required=True)

class Meta:
model = User
fields = ('token',)
if "EMAIL" in settings.PASSWORDLESS["ALLOWED_PASSWORDLESS_METHODS"]:
fields += (settings.PASSWORDLESS["EMAIL_FIELD_NAME"],)
if "MOBILE" in settings.PASSWORDLESS["ALLOWED_PASSWORDLESS_METHODS"]:
fields += (settings.PASSWORDLESS["MOBILE_FIELD_NAME"],)

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Make both fields optional
if "EMAIL" in settings.PASSWORDLESS["ALLOWED_PASSWORDLESS_METHODS"]:
self.fields[settings.PASSWORDLESS["EMAIL_FIELD_NAME"]].required = False
if "MOBILE" in settings.PASSWORDLESS["ALLOWED_PASSWORDLESS_METHODS"]:
self.fields[settings.PASSWORDLESS["MOBILE_FIELD_NAME"]].required = False
self.user = None

def validate(self, attrs):
super().validate(attrs)

if "EMAIL" in settings.PASSWORDLESS["ALLOWED_PASSWORDLESS_METHODS"]:
valid_mobile_token = PasswordlessTokenService.check_token(
attrs.get("token", None),
settings.PASSWORDLESS["EMAIL_FIELD_NAME"],
attrs.get(settings.PASSWORDLESS["EMAIL_FIELD_NAME"]),
)
else:
valid_mobile_token = None

if "MOBILE" in settings.PASSWORDLESS["ALLOWED_PASSWORDLESS_METHODS"]:
valid_email_token = PasswordlessTokenService.check_token(
attrs.get("token", None),
settings.PASSWORDLESS["MOBILE_FIELD_NAME"],
attrs.get(settings.PASSWORDLESS["MOBILE_FIELD_NAME"]),
)
else:
valid_email_token = None

# WARNING - We're not checking that the user is valid, because
# they just confirmed their email/mobile number. User will be
# marked as active in the Action.
valid_token = valid_mobile_token or valid_email_token
if valid_token:
self.user = valid_token.user
return attrs

self.fail("invalid_credentials")
82 changes: 82 additions & 0 deletions djoser/passwordless/services.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@

from django.utils.timezone import now, timedelta
from django.db.models import Q
from djoser.conf import settings
from django.db import transaction, IntegrityError
from .utils import (
create_challenge,
)
from .models import PasswordlessChallengeToken

class PasswordlessTokenService(object):
@staticmethod
def create_token(user, identifier_type):
# We need to ensure the token is unique, so we'll wrap it in a
# transaction that retries if the token is not unique.
tries = 0
PasswordlessChallengeToken.objects.delete_expired(settings.PASSWORDLESS["TOKEN_LIFETIME"], settings.PASSWORDLESS["MAX_TOKEN_USES"])
try:
with transaction.atomic():
return PasswordlessTokenService._generate_create_token(user, identifier_type)
except IntegrityError as exception:
if tries < 5:
tries += 1
return PasswordlessTokenService._generate_create_token(user, identifier_type)
else:
# If we've cannot generate a unique token after 5 tries, we'll
# raise the exception. Maybe add a message to the admin to cleanup
# expired stale tokens, or to increase the token length.
raise exception

@staticmethod
def _generate_create_token(user, identifier_type):
# Remove all tokens for this user when issuing a new one
user.djoser_passwordless_tokens.all().delete()
token = PasswordlessChallengeToken.objects.create(
token = create_challenge(settings.PASSWORDLESS["LONG_TOKEN_LENGTH"], settings.PASSWORDLESS["LONG_TOKEN_CHARS"]),
short_token = create_challenge(settings.PASSWORDLESS["SHORT_TOKEN_LENGTH"], settings.PASSWORDLESS["SHORT_TOKEN_CHARS"]),
token_request_identifier=identifier_type,
user=user
)
return token


@staticmethod
def check_token(challenge, identifier_field, identifier_value):
if not challenge:
return None
try:
token = PasswordlessChallengeToken.objects.get(
Q(token=challenge) | Q(
**{
"short_token": challenge,
"token_request_identifier": identifier_field,
"user__"+identifier_field: identifier_value,
}
)
)
except PasswordlessChallengeToken.DoesNotExist:
if identifier_value and identifier_field and settings.PASSWORDLESS["INCORRECT_SHORT_TOKEN_REDEEMS_TOKEN"]:
# If the token is not found, we'll check if the identifier_value
# and identifier_field match an existing token. If so, we'll increment the
# number of attempts for the user. If the user has reached the
# max number of attempts, we'll lock the user out.
try:
tokens = PasswordlessChallengeToken.objects.filter(
**{
"token_request_identifier": identifier_field,
"user__"+identifier_field: identifier_value,
}
)
for token in tokens:
token.redeem()
except PasswordlessChallengeToken.DoesNotExist:
pass

return None

if not token.is_valid(settings.PASSWORDLESS["TOKEN_LIFETIME"], settings.PASSWORDLESS["MAX_TOKEN_USES"]):
return None

token.redeem()
return token
5 changes: 5 additions & 0 deletions djoser/passwordless/sms.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
def send_sms(request, token):
# TODO
print("TODO: send_sms")
print(token)
pass
21 changes: 21 additions & 0 deletions djoser/passwordless/urls.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from django.urls import re_path

from . import views

urlpatterns = [
re_path(
r"^request/email/$",
views.PasswordlessEmailTokenRequestView.as_view(),
name="passwordless_email_signup_request",
),
re_path(
r"^request/mobile/$",
views.PasswordlessMobileTokenRequestView.as_view(),
name="passwordless_mobile_signup_request",
),
re_path(
r"^exchange/$",
views.ExchangePasswordlessTokenForAuthTokenView.as_view(),
name="passwordless_token_exchange",
)
]
Loading