From 791379561ed829b804b6a0f0ebe338d3c7ee3425 Mon Sep 17 00:00:00 2001 From: tijsziere <39266480+tijsziere@users.noreply.github.com> Date: Wed, 2 Oct 2024 14:20:33 +0200 Subject: [PATCH] clean up --- main.py | 53 +----- routes/121_routes.py | 423 +++++++++++++++++++++++++++++++++++++++++++ routes/routes121.py | 9 +- test_submission.py | 38 ---- utils/cosmos.py | 16 ++ utils/logger.py | 32 ++++ utils/utils121.py | 5 +- utils/utilsEspo.py | 7 +- utils/utilsKobo.py | 4 +- 9 files changed, 484 insertions(+), 103 deletions(-) create mode 100644 routes/121_routes.py delete mode 100644 test_submission.py create mode 100644 utils/cosmos.py create mode 100644 utils/logger.py diff --git a/main.py b/main.py index 83ca40b..5a9168a 100644 --- a/main.py +++ b/main.py @@ -1,6 +1,5 @@ # pylint: disable=invalid-name import uvicorn -import time from fastapi import ( Security, Depends, @@ -13,56 +12,20 @@ from fastapi.responses import RedirectResponse, JSONResponse from fastapi.security.api_key import APIKeyHeader, APIKey from pydantic import BaseModel -import re import requests -import csv import pandas as pd from datetime import datetime, timedelta import os -from azure.cosmos.exceptions import CosmosResourceExistsError -import azure.cosmos.cosmos_client as cosmos_client from enum import Enum -import base64 -import sys -import unicodedata -import io -import json from dotenv import load_dotenv -import logging -from opentelemetry._logs import set_logger_provider -from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler -from opentelemetry.sdk._logs.export import BatchLogRecordProcessor -from azure.monitor.opentelemetry.exporter import AzureMonitorLogExporter +from utils.logger import logger from routes import routes121, routesEspo, routesGeneric, routesKobo - # load environment variables load_dotenv() port = os.environ["PORT"] -# Set up logs export to Azure Application Insights -logger_provider = LoggerProvider() -set_logger_provider(logger_provider) -exporter = AzureMonitorLogExporter( - connection_string=os.environ["APPLICATIONINSIGHTS_CONNECTION_STRING"] -) -logger_provider.add_log_record_processor(BatchLogRecordProcessor(exporter)) - -# Attach LoggingHandler to root logger -handler = LoggingHandler() -logging.getLogger().addHandler(handler) -logging.getLogger().setLevel(logging.NOTSET) -logger = logging.getLogger(__name__) - -# Silence noisy loggers -logging.getLogger("requests").setLevel(logging.WARNING) -logging.getLogger("urllib3").setLevel(logging.WARNING) -logging.getLogger("azure").setLevel(logging.WARNING) -logging.getLogger("requests_oauthlib").setLevel(logging.WARNING) -logging.getLogger("asyncio").setLevel(logging.WARNING) -logging.getLogger("opentelemetry").setLevel(logging.ERROR) - # initialize FastAPI app = FastAPI( title="kobo-connect", @@ -78,18 +41,6 @@ ) - -# initialize CosmosDB -client_ = cosmos_client.CosmosClient( - os.getenv("COSMOS_URL"), - {"masterKey": os.getenv("COSMOS_KEY")}, - user_agent="kobo-connect", - user_agent_overwrite=True, -) -cosmos_db = client_.get_database_client("kobo-connect") -cosmos_container_client = cosmos_db.get_container_client("kobo-submissions") - - @app.get("/", include_in_schema=False) async def docs_redirect(): """Redirect base URL to docs.""" @@ -101,8 +52,6 @@ async def docs_redirect(): app.include_router(routesGeneric.router) app.include_router(routesKobo.router) - - @app.get("/health") async def health(): """Get health of instance.""" diff --git a/routes/121_routes.py b/routes/121_routes.py new file mode 100644 index 0000000..0726607 --- /dev/null +++ b/routes/121_routes.py @@ -0,0 +1,423 @@ +import requests +import os +from fastapi import APIRouter, Request, Depends, HTTPException +from fastapi.responses import JSONResponse +from utils.utilsKobo import clean_kobo_data, get_attachment_dict +from utils.utils121 import login121, required_headers_121 + + +router = APIRouter() + +@router.post("/kobo-to-121") +async def kobo_to_121(request: Request, dependencies=Depends(required_headers_121)): + """Send a Kobo submission to 121.""" + + kobo_data = await request.json() + extra_logs = {"environment": os.getenv("ENV")} + try: + extra_logs["kobo_form_id"] = str(kobo_data["_xform_id_string"]) + extra_logs["kobo_form_version"] = str(kobo_data["__version__"]) + extra_logs["kobo_submission_id"] = str(kobo_data["_id"]) + except KeyError: + return JSONResponse( + status_code=422, + content={"detail": "Not a valid Kobo submission"}, + ) + extra_logs["121_url"] = request.headers["url121"] + + kobo_data = clean_kobo_data(kobo_data) + + # Check if 'skipConnect' is present and set to True in kobo_data + if "skipconnect" in kobo_data.keys() and kobo_data["skipconnect"] == "1": + logger.info("Skipping connection to 121", extra=extra_logs) + return JSONResponse( + status_code=200, content={"message": "Skipping connection to 121"} + ) + kobotoken, koboasset = None, None + if "kobotoken" in request.headers.keys(): + kobotoken = request.headers["kobotoken"] + if "koboasset" in request.headers.keys(): + koboasset = request.headers["koboasset"] + attachments = get_attachment_dict(kobo_data, kobotoken, koboasset) + + if "programid" in request.headers.keys(): + programid = request.headers["programid"] + elif "programid" in kobo_data.keys(): + programid = kobo_data["programid"] + else: + error_message = ( + "'programid' needs to be specified in headers or submission body" + ) + logger.info(f"Failed: {error_message}", extra=extra_logs) + raise HTTPException(status_code=400, detail=error_message) + extra_logs["121_program_id"] = programid + + if "referenceId" in request.headers.keys(): + referenceId = request.headers["referenceId"] + else: + referenceId = kobo_data["_uuid"] + + # Create API payload body + intvalues = ["maxPayments", "paymentAmountMultiplier", "inclusionScore"] + payload = {} + for kobo_field, target_field in request.headers.items(): + if kobo_field in kobo_data.keys(): + kobo_value_url = kobo_data[kobo_field].replace(" ", "_") + kobo_value_url = re.sub(r"[(,),']", "", kobo_value_url) + if target_field in intvalues: + payload[target_field] = int(kobo_data[kobo_field]) + elif target_field == "scope": + payload[target_field] = clean_text(kobo_data[kobo_field]) + elif kobo_value_url not in attachments.keys(): + payload[target_field] = kobo_data[kobo_field] + else: + payload[target_field] = attachments[kobo_value_url]["url"] + else: + payload[target_field] = "" + + payload["referenceId"] = referenceId + + access_token = login121(request.headers["url121"], request.headers["username121"], request.headers["password121"]) + + + # POST to 121 import endpoint + import_response = requests.post( + f"{request.headers['url121']}/api/programs/{programid}/registrations/import", + headers={"Cookie": f"access_token_general={access_token}"}, + json=[payload], + ) + + import_response_message = import_response.content.decode("utf-8") + if 200 <= import_response.status_code <= 299: + logger.info( + f"Success: 121 import returned {import_response.status_code} {import_response_message}", + extra=extra_logs, + ) + elif import_response.status_code >= 400: + logger.error( + f"Failed: 121 import returned {import_response.status_code} {import_response_message}", + extra=extra_logs, + ) + raise HTTPException( + status_code=import_response.status_code, detail=import_response_message + ) + else: + logger.warning( + f"121 import returned {import_response.status_code} {import_response_message}", + extra=extra_logs, + ) + + return JSONResponse( + status_code=import_response.status_code, content=import_response_message + ) + +######################################################################################################################## + +@router.post("/kobo-update-121") +async def kobo_update_121(request: Request, dependencies=Depends(required_headers_121)): + """Update a 121 record from a Kobo submission""" + + kobo_data = await request.json() + extra_logs = {"environment": os.getenv("ENV")} + try: + extra_logs["kobo_form_id"] = str(kobo_data["_xform_id_string"]) + extra_logs["kobo_form_version"] = str(kobo_data["__version__"]) + extra_logs["kobo_submission_id"] = str(kobo_data["_id"]) + except KeyError: + return JSONResponse( + status_code=422, + content={"detail": "Not a valid Kobo submission"}, + ) + extra_logs["121_url"] = request.headers["url121"] + + kobo_data = clean_kobo_data(kobo_data) + + kobotoken, koboasset = None, None + if 'kobotoken' in request.headers.keys(): + kobotoken = request.headers['kobotoken'] + if 'koboasset' in request.headers.keys(): + koboasset = request.headers['koboasset'] + attachments = get_attachment_dict(kobo_data, kobotoken, koboasset) + + if 'programid' in request.headers.keys(): + programid = request.headers['programid'] + elif 'programid' in kobo_data.keys(): + programid = kobo_data['programid'] + else: + raise HTTPException( + status_code=400, + detail=f"'programid' needs to be specified in headers or submission body" + ) + extra_logs["121_program_id"] = programid + + referenceId = kobo_data['referenceid'] + + access_token = login121(request.headers["url121"], request.headers["username121"], request.headers["password121"]) + + # Create API payload body + intvalues = ['maxPayments', 'paymentAmountMultiplier', 'inclusionScore'] + + for kobo_field, target_field in request.headers.items(): + payload = { + "data": {}, + "reason": "Validated during field validation" + } + if kobo_field in kobo_data.keys(): + kobo_value_url = kobo_data[kobo_field].replace(" ", "_") + kobo_value_url = re.sub(r"[(,),']", "", kobo_value_url) + if target_field in intvalues: + payload["data"][target_field] = int(kobo_data[kobo_field]) + elif target_field == 'scope': + payload["data"][target_field] = clean_text(kobo_data[kobo_field]) + elif kobo_value_url not in attachments.keys(): + payload["data"][target_field] = kobo_data[kobo_field] + else: + payload["data"][target_field] = attachments[kobo_value_url]['url'] + + # POST to target API + if target_field != 'referenceId': + response = requests.patch( + f"{request.headers['url121']}/api/programs/{programid}/registrations/{referenceId}", + headers={'Cookie': f"access_token_general={access_token}"}, + json=payload + ) + + target_response = response.content.decode("utf-8") + logger.info(target_response) + + status_response = requests.patch( + f"{request.headers['url121']}/api/programs/{programid}/registrations/status?dryRun=false&filter.referenceId=$in:{referenceId}", + headers={'Cookie': f"access_token_general={access_token}"}, + json={"status": "validated"} + ) + + if status_response.status_code != 202: + raise HTTPException(status_code=response.status_code, detail="Failed to set status of PA to validated") + + update_response_message = status_response.content.decode("utf-8") + if 200 <= status_response.status_code <= 299: + logger.info( + f"Success: 121 update returned {status_response.status_code} {update_response_message}", + extra=extra_logs, + ) + elif status_response.status_code >= 400: + logger.error( + f"Failed: 121 update returned {status_response.status_code} {update_response_message}", + extra=extra_logs, + ) + raise HTTPException( + status_code=status_response.status_code, detail=update_response_message + ) + else: + logger.warning( + f"121 update returned {status_response.status_code} {update_response_message}", + extra=extra_logs, + ) + + return JSONResponse( + status_code=status_response.status_code, content=update_response_message + ) + +########### +@router.get("/121-program") +async def create_121_program_from_kobo( + request: Request, dependencies=Depends(required_headers_kobo) +): + """Utility endpoint to automatically create a 121 Program in 121 from a koboform, including REST Service \n + Does only support the IFRC server kobo.ifrc.org \n + ***NB: if you want to duplicate an endpoint, please also use the Hook ID query param*** + """ + + koboUrl = f"https://kobo.ifrc.org/api/v2/assets/{request.headers['koboasset']}" + koboheaders = {"Authorization": f"Token {request.headers['kobotoken']}"} + data_request = requests.get(f"{koboUrl}/?format=json", headers=koboheaders) + if data_request.status_code >= 400: + raise HTTPException( + status_code=data_request.status_code, + detail=data_request.content.decode("utf-8"), + ) + data = data_request.json() + + survey = pd.DataFrame(data["content"]["survey"]) + choices = pd.DataFrame(data["content"]["choices"]) + + type_mapping = {} + with open("mappings/kobo121fieldtypes.csv", newline="") as csvfile: + reader = csv.reader(csvfile, delimiter="\t") + for row in reader: + if len(row) == 2: + type_mapping[row[0]] = row[1] + + mappingdf = pd.read_csv("mappings/kobo121fieldtypes.csv", delimiter="\t") + + CHECKFIELDS = [ + "validation", + "phase", + "location", + "ngo", + "language", + "titlePortal", + "description", + "startDate", + "endDate", + "currency", + "distributionFrequency", + "distributionDuration", + "fixedTransferValue", + "financialServiceProviders", + "targetNrRegistrations", + "tryWhatsAppFirst", + "phoneNumberPlaceholder", + "aboutProgram", + "fullnameNamingConvention", + "enableMaxPayments", + "phoneNumber", + "preferredLanguage", + "budget", + "maxPayments", + "fspName", + ] + + # First check if all setup fields are in the xlsform + FIELDNAMES = survey["name"].to_list() + MISSINGFIELDS = [] + for checkfield in CHECKFIELDS: + if checkfield not in FIELDNAMES: + MISSINGFIELDS.append(checkfield) + + if len(MISSINGFIELDS) != 0: + print("Missing hidden fields in the template: ", MISSINGFIELDS) + + lookupdict = dict(zip(survey["name"], survey["default"])) + fspquestions = [] + + if "tags" in survey.columns: + dedupedict = dict(zip(survey["name"], survey["tags"])) + + for key, value in dedupedict.items(): + if isinstance(value, list) and any("fsp" in item for item in value): + fspquestions.append(key) + elif isinstance(value, list) and any("dedupe" in item for item in value): + dedupedict[key] = True + else: + dedupedict[key] = False + + else: + survey["tags"] = False + dedupedict = dict(zip(survey["name"], survey["tags"])) + + # Create the JSON structure + data = { + "published": True, + "validation": lookupdict["validation"].upper() == "TRUE", + "phase": lookupdict["phase"], + "location": lookupdict["location"], + "ngo": lookupdict["ngo"], + "titlePortal": {lookupdict["language"]: lookupdict["titlePortal"]}, + "titlePaApp": {lookupdict["language"]: lookupdict["titlePortal"]}, + "description": {"en": ""}, + "startDate": datetime.strptime(lookupdict["startDate"], "%d/%m/%Y").isoformat(), + "endDate": datetime.strptime(lookupdict["endDate"], "%d/%m/%Y").isoformat(), + "currency": lookupdict["currency"], + "distributionFrequency": lookupdict["distributionFrequency"], + "distributionDuration": int(lookupdict["distributionDuration"]), + "fixedTransferValue": int(lookupdict["fixedTransferValue"]), + "paymentAmountMultiplierFormula": "", + "financialServiceProviders": [{"fsp": lookupdict["financialServiceProviders"]}], + "targetNrRegistrations": int(lookupdict["targetNrRegistrations"]), + "tryWhatsAppFirst": lookupdict["tryWhatsAppFirst"].upper() == "TRUE", + "phoneNumberPlaceholder": lookupdict["phoneNumberPlaceholder"], + "programCustomAttributes": [], + "programQuestions": [], + "aboutProgram": {lookupdict["language"]: lookupdict["aboutProgram"]}, + "fullnameNamingConvention": [lookupdict["fullnameNamingConvention"]], + "languages": [lookupdict["language"]], + "enableMaxPayments": lookupdict["enableMaxPayments"].upper() == "TRUE", + "allowEmptyPhoneNumber": False, + "enableScope": False, + } + + koboConnectHeader = ["fspName", "preferredLanguage", "maxPayments"] + + for index, row in survey.iterrows(): + if ( + row["type"].split()[0] in mappingdf["kobotype"].tolist() + and row["name"] not in CHECKFIELDS + and row["name"] not in fspquestions + ): + koboConnectHeader.append(row["name"]) + question = { + "name": row["name"], + "label": {"en": str(row["label"][0])}, + "answerType": type_mapping[row["type"].split()[0]], + "questionType": "standard", + "options": [], + "scoring": {}, + "persistence": True, + "pattern": "", + "phases": [], + "editableInPortal": True, + "export": ["all-people-affected", "included"], + "shortLabel": { + "en": row["name"], + }, + "duplicateCheck": dedupedict[row["name"]], + "placeholder": "", + } + if type_mapping[row["type"].split()[0]] == "dropdown": + filtered_df = choices[ + choices["list_name"] == row["select_from_list_name"] + ] + for index, row in filtered_df.iterrows(): + option = { + "option": row["name"], + "label": {"en": str(row["label"][0])}, + } + question["options"].append(option) + data["programQuestions"].append(question) + if row["name"] == "phoneNumber": + koboConnectHeader.append("phoneNumber") + question = { + "name": "phoneNumber", + "label": {"en": "Phone Number"}, + "answerType": "tel", + "questionType": "standard", + "options": [], + "scoring": {}, + "persistence": True, + "pattern": "", + "phases": [], + "editableInPortal": True, + "export": ["all-people-affected", "included"], + "shortLabel": { + "en": row["name"], + }, + "duplicateCheck": dedupedict[row["name"]], + "placeholder": "", + } + data["programQuestions"].append(question) + + # Create kobo-connect rest service + restServicePayload = { + "name": "Kobo Connect", + "endpoint": "https://kobo-connect.azurewebsites.net/kobo-to-121", + "active": True, + "email_notification": True, + "export_type": "json", + "settings": {"custom_headers": {}}, + } + koboConnectHeader = koboConnectHeader + fspquestions + customHeaders = dict(zip(koboConnectHeader, koboConnectHeader)) + restServicePayload["settings"]["custom_headers"] = customHeaders + + kobo_response = requests.post( + f"{koboUrl}/hooks/", headers=koboheaders, json=restServicePayload + ) + + if kobo_response.status_code == 200 or 201: + return JSONResponse(content=data) + else: + return JSONResponse( + content={"message": "Failed"}, status_code=kobo_response.status_code + ) + diff --git a/routes/routes121.py b/routes/routes121.py index f37c277..0d16efa 100644 --- a/routes/routes121.py +++ b/routes/routes121.py @@ -1,9 +1,11 @@ from fastapi import APIRouter, Request, Depends, HTTPException import requests +import re +import os from fastapi.responses import JSONResponse from utils.utilsKobo import clean_kobo_data, get_attachment_dict, required_headers_kobo from utils.utils121 import login121, required_headers_121 -import os +from utils.logger import logger router = APIRouter() @@ -71,10 +73,11 @@ async def kobo_to_121(request: Request, dependencies=Depends(required_headers_12 payload[target_field] = kobo_data[kobo_field] else: payload[target_field] = attachments[kobo_value_url]["url"] - else: - payload[target_field] = "" + # else: + # payload[target_field] = "" payload["referenceId"] = referenceId + print(payload) access_token = login121(request.headers["url121"], request.headers["username121"], request.headers["password121"]) diff --git a/test_submission.py b/test_submission.py deleted file mode 100644 index c17048f..0000000 --- a/test_submission.py +++ /dev/null @@ -1,38 +0,0 @@ -from selenium import webdriver -from selenium.webdriver.firefox.options import Options -from selenium.webdriver.common.by import By -import time -import click -import random -import string - - -@click.command() -@click.option("--form", help="Kobo form ID") -@click.option("--url", help="Kobo form URL") -def test(form, url): - options = Options() - options.add_argument("-headless") - driver = webdriver.Firefox(options=options) - driver.get(url) - time.sleep(5) - - questions = { - "firstname": "".join(random.choices(string.ascii_uppercase, k=6)), - "lastname": "".join(random.choices(string.ascii_uppercase, k=6)), - } - - for question, answer in questions.items(): - text_input = driver.find_element( - By.XPATH, f"//input[@name='/{form}/{question}']" - ) - text_input.send_keys(answer) - - search_button = driver.find_element(By.ID, "submit-form") - search_button.click() - - driver.close() - - -if __name__ == "__main__": - test() diff --git a/utils/cosmos.py b/utils/cosmos.py new file mode 100644 index 0000000..568529c --- /dev/null +++ b/utils/cosmos.py @@ -0,0 +1,16 @@ +import os +from dotenv import load_dotenv +import azure.cosmos.cosmos_client as cosmos_client + +# load environment variables +load_dotenv() + +# initialize CosmosDB +client_ = cosmos_client.CosmosClient( + os.getenv("COSMOS_URL"), + {"masterKey": os.getenv("COSMOS_KEY")}, + user_agent="kobo-connect", + user_agent_overwrite=True, +) +cosmos_db = client_.get_database_client("kobo-connect") +cosmos_container_client = cosmos_db.get_container_client("kobo-submissions") diff --git a/utils/logger.py b/utils/logger.py new file mode 100644 index 0000000..ac0aa2a --- /dev/null +++ b/utils/logger.py @@ -0,0 +1,32 @@ +import logging +import os +from dotenv import load_dotenv +from opentelemetry._logs import set_logger_provider +from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler +from opentelemetry.sdk._logs.export import BatchLogRecordProcessor +from azure.monitor.opentelemetry.exporter import AzureMonitorLogExporter + +# load environment variables +load_dotenv() + +# Set up logs export to Azure Application Insights +logger_provider = LoggerProvider() +set_logger_provider(logger_provider) +exporter = AzureMonitorLogExporter( + connection_string=os.environ["APPLICATIONINSIGHTS_CONNECTION_STRING"] +) +logger_provider.add_log_record_processor(BatchLogRecordProcessor(exporter)) + +# Attach LoggingHandler to root logger +handler = LoggingHandler() +logging.getLogger().addHandler(handler) +logging.getLogger().setLevel(logging.NOTSET) +logger = logging.getLogger(__name__) + +# Silence noisy loggers +logging.getLogger("requests").setLevel(logging.WARNING) +logging.getLogger("urllib3").setLevel(logging.WARNING) +logging.getLogger("azure").setLevel(logging.WARNING) +logging.getLogger("requests_oauthlib").setLevel(logging.WARNING) +logging.getLogger("asyncio").setLevel(logging.WARNING) +logging.getLogger("opentelemetry").setLevel(logging.ERROR) diff --git a/utils/utils121.py b/utils/utils121.py index bff1b20..ad8d211 100644 --- a/utils/utils121.py +++ b/utils/utils121.py @@ -1,8 +1,9 @@ import requests +import unicodedata from fastapi import HTTPException, Header from datetime import datetime, timedelta -from utils.utilsKobo import required_headers_kobo -import unicodedata +from utils.logger import logger + def clean_text(text): # Normalize text to remove accents diff --git a/utils/utilsEspo.py b/utils/utilsEspo.py index b9e3535..8c9b729 100644 --- a/utils/utilsEspo.py +++ b/utils/utilsEspo.py @@ -1,9 +1,6 @@ -from clients.espo_api_client import EspoAPI -import requests -import time from fastapi import HTTPException, Header -from datetime import datetime, timedelta -from azure.cosmos.exceptions import CosmosResourceExistsError +from utils.logger import logger +from utils.utilsKobo import update_submission_status def espo_request(submission, espo_client, method, entity, params=None, logs=None): """Make a request to EspoCRM. If the request fails, update submission status in CosmosDB.""" diff --git a/utils/utilsKobo.py b/utils/utilsKobo.py index 387c82f..e86d1cf 100644 --- a/utils/utilsKobo.py +++ b/utils/utilsKobo.py @@ -1,10 +1,8 @@ import requests import time -import base64 -import logging from fastapi import HTTPException, Header -from datetime import datetime, timedelta from azure.cosmos.exceptions import CosmosResourceExistsError +from utils.cosmos import cosmos_container_client def required_headers_kobo(kobotoken: str = Header(), koboasset: str = Header()): return kobotoken, koboasset