-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
6 changed files
with
380 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
from fastapi import APIRouter, Request, Depends, HTTPException | ||
from fastapi.responses import JSONResponse | ||
from utils.utilsKobo import clean_kobo_data, get_attachment_dict | ||
import requests | ||
|
||
router = APIRouter() | ||
|
||
@router.post("/kobo-to-generic") | ||
async def kobo_to_generic(request: Request): | ||
"""Send a Kobo submission to a generic API. | ||
API Key is passed as 'x-api-key' in headers.""" | ||
|
||
kobo_data = await request.json() | ||
kobo_data = clean_kobo_data(kobo_data) | ||
attachments = get_attachment_dict(kobo_data) | ||
|
||
# Create API payload body | ||
payload = {} | ||
for kobo_field, target_field in request.headers.items(): | ||
if kobo_field in kobo_data.keys(): | ||
kobo_value = kobo_data[kobo_field].replace(" ", "_") | ||
if kobo_value not in attachments.keys(): | ||
payload[target_field] = kobo_value | ||
else: | ||
file_url = attachments[kobo_value]["url"] | ||
if "kobotoken" not in request.headers.keys(): | ||
raise HTTPException( | ||
status_code=400, | ||
detail=f"'kobotoken' needs to be specified in headers to upload attachments", | ||
) | ||
# encode attachment in base64 | ||
file = get_kobo_attachment(file_url, request.headers["kobotoken"]) | ||
file_b64 = base64.b64encode(file).decode("utf8") | ||
payload[target_field] = ( | ||
f"data:{attachments[kobo_value]['mimetype']};base64,{file_b64}" | ||
) | ||
|
||
# POST to target API | ||
response = requests.post( | ||
request.headers["targeturl"], | ||
headers={"x-api-key": request.headers["targetkey"]}, | ||
data=payload, | ||
) | ||
target_response = response.content.decode("utf-8") | ||
|
||
return JSONResponse(status_code=200, content=target_response) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,195 @@ | ||
from fastapi import APIRouter, Request, Depends, HTTPException | ||
from fastapi.responses import JSONResponse | ||
import requests | ||
import base64 | ||
import csv | ||
import io | ||
import json | ||
from enum import Enum | ||
from utils.utils121 import login121 | ||
from utils.utilsKobo import required_headers_121_kobo | ||
|
||
router = APIRouter() | ||
|
||
@router.post("/update-kobo-csv") | ||
async def prepare_kobo_validation(request: Request, programId: int, kobousername: str, dependencies=Depends(required_headers_121_kobo)): | ||
""" | ||
Prepare Kobo validation by fetching data from 121 platform, | ||
converting it to CSV, and uploading to Kobo. | ||
""" | ||
|
||
access_token = login121(request.headers["url121"], request.headers["username121"], request.headers["password121"]) | ||
|
||
# Fetch data from 121 platform | ||
response = requests.get( | ||
f"{request.headers['url121']}/api/programs/{programId}/metrics/export-list/all-people-affected", | ||
headers={'Cookie': f"access_token_general={access_token}"} | ||
) | ||
if response.status_code != 200: | ||
raise HTTPException(status_code=response.status_code, detail="Failed to fetch data from 121 platform") | ||
|
||
data = response.json() | ||
|
||
# Convert JSON to CSV | ||
output = io.StringIO() | ||
writer = csv.writer(output) | ||
|
||
# Ensure we have data to process | ||
if data and 'data' in data and len(data['data']) > 0: | ||
# Get the keys (column names) from the first row | ||
fieldnames = list(data['data'][0].keys()) | ||
|
||
# Write header | ||
writer.writerow(fieldnames) | ||
|
||
# Write rows | ||
for row in data['data']: | ||
# Create a list of values in the same order as fieldnames | ||
row_data = [row.get(field, '') for field in fieldnames] | ||
writer.writerow(row_data) | ||
|
||
csv_content = output.getvalue().encode('utf-8') | ||
|
||
# Prepare the payload for Kobo | ||
base64_encoded_csv = base64.b64encode(csv_content).decode('utf-8') | ||
metadata = json.dumps({"filename": "ValidationDataFrom121.csv"}) | ||
|
||
payload = { | ||
"description": "default", | ||
"file_type": "form_media", | ||
"metadata": metadata, | ||
"base64Encoded": f"data:text/csv;base64,{base64_encoded_csv}" | ||
} | ||
|
||
# Kobo headers | ||
headers = { | ||
"Authorization": f"Token {request.headers['kobotoken']}", | ||
"Content-Type": "application/x-www-form-urlencoded" | ||
} | ||
#If exists, remove existing ValidationDataFrom121.csv | ||
media_response = requests.get( | ||
f"https://kobo.ifrc.org/api/v2/assets/{request.headers['koboasset']}/files/", | ||
headers=headers | ||
) | ||
if media_response.status_code != 200: | ||
raise HTTPException(status_code=response.status_code, detail="Failed to fetch media from kobo") | ||
|
||
media = media_response.json() | ||
|
||
# Check if ValidationDataFrom121.csv exists and get its uid | ||
existing_file_uid = None | ||
for file in media.get('results', []): | ||
if file.get('metadata', {}).get('filename') == "ValidationDataFrom121.csv": | ||
existing_file_uid = file.get('uid') | ||
break | ||
|
||
# If the file exists, delete it | ||
if existing_file_uid: | ||
delete_response = requests.delete( | ||
f"https://kobo.ifrc.org/api/v2/assets/{request.headers['koboasset']}/files/{existing_file_uid}/", | ||
headers={"Authorization": f"Token {request.headers['kobotoken']}"} | ||
) | ||
if delete_response.status_code != 204: | ||
raise HTTPException(status_code=delete_response.status_code, detail="Failed to delete existing file from Kobo") | ||
|
||
|
||
upload_response = requests.post( | ||
f"https://kobo.ifrc.org/api/v2/assets/{request.headers['koboasset']}/files/", | ||
headers=headers, | ||
data=payload | ||
) | ||
|
||
if upload_response.status_code != 201: | ||
raise HTTPException(status_code=upload_response.status_code, detail="Failed to upload file to Kobo") | ||
|
||
# Redeploy the Kobo form | ||
redeploy_url = f"https://kobo.ifrc.org/api/v2/assets/{request.headers['koboasset']}/deployment/" | ||
redeploy_payload = {"active": True} | ||
|
||
redeploy_response = requests.patch( | ||
redeploy_url, | ||
headers=headers, | ||
json=redeploy_payload | ||
) | ||
|
||
if redeploy_response.status_code != 200: | ||
raise HTTPException(status_code=redeploy_response.status_code, detail="Failed to redeploy Kobo form") | ||
|
||
|
||
return {"message": "Validation data prepared and uploaded successfully", "kobo_response": upload_response.json()} | ||
|
||
|
||
############### | ||
|
||
class system(str, Enum): | ||
system_generic = "generic" | ||
system_espo = "espocrm" | ||
system_121 = "121" | ||
|
||
|
||
@router.post("/create-kobo-headers") | ||
async def create_kobo_headers( | ||
json_data: dict, | ||
system: system, | ||
koboassetId: str, | ||
kobotoken: str, | ||
hookId: str = None, | ||
): | ||
"""Utility endpoint to automatically create the necessary headers in Kobo. \n | ||
Does only support the IFRC server kobo.ifrc.org \n | ||
***NB: if you want to duplicate an endpoint, please also use the Hook ID query param*** | ||
""" | ||
|
||
if json_data is None: | ||
raise HTTPException(status_code=400, detail="JSON data is required") | ||
|
||
target_url = f"https://kobo.ifrc.org/api/v2/assets/{koboassetId}/hooks/" | ||
koboheaders = {"Authorization": f"Token {kobotoken}"} | ||
|
||
if hookId is None: | ||
payload = { | ||
"name": "koboconnect", | ||
"endpoint": f"https://kobo-connect.azurewebsites.net/kobo-to-{system}", | ||
"active": True, | ||
"subset_fields": [], | ||
"email_notification": True, | ||
"export_type": "json", | ||
"auth_level": "no_auth", | ||
"settings": {"custom_headers": {}}, | ||
"payload_template": "", | ||
} | ||
|
||
payload["settings"]["custom_headers"] = json_data | ||
else: | ||
get_url = f"https://kobo.ifrc.org/api/v2/assets/{koboassetId}/hooks/{hookId}" | ||
hook = requests.get(get_url, headers=koboheaders) | ||
hook = hook.json() | ||
hook["name"] = "Duplicate of " + hook["name"] | ||
|
||
def remove_keys(data, keys_to_remove): | ||
for key in keys_to_remove: | ||
if key in data: | ||
del data[key] | ||
return data | ||
|
||
keys_to_remove = [ | ||
"url", | ||
"logs_url", | ||
"asset", | ||
"uid", | ||
"success_count", | ||
"failed_count", | ||
"pending_count", | ||
"date_modified", | ||
] | ||
payload = remove_keys(hook, keys_to_remove) | ||
|
||
response = requests.post(target_url, headers=koboheaders, json=payload) | ||
|
||
if response.status_code == 200 or 201: | ||
return JSONResponse(content={"message": "Sucess"}) | ||
else: | ||
return JSONResponse( | ||
content={"message": "Failed to post data to the target endpoint"}, | ||
status_code=response.status_code, | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
import os | ||
from dotenv import load_dotenv | ||
import azure.cosmos.cosmos_client as cosmos_client | ||
|
||
# load environment variables | ||
load_dotenv() | ||
|
||
# initialize CosmosDB | ||
client_ = cosmos_client.CosmosClient( | ||
os.getenv("COSMOS_URL"), | ||
{"masterKey": os.getenv("COSMOS_KEY")}, | ||
user_agent="kobo-connect", | ||
user_agent_overwrite=True, | ||
) | ||
cosmos_db = client_.get_database_client("kobo-connect") | ||
cosmos_container_client = cosmos_db.get_container_client("kobo-submissions") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
import logging | ||
import os | ||
from dotenv import load_dotenv | ||
from opentelemetry._logs import set_logger_provider | ||
from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler | ||
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor | ||
from azure.monitor.opentelemetry.exporter import AzureMonitorLogExporter | ||
|
||
# load environment variables | ||
load_dotenv() | ||
|
||
# Set up logs export to Azure Application Insights | ||
logger_provider = LoggerProvider() | ||
set_logger_provider(logger_provider) | ||
exporter = AzureMonitorLogExporter( | ||
connection_string=os.environ["APPLICATIONINSIGHTS_CONNECTION_STRING"] | ||
) | ||
logger_provider.add_log_record_processor(BatchLogRecordProcessor(exporter)) | ||
|
||
# Attach LoggingHandler to root logger | ||
handler = LoggingHandler() | ||
logging.getLogger().addHandler(handler) | ||
logging.getLogger().setLevel(logging.NOTSET) | ||
logger = logging.getLogger(__name__) | ||
|
||
# Silence noisy loggers | ||
logging.getLogger("requests").setLevel(logging.WARNING) | ||
logging.getLogger("urllib3").setLevel(logging.WARNING) | ||
logging.getLogger("azure").setLevel(logging.WARNING) | ||
logging.getLogger("requests_oauthlib").setLevel(logging.WARNING) | ||
logging.getLogger("asyncio").setLevel(logging.WARNING) | ||
logging.getLogger("opentelemetry").setLevel(logging.ERROR) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,74 @@ | ||
import requests | ||
import unicodedata | ||
from fastapi import HTTPException, Header | ||
from datetime import datetime, timedelta | ||
from utils.logger import logger | ||
|
||
|
||
def clean_text(text): | ||
# Normalize text to remove accents | ||
normalized_text = unicodedata.normalize("NFD", text) | ||
# Remove accents and convert to lowercase | ||
cleaned_text = "".join( | ||
c for c in normalized_text if not unicodedata.combining(c) | ||
).lower() | ||
return cleaned_text | ||
|
||
|
||
def required_headers_121( | ||
url121: str = Header(), username121: str = Header(), password121: str = Header() | ||
): | ||
return url121, username121, password121 | ||
|
||
# Dictionary to store cookies, credentials, and expiration times | ||
cookie121 = {} | ||
|
||
def login121(url121, username, password): | ||
# Check if URL exists in the dictionary | ||
if url121 in cookie121: | ||
cookie_data = cookie121[url121] | ||
# Check if the stored username and password match | ||
if cookie_data['username'] == username and cookie_data['password'] == password: | ||
cookie_expiry = cookie_data['expiry'] | ||
current_time = datetime.utcnow() | ||
|
||
# Check if the cookie is valid for at least 24 more hours | ||
if (cookie_expiry - current_time) >= timedelta(hours=24): | ||
logger.info(f"Using cached cookie for {url121}") | ||
return cookie_data['cookie'] | ||
else: | ||
logger.info(f"Cookie for {url121} is valid for less than 24 hours, refreshing cookie...") | ||
|
||
# Otherwise, request a new cookie | ||
body = {'username': username, 'password': password} | ||
url = f'{url121}/api/users/login' | ||
|
||
try: | ||
login_response = requests.post(url, data=body) | ||
login_response.raise_for_status() | ||
except requests.RequestException as e: | ||
error_message = str(e) | ||
logger.error( | ||
f"Failed: 121 login returned {login_response.status_code} {error_message}", | ||
extra=None, | ||
) | ||
raise HTTPException( | ||
status_code=login_response.status_code, detail=error_message | ||
) | ||
|
||
# Parse the response | ||
response_data = login_response.json() | ||
cookie = response_data['access_token_general'] | ||
|
||
# Store the new cookie, username, password, and expiration time in the dictionary | ||
expiry_datetime = datetime.fromisoformat(response_data['expires'].replace("Z", "")) | ||
|
||
cookie121[url121] = { | ||
'username': username, | ||
'password': password, | ||
'cookie': cookie, | ||
'expiry': expiry_datetime | ||
} | ||
|
||
logger.info(f"New cookie stored for {url121} with credentials.") | ||
return cookie |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
from fastapi import HTTPException, Header | ||
from utils.logger import logger | ||
from utils.utilsKobo import update_submission_status | ||
|
||
def espo_request(submission, espo_client, method, entity, params=None, logs=None): | ||
"""Make a request to EspoCRM. If the request fails, update submission status in CosmosDB.""" | ||
try: | ||
response = espo_client.request(method, entity, params) | ||
return response | ||
except HTTPException as e: | ||
detail = e.detail if "Unknown Error" not in e.detail else "" | ||
logger.error(f"Failed: EspoCRM returned {e.status_code} {detail}", extra=logs) | ||
update_submission_status(submission, "failed", e.detail) | ||
|
||
|
||
def required_headers_espocrm(targeturl: str = Header(), targetkey: str = Header()): | ||
return targeturl, targetkey |