Skip to content

Commit

Permalink
Do not reuse username generation logic in migration
Browse files Browse the repository at this point in the history
  • Loading branch information
jdavcs committed Jun 12, 2024
1 parent 1ba476e commit 7f4c0f8
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 16 deletions.
20 changes: 6 additions & 14 deletions lib/galaxy/managers/users.py
Original file line number Diff line number Diff line change
Expand Up @@ -886,17 +886,10 @@ def get_user_by_username(session, username: str, model_class=User):

def username_from_email(session, email, model_class=User):
"""Get next available username generated based on email"""
engine = session.bind
with engine.connect() as connection:
return username_from_email_with_connection(connection, email, model_class)


def username_from_email_with_connection(connection, email, model_class=User):
# This function is also called from database revision scripts, which do not provide a session.
username = email.split("@", 1)[0].lower()
username = filter_out_invalid_username_characters(username)
if username_exists(connection, username, model_class):
username = generate_next_available_username(connection, username, model_class)
if username_exists(session, username, model_class):
username = generate_next_available_username(session, username, model_class)
return username


Expand All @@ -907,14 +900,13 @@ def filter_out_invalid_username_characters(username):
return username


def username_exists(connection, username: str, model_class=User):
stmt = select(model_class).filter(model_class.username == username).limit(1)
return bool(connection.execute(stmt).first())
def username_exists(session, username: str, model_class=User):
return bool(get_user_by_username(session, username, model_class))


def generate_next_available_username(connection, username, model_class=User):
def generate_next_available_username(session, username, model_class=User):
"""Generate unique username; user can change it later"""
i = 1
while connection.execute(select(model_class).where(model_class.username == f"{username}-{i}")).first():
while session.execute(select(model_class).where(model_class.username == f"{username}-{i}")).first():
i += 1
return f"{username}-{i}"
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@
"""

import string

from alembic import op
from sqlalchemy import (
or_,
select,
update,
)

from galaxy.managers.users import username_from_email_with_connection
from galaxy.model import User
from galaxy.model.migrations.util import (
alter_column,
Expand Down Expand Up @@ -48,6 +49,41 @@ def _generate_missing_usernames():
connection = op.get_bind()
users = connection.execute(stmt).all()
for id, email in users:
new_username = username_from_email_with_connection(connection, email)
new_username = username_from_email(connection, email)
update_stmt = update(User).where(User.id == id).values(username=new_username)
connection.execute(update_stmt)


# The code below is a near-duplicate of similar code in managers.users. The duplication is
# intentional: we want to preserve this logic in the migration script. The only differences are:
# (1) this code uses a Connection instead of a Session;
# (2) the username_exists function inlines the Select statement from managers.users::get_user_by_username.


def username_from_email(connection, email, model_class=User):
# This function is also called from database revision scripts, which do not provide a session.
username = email.split("@", 1)[0].lower()
username = filter_out_invalid_username_characters(username)
if username_exists(connection, username, model_class):
username = generate_next_available_username(connection, username, model_class)
return username


def filter_out_invalid_username_characters(username):
"""Replace invalid characters in username"""
for char in [x for x in username if x not in f"{string.ascii_lowercase + string.digits}-."]:
username = username.replace(char, "-")
return username


def username_exists(connection, username: str, model_class=User):
stmt = select(model_class).filter(model_class.username == username).limit(1)
return bool(connection.execute(stmt).first())


def generate_next_available_username(connection, username, model_class=User):
"""Generate unique username; user can change it later"""
i = 1
while connection.execute(select(model_class).where(model_class.username == f"{username}-{i}")).first():
i += 1
return f"{username}-{i}"

0 comments on commit 7f4c0f8

Please sign in to comment.