Skip to content

Commit

Permalink
Get /users/N/authorized_tokens/ working
Browse files Browse the repository at this point in the history
Signed-off-by: Rick Elrod <[email protected]>
  • Loading branch information
relrod committed May 1, 2024
1 parent c694713 commit 7d35c33
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 36 deletions.
2 changes: 1 addition & 1 deletion ansible_base/oauth2_provider/serializers/token.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def get_refresh_token(self, obj):
try:
if not obj.refresh_token:
return None
elif request.method == 'POST':
elif request and request.method == 'POST':
return getattr(obj.refresh_token, 'token', '')
else:
return ENCRYPTED_STRING
Expand Down
16 changes: 12 additions & 4 deletions ansible_base/oauth2_provider/views/user_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,23 @@ class DABOAuth2UserViewsetMixin:
def extra_related_fields(self, obj) -> dict[str, str]:
fields = super().extra_related_fields(obj)
fields['personal_tokens'] = reverse(f'{self.basename}-personal-tokens-list', kwargs={"pk": obj.pk})
fields['authorized_tokens'] = reverse(f'{self.basename}-authorized-tokens-list', kwargs={"pk": obj.pk})
return fields

@action(detail=True, methods=["get"], url_name="personal-tokens-list")
def personal_tokens(self, request, pk=None):
tokens = OAuth2AccessToken.objects.filter(application__isnull=True, user=pk)
def _user_token_response(self, request, application_isnull, pk):
tokens = OAuth2AccessToken.objects.filter(application__isnull=application_isnull, user=pk)
page = self.paginate_queryset(tokens)
if page is not None:
serializer = OAuth2TokenSerializer(page, many=True)
serializer = OAuth2TokenSerializer(page, many=True, context={"request": request})
return self.get_paginated_response(serializer.data)

serializer = OAuth2TokenSerializer(tokens, many=True)
return Response(serializer.data)

@action(detail=True, methods=["get"], url_name="personal-tokens-list")
def personal_tokens(self, request, pk=None):
return self._user_token_response(request, True, pk)

@action(detail=True, methods=["get"], url_name="authorized-tokens-list")
def authorized_tokens(self, request, pk=None):
return self._user_token_response(request, False, pk)
2 changes: 1 addition & 1 deletion test_app/tests/oauth2_provider/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def oauth2_admin_access_token(oauth2_application, admin_api_client, admin_user):
url = reverse('token-list')
response = admin_api_client.post(url, {'application': oauth2_application[0].pk})
assert response.status_code == 201
return response.data['token']
return OAuth2AccessToken.objects.get(token=response.data['token'])


@copy_fixture(copies=3)
Expand Down
12 changes: 6 additions & 6 deletions test_app/tests/oauth2_provider/test_authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ def test_oauth2_bearer_get_user_correct(unauthenticated_api_client, oauth2_admin
url = reverse("user-me")
response = unauthenticated_api_client.get(
url,
headers={'Authorization': f'Bearer {oauth2_admin_access_token}'},
headers={'Authorization': f'Bearer {oauth2_admin_access_token.token}'},
)
assert response.status_code == 200
assert response.data['username'] == 'admin'
assert response.data['username'] == oauth2_admin_access_token.user.username


@pytest.mark.parametrize(
Expand All @@ -28,7 +28,7 @@ def test_oauth2_bearer_get(unauthenticated_api_client, oauth2_admin_access_token
GET an animal with a bearer token.
"""
url = reverse("animal-detail", kwargs={"pk": animal.pk})
token = oauth2_admin_access_token if token == 'fixture' else generate_token()
token = oauth2_admin_access_token.token if token == 'fixture' else generate_token()
response = unauthenticated_api_client.get(
url,
headers={'Authorization': f'Bearer {token}'},
Expand All @@ -50,7 +50,7 @@ def test_oauth2_bearer_post(unauthenticated_api_client, oauth2_admin_access_toke
POST an animal with a bearer token.
"""
url = reverse("animal-list")
token = oauth2_admin_access_token if token == 'fixture' else generate_token()
token = oauth2_admin_access_token.token if token == 'fixture' else generate_token()
data = {
"name": "Fido",
"owner": admin_user.pk,
Expand All @@ -77,7 +77,7 @@ def test_oauth2_bearer_patch(unauthenticated_api_client, oauth2_admin_access_tok
PATCH an animal with a bearer token.
"""
url = reverse("animal-detail", kwargs={"pk": animal.pk})
token = oauth2_admin_access_token if token == 'fixture' else generate_token()
token = oauth2_admin_access_token.token if token == 'fixture' else generate_token()
data = {
"name": "Fido",
}
Expand All @@ -103,7 +103,7 @@ def test_oauth2_bearer_put(unauthenticated_api_client, oauth2_admin_access_token
PUT an animal with a bearer token.
"""
url = reverse("animal-detail", kwargs={"pk": animal.pk})
token = oauth2_admin_access_token if token == 'fixture' else generate_token()
token = oauth2_admin_access_token.token if token == 'fixture' else generate_token()
data = {
"name": "Fido",
"owner": admin_user.pk,
Expand Down
126 changes: 102 additions & 24 deletions test_app/tests/oauth2_provider/views/test_token.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,8 @@
from django.utils.http import urlencode

from ansible_base.authentication.models import AuthenticatorUser
from ansible_base.oauth2_provider.models import OAuth2AccessToken


@pytest.mark.django_db
@pytest.mark.parametrize(
'client_fixture, user_fixture',
[
pytest.param('user_api_client', 'user', id='user'),
pytest.param('admin_api_client', 'admin_user', id='admin'),
],
)
def test_oauth2_provider_list_user_tokens(request, client_fixture, user_fixture):
client = request.getfixturevalue(client_fixture)
user = request.getfixturevalue(user_fixture)
url = reverse('token-list')
response = client.post(url, data={'scope': 'read'})
assert response.status_code == 201
assert response.data['scope'] == 'read'
assert response.data['user'] == user.pk

get_response = client.get(url)
assert get_response.status_code == 200
assert len(get_response.data['results']) == 1
from ansible_base.lib.utils.encryption import ENCRYPTED_STRING
from ansible_base.oauth2_provider.models import OAuth2AccessToken, OAuth2RefreshToken


@pytest.mark.django_db
Expand Down Expand Up @@ -113,6 +92,31 @@ def test_oauth2_existing_token_enabled_for_external_accounts(
assert resp.json()['username'] == user.username


@pytest.mark.django_db
@pytest.mark.parametrize(
'client_fixture, user_fixture',
[
pytest.param('user_api_client', 'user', id='user'),
pytest.param('admin_api_client', 'admin_user', id='admin'),
],
)
def test_oauth2_pat_create_and_list(request, client_fixture, user_fixture):
"""
A user can create and list personal access tokens.
"""
client = request.getfixturevalue(client_fixture)
user = request.getfixturevalue(user_fixture)
url = reverse('token-list')
response = client.post(url, data={'scope': 'read'})
assert response.status_code == 201
assert response.data['scope'] == 'read'
assert response.data['user'] == user.pk

get_response = client.get(url)
assert get_response.status_code == 200
assert len(get_response.data['results']) == 1


@pytest.mark.django_db
def test_oauth2_pat_creation(oauth2_application_password, user, unauthenticated_api_client):
app = oauth2_application_password[0]
Expand Down Expand Up @@ -203,4 +207,78 @@ def test_oauth2_pat_list_is_user_related_field(user, admin_api_client):
response = admin_api_client.get(url)
assert response.status_code == 200
assert 'personal_tokens' in response.data['related']
assert response.data['delated']['personal_tokens'] == reverse('user-personal-tokens-list', kwargs={"pk": user.pk})
assert response.data['related']['personal_tokens'] == reverse('user-personal-tokens-list', kwargs={"pk": user.pk})


@pytest.mark.django_db
def test_oauth2_token_create(oauth2_application, admin_api_client, admin_user):
oauth2_application = oauth2_application[0]
url = reverse('token-list')
response = admin_api_client.post(url, {'scope': 'read', 'application': oauth2_application.pk})
assert response.status_code == 201
assert 'modified' in response.data and response.data['modified'] is not None
assert 'updated' not in response.data
token = OAuth2AccessToken.objects.get(token=response.data['token'])
refresh_token = OAuth2RefreshToken.objects.get(token=response.data['refresh_token'])
assert token.application == oauth2_application
assert refresh_token.application == oauth2_application
assert token.user == admin_user
assert refresh_token.user == admin_user
assert refresh_token.access_token == token
assert token.scope == 'read'

url = reverse('application-access_tokens-list', kwargs={'pk': oauth2_application.pk})
response = admin_api_client.get(url)
assert response.status_code == 200
assert response.data['count'] == 1
assert response.data['results'][0]['id'] == token.pk
assert response.data['results'][0]['scope'] == token.scope


def test_oauth2_application_token_summary_fields(admin_api_client, oauth2_admin_access_token, oauth2_application):
url = reverse('application-detail', kwargs={'pk': oauth2_application[0].pk})
response = admin_api_client.get(url)
assert response.status_code == 200
assert response.data['summary_fields']['tokens']['count'] == 1
assert response.data['summary_fields']['tokens']['results'][0] == {'id': oauth2_admin_access_token.pk, 'scope': 'write', 'token': ENCRYPTED_STRING}


@pytest.mark.django_db
def test_oauth2_authorized_list_for_user(oauth2_application, oauth2_user_pat, oauth2_user_pat_1, user, admin_api_client):
"""
Tests that we can list a user's authorized tokens via API.
"""
# Turn the PATs into authorized tokens by attaching an application
oauth2_application = oauth2_application[0]
oauth2_user_pat.application = oauth2_application
oauth2_user_pat.save()
oauth2_user_pat_1.application = oauth2_application
oauth2_user_pat_1.save()

url = reverse('user-authorized-tokens-list', kwargs={"pk": user.pk})
response = admin_api_client.get(url)
assert response.status_code == 200
assert len(response.data['results']) == 2


def test_oauth2_authorized_list_for_invalid_user(oauth2_user_pat, oauth2_user_pat_1, user, admin_api_client):
"""
Ensure we don't fatal if we give a bad user PK.
We return an empty list.
"""
url = reverse('user-authorized-tokens-list', kwargs={"pk": 1000})
response = admin_api_client.get(url)
assert response.status_code == 200
assert response.data['results'] == []


def test_oauth2_authorized_list_is_user_related_field(user, admin_api_client):
"""
Ensure 'authorized_tokens' shows up in the user's related fields.
"""
url = reverse('user-detail', kwargs={"pk": user.pk})
response = admin_api_client.get(url)
assert response.status_code == 200
assert 'authorized_tokens' in response.data['related']
assert response.data['related']['authorized_tokens'] == reverse('user-authorized-tokens-list', kwargs={"pk": user.pk})

0 comments on commit 7d35c33

Please sign in to comment.