From 3e963143196eac9b690b04ff822a64616f468068 Mon Sep 17 00:00:00 2001 From: Renan Butkeraites Date: Wed, 25 Sep 2024 19:56:58 -0300 Subject: [PATCH] Paralellize streams sync --- tap_salesforce/__init__.py | 335 ++++++++++++++++++++++--------------- tap_salesforce/sync.py | 273 +++++++++++++++--------------- 2 files changed, 339 insertions(+), 269 deletions(-) diff --git a/tap_salesforce/__init__.py b/tap_salesforce/__init__.py index 58545259..940a6275 100644 --- a/tap_salesforce/__init__.py +++ b/tap_salesforce/__init__.py @@ -12,6 +12,10 @@ from tap_salesforce.salesforce.exceptions import ( TapSalesforceException, TapSalesforceQuotaExceededException, TapSalesforceBulkAPIDisabledException) +import multiprocessing +from functools import partial + + LOGGER = singer.get_logger() REQUIRED_CONFIG_KEYS = ['refresh_token', @@ -52,40 +56,39 @@ def get_replication_key(sobject_name, fields): def stream_is_selected(mdata): return mdata.get((), {}).get('selected', False) -def build_state(raw_state, catalog): +def build_state(raw_state, catalog_entry): state = {} - for catalog_entry in catalog['streams']: - tap_stream_id = catalog_entry['tap_stream_id'] - catalog_metadata = metadata.to_map(catalog_entry['metadata']) - replication_method = catalog_metadata.get((), {}).get('replication-method') - - version = singer.get_bookmark(raw_state, - tap_stream_id, - 'version') - - # Preserve state that deals with resuming an incomplete bulk job - if singer.get_bookmark(raw_state, tap_stream_id, 'JobID'): - job_id = singer.get_bookmark(raw_state, tap_stream_id, 'JobID') - batches = singer.get_bookmark(raw_state, tap_stream_id, 'BatchIDs') - current_bookmark = singer.get_bookmark(raw_state, tap_stream_id, 'JobHighestBookmarkSeen') - state = singer.write_bookmark(state, tap_stream_id, 'JobID', job_id) - state = singer.write_bookmark(state, tap_stream_id, 'BatchIDs', batches) - state = singer.write_bookmark(state, tap_stream_id, 'JobHighestBookmarkSeen', current_bookmark) - - if replication_method == 'INCREMENTAL': - replication_key = catalog_metadata.get((), {}).get('replication-key') - replication_key_value = singer.get_bookmark(raw_state, - tap_stream_id, - replication_key) - if version is not None: - state = singer.write_bookmark( - state, tap_stream_id, 'version', version) - if replication_key_value is not None: - state = singer.write_bookmark( - state, tap_stream_id, replication_key, replication_key_value) - elif replication_method == 'FULL_TABLE' and version is None: - state = singer.write_bookmark(state, tap_stream_id, 'version', version) + tap_stream_id = catalog_entry['tap_stream_id'] + catalog_metadata = metadata.to_map(catalog_entry['metadata']) + replication_method = catalog_metadata.get((), {}).get('replication-method') + + version = singer.get_bookmark(raw_state, + tap_stream_id, + 'version') + + # Preserve state that deals with resuming an incomplete bulk job + if singer.get_bookmark(raw_state, tap_stream_id, 'JobID'): + job_id = singer.get_bookmark(raw_state, tap_stream_id, 'JobID') + batches = singer.get_bookmark(raw_state, tap_stream_id, 'BatchIDs') + current_bookmark = singer.get_bookmark(raw_state, tap_stream_id, 'JobHighestBookmarkSeen') + state = singer.write_bookmark(state, tap_stream_id, 'JobID', job_id) + state = singer.write_bookmark(state, tap_stream_id, 'BatchIDs', batches) + state = singer.write_bookmark(state, tap_stream_id, 'JobHighestBookmarkSeen', current_bookmark) + + if replication_method == 'INCREMENTAL': + replication_key = catalog_metadata.get((), {}).get('replication-key') + replication_key_value = singer.get_bookmark(raw_state, + tap_stream_id, + replication_key) + if version is not None: + state = singer.write_bookmark( + state, tap_stream_id, 'version', version) + if replication_key_value is not None: + state = singer.write_bookmark( + state, tap_stream_id, replication_key, replication_key_value) + elif replication_method == 'FULL_TABLE' and version is None: + state = singer.write_bookmark(state, tap_stream_id, 'version', version) return state @@ -397,7 +400,7 @@ def do_discover(sf): result = {'streams': entries} json.dump(result, sys.stdout, indent=4) -def do_sync(sf, catalog, state,config=None): +def do_sync(sf, catalog_entry, state, catalog,config=None): input_state = state.copy() starting_stream = state.get("current_stream") @@ -405,112 +408,131 @@ def do_sync(sf, catalog, state,config=None): LOGGER.info("Resuming sync from %s", starting_stream) else: LOGGER.info("Starting sync") - catalog = prepare_reports_streams(catalog) - # Set ListView as first stream to sync to avoid issues with replication-keys - list_view = [c for c in catalog["streams"] if c["stream"]=="ListView"] - catalog["streams"] = [c for c in catalog["streams"] if c["stream"]!="ListView"] - catalog["streams"] = list_view + catalog["streams"] + stream_version = get_stream_version(catalog_entry, state) + stream = catalog_entry['stream'] + stream_alias = catalog_entry.get('stream_alias') + stream_name = catalog_entry["tap_stream_id"].replace("/","_") + activate_version_message = singer.ActivateVersionMessage( + stream=(stream_alias or stream.replace("/","_")), version=stream_version) - # Sync Streams - for catalog_entry in catalog["streams"]: - stream_version = get_stream_version(catalog_entry, state) - stream = catalog_entry['stream'] - stream_alias = catalog_entry.get('stream_alias') - stream_name = catalog_entry["tap_stream_id"].replace("/","_") - activate_version_message = singer.ActivateVersionMessage( - stream=(stream_alias or stream.replace("/","_")), version=stream_version) + catalog_metadata = metadata.to_map(catalog_entry['metadata']) + replication_key = catalog_metadata.get((), {}).get('replication-key') - catalog_metadata = metadata.to_map(catalog_entry['metadata']) - replication_key = catalog_metadata.get((), {}).get('replication-key') + mdata = metadata.to_map(catalog_entry['metadata']) - mdata = metadata.to_map(catalog_entry['metadata']) + if not stream_is_selected(mdata): + LOGGER.info("%s: Skipping - not selected", stream_name) + return - if not stream_is_selected(mdata): - LOGGER.info("%s: Skipping - not selected", stream_name) - continue - - if starting_stream: - if starting_stream == stream_name: - LOGGER.info("%s: Resuming", stream_name) - starting_stream = None - else: - LOGGER.info("%s: Skipping - already synced", stream_name) - continue - else: - LOGGER.info("%s: Starting", stream_name) - - state["current_stream"] = stream_name - singer.write_state(state) - key_properties = metadata.to_map(catalog_entry['metadata']).get((), {}).get('table-key-properties') - singer.write_schema( - stream.replace("/","_"), - catalog_entry['schema'], - key_properties, - replication_key, - stream_alias) - - job_id = singer.get_bookmark(state, catalog_entry['tap_stream_id'], 'JobID') - if job_id: - with metrics.record_counter(stream) as counter: - LOGGER.info("Found JobID from previous Bulk Query. Resuming sync for job: %s", job_id) - # Resuming a sync should clear out the remaining state once finished - counter = resume_syncing_bulk_query(sf, catalog_entry, job_id, state, counter) - LOGGER.info("%s: Completed sync (%s rows)", stream_name, counter.value) - # Remove Job info from state once we complete this resumed query. One of a few cases could have occurred: - # 1. The job succeeded, in which case make JobHighestBookmarkSeen the new bookmark - # 2. The job partially completed, in which case make JobHighestBookmarkSeen the new bookmark, or - # existing bookmark if no bookmark exists for the Job. - # 3. The job completely failed, in which case maintain the existing bookmark, or None if no bookmark - state.get('bookmarks', {}).get(catalog_entry['tap_stream_id'], {}).pop('JobID', None) - state.get('bookmarks', {}).get(catalog_entry['tap_stream_id'], {}).pop('BatchIDs', None) - bookmark = state.get('bookmarks', {}).get(catalog_entry['tap_stream_id'], {}) \ - .pop('JobHighestBookmarkSeen', None) - existing_bookmark = state.get('bookmarks', {}).get(catalog_entry['tap_stream_id'], {}) \ - .pop(replication_key, None) - state = singer.write_bookmark( - state, - catalog_entry['tap_stream_id'], - replication_key, - bookmark or existing_bookmark) # If job is removed, reset to existing bookmark or None - singer.write_state(state) + if starting_stream: + if starting_stream == stream_name: + LOGGER.info("%s: Resuming", stream_name) + starting_stream = None else: - # Tables with a replication_key or an empty bookmark will emit an - # activate_version at the beginning of their sync - bookmark_is_empty = state.get('bookmarks', {}).get( - catalog_entry['tap_stream_id']) is None - - if "/" in state["current_stream"]: - # get current name - old_key = state["current_stream"] - # get the new key name - new_key = old_key.replace("/","_") - state["current_stream"] = new_key - - catalog_entry['tap_stream_id'] = catalog_entry['tap_stream_id'].replace("/","_") - if replication_key or bookmark_is_empty: - singer.write_message(activate_version_message) - state = singer.write_bookmark(state, - catalog_entry['tap_stream_id'], - 'version', - stream_version) - counter = sync_stream(sf, catalog_entry, state, input_state, catalog,config) + LOGGER.info("%s: Skipping - already synced", stream_name) + return + else: + LOGGER.info("%s: Starting", stream_name) + + state["current_stream"] = stream_name + singer.write_state(state) + key_properties = metadata.to_map(catalog_entry['metadata']).get((), {}).get('table-key-properties') + singer.write_schema( + stream.replace("/","_"), + catalog_entry['schema'], + key_properties, + replication_key, + stream_alias) + + job_id = singer.get_bookmark(state, catalog_entry['tap_stream_id'], 'JobID') + if job_id: + with metrics.record_counter(stream) as counter: + LOGGER.info("Found JobID from previous Bulk Query. Resuming sync for job: %s", job_id) + # Resuming a sync should clear out the remaining state once finished + counter = resume_syncing_bulk_query(sf, catalog_entry, job_id, state, counter) LOGGER.info("%s: Completed sync (%s rows)", stream_name, counter.value) + # Remove Job info from state once we complete this resumed query. One of a few cases could have occurred: + # 1. The job succeeded, in which case make JobHighestBookmarkSeen the new bookmark + # 2. The job partially completed, in which case make JobHighestBookmarkSeen the new bookmark, or + # existing bookmark if no bookmark exists for the Job. + # 3. The job completely failed, in which case maintain the existing bookmark, or None if no bookmark + state.get('bookmarks', {}).get(catalog_entry['tap_stream_id'], {}).pop('JobID', None) + state.get('bookmarks', {}).get(catalog_entry['tap_stream_id'], {}).pop('BatchIDs', None) + bookmark = state.get('bookmarks', {}).get(catalog_entry['tap_stream_id'], {}) \ + .pop('JobHighestBookmarkSeen', None) + existing_bookmark = state.get('bookmarks', {}).get(catalog_entry['tap_stream_id'], {}) \ + .pop(replication_key, None) + state = singer.write_bookmark( + state, + catalog_entry['tap_stream_id'], + replication_key, + bookmark or existing_bookmark) # If job is removed, reset to existing bookmark or None + singer.write_state(state) + else: + # Tables with a replication_key or an empty bookmark will emit an + # activate_version at the beginning of their sync + bookmark_is_empty = state.get('bookmarks', {}).get( + catalog_entry['tap_stream_id']) is None + + if "/" in state["current_stream"]: + # get current name + old_key = state["current_stream"] + # get the new key name + new_key = old_key.replace("/","_") + state["current_stream"] = new_key + + catalog_entry['tap_stream_id'] = catalog_entry['tap_stream_id'].replace("/","_") + if replication_key or bookmark_is_empty: + singer.write_message(activate_version_message) + state = singer.write_bookmark(state, + catalog_entry['tap_stream_id'], + 'version', + stream_version) + counter = sync_stream(sf, catalog_entry, state, input_state, catalog, config) + LOGGER.info("%s: Completed sync (%s rows)", stream_name, counter.value) state["current_stream"] = None singer.write_state(state) LOGGER.info("Finished sync") +def process_catalog_entry(catalog_entry, sf_data, state, catalog, config): + # Reinitialize Salesforce object in the child process using parent's session + sf = Salesforce( + refresh_token=sf_data['refresh_token'], # Still keep refresh_token + sf_client_id=sf_data['client_id'], + sf_client_secret=sf_data['client_secret'], + quota_percent_total=sf_data.get('quota_percent_total'), + quota_percent_per_run=sf_data.get('quota_percent_per_run'), + is_sandbox=sf_data.get('is_sandbox'), + select_fields_by_default=sf_data.get('select_fields_by_default'), + default_start_date=sf_data.get('start_date'), + api_type=sf_data.get('api_type'), + list_reports=sf_data.get('list_reports'), + list_views=sf_data.get('list_views'), + api_version=sf_data.get('api_version') + ) + + # No need to log in again; set the session directly + sf.access_token = sf_data['access_token'] + sf.instance_url = sf_data['instance_url'] + + state = {key: value for key, value in build_state(state, catalog_entry).items()} + LOGGER.info(f"Processing stream: {catalog_entry}") + do_sync(sf, catalog_entry, state, catalog, config) + + def main_impl(): args = singer_utils.parse_args(REQUIRED_CONFIG_KEYS) CONFIG.update(args.config) - sf = None is_sandbox = ( CONFIG.get("base_uri") == "https://test.salesforce.com" if CONFIG.get("base_uri") else CONFIG.get("is_sandbox") ) + CONFIG["is_sandbox"] = is_sandbox + try: sf = Salesforce( refresh_token=CONFIG['refresh_token'], @@ -525,27 +547,66 @@ def main_impl(): list_reports=CONFIG.get('list_reports'), list_views=CONFIG.get('list_views'), api_version=CONFIG.get('api_version') - ) + ) sf.login() + if sf.login_timer: + sf.login_timer.cancel() # Ensure the login timer is cancelled if needed + except Exception as e: + raise e + + if not sf: + return + + if args.discover: + do_discover(sf) + return + + if not args.properties: + return + + catalog = prepare_reports_streams(args.properties) + + list_view = [c for c in catalog["streams"] if c["stream"] == "ListView"] + catalog["streams"] = [c for c in catalog["streams"] if c["stream"] != "ListView"] + catalog["streams"] = list_view + catalog["streams"] + + # Create a dictionary with session details to pass to child processes + sf_data = { + 'access_token': sf.access_token, + 'instance_url': sf.instance_url, + 'refresh_token': CONFIG['refresh_token'], + 'client_id': CONFIG['client_id'], + 'client_secret': CONFIG['client_secret'], + 'quota_percent_total': CONFIG.get('quota_percent_total'), + 'quota_percent_per_run': CONFIG.get('quota_percent_per_run'), + 'is_sandbox': is_sandbox, + 'select_fields_by_default': CONFIG.get('select_fields_by_default'), + 'start_date': CONFIG.get('start_date'), + 'api_type': CONFIG.get('api_type'), + 'list_reports': CONFIG.get('list_reports'), + 'list_views': CONFIG.get('list_views'), + 'api_version': CONFIG.get('api_version'), + } - if args.discover: - do_discover(sf) - elif args.properties: - catalog = args.properties - state = build_state(args.state, catalog) - do_sync(sf, catalog, state,CONFIG) - finally: - if sf: - if sf.rest_requests_attempted > 0: - LOGGER.debug( - "This job used %s REST requests towards the Salesforce quota.", - sf.rest_requests_attempted) - if sf.jobs_completed > 0: - LOGGER.debug( - "Replication used %s Bulk API jobs towards the Salesforce quota.", - sf.jobs_completed) - if sf.login_timer: - sf.login_timer.cancel() + # Use multiprocessing to process the catalog entries in parallel + with multiprocessing.Manager() as manager: + managed_state = manager.dict(args.state) # Shared state + + # Create a partial function with shared session and config + process_func = partial(process_catalog_entry, sf_data=sf_data, state=managed_state, catalog=catalog, config=CONFIG) + + # Parallel execution using multiprocessing.Pool + with multiprocessing.Pool(processes=8) as pool: + pool.map(process_func, catalog["streams"]) + + if sf.rest_requests_attempted > 0: + LOGGER.debug( + "This job used %s REST requests towards the Salesforce quota.", + sf.rest_requests_attempted) + if sf.jobs_completed > 0: + LOGGER.debug( + "Replication used %s Bulk API jobs towards the Salesforce quota.", + sf.jobs_completed) def prepare_reports_streams(catalog): streams = catalog["streams"] diff --git a/tap_salesforce/sync.py b/tap_salesforce/sync.py index 08210674..92f9586d 100644 --- a/tap_salesforce/sync.py +++ b/tap_salesforce/sync.py @@ -248,128 +248,63 @@ def sync_records(sf, catalog_entry, state, input_state, counter, catalog,config= singer_utils.strftime(chunked_bookmark)) if catalog_entry["stream"].startswith("Report_"): - report_name = catalog_entry["stream"].split("Report_", 1)[1] + sync_report_streams(sf, catalog_entry, stream, schema, stream_alias, stream_version, start_time) + return + if "ListViews" == catalog_entry["stream"]: + sync_list_views_stream(sf, catalog_entry, state, input_state, catalog, replication_key, start_time) + return - reports = [] - done = False - headers = sf._get_standard_headers() - endpoint = "queryAll" - params = {'q': 'SELECT Id,DeveloperName FROM Report'} - url = sf.data_url.format(sf.instance_url, endpoint) - - while not done: - response = sf._make_request('GET', url, headers=headers, params=params) - response_json = response.json() - done = response_json.get("done") - reports.extend(response_json.get("records", [])) - if not done: - url = sf.instance_url+response_json.get("nextRecordsUrl") - - report = [r for r in reports if report_name==r["DeveloperName"]][0] - report_id = report["Id"] - - endpoint = f"analytics/reports/{report_id}" - url = sf.data_url.format(sf.instance_url, endpoint) - response = sf._make_request('GET', url, headers=headers) + query_response = sf.query(catalog_entry, state) + if catalog_entry["stream"] in ACTIVITY_STREAMS: + start_date_str = sf.get_start_date(state, catalog_entry) + start_date = singer_utils.strptime_with_tz(start_date_str) + start_date = singer_utils.strftime(start_date) - with Transformer(pre_hook=transform_bulk_data_hook) as transformer: - rec = transformer.transform(response.json(), schema) - rec = fix_record_anytype(rec, schema) - stream = stream.replace("/","_") - singer.write_message( - singer.RecordMessage( - stream=( - stream_alias or stream), - record=rec, - version=stream_version, - time_extracted=start_time)) - - elif "ListViews" == catalog_entry["stream"]: - headers = sf._get_standard_headers() - endpoint = "queryAll" - - params = {'q': f'SELECT Name,Id,SobjectType,DeveloperName FROM ListView'} - url = sf.data_url.format(sf.instance_url, endpoint) - response = sf._make_request('GET', url, headers=headers, params=params) - - Id_Sobject = [{"Id":r["Id"],"SobjectType": r["SobjectType"],"DeveloperName":r["DeveloperName"],"Name":r["Name"]} - for r in response.json().get('records',[]) if r["Name"]] - - selected_lists_names = [] - for ln in catalog_entry.get("metadata",[])[:-1]: - if ln.get("metadata",[])['selected']: - selected_list = ln.get('breadcrumb',[])[1] - for isob in Id_Sobject: - if selected_list==f"ListView_{isob['SobjectType']}_{isob['DeveloperName']}": - selected_lists_names.append(isob) - - replication_key_value = replication_key and singer_utils.strptime_with_tz(rec[replication_key]) - - for list_info in selected_lists_names: - - sobject = list_info['SobjectType'] - lv_name = list_info['DeveloperName'] - lv_id = list_info['Id'] - - lv_catalog = [x for x in catalog["streams"] if x["stream"] == sobject] + selected_properties = sf._get_selected_properties(catalog_entry) - if lv_catalog: - lv_catalog_entry = lv_catalog[0].copy() - try: - handle_ListView(sf,lv_id,sobject,lv_name,lv_catalog_entry,state,input_state,start_time) - except RequestException as e: - LOGGER.warning(f"No existing /'results/' endpoint was found for SobjectType:{sobject}, Id:{lv_id}") + query_map = { + "ActivityHistory": "ActivityHistories", + "OpenActivity": "OpenActivities" + } - else: - if catalog_entry["stream"] in ACTIVITY_STREAMS: - start_date_str = sf.get_start_date(state, catalog_entry) - start_date = singer_utils.strptime_with_tz(start_date_str) - start_date = singer_utils.strftime(start_date) + query_field = query_map[catalog_entry['stream']] - selected_properties = sf._get_selected_properties(catalog_entry) + query = "SELECT {} FROM {}".format(",".join(selected_properties), query_field) + query = f"SELECT ({query}) FROM Contact" - query_map = { - "ActivityHistory": "ActivityHistories", - "OpenActivity": "OpenActivities" - } + catalog_metadata = metadata.to_map(catalog_entry['metadata']) + replication_key = catalog_metadata.get((), {}).get('replication-key') - query_field = query_map[catalog_entry['stream']] + order_by = "" + if replication_key: + where_clause = " WHERE {} > {} ".format( + replication_key, + start_date) + order_by = " ORDER BY {} ASC".format(replication_key) + query = query + where_clause + order_by - query = "SELECT {} FROM {}".format(",".join(selected_properties), query_field) - query = f"SELECT ({query}) FROM Contact" + def unwrap_query(query_response, query_field): + for q in query_response: + if q.get(query_field): + for f in q[query_field]["records"]: + yield f - catalog_metadata = metadata.to_map(catalog_entry['metadata']) - replication_key = catalog_metadata.get((), {}).get('replication-key') + query_response = sf.query(catalog_entry, state, query_override=query) + query_response = unwrap_query(query_response, query_field) - order_by = "" - if replication_key: - where_clause = " WHERE {} > {} ".format( - replication_key, - start_date) - order_by = " ORDER BY {} ASC".format(replication_key) - query = query + where_clause + order_by - - def unwrap_query(query_response, query_field): - for q in query_response: - if q.get(query_field): - for f in q[query_field]["records"]: - yield f - - query_response = sf.query(catalog_entry, state, query_override=query) - query_response = unwrap_query(query_response, query_field) - else: - query_response = sf.query(catalog_entry, state) + sync_others(sf, catalog_entry, state, input_state, counter, catalog, download_files, chunked_bookmark, stream, schema, stream_alias, replication_key, stream_version, start_time, query_response) - for rec in query_response: - counter.increment() - with Transformer(pre_hook=transform_bulk_data_hook) as transformer: - rec = transformer.transform(rec, schema) - rec = fix_record_anytype(rec, schema) - if stream=='ContentVersion': - if "IsLatest" in rec: - if rec['IsLatest']==True and download_files==True: - rec['TextPreview'] = base64.b64encode(get_content_document_file(sf,rec['Id'])).decode('utf-8') - singer.write_message( +def sync_others(sf, catalog_entry, state, input_state, counter, catalog, download_files, chunked_bookmark, stream, schema, stream_alias, replication_key, stream_version, start_time, query_response): + for rec in query_response: + counter.increment() + with Transformer(pre_hook=transform_bulk_data_hook) as transformer: + rec = transformer.transform(rec, schema) + rec = fix_record_anytype(rec, schema) + if stream=='ContentVersion': + if "IsLatest" in rec: + if rec['IsLatest']==True and download_files==True: + rec['TextPreview'] = base64.b64encode(get_content_document_file(sf,rec['Id'])).decode('utf-8') + singer.write_message( singer.RecordMessage( stream=( stream_alias or stream), @@ -377,41 +312,115 @@ def unwrap_query(query_response, query_field): version=stream_version, time_extracted=start_time)) - replication_key_value = replication_key and singer_utils.strptime_with_tz(rec[replication_key]) + replication_key_value = replication_key and singer_utils.strptime_with_tz(rec[replication_key]) - if sf.pk_chunking: - if replication_key_value and replication_key_value <= start_time and replication_key_value > chunked_bookmark: + if sf.pk_chunking: + if replication_key_value and replication_key_value <= start_time and replication_key_value > chunked_bookmark: # Replace the highest seen bookmark and save the state in case we need to resume later - chunked_bookmark = singer_utils.strptime_with_tz(rec[replication_key]) - state = singer.write_bookmark( + chunked_bookmark = singer_utils.strptime_with_tz(rec[replication_key]) + state = singer.write_bookmark( state, catalog_entry['tap_stream_id'], 'JobHighestBookmarkSeen', singer_utils.strftime(chunked_bookmark)) - singer.write_state(state) + singer.write_state(state) # Before writing a bookmark, make sure Salesforce has not given us a # record with one outside our range - elif replication_key_value and replication_key_value <= start_time: - state = singer.write_bookmark( + elif replication_key_value and replication_key_value <= start_time: + state = singer.write_bookmark( state, catalog_entry['tap_stream_id'], replication_key, rec[replication_key]) - singer.write_state(state) + singer.write_state(state) - selected = get_selected_streams(catalog) - if stream == "ListView" and rec.get("SobjectType") in selected and rec.get("Id") is not None: + selected = get_selected_streams(catalog) + if stream == "ListView" and rec.get("SobjectType") in selected and rec.get("Id") is not None: # Handle listview - try: - sobject = rec["SobjectType"] - lv_name = rec["DeveloperName"] - lv_catalog = [x for x in catalog["streams"] if x["stream"] == sobject] - rec_id = rec["Id"] - lv_catalog_entry = lv_catalog[0].copy() - if len(lv_catalog) > 0: - handle_ListView(sf,rec_id,sobject,lv_name,lv_catalog_entry,state,input_state,start_time) - except RequestException as e: - pass + try: + sobject = rec["SobjectType"] + lv_name = rec["DeveloperName"] + lv_catalog = [x for x in catalog["streams"] if x["stream"] == sobject] + rec_id = rec["Id"] + lv_catalog_entry = lv_catalog[0].copy() + if len(lv_catalog) > 0: + handle_ListView(sf,rec_id,sobject,lv_name,lv_catalog_entry,state,input_state,start_time) + except RequestException as e: + pass + +def sync_list_views_stream(sf, catalog_entry, state, input_state, catalog, replication_key, start_time): + headers = sf._get_standard_headers() + endpoint = "queryAll" + + params = {'q': f'SELECT Name,Id,SobjectType,DeveloperName FROM ListView'} + url = sf.data_url.format(sf.instance_url, endpoint) + response = sf._make_request('GET', url, headers=headers, params=params) + + Id_Sobject = [{"Id":r["Id"],"SobjectType": r["SobjectType"],"DeveloperName":r["DeveloperName"],"Name":r["Name"]} + for r in response.json().get('records',[]) if r["Name"]] + + selected_lists_names = [] + for ln in catalog_entry.get("metadata",[])[:-1]: + if ln.get("metadata",[])['selected']: + selected_list = ln.get('breadcrumb',[])[1] + for isob in Id_Sobject: + if selected_list==f"ListView_{isob['SobjectType']}_{isob['DeveloperName']}": + selected_lists_names.append(isob) + + replication_key_value = replication_key and singer_utils.strptime_with_tz(rec[replication_key]) + + for list_info in selected_lists_names: + sobject = list_info['SobjectType'] + lv_name = list_info['DeveloperName'] + lv_id = list_info['Id'] + + lv_catalog = [x for x in catalog["streams"] if x["stream"] == sobject] + + if lv_catalog: + lv_catalog_entry = lv_catalog[0].copy() + try: + handle_ListView(sf,lv_id,sobject,lv_name,lv_catalog_entry,state,input_state,start_time) + except RequestException as e: + LOGGER.warning(f"No existing /'results/' endpoint was found for SobjectType:{sobject}, Id:{lv_id}") + +def sync_report_streams(sf, catalog_entry, stream, schema, stream_alias, stream_version, start_time): + report_name = catalog_entry["stream"].split("Report_", 1)[1] + + reports = [] + done = False + headers = sf._get_standard_headers() + endpoint = "queryAll" + params = {'q': 'SELECT Id,DeveloperName FROM Report'} + url = sf.data_url.format(sf.instance_url, endpoint) + + while not done: + response = sf._make_request('GET', url, headers=headers, params=params) + response_json = response.json() + reports.extend(response_json.get("records", [])) + done = response_json.get("done") + if not done: + url = sf.instance_url+response_json.get("nextRecordsUrl") + + report = [r for r in reports if report_name==r["DeveloperName"]][0] + report_id = report["Id"] + + endpoint = f"analytics/reports/{report_id}" + url = sf.data_url.format(sf.instance_url, endpoint) + response = sf._make_request('GET', url, headers=headers) + + with Transformer(pre_hook=transform_bulk_data_hook) as transformer: + rec = transformer.transform(response.json(), schema) + rec = fix_record_anytype(rec, schema) + stream = stream.replace("/","_") + singer.write_message( + singer.RecordMessage( + stream=( + stream_alias or stream), + record=rec, + version=stream_version, + time_extracted=start_time)) + + return stream,rec def fix_record_anytype(rec, schema):