Skip to content

Commit

Permalink
Add idempotent handler decorator
Browse files Browse the repository at this point in the history
  • Loading branch information
BenjaminPelletier committed Oct 11, 2023
1 parent 552910c commit ed78dd7
Show file tree
Hide file tree
Showing 5 changed files with 164 additions and 43 deletions.
4 changes: 0 additions & 4 deletions monitoring/mock_uss/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,3 @@ class Database(ImplicitDict):
Database(one_time_tasks=[], task_errors=[], periodic_tasks={}),
decoder=lambda b: ImplicitDict.parse(json.loads(b.decode("utf-8")), Database),
)

fulfilled_request_ids = SynchronizedValue(
[], decoder=lambda b: json.loads(b.decode("utf-8"))
)
15 changes: 2 additions & 13 deletions monitoring/mock_uss/ridsp/routes_injection.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from . import database
from .database import db
from monitoring.monitorlib import geo
from ..database import fulfilled_request_ids
from monitoring.monitorlib.idempotency import idempotent_request

require_config_value(KEY_BASE_URL)
require_config_value(KEY_RID_VERSION)
Expand All @@ -34,6 +34,7 @@ class ErrorResponse(ImplicitDict):

@webapp.route("/ridsp/injection/tests/<test_id>", methods=["PUT"])
@requires_scope([injection_api.SCOPE_RID_QUALIFIER_INJECT])
@idempotent_request()
def ridsp_create_test(test_id: str) -> Tuple[str, int]:
"""Implements test creation in RID automated testing injection API."""
logger.info(f"Create test {test_id}")
Expand All @@ -51,18 +52,6 @@ def ridsp_create_test(test_id: str) -> Tuple[str, int]:
except ValueError as e:
msg = "Create test {} unable to parse JSON: {}".format(test_id, e)
return msg, 400
if "request_id" in json:
logger.debug(f"[ridsp_create_test] Request ID {json['request_id']}")
with fulfilled_request_ids as tx:
if json["request_id"] in tx:
logger.debug(
f"[ridsp_create_test] Already processed request ID {json['request_id']}"
)
return (
f"Request ID {json['request_id']} has already been fulfilled",
400,
)
tx.append(json["request_id"])

# Create ISA in DSS
(t0, t1) = req_body.get_span()
Expand Down
29 changes: 4 additions & 25 deletions monitoring/mock_uss/scdsc/routes_injection.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@
import traceback
from datetime import datetime, timedelta
import time
from functools import wraps
from typing import List, Tuple
import uuid

import flask
from implicitdict import ImplicitDict, StringBasedDateTime
from loguru import logger
import requests.exceptions
from uas_standards.interuss.automated_testing.scd.v1 import api as scd_api

from monitoring.monitorlib.idempotency import idempotent_request
from uas_standards.interuss.automated_testing.scd.v1.api import (
InjectFlightRequest,
InjectFlightResponse,
Expand All @@ -32,7 +32,6 @@
PutOperationalIntentReferenceParameters,
)

from monitoring.mock_uss.database import fulfilled_request_ids
from monitoring.mock_uss.scdsc.flight_planning import (
validate_request,
check_for_disallowed_conflicts,
Expand Down Expand Up @@ -150,6 +149,7 @@ def scd_capabilities() -> Tuple[dict, int]:

@webapp.route("/scdsc/v1/flights/<flight_id>", methods=["PUT"])
@requires_scope([SCOPE_SCD_QUALIFIER_INJECT])
@idempotent_request()
def scdsc_inject_flight(flight_id: str) -> Tuple[str, int]:
"""Implements flight injection in SCD automated testing injection API."""
logger.debug(f"[inject_flight/{os.getpid()}:{flight_id}] Starting handler")
Expand All @@ -161,20 +161,6 @@ def scdsc_inject_flight(flight_id: str) -> Tuple[str, int]:
except ValueError as e:
msg = "Create flight {} unable to parse JSON: {}".format(flight_id, e)
return msg, 400
if "request_id" in json:
logger.debug(
f"[inject_flight/{os.getpid()}:{flight_id}] Request ID {json['request_id']}"
)
with fulfilled_request_ids as tx:
if json["request_id"] in tx:
logger.debug(
f"[inject_flight/{os.getpid()}:{flight_id}] Already processed request ID {json['request_id']}"
)
return (
f"Request ID {json['request_id']} has already been fulfilled",
400,
)
tx.append(json["request_id"])
json, code = inject_flight(flight_id, req_body)
return flask.jsonify(json), code

Expand Down Expand Up @@ -480,6 +466,7 @@ def delete_flight(flight_id) -> Tuple[dict, int]:

@webapp.route("/scdsc/v1/clear_area_requests", methods=["POST"])
@requires_scope([SCOPE_SCD_QUALIFIER_INJECT])
@idempotent_request()
def scdsc_clear_area() -> Tuple[str, int]:
try:
json = flask.request.json
Expand All @@ -489,14 +476,6 @@ def scdsc_clear_area() -> Tuple[str, int]:
except ValueError as e:
msg = "Unable to parse ClearAreaRequest JSON request: {}".format(e)
return msg, 400
with fulfilled_request_ids as tx:
logger.debug(f"[scdsc_clear_area] Processing request ID {req.request_id}")
if req.request_id in tx:
logger.debug(
f"[scdsc_clear_area] Already processed request ID {req.request_id}"
)
return f"Request ID {req.request_id} has already been fulfilled", 400
tx.append(json["request_id"])
json, code = clear_area(req)
return flask.jsonify(json), code

Expand Down
157 changes: 157 additions & 0 deletions monitoring/monitorlib/idempotency.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
import base64
import hashlib
from functools import wraps
import json
from typing import Callable, Optional, Dict

import arrow
import flask
from loguru import logger

from implicitdict import ImplicitDict, StringBasedDateTime
from monitoring.monitorlib.multiprocessing import SynchronizedValue


_max_request_buffer_size = int(10e6)
"""Number of bytes to dedicate to caching responses"""


class Response(ImplicitDict):
"""Information about a previously-returned response.
Note that this object is never actually used (in order to maximize performance); instead it serves as documentation
of the structure of the fields within a plain JSON dict/object."""

json: Optional[dict]
body: Optional[str]
code: int
timestamp: StringBasedDateTime


def _get_responses(raw: bytes) -> Dict[str, Response]:
return json.loads(raw.decode("utf-8"))


def _set_responses(responses: Dict[str, Response]) -> bytes:
while True:
s = json.dumps(responses)
if len(s) <= _max_request_buffer_size:
break

# Remove oldest cached response
oldest_id = None
oldest_timestamp = None
for request_id, response in responses.items():
t = arrow.get(response["timestamp"])
if oldest_timestamp is None or t < oldest_timestamp:
oldest_id = request_id
oldest_timestamp = t

del responses[oldest_id]
return s.encode("utf-8")


_fulfilled_requests = SynchronizedValue(
{},
decoder=_get_responses,
encoder=_set_responses,
capacity_bytes=_max_request_buffer_size,
)


def get_hashed_request_id() -> Optional[str]:
"""Retrieves an identifier for the request by hashing key characteristics of the request."""
characteristics = flask.request.method + flask.request.url
if flask.request.json:
characteristics += json.dumps(flask.request.json)
else:
characteristics += flask.request.data.decode("utf-8")
return base64.b64encode(
hashlib.sha512(characteristics.encode("utf-8")).digest()
).decode("utf-8")


def idempotent_request(get_request_id: Optional[Callable[[], Optional[str]]] = None):
"""Decorator for idempotent Flask view handlers.
When subsequent requests are received with the same request identifier, this decorator will use a recent cached
response instead of invoking the underlying handler when possible. Note that there is no verification that the rest
of the request (apart from the request ID) is identical, so a request with different content but the same request ID
will receive the cached response from the first request. A developer could compute a request ID based on a hash of
important request characteristics to control this behavior.
Note that cached response characteristics are limited and the full original response is not produced verbatim.
"""
if get_request_id is None:
get_request_id = get_hashed_request_id

def outer_wrapper(fn):
@wraps(fn)
def wrapper(*args, **kwargs):
request_id = get_request_id()

cached_requests = _fulfilled_requests.value
if request_id in cached_requests:
endpoint = (
flask.request.url_rule.rule
if flask.request.url_rule is not None
else "unknown endpoint"
)
logger.warning(
"Fulfilling {} {} with cached response for request {}",
flask.request.method,
endpoint,
request_id,
)
response = cached_requests[request_id]
if response["body"] is not None:
return response["body"], response["code"]
else:
return flask.jsonify(response["json"]), response["code"]

result = fn(*args, **kwargs)

response = {
"timestamp": arrow.utcnow().isoformat(),
"code": 200,
"body": None,
"json": None,
}
keep_code = False
if isinstance(result, tuple):
if len(result) == 2:
if not isinstance(result[1], int):
raise NotImplementedError(
f"Unable to cache Flask view handler result where the second 2-tuple element is a '{type(result[1]).__name__}'"
)
response["code"] = result[1]
keep_code = True
result = result[0]
else:
raise NotImplementedError(
f"Unable to cache Flask view handler result which is a tuple of ({', '.join(type(v).__name__ for v in result)})"
)

if isinstance(result, str):
response["body"] = result
response["json"] = None
elif isinstance(result, flask.Response):
try:
response["json"] = result.get_json()
except ValueError:
response["body"] = result.get_data(as_text=True)
if not keep_code:
response["code"] = result.status_code
else:
raise NotImplementedError(
f"Unable to cache Flask view handler result of type '{type(result).__name__}'"
)

with _fulfilled_requests as cached_requests:
cached_requests[request_id] = response

return result

return wrapper

return outer_wrapper
2 changes: 1 addition & 1 deletion monitoring/monitorlib/multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def __init__(
"""
self._lock = multiprocessing.RLock()
self._shared_memory = multiprocessing.shared_memory.SharedMemory(
create=True, size=capacity_bytes
create=True, size=capacity_bytes + 4
)
self._encoder = (
encoder
Expand Down

0 comments on commit ed78dd7

Please sign in to comment.