Skip to content

Commit

Permalink
Implement probing discovery (#1)
Browse files Browse the repository at this point in the history
  • Loading branch information
sherifnada authored Feb 24, 2021
1 parent 82ede05 commit 08e4fe8
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 53 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -106,4 +106,5 @@ venv.bak/

#idea
.idea
*.iml
*.iml
secrets
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
py_modules=['tap-appstore'],
install_requires=[
'singer-python==5.2.3',
'appstoreconnect==0.3.0',
'appstoreconnect==0.9.0',
'pytz==2018.4'
],
entry_points='''
Expand Down
108 changes: 57 additions & 51 deletions tap_appstore/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from datetime import timedelta
import os
import json
from typing import Dict, Union, List

import singer
from singer import utils, metadata, Transformer
Expand Down Expand Up @@ -54,6 +55,7 @@
}
}


class Context:
config = {}
state = {}
Expand Down Expand Up @@ -110,28 +112,33 @@ def load_schemas():
return schemas


def discover():
def discover(api: Api):
raw_schemas = load_schemas()
streams = []

for schema_name, schema in raw_schemas.items():
# create and add catalog entry
catalog_entry = {
'stream': schema_name,
'tap_stream_id': schema_name,
'schema': schema,
'key_properties': []
}
streams.append(catalog_entry)
report_date = datetime.strptime(get_bookmark(schema_name), "%Y-%m-%dT%H:%M:%SZ").strftime("%Y-%m-%d")
filters = get_api_request_fields(report_date, schema_name)

report = _attempt_download_report(api, filters)
if report:
# create and add catalog entry
catalog_entry = {
'stream': schema_name,
'tap_stream_id': schema_name,
'schema': schema,
'key_properties': []
}
streams.append(catalog_entry)

if len(streams) == 0:
LOGGER.warning("Could not find any reports types to download for the input configuration.")

return {'streams': streams}


def tsv_to_list(tsv):
lines = tsv.split('\n')
header = [s.lower().replace(' ', '_').replace('-','_')\
for s in lines[0].split('\t')]

header = [s.lower().replace(' ', '_').replace('-', '_') for s in lines[0].split('\t')]
data = []
for line in lines[1:]:
if len(line) == 0:
Expand All @@ -146,25 +153,23 @@ def tsv_to_list(tsv):
return data


def get_api_request_fields(report_date,stream_name):
def get_api_request_fields(report_date, stream_name) -> Dict[str, any]:
"""Get fields to be used in appstore API request """
report_filters = {
'reportDate': report_date,
'vendorNumber': "{}".format(Context.config['vendor'])
}
'vendorNumber': f"{Context.config['vendor']}"
}

api_fields = API_REQUEST_FIELDS.get(stream_name)
if api_fields == None:
raise Exception('API request fields not set to stream "{}" '.\
format(stream_name))
if api_fields is None:
raise Exception(f'API request fields not set to stream "{stream_name}"')
else:
report_filters.update(API_REQUEST_FIELDS[stream_name])

return report_filters

return report_filters


def sync(api):
def sync(api: Api):
# Write all schemas and init count to 0
for catalog_entry in Context.catalog['streams']:
stream_name = catalog_entry["tap_stream_id"]
Expand All @@ -176,7 +181,22 @@ def sync(api):
query_report(api, catalog_entry)


def query_report(api,catalog_entry):
def _attempt_download_report(api: Api, report_filters: Dict[str, any]) -> Union[List[Dict], None]:
# fetch data from appstore api
try:
rep_tsv = api.download_sales_and_trends_reports(filters=report_filters)
except APIError as e:
LOGGER.error(e)
return None

# parse api response
if isinstance(rep_tsv, dict):
LOGGER.warning(f"Received a JSON response instead of the report: {rep_tsv}")
else:
return tsv_to_list(rep_tsv)


def query_report(api: Api, catalog_entry):
stream_name = catalog_entry["tap_stream_id"]
stream_schema = catalog_entry['schema']

Expand All @@ -194,32 +214,18 @@ def query_report(api,catalog_entry):

with Transformer(singer.UNIX_SECONDS_INTEGER_DATETIME_PARSING) as transformer:
while iterator + delta <= extraction_time:
iterator_str = iterator.strftime("%Y-%m-%d")
LOGGER.info("Requesting Appstore data for: %s on %s", stream_name, iterator_str)
report_date = iterator.strftime("%Y-%m-%d")
LOGGER.info("Requesting Appstore data for: %s on %s", stream_name, report_date)
# setting report filters for each stream
report_filters = get_api_request_fields(iterator_str,stream_name)

# fetch data from appstore api
try:
rep_tsv = api.download_sales_and_trends_reports(filters=
report_filters)
except APIError as e:
LOGGER.error(e)
break

# parse api response
if isinstance(rep_tsv, dict):
LOGGER.warning("Received a JSON response instead of the report: %s", str(rep_tsv))
break
else:
rep = tsv_to_list(rep_tsv)
report_filters = get_api_request_fields(report_date, stream_name)
rep = _attempt_download_report(api, report_filters)

# write records
for index, line in enumerate(rep, start=1):
data = line
data['_line_id'] = index
data['_time_extracted'] = extraction_time.strftime(TIME_EXTRACTED_FORMAT)
data['_api_report_date'] = iterator_str
data['_api_report_date'] = report_date
rec = transformer.transform(data, stream_schema)

singer.write_record(
Expand Down Expand Up @@ -255,9 +261,16 @@ def main():
# Parse command line arguments
args = utils.parse_args(REQUIRED_CONFIG_KEYS)

Context.config = args.config
api = Api(
Context.config['key_id'],
Context.config['key_file'],
Context.config['issuer_id']
)

# If discover flag was passed, run discovery mode and dump output to stdout
if args.discover:
catalog = discover()
catalog = discover(api)
Context.config = args.config
print(json.dumps(catalog, indent=2))

Expand All @@ -266,16 +279,9 @@ def main():
if args.catalog:
Context.catalog = args.catalog.to_dict()
else:
Context.catalog = discover()
Context.catalog = discover(api)

Context.config = args.config
Context.state = args.state
api = Api(
Context.config['key_id'],
Context.config['key_file'],
Context.config['issuer_id']
)

sync(api)


Expand Down

0 comments on commit 08e4fe8

Please sign in to comment.