From c4a0fa7257dcc928853392f23d8dd3c2ff74b660 Mon Sep 17 00:00:00 2001 From: maxachis Date: Sat, 16 Mar 2024 16:07:23 -0400 Subject: [PATCH] Revert to old version of code and add only type hinting and docstrings --- middleware/security.py | 74 ++++++++++++------------------------------ 1 file changed, 21 insertions(+), 53 deletions(-) diff --git a/middleware/security.py b/middleware/security.py index 8a0fd005..07ee3d0e 100644 --- a/middleware/security.py +++ b/middleware/security.py @@ -1,85 +1,56 @@ import functools from hmac import compare_digest +from typing import Callable + from flask import request, jsonify -from typing import Tuple, Callable, Any -from psycopg2.extensions import cursor as PgCursor, connection as PgConnection from middleware.initialize_psycopg2_connection import initialize_psycopg2_connection from datetime import datetime as dt -from middleware.login_queries import is_admin import os -def is_valid(api_key: str, endpoint: str, method: str) -> Tuple[bool, bool]: +def is_valid(api_key: str) -> bool: """ - Validates the provided API key against various security checks and determines if it matches a user, session, or access token. + Check if the provided API key is valid by comparing it against the users and access_tokens in the database. Parameters: - - api_key: The API key to validate. - - endpoint: The endpoint being requested. - - method: The HTTP method of the request. + api_key (str): The API key to validate. Returns: - - A tuple of two booleans. The first indicates if the API key is valid, the second if the API key has expired. + bool: True if the API key is valid, False otherwise. """ - if not api_key: - return False, False - psycopg2_connection = initialize_psycopg2_connection() + # Get the user data that matches the API key from the request cursor = psycopg2_connection.cursor() - cursor.execute(f"select id, api_key, role from users where api_key = '{api_key}'") + cursor.execute(f"select id, api_key from users where api_key = '{api_key}'") results = cursor.fetchall() - if len(results) > 0: - role = results[0][2] - + user_data = {} if not results: cursor.execute( - f"select email, expiration_date from session_tokens where token = '{api_key}'" + f"delete from access_tokens where expiration_date < '{dt.now()}'" ) - results = cursor.fetchall() - if len(results) > 0: - email = results[0][0] - expiration_date = results[0][1] - print(expiration_date, dt.utcnow()) - - if expiration_date < dt.utcnow(): - return False, True - - if is_admin(cursor, email): - role = "admin" - - if not results: + psycopg2_connection.commit() cursor.execute(f"select id, token from access_tokens where token = '{api_key}'") results = cursor.fetchall() - cursor.execute( - f"delete from access_tokens where expiration_date < '{dt.utcnow()}'" - ) - psycopg2_connection.commit() - role = "user" if not results: - return False, False - - if endpoint in ("datasources", "datasourcebyid") and method in ("PUT", "POST"): - if role != "admin": - return False, False + return False - # Compare the API key in the user table to the API in the request header and proceed - # through the protected route if it's valid. Otherwise, compare_digest will return False - # and api_required will send an error message to provide a valid API key - return True, False + user_data = dict(zip(("id", "api_key"), results[0])) + # Compare the API key in the user table to the API in the request header and proceed through the protected route if it's valid. Otherwise, compare_digest will return False and api_required will send an error message to provide a valid API key + if compare_digest(user_data.get("api_key"), api_key): + return True -def api_required(func: Callable[..., Any]) -> Callable[..., Any]: +def api_required(func: Callable) -> Callable: """ - Decorator to protect a route, ensuring it can only be accessed with a valid and non-expired API key. + Decorator to protect routes, ensuring that only requests with a valid API key can access the decorated function. Parameters: - - func: The function to decorate. + func (Callable): The function to be decorated. Returns: - - The decorator function. + Callable: The decorated function which will now require a valid API key to be accessed. """ - @functools.wraps(func) def decorator(*args, **kwargs): api_key = None @@ -98,12 +69,9 @@ def decorator(*args, **kwargs): "message": "Please provide an 'Authorization' key in the request header" }, 400 # Check if API key is correct and valid - valid, expired = is_valid(api_key, request.endpoint, request.method) - if valid: + if is_valid(api_key): return func(*args, **kwargs) else: - if expired: - return {"message": "The provided API key has expired"}, 401 return {"message": "The provided API key is not valid"}, 403 return decorator