Skip to content

Commit

Permalink
Merge pull request #1 from Muriloo/master
Browse files Browse the repository at this point in the history
adding support to multiple streams and other minor tweaks
  • Loading branch information
justedro authored Oct 6, 2020
2 parents 1420303 + 5ee32e7 commit 82ede05
Show file tree
Hide file tree
Showing 7 changed files with 857 additions and 210 deletions.
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from setuptools import setup

setup(name='tap-appstore',
version='0.0.1',
version='0.2.0',
description='Singer.io tap for extracting data from the App Store Connect API',
author='JustEdro',
url='https://github.com/JustEdro',
Expand All @@ -12,7 +12,7 @@
install_requires=[
'singer-python==5.2.3',
'appstoreconnect==0.3.0',
'pytz'
'pytz==2018.4'
],
entry_points='''
[console_scripts]
Expand Down
100 changes: 76 additions & 24 deletions tap_appstore/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@
import json

import singer
from singer import utils, Transformer
from singer import metadata
from singer import utils, metadata, Transformer

from appstoreconnect import Api
from appstoreconnect.api import APIError
import pytz

REQUIRED_CONFIG_KEYS = [
Expand All @@ -19,12 +19,40 @@
'vendor',
'start_date'
]

STATE = {}

LOGGER = singer.get_logger()

BOOKMARK_DATE_FORMAT = '%Y-%m-%dT%H:%M:%SZ'

TIME_EXTRACTED_FORMAT = '%Y-%m-%dT%H:%M:%S%z'

API_REQUEST_FIELDS = {
'subscription_event_report': {
'reportType': 'SUBSCRIPTION_EVENT',
'frequency': 'DAILY',
'reportSubType': 'SUMMARY',
'version': '1_2'
},
'subscriber_report': {
'reportType': 'SUBSCRIBER',
'frequency': 'DAILY',
'reportSubType': 'DETAILED',
'version': '1_2'
},
'subscription_report': {
'reportType': 'SUBSCRIPTION',
'frequency': 'DAILY',
'reportSubType': 'SUMMARY',
'version': '1_2'
},
'sales_report': {
'reportType': 'SALES',
'frequency': 'DAILY',
'reportSubType': 'SUMMARY',
'version': '1_0'
}
}

class Context:
config = {}
Expand Down Expand Up @@ -92,13 +120,7 @@ def discover():
'stream': schema_name,
'tap_stream_id': schema_name,
'schema': schema,
# TODO Events may have a different key property than this. Change
# if it's appropriate.
'key_properties': [
'line_id', # artificial
'begin_date',
'end_date'
]
'key_properties': []
}
streams.append(catalog_entry)

Expand All @@ -107,7 +129,8 @@ def discover():

def tsv_to_list(tsv):
lines = tsv.split('\n')
header = [s.lower().replace(' ', '_') for s in lines[0].split('\t')]
header = [s.lower().replace(' ', '_').replace('-','_')\
for s in lines[0].split('\t')]

data = []
for line in lines[1:]:
Expand All @@ -123,6 +146,24 @@ def tsv_to_list(tsv):
return data


def get_api_request_fields(report_date,stream_name):
"""Get fields to be used in appstore API request """
report_filters = {
'reportDate': report_date,
'vendorNumber': "{}".format(Context.config['vendor'])
}

api_fields = API_REQUEST_FIELDS.get(stream_name)
if api_fields == None:
raise Exception('API request fields not set to stream "{}" '.\
format(stream_name))
else:
report_filters.update(API_REQUEST_FIELDS[stream_name])

return report_filters



def sync(api):
# Write all schemas and init count to 0
for catalog_entry in Context.catalog['streams']:
Expand All @@ -132,20 +173,17 @@ def sync(api):
Context.new_counts[stream_name] = 0
Context.updated_counts[stream_name] = 0

query_report(api)
query_report(api, catalog_entry)


def query_report(api):
stream_name = 'summary_sales_report'
catalog_entry = Context.get_catalog_entry(stream_name)
def query_report(api,catalog_entry):
stream_name = catalog_entry["tap_stream_id"]
stream_schema = catalog_entry['schema']

# bookmark = datetime.fromisoformat(get_bookmark(stream_name)).replace(tzinfo=pytz.UTC)
bookmark = datetime.strptime(get_bookmark(stream_name), "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=pytz.UTC)
# get bookmark from when data will be pulled
bookmark = datetime.strptime(get_bookmark(stream_name), "%Y-%m-%dT%H:%M:%SZ").astimezone()
delta = timedelta(days=1)

extraction_time = singer.utils.now()

extraction_time = singer.utils.now().astimezone()
iterator = bookmark
singer.write_bookmark(
Context.state,
Expand All @@ -156,18 +194,32 @@ def query_report(api):

with Transformer(singer.UNIX_SECONDS_INTEGER_DATETIME_PARSING) as transformer:
while iterator + delta <= extraction_time:

iterator_str = iterator.strftime("%Y-%m-%d")
rep_tsv = api.sales_report('SALES', 'SUMMARY', 'DAILY', Context.config['vendor'], iterator_str, '1_0')
LOGGER.info("Requesting Appstore data for: %s on %s", stream_name, iterator_str)
# setting report filters for each stream
report_filters = get_api_request_fields(iterator_str,stream_name)

# fetch data from appstore api
try:
rep_tsv = api.download_sales_and_trends_reports(filters=
report_filters)
except APIError as e:
LOGGER.error(e)
break

# parse api response
if isinstance(rep_tsv, dict):
LOGGER.warning("Received a JSON response instead of the report: %s", str(rep_tsv))
break
else:
rep = tsv_to_list(rep_tsv)

# write records
for index, line in enumerate(rep, start=1):
data = line
data['line_id'] = index
data['_line_id'] = index
data['_time_extracted'] = extraction_time.strftime(TIME_EXTRACTED_FORMAT)
data['_api_report_date'] = iterator_str
rec = transformer.transform(data, stream_schema)

singer.write_record(
Expand Down Expand Up @@ -206,6 +258,7 @@ def main():
# If discover flag was passed, run discovery mode and dump output to stdout
if args.discover:
catalog = discover()
Context.config = args.config
print(json.dumps(catalog, indent=2))

else:
Expand All @@ -217,7 +270,6 @@ def main():

Context.config = args.config
Context.state = args.state

api = Api(
Context.config['key_id'],
Context.config['key_file'],
Expand Down
Loading

0 comments on commit 82ede05

Please sign in to comment.