Skip to content

Commit

Permalink
Revert to old version of code and add only type hinting and docstrings
Browse files Browse the repository at this point in the history
  • Loading branch information
maxachis committed Mar 16, 2024
1 parent 5cd138c commit c4a0fa7
Showing 1 changed file with 21 additions and 53 deletions.
74 changes: 21 additions & 53 deletions middleware/security.py
Original file line number Diff line number Diff line change
@@ -1,85 +1,56 @@
import functools
from hmac import compare_digest
from typing import Callable

from flask import request, jsonify
from typing import Tuple, Callable, Any
from psycopg2.extensions import cursor as PgCursor, connection as PgConnection
from middleware.initialize_psycopg2_connection import initialize_psycopg2_connection
from datetime import datetime as dt
from middleware.login_queries import is_admin
import os


def is_valid(api_key: str, endpoint: str, method: str) -> Tuple[bool, bool]:
def is_valid(api_key: str) -> bool:
"""
Validates the provided API key against various security checks and determines if it matches a user, session, or access token.
Check if the provided API key is valid by comparing it against the users and access_tokens in the database.
Parameters:
- api_key: The API key to validate.
- endpoint: The endpoint being requested.
- method: The HTTP method of the request.
api_key (str): The API key to validate.
Returns:
- A tuple of two booleans. The first indicates if the API key is valid, the second if the API key has expired.
bool: True if the API key is valid, False otherwise.
"""
if not api_key:
return False, False

psycopg2_connection = initialize_psycopg2_connection()
# Get the user data that matches the API key from the request
cursor = psycopg2_connection.cursor()
cursor.execute(f"select id, api_key, role from users where api_key = '{api_key}'")
cursor.execute(f"select id, api_key from users where api_key = '{api_key}'")
results = cursor.fetchall()
if len(results) > 0:
role = results[0][2]

user_data = {}
if not results:
cursor.execute(
f"select email, expiration_date from session_tokens where token = '{api_key}'"
f"delete from access_tokens where expiration_date < '{dt.now()}'"
)
results = cursor.fetchall()
if len(results) > 0:
email = results[0][0]
expiration_date = results[0][1]
print(expiration_date, dt.utcnow())

if expiration_date < dt.utcnow():
return False, True

if is_admin(cursor, email):
role = "admin"

if not results:
psycopg2_connection.commit()
cursor.execute(f"select id, token from access_tokens where token = '{api_key}'")
results = cursor.fetchall()
cursor.execute(
f"delete from access_tokens where expiration_date < '{dt.utcnow()}'"
)
psycopg2_connection.commit()
role = "user"

if not results:
return False, False

if endpoint in ("datasources", "datasourcebyid") and method in ("PUT", "POST"):
if role != "admin":
return False, False
return False

# Compare the API key in the user table to the API in the request header and proceed
# through the protected route if it's valid. Otherwise, compare_digest will return False
# and api_required will send an error message to provide a valid API key
return True, False
user_data = dict(zip(("id", "api_key"), results[0]))
# Compare the API key in the user table to the API in the request header and proceed through the protected route if it's valid. Otherwise, compare_digest will return False and api_required will send an error message to provide a valid API key
if compare_digest(user_data.get("api_key"), api_key):
return True


def api_required(func: Callable[..., Any]) -> Callable[..., Any]:
def api_required(func: Callable) -> Callable:
"""
Decorator to protect a route, ensuring it can only be accessed with a valid and non-expired API key.
Decorator to protect routes, ensuring that only requests with a valid API key can access the decorated function.
Parameters:
- func: The function to decorate.
func (Callable): The function to be decorated.
Returns:
- The decorator function.
Callable: The decorated function which will now require a valid API key to be accessed.
"""

@functools.wraps(func)
def decorator(*args, **kwargs):
api_key = None
Expand All @@ -98,12 +69,9 @@ def decorator(*args, **kwargs):
"message": "Please provide an 'Authorization' key in the request header"
}, 400
# Check if API key is correct and valid
valid, expired = is_valid(api_key, request.endpoint, request.method)
if valid:
if is_valid(api_key):
return func(*args, **kwargs)
else:
if expired:
return {"message": "The provided API key has expired"}, 401
return {"message": "The provided API key is not valid"}, 403

return decorator

0 comments on commit c4a0fa7

Please sign in to comment.