diff --git a/.gitignore b/.gitignore index 98ff62c..8c3501f 100644 --- a/.gitignore +++ b/.gitignore @@ -106,4 +106,5 @@ venv.bak/ #idea .idea -*.iml \ No newline at end of file +*.iml +secrets diff --git a/setup.py b/setup.py index e497f68..b62a99d 100644 --- a/setup.py +++ b/setup.py @@ -11,7 +11,7 @@ py_modules=['tap-appstore'], install_requires=[ 'singer-python==5.2.3', - 'appstoreconnect==0.3.0', + 'appstoreconnect==0.9.0', 'pytz==2018.4' ], entry_points=''' diff --git a/tap_appstore/__init__.py b/tap_appstore/__init__.py index b08d30b..8ea47db 100644 --- a/tap_appstore/__init__.py +++ b/tap_appstore/__init__.py @@ -4,6 +4,7 @@ from datetime import timedelta import os import json +from typing import Dict, Union, List import singer from singer import utils, metadata, Transformer @@ -54,6 +55,7 @@ } } + class Context: config = {} state = {} @@ -110,28 +112,33 @@ def load_schemas(): return schemas -def discover(): +def discover(api: Api): raw_schemas = load_schemas() streams = [] - for schema_name, schema in raw_schemas.items(): - # create and add catalog entry - catalog_entry = { - 'stream': schema_name, - 'tap_stream_id': schema_name, - 'schema': schema, - 'key_properties': [] - } - streams.append(catalog_entry) + report_date = datetime.strptime(get_bookmark(schema_name), "%Y-%m-%dT%H:%M:%SZ").strftime("%Y-%m-%d") + filters = get_api_request_fields(report_date, schema_name) + + report = _attempt_download_report(api, filters) + if report: + # create and add catalog entry + catalog_entry = { + 'stream': schema_name, + 'tap_stream_id': schema_name, + 'schema': schema, + 'key_properties': [] + } + streams.append(catalog_entry) + + if len(streams) == 0: + LOGGER.warning("Could not find any reports types to download for the input configuration.") return {'streams': streams} def tsv_to_list(tsv): lines = tsv.split('\n') - header = [s.lower().replace(' ', '_').replace('-','_')\ - for s in lines[0].split('\t')] - + header = [s.lower().replace(' ', '_').replace('-', '_') for s in lines[0].split('\t')] data = [] for line in lines[1:]: if len(line) == 0: @@ -146,25 +153,23 @@ def tsv_to_list(tsv): return data -def get_api_request_fields(report_date,stream_name): +def get_api_request_fields(report_date, stream_name) -> Dict[str, any]: """Get fields to be used in appstore API request """ report_filters = { 'reportDate': report_date, - 'vendorNumber': "{}".format(Context.config['vendor']) - } + 'vendorNumber': f"{Context.config['vendor']}" + } api_fields = API_REQUEST_FIELDS.get(stream_name) - if api_fields == None: - raise Exception('API request fields not set to stream "{}" '.\ - format(stream_name)) + if api_fields is None: + raise Exception(f'API request fields not set to stream "{stream_name}"') else: report_filters.update(API_REQUEST_FIELDS[stream_name]) - - return report_filters + return report_filters -def sync(api): +def sync(api: Api): # Write all schemas and init count to 0 for catalog_entry in Context.catalog['streams']: stream_name = catalog_entry["tap_stream_id"] @@ -176,7 +181,22 @@ def sync(api): query_report(api, catalog_entry) -def query_report(api,catalog_entry): +def _attempt_download_report(api: Api, report_filters: Dict[str, any]) -> Union[List[Dict], None]: + # fetch data from appstore api + try: + rep_tsv = api.download_sales_and_trends_reports(filters=report_filters) + except APIError as e: + LOGGER.error(e) + return None + + # parse api response + if isinstance(rep_tsv, dict): + LOGGER.warning(f"Received a JSON response instead of the report: {rep_tsv}") + else: + return tsv_to_list(rep_tsv) + + +def query_report(api: Api, catalog_entry): stream_name = catalog_entry["tap_stream_id"] stream_schema = catalog_entry['schema'] @@ -194,32 +214,18 @@ def query_report(api,catalog_entry): with Transformer(singer.UNIX_SECONDS_INTEGER_DATETIME_PARSING) as transformer: while iterator + delta <= extraction_time: - iterator_str = iterator.strftime("%Y-%m-%d") - LOGGER.info("Requesting Appstore data for: %s on %s", stream_name, iterator_str) + report_date = iterator.strftime("%Y-%m-%d") + LOGGER.info("Requesting Appstore data for: %s on %s", stream_name, report_date) # setting report filters for each stream - report_filters = get_api_request_fields(iterator_str,stream_name) - - # fetch data from appstore api - try: - rep_tsv = api.download_sales_and_trends_reports(filters= - report_filters) - except APIError as e: - LOGGER.error(e) - break - - # parse api response - if isinstance(rep_tsv, dict): - LOGGER.warning("Received a JSON response instead of the report: %s", str(rep_tsv)) - break - else: - rep = tsv_to_list(rep_tsv) + report_filters = get_api_request_fields(report_date, stream_name) + rep = _attempt_download_report(api, report_filters) # write records for index, line in enumerate(rep, start=1): data = line data['_line_id'] = index data['_time_extracted'] = extraction_time.strftime(TIME_EXTRACTED_FORMAT) - data['_api_report_date'] = iterator_str + data['_api_report_date'] = report_date rec = transformer.transform(data, stream_schema) singer.write_record( @@ -255,9 +261,16 @@ def main(): # Parse command line arguments args = utils.parse_args(REQUIRED_CONFIG_KEYS) + Context.config = args.config + api = Api( + Context.config['key_id'], + Context.config['key_file'], + Context.config['issuer_id'] + ) + # If discover flag was passed, run discovery mode and dump output to stdout if args.discover: - catalog = discover() + catalog = discover(api) Context.config = args.config print(json.dumps(catalog, indent=2)) @@ -266,16 +279,9 @@ def main(): if args.catalog: Context.catalog = args.catalog.to_dict() else: - Context.catalog = discover() + Context.catalog = discover(api) - Context.config = args.config Context.state = args.state - api = Api( - Context.config['key_id'], - Context.config['key_file'], - Context.config['issuer_id'] - ) - sync(api)