Skip to content

Commit

Permalink
[#2041] Added hard database constraints on User's identifier/login_ty…
Browse files Browse the repository at this point in the history
…pe fields
  • Loading branch information
Bart van der Schoor authored and stevenbal committed Mar 11, 2024
1 parent a0afa39 commit c9fb425
Show file tree
Hide file tree
Showing 3 changed files with 246 additions and 27 deletions.
88 changes: 88 additions & 0 deletions src/open_inwoner/accounts/migrations/0071_auto_20240125_1227.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
# Generated by Django 3.2.23 on 2024-01-25 11:27

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
("accounts", "0070_auto_20231205_1657"),
]

operations = [
migrations.AddConstraint(
model_name="user",
constraint=models.UniqueConstraint(
condition=models.Q(("login_type", "digid")),
fields=("bsn",),
name="unique_bsn_when_login_digid",
),
),
migrations.AddConstraint(
model_name="user",
constraint=models.UniqueConstraint(
condition=models.Q(
("login_type", "eherkenning"), models.Q(("rsin", ""), _negated=True)
),
fields=("rsin",),
name="unique_rsin_when_login_eherkenning",
),
),
migrations.AddConstraint(
model_name="user",
constraint=models.UniqueConstraint(
condition=models.Q(
("login_type", "eherkenning"), models.Q(("kvk", ""), _negated=True)
),
fields=("kvk",),
name="unique_kvk_when_login_eherkenning",
),
),
migrations.AddConstraint(
model_name="user",
constraint=models.UniqueConstraint(
condition=models.Q(("login_type", "oidc")),
fields=("oidc_id",),
name="unique_oidc_id_when_login_oidc",
),
),
migrations.AddConstraint(
model_name="user",
constraint=models.CheckConstraint(
check=models.Q(("bsn", ""), ("login_type", "digid"), _connector="OR"),
name="check_bsn_only_set_when_login_digid",
),
),
migrations.AddConstraint(
model_name="user",
constraint=models.CheckConstraint(
check=models.Q(
("oidc_id", ""), ("login_type", "oidc"), _connector="OR"
),
name="check_oidc_id_only_set_when_login_oidc",
),
),
migrations.AddConstraint(
model_name="user",
constraint=models.CheckConstraint(
check=models.Q(
models.Q(("kvk", ""), ("rsin", "")),
("login_type", "eherkenning"),
_connector="OR",
),
name="check_kvk_or_rsin_only_set_when_login_eherkenning",
),
),
migrations.AddConstraint(
model_name="user",
constraint=models.CheckConstraint(
check=models.Q(
models.Q(("kvk", ""), models.Q(("rsin", ""), _negated=True)),
models.Q(models.Q(("kvk", ""), _negated=True), ("rsin", "")),
models.Q(("login_type", "eherkenning"), _negated=True),
_connector="OR",
),
name="check_kvk_or_rsin_exclusive_when_login_eherkenning",
),
),
]
66 changes: 39 additions & 27 deletions src/open_inwoner/accounts/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,6 @@ class User(AbstractBaseUser, PermissionsMixin):
date_joined = models.DateTimeField(
verbose_name=_("Date joined"), default=timezone.now
)
# TODO shouldn't rsin & bsn be unique? (possibly fixed in model constraints)
# TODO fix rsin & bsn to not be both null AND blank (!)
rsin = models.CharField(
verbose_name=_("Rsin"), max_length=9, blank=True, default=""
)
Expand Down Expand Up @@ -246,31 +244,45 @@ class Meta:
& ~Q(login_type=LoginTypeChoices.eherkenning),
name="unique_email_when_not_digid_or_eherkenning",
),
# UniqueConstraint(
# fields=["bsn"],
# condition=Q(login_type=LoginTypeChoices.digid)
# # not quite sure if we want this (bsn shouldn't be null AND blank)
# & ~(Q(bsn="") | Q(bsn__isnull=True)),
# name="unique_bsn_when_digid",
# ),
# UniqueConstraint(
# fields=["rsin"],
# condition=Q(login_type=LoginTypeChoices.eherkenning)
# # not quite sure if we want this (rsin shouldn't be null AND blank)
# & ~(Q(rsin="") | Q(rsin__isnull=True)),
# name="unique_rsin_when_eherkenning",
# ),
# UniqueConstraint(
# fields=["oidc_id"],
# condition=Q(login_type=LoginTypeChoices.oidc),
# name="unique_bsn_when_digid",
# ),
# CheckConstraint(
# # maybe this is not correct?
# check=(Q(bsn="") | Q(bsn__isnull=True))
# | ~Q(login_type=LoginTypeChoices.digid),
# name="check_digid_bsn_required_when_digid",
# ),
UniqueConstraint(
fields=["bsn"],
condition=Q(login_type=LoginTypeChoices.digid),
name="unique_bsn_when_login_digid",
),
UniqueConstraint(
fields=["rsin"],
condition=Q(login_type=LoginTypeChoices.eherkenning) & ~Q(rsin=""),
name="unique_rsin_when_login_eherkenning",
),
UniqueConstraint(
fields=["kvk"],
condition=Q(login_type=LoginTypeChoices.eherkenning) & ~Q(kvk=""),
name="unique_kvk_when_login_eherkenning",
),
UniqueConstraint(
fields=["oidc_id"],
condition=Q(login_type=LoginTypeChoices.oidc),
name="unique_oidc_id_when_login_oidc",
),
# --- --- ---
CheckConstraint(
check=Q(bsn="") | Q(login_type=LoginTypeChoices.digid),
name="check_bsn_only_set_when_login_digid",
),
CheckConstraint(
check=Q(oidc_id="") | Q(login_type=LoginTypeChoices.oidc),
name="check_oidc_id_only_set_when_login_oidc",
),
CheckConstraint(
check=(Q(kvk="") & Q(rsin=""))
| Q(login_type=LoginTypeChoices.eherkenning),
name="check_kvk_or_rsin_only_set_when_login_eherkenning",
),
CheckConstraint(
check=((Q(kvk="") & ~Q(rsin="")) | (~Q(kvk="") & Q(rsin="")))
| ~Q(login_type=LoginTypeChoices.eherkenning),
name="check_kvk_or_rsin_exclusive_when_login_eherkenning",
),
]

def __init__(self, *args, **kwargs):
Expand Down
119 changes: 119 additions & 0 deletions src/open_inwoner/accounts/tests/test_user_constraints.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
from django.db import IntegrityError, transaction
from django.test import TestCase

from open_inwoner.accounts.choices import LoginTypeChoices

from .factories import UserFactory


class UserIdentifierConstraintTests(TestCase):
def test_unique_email_when_login_default(self):
UserFactory(email="[email protected]", login_type=LoginTypeChoices.default)
UserFactory(email="[email protected]", login_type=LoginTypeChoices.default)

with self.assertRaises(IntegrityError):
UserFactory(email="[email protected]", login_type=LoginTypeChoices.default)

def test_not_unique_email_when_login_not_default(self):
UserFactory(email="[email protected]", login_type=LoginTypeChoices.default)

UserFactory(email="[email protected]", login_type=LoginTypeChoices.digid)
UserFactory(email="[email protected]", login_type=LoginTypeChoices.eherkenning)
UserFactory(email="[email protected]", login_type=LoginTypeChoices.oidc)

def test_unique_bsn_when_login_digid(self):
UserFactory(bsn="123456782", login_type=LoginTypeChoices.digid)
UserFactory(bsn="123456783", login_type=LoginTypeChoices.digid)

with self.assertRaises(IntegrityError):
UserFactory(bsn="123456782", login_type=LoginTypeChoices.digid)

def test_unique_oidc_id_when_login_oidc(self):
UserFactory(oidc_id="1234", login_type=LoginTypeChoices.oidc)
UserFactory(oidc_id="1235", login_type=LoginTypeChoices.oidc)

with self.assertRaises(IntegrityError):
UserFactory(oidc_id="1234", login_type=LoginTypeChoices.oidc)

def test_unique_kvk_when_login_eherkenning(self):
UserFactory(kvk="12345678", login_type=LoginTypeChoices.eherkenning)
UserFactory(kvk="12345679", login_type=LoginTypeChoices.eherkenning)

with self.assertRaises(IntegrityError):
UserFactory(kvk="12345678", login_type=LoginTypeChoices.eherkenning)

def test_unique_rsin_when_login_herkenning(self):
UserFactory(rsin="12345678", login_type=LoginTypeChoices.eherkenning)
UserFactory(rsin="12345679", login_type=LoginTypeChoices.eherkenning)

with self.assertRaises(IntegrityError):
UserFactory(rsin="12345678", login_type=LoginTypeChoices.eherkenning)

def test_not_both_kvk_and_rsin_when_login_eherkenning(self):
with self.assertRaises(IntegrityError):
UserFactory(
kvk="12345678", rsin="12345678", login_type=LoginTypeChoices.eherkenning
)

def test_not_bsn_when_login_not_digid(self):
for login_type in [
LoginTypeChoices.default,
LoginTypeChoices.eherkenning,
LoginTypeChoices.oidc,
]:
with self.subTest(login_type):
# start transaction block because we're testing multiple IntegrityErrors
with transaction.atomic():
with self.assertRaises(IntegrityError):
UserFactory(bsn="123456782", login_type=login_type)

def test_not_rsin_when_login_not_eherkenning(self):
for login_type in [
LoginTypeChoices.default,
LoginTypeChoices.digid,
LoginTypeChoices.oidc,
]:
with self.subTest(login_type):
# start transaction block because we're testing multiple IntegrityErrors
with transaction.atomic():
with self.assertRaises(IntegrityError):
UserFactory(rsin="12345678", login_type=login_type)

def test_not_kvk_when_login_not_eherkenning(self):
for login_type in [
LoginTypeChoices.default,
LoginTypeChoices.digid,
LoginTypeChoices.oidc,
]:
with self.subTest(login_type):
# start transaction block because we're testing multiple IntegrityErrors
with transaction.atomic():
with self.assertRaises(IntegrityError):
UserFactory(kvk="12345678", login_type=login_type)

def test_not_oidc_id_when_login_not_oidc(self):
for login_type in [
LoginTypeChoices.default,
LoginTypeChoices.digid,
LoginTypeChoices.eherkenning,
]:
with self.subTest(login_type):
# start transaction block because we're testing multiple IntegrityErrors
with transaction.atomic():
with self.assertRaises(IntegrityError):
UserFactory(oidc_id="12345678", login_type=login_type)

def test_login_types_not_changed(self):
"""
The constraints and tests depend on a known set of login_types,
so make sure this suite fails if we ever change them.
"""
self.assertEqual(
LoginTypeChoices.values,
[
LoginTypeChoices.default,
LoginTypeChoices.digid,
LoginTypeChoices.eherkenning,
LoginTypeChoices.oidc,
],
)

0 comments on commit c9fb425

Please sign in to comment.