From 2bdf5739e45a77ef054f54a97627ca3ccc10130c Mon Sep 17 00:00:00 2001 From: tijsziere <39266480+tijsziere@users.noreply.github.com> Date: Fri, 15 Mar 2024 15:43:29 +0100 Subject: [PATCH 1/4] Create 121 Programme from Kobo form (incl. REST service) --- main.py | 592 ++++++++++++++++++++++----------- mappings/kobo121fieldtypes.csv | 17 + 2 files changed, 424 insertions(+), 185 deletions(-) create mode 100644 mappings/kobo121fieldtypes.csv diff --git a/main.py b/main.py index 6ee401a..8bfa881 100644 --- a/main.py +++ b/main.py @@ -7,9 +7,12 @@ from enum import Enum from clients.espo_api_client import EspoAPI import requests +import csv +import pandas as pd +from datetime import datetime import os -from azure.cosmos.exceptions import CosmosResourceExistsError -import azure.cosmos.cosmos_client as cosmos_client +# from azure.cosmos.exceptions import CosmosResourceExistsError +# import azure.cosmos.cosmos_client as cosmos_client from enum import Enum import base64 import logging @@ -24,7 +27,7 @@ logger.addHandler(handler) logging.getLogger("requests").setLevel(logging.WARNING) logging.getLogger("urllib3").setLevel(logging.WARNING) -logging.getLogger("azure").setLevel(logging.WARNING) +# logging.getLogger("azure").setLevel(logging.WARNING) logging.getLogger("requests_oauthlib").setLevel(logging.WARNING) from dotenv import load_dotenv load_dotenv() @@ -46,15 +49,15 @@ }, ) -# initialize CosmosDB -client_ = cosmos_client.CosmosClient( - os.getenv('COSMOS_URL'), - {'masterKey': os.getenv('COSMOS_KEY')}, - user_agent="kobo-connect", - user_agent_overwrite=True -) -cosmos_db = client_.get_database_client('kobo-connect') -cosmos_container_client = cosmos_db.get_container_client('kobo-submissions') +# # initialize CosmosDB +# client_ = cosmos_client.CosmosClient( +# os.getenv('COSMOS_URL'), +# {'masterKey': os.getenv('COSMOS_KEY')}, +# user_agent="kobo-connect", +# user_agent_overwrite=True +# ) +# cosmos_db = client_.get_database_client('kobo-connect') +# cosmos_container_client = cosmos_db.get_container_client('kobo-submissions') @app.get("/", include_in_schema=False) @@ -63,41 +66,41 @@ async def docs_redirect(): return RedirectResponse(url='/docs') -def add_submission(kobo_data): - """Add submission to CosmosDB. If submission already exists and status is pending, raise HTTPException.""" - submission = { - 'id': str(kobo_data['_uuid']), - 'uuid': str(kobo_data['formhub/uuid']), - 'status': 'pending' - } - try: - submission = cosmos_container_client.create_item(body=submission) - except CosmosResourceExistsError: - submission = cosmos_container_client.read_item( - item=str(kobo_data['_uuid']), - partition_key=str(kobo_data['formhub/uuid']), - ) - if submission['status'] == 'pending': - raise HTTPException( - status_code=400, - detail=f"Submission is still being processed." - ) - return submission - - -def update_submission_status(submission, status, error_message=None): - """Update submission status in CosmosDB. If error_message is not none, raise HTTPException.""" - submission['status'] = status - submission['error_message'] = error_message - cosmos_container_client.replace_item( - item=str(submission['id']), - body=submission - ) - if status == 'failed': - raise HTTPException( - status_code=400, - detail=error_message - ) +# def add_submission(kobo_data): +# """Add submission to CosmosDB. If submission already exists and status is pending, raise HTTPException.""" +# submission = { +# 'id': str(kobo_data['_uuid']), +# 'uuid': str(kobo_data['formhub/uuid']), +# 'status': 'pending' +# } +# try: +# submission = cosmos_container_client.create_item(body=submission) +# except CosmosResourceExistsError: +# submission = cosmos_container_client.read_item( +# item=str(kobo_data['_uuid']), +# partition_key=str(kobo_data['formhub/uuid']), +# ) +# if submission['status'] == 'pending': +# raise HTTPException( +# status_code=400, +# detail=f"Submission is still being processed." +# ) +# return submission + + +# def update_submission_status(submission, status, error_message=None): +# """Update submission status in CosmosDB. If error_message is not none, raise HTTPException.""" +# submission['status'] = status +# submission['error_message'] = error_message +# cosmos_container_client.replace_item( +# item=str(submission['id']), +# body=submission +# ) +# if status == 'failed': +# raise HTTPException( +# status_code=400, +# detail=error_message +# ) def get_kobo_attachment(URL, kobo_token): @@ -131,157 +134,157 @@ def clean_kobo_data(kobo_data): return kobo_data_clean -def espo_request(submission, espo_client, method, action, params=None): - """Make a request to EspoCRM. If the request fails, update submission status in CosmosDB.""" - try: - response = espo_client.request(method, action, params) - return response - except HTTPException as e: - update_submission_status(submission, 'failed', e.detail) +# def espo_request(submission, espo_client, method, action, params=None): +# """Make a request to EspoCRM. If the request fails, update submission status in CosmosDB.""" +# try: +# response = espo_client.request(method, action, params) +# return response +# except HTTPException as e: +# update_submission_status(submission, 'failed', e.detail) -def required_headers_espocrm( - targeturl: str = Header(), - targetkey: str = Header()): - return targeturl, targetkey +# def required_headers_espocrm( +# targeturl: str = Header(), +# targetkey: str = Header()): +# return targeturl, targetkey -@app.post("/kobo-to-espocrm") -async def kobo_to_espocrm(request: Request, dependencies=Depends(required_headers_espocrm)): - """Send a Kobo submission to EspoCRM.""" +# @app.post("/kobo-to-espocrm") +# async def kobo_to_espocrm(request: Request, dependencies=Depends(required_headers_espocrm)): +# """Send a Kobo submission to EspoCRM.""" - kobo_data = await request.json() - target_response = {} - - # store the submission uuid and status, to avoid duplicate submissions - submission = add_submission(kobo_data) - if submission['status'] == 'success': - return JSONResponse( - status_code=200, - content={"detail": "Submission has already been successfully processed."} - ) - - kobo_data = clean_kobo_data(kobo_data) - client = EspoAPI(request.headers['targeturl'], request.headers['targetkey']) - attachments = get_attachment_dict(kobo_data) - - # check if records need to be updated - update_record_payload = {} - if 'updaterecordby' in request.headers.keys(): - if 'updaterecordby' in kobo_data.keys(): - if kobo_data['updaterecordby'] != "" and kobo_data['updaterecordby'] is not None: - update_record_entity = request.headers['updaterecordby'].split('.')[0] - update_record_field = request.headers['updaterecordby'].split('.')[1] - update_record_payload[update_record_entity] = { - 'field': update_record_field, - 'value': kobo_data['updaterecordby'] - } - kobo_data.pop('updaterecordby') +# kobo_data = await request.json() +# target_response = {} - # Create API payload body - payload, target_entity = {}, "" - for kobo_field, target_field in request.headers.items(): +# # store the submission uuid and status, to avoid duplicate submissions +# submission = add_submission(kobo_data) +# if submission['status'] == 'success': +# return JSONResponse( +# status_code=200, +# content={"detail": "Submission has already been successfully processed."} +# ) + +# kobo_data = clean_kobo_data(kobo_data) +# client = EspoAPI(request.headers['targeturl'], request.headers['targetkey']) +# attachments = get_attachment_dict(kobo_data) + +# # check if records need to be updated +# update_record_payload = {} +# if 'updaterecordby' in request.headers.keys(): +# if 'updaterecordby' in kobo_data.keys(): +# if kobo_data['updaterecordby'] != "" and kobo_data['updaterecordby'] is not None: +# update_record_entity = request.headers['updaterecordby'].split('.')[0] +# update_record_field = request.headers['updaterecordby'].split('.')[1] +# update_record_payload[update_record_entity] = { +# 'field': update_record_field, +# 'value': kobo_data['updaterecordby'] +# } +# kobo_data.pop('updaterecordby') + +# # Create API payload body +# payload, target_entity = {}, "" +# for kobo_field, target_field in request.headers.items(): - kobo_value, multi, repeat, repeat_no, repeat_question = "", False, False, 0, "" - - # determine if kobo_field is of type multi or repeat - if "multi." in kobo_field: - kobo_field = kobo_field.split(".")[1] - multi = True - if "repeat." in kobo_field: - split = kobo_field.split(".") - kobo_field = split[1] - repeat_no = int(split[2]) - repeat_question = split[3] - repeat = True +# kobo_value, multi, repeat, repeat_no, repeat_question = "", False, False, 0, "" + +# # determine if kobo_field is of type multi or repeat +# if "multi." in kobo_field: +# kobo_field = kobo_field.split(".")[1] +# multi = True +# if "repeat." in kobo_field: +# split = kobo_field.split(".") +# kobo_field = split[1] +# repeat_no = int(split[2]) +# repeat_question = split[3] +# repeat = True - # check if kobo_field is in submission - if kobo_field not in kobo_data.keys(): - continue +# # check if kobo_field is in submission +# if kobo_field not in kobo_data.keys(): +# continue - # check if entity is nested in target_field - if len(target_field.split('.')) == 2: - target_entity = target_field.split('.')[0] - target_field = target_field.split('.')[1] - if target_entity not in payload.keys(): - payload[target_entity] = {} - else: - continue - - # get kobo_value based on kobo_field type - if multi: - kobo_value = kobo_data[kobo_field].split(" ") - elif repeat: - if 0 <= repeat_no < len(kobo_data[kobo_field]): - kobo_data[kobo_field][repeat_no] = clean_kobo_data(kobo_data[kobo_field][repeat_no]) - if repeat_question not in kobo_data[kobo_field][repeat_no].keys(): - continue - kobo_value = kobo_data[kobo_field][repeat_no][repeat_question] - else: - continue - else: - kobo_value = kobo_data[kobo_field] +# # check if entity is nested in target_field +# if len(target_field.split('.')) == 2: +# target_entity = target_field.split('.')[0] +# target_field = target_field.split('.')[1] +# if target_entity not in payload.keys(): +# payload[target_entity] = {} +# else: +# continue + +# # get kobo_value based on kobo_field type +# if multi: +# kobo_value = kobo_data[kobo_field].split(" ") +# elif repeat: +# if 0 <= repeat_no < len(kobo_data[kobo_field]): +# kobo_data[kobo_field][repeat_no] = clean_kobo_data(kobo_data[kobo_field][repeat_no]) +# if repeat_question not in kobo_data[kobo_field][repeat_no].keys(): +# continue +# kobo_value = kobo_data[kobo_field][repeat_no][repeat_question] +# else: +# continue +# else: +# kobo_value = kobo_data[kobo_field] - # process individual field; if it's an attachment, upload it to EspoCRM - kobo_value_url = str(kobo_value).replace(" ", "_") - if kobo_value_url not in attachments.keys(): - payload[target_entity][target_field] = kobo_value - else: - file_url = attachments[kobo_value_url]['url'] - if 'kobotoken' not in request.headers.keys(): - update_submission_status(submission, 'failed', - f"'kobotoken' needs to be specified in headers" - f" to upload attachments to EspoCRM") +# # process individual field; if it's an attachment, upload it to EspoCRM +# kobo_value_url = str(kobo_value).replace(" ", "_") +# if kobo_value_url not in attachments.keys(): +# payload[target_entity][target_field] = kobo_value +# else: +# file_url = attachments[kobo_value_url]['url'] +# if 'kobotoken' not in request.headers.keys(): +# update_submission_status(submission, 'failed', +# f"'kobotoken' needs to be specified in headers" +# f" to upload attachments to EspoCRM") - # encode attachment in base64 - file = get_kobo_attachment(file_url, request.headers['kobotoken']) - file_b64 = base64.b64encode(file).decode("utf8") - # upload attachment to target - attachment_payload = { - "name": kobo_value, - "type": attachments[kobo_value_url]['mimetype'], - "role": "Attachment", - "relatedType": target_entity, - "field": target_field, - "file": f"data:{attachments[kobo_value_url]['mimetype']};base64,{file_b64}" - } - attachment_record = espo_request(submission, client, 'POST', 'Attachment', attachment_payload) - # link field to attachment - payload[target_entity][f"{target_field}Id"] = attachment_record['id'] - - if len(payload) == 0: - update_submission_status(submission, 'failed', - f"No fields found in Kobo submission or" - f" no entities found in headers") +# # encode attachment in base64 +# file = get_kobo_attachment(file_url, request.headers['kobotoken']) +# file_b64 = base64.b64encode(file).decode("utf8") +# # upload attachment to target +# attachment_payload = { +# "name": kobo_value, +# "type": attachments[kobo_value_url]['mimetype'], +# "role": "Attachment", +# "relatedType": target_entity, +# "field": target_field, +# "file": f"data:{attachments[kobo_value_url]['mimetype']};base64,{file_b64}" +# } +# attachment_record = espo_request(submission, client, 'POST', 'Attachment', attachment_payload) +# # link field to attachment +# payload[target_entity][f"{target_field}Id"] = attachment_record['id'] + +# if len(payload) == 0: +# update_submission_status(submission, 'failed', +# f"No fields found in Kobo submission or" +# f" no entities found in headers") - for target_entity in payload.keys(): - logger.info(payload) - if target_entity not in update_record_payload.keys(): - # create new record of target entity - response = espo_request(submission, client, 'POST', target_entity, payload[target_entity]) - else: - # find target record - params = {"where": [{ - "type": "contains", - "attribute": update_record_payload[target_entity]['field'], - "value": update_record_payload[target_entity]['value'] - }]} - records = espo_request(submission, client, 'GET', target_entity, params)['list'] - if len(records) != 1: - update_submission_status(submission, 'failed', - f"Found {len(records)} records of entity {target_entity} " - f"with field {update_record_payload[target_entity]['field']} " - f"equal to {update_record_payload[target_entity]['value']}") - else: - # update target record - response = espo_request(submission, client, 'PUT', f"{target_entity}/{records[0]['id']}", payload[target_entity]) - if 'id' not in response.keys(): - update_submission_status(submission, 'failed', response.content.decode("utf-8")) - else: - target_response[target_entity] = response +# for target_entity in payload.keys(): +# logger.info(payload) +# if target_entity not in update_record_payload.keys(): +# # create new record of target entity +# response = espo_request(submission, client, 'POST', target_entity, payload[target_entity]) +# else: +# # find target record +# params = {"where": [{ +# "type": "contains", +# "attribute": update_record_payload[target_entity]['field'], +# "value": update_record_payload[target_entity]['value'] +# }]} +# records = espo_request(submission, client, 'GET', target_entity, params)['list'] +# if len(records) != 1: +# update_submission_status(submission, 'failed', +# f"Found {len(records)} records of entity {target_entity} " +# f"with field {update_record_payload[target_entity]['field']} " +# f"equal to {update_record_payload[target_entity]['value']}") +# else: +# # update target record +# response = espo_request(submission, client, 'PUT', f"{target_entity}/{records[0]['id']}", payload[target_entity]) +# if 'id' not in response.keys(): +# update_submission_status(submission, 'failed', response.content.decode("utf-8")) +# else: +# target_response[target_entity] = response - update_submission_status(submission, 'success') - return JSONResponse(status_code=200, content=target_response) +# update_submission_status(submission, 'success') +# return JSONResponse(status_code=200, content=target_response) ######################################################################################################################## @@ -434,6 +437,225 @@ def remove_keys(data, keys_to_remove): ######################################################################################################################## +@app.post("/create-121-from-kobo") +async def create_kobo_headers(request: Request, koboToken: str, koboassetId: str, dependencies=Depends(required_headers_121)): + """Utility endpoint to automatically create a 121 Program in 121 from a koboform, including REST Service \n + Does only support the IFRC server kobonew.ifrc.org \n + ***NB: if you want to duplicate an endpoint, please also use the Hook ID query param***""" + + koboUrl = f'https://kobo.ifrc.org/api/v2/assets/{koboassetId}' + koboheaders = {'Authorization': f'Token {koboToken}'} + data_request = requests.get(f'{koboUrl}/?format=json', headers=koboheaders) + if data_request.status_code >= 400: + raise HTTPException( + status_code=data_request.status_code, + detail=data_request.content.decode("utf-8") + ) + data = data_request.json() + + survey = pd.DataFrame(data['content']['survey']) + choices = pd.DataFrame(data['content']['choices']) + + type_mapping = {} + with open('mappings/kobo121fieldtypes.csv', newline='') as csvfile: + reader = csv.reader(csvfile, delimiter='\t') + for row in reader: + if len(row) == 2: + type_mapping[row[0]] = row[1] + + mappingdf = pd.read_csv('mappings/kobo121fieldtypes.csv', delimiter='\t') + + CHECKFIELDS = ['validation', 'phase', 'location', 'ngo', 'language', 'titlePortal', 'description', + 'startDate', 'endDate', 'currency', 'distributionFrequency', 'distributionDuration', 'fixedTransferValue', + 'financialServiceProviders', 'targetNrRegistrations', 'tryWhatsAppFirst', 'phoneNumberPlaceholder', 'aboutProgram', + 'fullnameNamingConvention', 'enableMaxPayments', 'phoneNumber','preferredLanguage','maxPayments','fspName'] + + # First check if all setup fields are in the xlsform + FIELDNAMES = survey["name"].to_list() + MISSINGFIELDS = [] + for checkfield in CHECKFIELDS: + if checkfield not in FIELDNAMES: + MISSINGFIELDS.append(checkfield) + + if len(MISSINGFIELDS) != 0: + print('Missing hidden fields in the template: ', MISSINGFIELDS) + + lookupdict = dict(zip(survey['name'], survey['default'])) + + if 'tags'in survey.columns: + dedupedict = dict(zip(survey['name'], survey['tags'])) + + for key, value in dedupedict.items(): + if isinstance(value, list) and any('dedupe' in item for item in value): + dedupedict[key] = True + else: + dedupedict[key] = False + else: + survey['tags'] = False + dedupedict = dict(zip(survey['name'], survey['tags'])) + + # Create the JSON structure + data = { + "published": True, + "validation": lookupdict['validation'].upper() == 'TRUE', + "phase": lookupdict['phase'], + "location": lookupdict['location'], + "ngo": lookupdict['ngo'], + "titlePortal": { + lookupdict['language']: lookupdict['titlePortal'] + }, + "titlePaApp": { + lookupdict['language']: lookupdict['titlePortal'] + }, + "description": { + "en": "" + }, + "startDate": datetime.strptime(lookupdict['startDate'], '%d/%m/%Y').isoformat(), + "endDate": datetime.strptime(lookupdict['endDate'], '%d/%m/%Y').isoformat(), + "currency": lookupdict['currency'], + "distributionFrequency": lookupdict['distributionFrequency'], + "distributionDuration": int(lookupdict['distributionDuration']), + "fixedTransferValue": int(lookupdict['fixedTransferValue']), + "paymentAmountMultiplierFormula": "", + "financialServiceProviders": [ + { + "fsp": lookupdict['financialServiceProviders'] + } + ], + "targetNrRegistrations": int(lookupdict['targetNrRegistrations']), + "tryWhatsAppFirst": lookupdict['tryWhatsAppFirst'].upper() == 'TRUE', + "phoneNumberPlaceholder": lookupdict['phoneNumberPlaceholder'], + "programCustomAttributes": [], + "programQuestions": [], + "aboutProgram": { + lookupdict['language']: lookupdict['aboutProgram'] + }, + "fullnameNamingConvention": [ + lookupdict['fullnameNamingConvention'] + ], + "languages": [ + lookupdict['language'] + ], + "enableMaxPayments": lookupdict['enableMaxPayments'].upper() == 'TRUE', + "allowEmptyPhoneNumber": False, + "enableScope": False + } + + koboConnectHeader = ['fspName','preferredLanguage','maxPayments'] + + for index, row in survey.iterrows(): + if row['type'].split()[0] in mappingdf['kobotype'].tolist() and row['name'] not in CHECKFIELDS: + koboConnectHeader.append(row['name']) + question = { + "name": row['name'], + "label": { + "en": str(row['label'][0]) + }, + "answerType": type_mapping[row['type'].split()[0]], + "questionType": "standard", + "options": [], + "scoring": {}, + "persistence": True, + "pattern": "", + "phases": [], + "editableInPortal": True, + "export": [ + "all-people-affected", + "included" + ], + "shortLabel": { + "en": row['name'], + }, + "duplicateCheck": dedupedict[row['name']], + "placeholder": "" + } + if type_mapping[row['type'].split()[0]] == 'dropdown': + filtered_df = choices[choices['list_name'] == row['select_from_list_name']] + for index, row in filtered_df.iterrows(): + option = { + "option": row['name'], + "label": { + "en": str(row['label'][0]) + } + } + question["options"].append(option) + data["programQuestions"].append(question) + if row['name'] == 'phoneNumber': + koboConnectHeader.append('phoneNumber') + question = { + "name": 'phoneNumber', + "label": { + "en": 'Phone Number' + }, + "answerType": "tel", + "questionType": "standard", + "options": [], + "scoring": {}, + "persistence": True, + "pattern": "", + "phases": [], + "editableInPortal": True, + "export": [ + "all-people-affected", + "included" + ], + "shortLabel": { + "en": row['name'], + }, + "duplicateCheck": dedupedict[row['name']], + "placeholder": "" + } + data["programQuestions"].append(question) + + # Create program in 121 + body = {'username': {request.headers['username121']}, 'password': {request.headers['password121']}} + url = f"{request.headers['url121']}/api/users/login" + login = requests.post(url, data=body) + if login.status_code >= 400: + raise HTTPException( + status_code=login.status_code, + detail=login.content.decode("utf-8") + ) + access_token = login.json()['access_token_general'] + + # POST to target API + response = requests.post( + f"{request.headers['url121']}/api/programs", + headers={'Cookie': f"access_token_general={access_token}"}, + json=data + ) + if response.status_code >= 400: + raise HTTPException( + status_code=response.status_code, + detail=response.content.decode("utf-8") + ) + + # Create kobo-connect rest service + restServicePayload = { + "name": 'Kobo Connect', + "endpoint": 'https://kobo-connect.azurewebsites.net/kobo-to-121', + "active": True, + "email_notification": True, + "export_type": 'json', + "settings": { + "custom_headers": { + } + } + } + customHeaders = dict(zip(koboConnectHeader, koboConnectHeader)) + restServicePayload['settings']['custom_headers'] = customHeaders + + kobo_response = requests.post(f'{koboUrl}/hooks/', + headers=koboheaders, + json=restServicePayload + ) + + if kobo_response.status_code == 200 or 201: + return JSONResponse(content={"message": "Sucess"}) + else: + return JSONResponse(content={"message": "Failed"}, status_code=response.status_code) + +######################################################################################################################## @app.post("/kobo-to-generic") async def kobo_to_generic(request: Request): diff --git a/mappings/kobo121fieldtypes.csv b/mappings/kobo121fieldtypes.csv new file mode 100644 index 0000000..4ecc70b --- /dev/null +++ b/mappings/kobo121fieldtypes.csv @@ -0,0 +1,17 @@ +kobotype 121type +integer numeric +decimal numeric +range numeric +text text +select_one dropdown +select_multiple text +geopoint text +geotrace text +geoshape text +date text +time text +dateTime text +image text +calculate text +hidden text +tel tel \ No newline at end of file From 660cb2a56825242a04a0d5f26ee019392bcbab06 Mon Sep 17 00:00:00 2001 From: tijsziere <39266480+tijsziere@users.noreply.github.com> Date: Fri, 15 Mar 2024 15:46:30 +0100 Subject: [PATCH 2/4] fix comments --- main.py | 370 ++++++++++++++++++++++++++++---------------------------- 1 file changed, 185 insertions(+), 185 deletions(-) diff --git a/main.py b/main.py index 8bfa881..1eb8a9a 100644 --- a/main.py +++ b/main.py @@ -11,8 +11,8 @@ import pandas as pd from datetime import datetime import os -# from azure.cosmos.exceptions import CosmosResourceExistsError -# import azure.cosmos.cosmos_client as cosmos_client +from azure.cosmos.exceptions import CosmosResourceExistsError +import azure.cosmos.cosmos_client as cosmos_client from enum import Enum import base64 import logging @@ -27,7 +27,7 @@ logger.addHandler(handler) logging.getLogger("requests").setLevel(logging.WARNING) logging.getLogger("urllib3").setLevel(logging.WARNING) -# logging.getLogger("azure").setLevel(logging.WARNING) +logging.getLogger("azure").setLevel(logging.WARNING) logging.getLogger("requests_oauthlib").setLevel(logging.WARNING) from dotenv import load_dotenv load_dotenv() @@ -49,15 +49,15 @@ }, ) -# # initialize CosmosDB -# client_ = cosmos_client.CosmosClient( -# os.getenv('COSMOS_URL'), -# {'masterKey': os.getenv('COSMOS_KEY')}, -# user_agent="kobo-connect", -# user_agent_overwrite=True -# ) -# cosmos_db = client_.get_database_client('kobo-connect') -# cosmos_container_client = cosmos_db.get_container_client('kobo-submissions') +# initialize CosmosDB +client_ = cosmos_client.CosmosClient( + os.getenv('COSMOS_URL'), + {'masterKey': os.getenv('COSMOS_KEY')}, + user_agent="kobo-connect", + user_agent_overwrite=True +) +cosmos_db = client_.get_database_client('kobo-connect') +cosmos_container_client = cosmos_db.get_container_client('kobo-submissions') @app.get("/", include_in_schema=False) @@ -66,41 +66,41 @@ async def docs_redirect(): return RedirectResponse(url='/docs') -# def add_submission(kobo_data): -# """Add submission to CosmosDB. If submission already exists and status is pending, raise HTTPException.""" -# submission = { -# 'id': str(kobo_data['_uuid']), -# 'uuid': str(kobo_data['formhub/uuid']), -# 'status': 'pending' -# } -# try: -# submission = cosmos_container_client.create_item(body=submission) -# except CosmosResourceExistsError: -# submission = cosmos_container_client.read_item( -# item=str(kobo_data['_uuid']), -# partition_key=str(kobo_data['formhub/uuid']), -# ) -# if submission['status'] == 'pending': -# raise HTTPException( -# status_code=400, -# detail=f"Submission is still being processed." -# ) -# return submission - - -# def update_submission_status(submission, status, error_message=None): -# """Update submission status in CosmosDB. If error_message is not none, raise HTTPException.""" -# submission['status'] = status -# submission['error_message'] = error_message -# cosmos_container_client.replace_item( -# item=str(submission['id']), -# body=submission -# ) -# if status == 'failed': -# raise HTTPException( -# status_code=400, -# detail=error_message -# ) +def add_submission(kobo_data): + """Add submission to CosmosDB. If submission already exists and status is pending, raise HTTPException.""" + submission = { + 'id': str(kobo_data['_uuid']), + 'uuid': str(kobo_data['formhub/uuid']), + 'status': 'pending' + } + try: + submission = cosmos_container_client.create_item(body=submission) + except CosmosResourceExistsError: + submission = cosmos_container_client.read_item( + item=str(kobo_data['_uuid']), + partition_key=str(kobo_data['formhub/uuid']), + ) + if submission['status'] == 'pending': + raise HTTPException( + status_code=400, + detail=f"Submission is still being processed." + ) + return submission + + +def update_submission_status(submission, status, error_message=None): + """Update submission status in CosmosDB. If error_message is not none, raise HTTPException.""" + submission['status'] = status + submission['error_message'] = error_message + cosmos_container_client.replace_item( + item=str(submission['id']), + body=submission + ) + if status == 'failed': + raise HTTPException( + status_code=400, + detail=error_message + ) def get_kobo_attachment(URL, kobo_token): @@ -134,157 +134,157 @@ def clean_kobo_data(kobo_data): return kobo_data_clean -# def espo_request(submission, espo_client, method, action, params=None): -# """Make a request to EspoCRM. If the request fails, update submission status in CosmosDB.""" -# try: -# response = espo_client.request(method, action, params) -# return response -# except HTTPException as e: -# update_submission_status(submission, 'failed', e.detail) +def espo_request(submission, espo_client, method, action, params=None): + """Make a request to EspoCRM. If the request fails, update submission status in CosmosDB.""" + try: + response = espo_client.request(method, action, params) + return response + except HTTPException as e: + update_submission_status(submission, 'failed', e.detail) -# def required_headers_espocrm( -# targeturl: str = Header(), -# targetkey: str = Header()): -# return targeturl, targetkey +def required_headers_espocrm( + targeturl: str = Header(), + targetkey: str = Header()): + return targeturl, targetkey -# @app.post("/kobo-to-espocrm") -# async def kobo_to_espocrm(request: Request, dependencies=Depends(required_headers_espocrm)): -# """Send a Kobo submission to EspoCRM.""" +@app.post("/kobo-to-espocrm") +async def kobo_to_espocrm(request: Request, dependencies=Depends(required_headers_espocrm)): + """Send a Kobo submission to EspoCRM.""" -# kobo_data = await request.json() -# target_response = {} - -# # store the submission uuid and status, to avoid duplicate submissions -# submission = add_submission(kobo_data) -# if submission['status'] == 'success': -# return JSONResponse( -# status_code=200, -# content={"detail": "Submission has already been successfully processed."} -# ) + kobo_data = await request.json() + target_response = {} + + # store the submission uuid and status, to avoid duplicate submissions + submission = add_submission(kobo_data) + if submission['status'] == 'success': + return JSONResponse( + status_code=200, + content={"detail": "Submission has already been successfully processed."} + ) -# kobo_data = clean_kobo_data(kobo_data) -# client = EspoAPI(request.headers['targeturl'], request.headers['targetkey']) -# attachments = get_attachment_dict(kobo_data) - -# # check if records need to be updated -# update_record_payload = {} -# if 'updaterecordby' in request.headers.keys(): -# if 'updaterecordby' in kobo_data.keys(): -# if kobo_data['updaterecordby'] != "" and kobo_data['updaterecordby'] is not None: -# update_record_entity = request.headers['updaterecordby'].split('.')[0] -# update_record_field = request.headers['updaterecordby'].split('.')[1] -# update_record_payload[update_record_entity] = { -# 'field': update_record_field, -# 'value': kobo_data['updaterecordby'] -# } -# kobo_data.pop('updaterecordby') - -# # Create API payload body -# payload, target_entity = {}, "" -# for kobo_field, target_field in request.headers.items(): + kobo_data = clean_kobo_data(kobo_data) + client = EspoAPI(request.headers['targeturl'], request.headers['targetkey']) + attachments = get_attachment_dict(kobo_data) + + # check if records need to be updated + update_record_payload = {} + if 'updaterecordby' in request.headers.keys(): + if 'updaterecordby' in kobo_data.keys(): + if kobo_data['updaterecordby'] != "" and kobo_data['updaterecordby'] is not None: + update_record_entity = request.headers['updaterecordby'].split('.')[0] + update_record_field = request.headers['updaterecordby'].split('.')[1] + update_record_payload[update_record_entity] = { + 'field': update_record_field, + 'value': kobo_data['updaterecordby'] + } + kobo_data.pop('updaterecordby') + + # Create API payload body + payload, target_entity = {}, "" + for kobo_field, target_field in request.headers.items(): -# kobo_value, multi, repeat, repeat_no, repeat_question = "", False, False, 0, "" - -# # determine if kobo_field is of type multi or repeat -# if "multi." in kobo_field: -# kobo_field = kobo_field.split(".")[1] -# multi = True -# if "repeat." in kobo_field: -# split = kobo_field.split(".") -# kobo_field = split[1] -# repeat_no = int(split[2]) -# repeat_question = split[3] -# repeat = True + kobo_value, multi, repeat, repeat_no, repeat_question = "", False, False, 0, "" + + # determine if kobo_field is of type multi or repeat + if "multi." in kobo_field: + kobo_field = kobo_field.split(".")[1] + multi = True + if "repeat." in kobo_field: + split = kobo_field.split(".") + kobo_field = split[1] + repeat_no = int(split[2]) + repeat_question = split[3] + repeat = True -# # check if kobo_field is in submission -# if kobo_field not in kobo_data.keys(): -# continue + # check if kobo_field is in submission + if kobo_field not in kobo_data.keys(): + continue -# # check if entity is nested in target_field -# if len(target_field.split('.')) == 2: -# target_entity = target_field.split('.')[0] -# target_field = target_field.split('.')[1] -# if target_entity not in payload.keys(): -# payload[target_entity] = {} -# else: -# continue - -# # get kobo_value based on kobo_field type -# if multi: -# kobo_value = kobo_data[kobo_field].split(" ") -# elif repeat: -# if 0 <= repeat_no < len(kobo_data[kobo_field]): -# kobo_data[kobo_field][repeat_no] = clean_kobo_data(kobo_data[kobo_field][repeat_no]) -# if repeat_question not in kobo_data[kobo_field][repeat_no].keys(): -# continue -# kobo_value = kobo_data[kobo_field][repeat_no][repeat_question] -# else: -# continue -# else: -# kobo_value = kobo_data[kobo_field] + # check if entity is nested in target_field + if len(target_field.split('.')) == 2: + target_entity = target_field.split('.')[0] + target_field = target_field.split('.')[1] + if target_entity not in payload.keys(): + payload[target_entity] = {} + else: + continue + + # get kobo_value based on kobo_field type + if multi: + kobo_value = kobo_data[kobo_field].split(" ") + elif repeat: + if 0 <= repeat_no < len(kobo_data[kobo_field]): + kobo_data[kobo_field][repeat_no] = clean_kobo_data(kobo_data[kobo_field][repeat_no]) + if repeat_question not in kobo_data[kobo_field][repeat_no].keys(): + continue + kobo_value = kobo_data[kobo_field][repeat_no][repeat_question] + else: + continue + else: + kobo_value = kobo_data[kobo_field] -# # process individual field; if it's an attachment, upload it to EspoCRM -# kobo_value_url = str(kobo_value).replace(" ", "_") -# if kobo_value_url not in attachments.keys(): -# payload[target_entity][target_field] = kobo_value -# else: -# file_url = attachments[kobo_value_url]['url'] -# if 'kobotoken' not in request.headers.keys(): -# update_submission_status(submission, 'failed', -# f"'kobotoken' needs to be specified in headers" -# f" to upload attachments to EspoCRM") + # process individual field; if it's an attachment, upload it to EspoCRM + kobo_value_url = str(kobo_value).replace(" ", "_") + if kobo_value_url not in attachments.keys(): + payload[target_entity][target_field] = kobo_value + else: + file_url = attachments[kobo_value_url]['url'] + if 'kobotoken' not in request.headers.keys(): + update_submission_status(submission, 'failed', + f"'kobotoken' needs to be specified in headers" + f" to upload attachments to EspoCRM") -# # encode attachment in base64 -# file = get_kobo_attachment(file_url, request.headers['kobotoken']) -# file_b64 = base64.b64encode(file).decode("utf8") -# # upload attachment to target -# attachment_payload = { -# "name": kobo_value, -# "type": attachments[kobo_value_url]['mimetype'], -# "role": "Attachment", -# "relatedType": target_entity, -# "field": target_field, -# "file": f"data:{attachments[kobo_value_url]['mimetype']};base64,{file_b64}" -# } -# attachment_record = espo_request(submission, client, 'POST', 'Attachment', attachment_payload) -# # link field to attachment -# payload[target_entity][f"{target_field}Id"] = attachment_record['id'] - -# if len(payload) == 0: -# update_submission_status(submission, 'failed', -# f"No fields found in Kobo submission or" -# f" no entities found in headers") + # encode attachment in base64 + file = get_kobo_attachment(file_url, request.headers['kobotoken']) + file_b64 = base64.b64encode(file).decode("utf8") + # upload attachment to target + attachment_payload = { + "name": kobo_value, + "type": attachments[kobo_value_url]['mimetype'], + "role": "Attachment", + "relatedType": target_entity, + "field": target_field, + "file": f"data:{attachments[kobo_value_url]['mimetype']};base64,{file_b64}" + } + attachment_record = espo_request(submission, client, 'POST', 'Attachment', attachment_payload) + # link field to attachment + payload[target_entity][f"{target_field}Id"] = attachment_record['id'] + + if len(payload) == 0: + update_submission_status(submission, 'failed', + f"No fields found in Kobo submission or" + f" no entities found in headers") -# for target_entity in payload.keys(): -# logger.info(payload) -# if target_entity not in update_record_payload.keys(): -# # create new record of target entity -# response = espo_request(submission, client, 'POST', target_entity, payload[target_entity]) -# else: -# # find target record -# params = {"where": [{ -# "type": "contains", -# "attribute": update_record_payload[target_entity]['field'], -# "value": update_record_payload[target_entity]['value'] -# }]} -# records = espo_request(submission, client, 'GET', target_entity, params)['list'] -# if len(records) != 1: -# update_submission_status(submission, 'failed', -# f"Found {len(records)} records of entity {target_entity} " -# f"with field {update_record_payload[target_entity]['field']} " -# f"equal to {update_record_payload[target_entity]['value']}") -# else: -# # update target record -# response = espo_request(submission, client, 'PUT', f"{target_entity}/{records[0]['id']}", payload[target_entity]) -# if 'id' not in response.keys(): -# update_submission_status(submission, 'failed', response.content.decode("utf-8")) -# else: -# target_response[target_entity] = response + for target_entity in payload.keys(): + logger.info(payload) + if target_entity not in update_record_payload.keys(): + # create new record of target entity + response = espo_request(submission, client, 'POST', target_entity, payload[target_entity]) + else: + # find target record + params = {"where": [{ + "type": "contains", + "attribute": update_record_payload[target_entity]['field'], + "value": update_record_payload[target_entity]['value'] + }]} + records = espo_request(submission, client, 'GET', target_entity, params)['list'] + if len(records) != 1: + update_submission_status(submission, 'failed', + f"Found {len(records)} records of entity {target_entity} " + f"with field {update_record_payload[target_entity]['field']} " + f"equal to {update_record_payload[target_entity]['value']}") + else: + # update target record + response = espo_request(submission, client, 'PUT', f"{target_entity}/{records[0]['id']}", payload[target_entity]) + if 'id' not in response.keys(): + update_submission_status(submission, 'failed', response.content.decode("utf-8")) + else: + target_response[target_entity] = response -# update_submission_status(submission, 'success') -# return JSONResponse(status_code=200, content=target_response) + update_submission_status(submission, 'success') + return JSONResponse(status_code=200, content=target_response) ######################################################################################################################## From 55467b275ac8736b9407db5d19e02223eb43e25e Mon Sep 17 00:00:00 2001 From: jmargutti Date: Fri, 15 Mar 2024 16:49:40 +0100 Subject: [PATCH 3/4] bug fixes and formatting --- main.py | 23 +++++++++++++++++------ requirements.txt | 1 + 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/main.py b/main.py index 1eb8a9a..6d89a7a 100644 --- a/main.py +++ b/main.py @@ -437,14 +437,24 @@ def remove_keys(data, keys_to_remove): ######################################################################################################################## -@app.post("/create-121-from-kobo") -async def create_kobo_headers(request: Request, koboToken: str, koboassetId: str, dependencies=Depends(required_headers_121)): + +def required_headers_121_kobo( + url121: str = Header(), + username121: str = Header(), + password121: str = Header(), + kobotoken: str = Header(), + koboasset: str = Header()): + return url121, username121, password121, kobotoken, koboasset + + +@app.post("/create-121-program-from-kobo") +async def create_121_program_from_kobo(request: Request, dependencies=Depends(required_headers_121_kobo)): """Utility endpoint to automatically create a 121 Program in 121 from a koboform, including REST Service \n Does only support the IFRC server kobonew.ifrc.org \n ***NB: if you want to duplicate an endpoint, please also use the Hook ID query param***""" - koboUrl = f'https://kobo.ifrc.org/api/v2/assets/{koboassetId}' - koboheaders = {'Authorization': f'Token {koboToken}'} + koboUrl = f"https://kobo.ifrc.org/api/v2/assets/{request.headers['koboasset']}" + koboheaders = {"Authorization": f"Token {request.headers['kobotoken']}"} data_request = requests.get(f'{koboUrl}/?format=json', headers=koboheaders) if data_request.status_code >= 400: raise HTTPException( @@ -541,7 +551,7 @@ async def create_kobo_headers(request: Request, koboToken: str, koboassetId: str "enableScope": False } - koboConnectHeader = ['fspName','preferredLanguage','maxPayments'] + koboConnectHeader = ['fspName', 'preferredLanguage', 'maxPayments'] for index, row in survey.iterrows(): if row['type'].split()[0] in mappingdf['kobotype'].tolist() and row['name'] not in CHECKFIELDS: @@ -645,7 +655,8 @@ async def create_kobo_headers(request: Request, koboToken: str, koboassetId: str customHeaders = dict(zip(koboConnectHeader, koboConnectHeader)) restServicePayload['settings']['custom_headers'] = customHeaders - kobo_response = requests.post(f'{koboUrl}/hooks/', + kobo_response = requests.post( + f'{koboUrl}/hooks/', headers=koboheaders, json=restServicePayload ) diff --git a/requirements.txt b/requirements.txt index f2d56fa..18ad1fc 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,6 +11,7 @@ idna==3.4 lxml==4.9.3 multidict==6.0.4 openai==0.27.8 +pandas==2.2.1 pypdf==3.12.2 python-docx==0.8.11 python-dotenv==1.0.0 From 94bb0f2f23641ab8e6c2b120aa48ae0d8f1b865c Mon Sep 17 00:00:00 2001 From: jmargutti Date: Fri, 15 Mar 2024 16:53:05 +0100 Subject: [PATCH 4/4] fix dockerfile --- Dockerfile | 1 + main.py | 6 +++--- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/Dockerfile b/Dockerfile index 155b60c..e8851a8 100644 --- a/Dockerfile +++ b/Dockerfile @@ -3,6 +3,7 @@ FROM python:3.9-slim # copy files to the /app folder in the container ADD clients /app/clients +ADD mappings /app/mappings COPY ./main.py /app/main.py COPY ./requirements.txt /app/requirements.txt diff --git a/main.py b/main.py index 6d89a7a..8c3b0df 100644 --- a/main.py +++ b/main.py @@ -450,7 +450,7 @@ def required_headers_121_kobo( @app.post("/create-121-program-from-kobo") async def create_121_program_from_kobo(request: Request, dependencies=Depends(required_headers_121_kobo)): """Utility endpoint to automatically create a 121 Program in 121 from a koboform, including REST Service \n - Does only support the IFRC server kobonew.ifrc.org \n + Does only support the IFRC server kobo.ifrc.org \n ***NB: if you want to duplicate an endpoint, please also use the Hook ID query param***""" koboUrl = f"https://kobo.ifrc.org/api/v2/assets/{request.headers['koboasset']}" @@ -493,7 +493,7 @@ async def create_121_program_from_kobo(request: Request, dependencies=Depends(re lookupdict = dict(zip(survey['name'], survey['default'])) if 'tags'in survey.columns: - dedupedict = dict(zip(survey['name'], survey['tags'])) + dedupedict = dict(zip(survey['name'], survey['tags'])) for key, value in dedupedict.items(): if isinstance(value, list) and any('dedupe' in item for item in value): @@ -502,7 +502,7 @@ async def create_121_program_from_kobo(request: Request, dependencies=Depends(re dedupedict[key] = False else: survey['tags'] = False - dedupedict = dict(zip(survey['name'], survey['tags'])) + dedupedict = dict(zip(survey['name'], survey['tags'])) # Create the JSON structure data = {