diff --git a/datapackage_pipelines_budgetkey/pipelines/budgetkey/emails/fetch_subscriptions.py b/datapackage_pipelines_budgetkey/pipelines/budgetkey/emails/fetch_subscriptions.py new file mode 100644 index 00000000..34ac55be --- /dev/null +++ b/datapackage_pipelines_budgetkey/pipelines/budgetkey/emails/fetch_subscriptions.py @@ -0,0 +1,42 @@ +import os +import sqlalchemy +import itertools + +from datapackage_pipelines.wrapper import ingest, spew +from datapackage_pipelines.utilities.resources import PROP_STREAMING + +if __name__ == '__main__': + params, dp, res_iter = ingest() + + dp['resources'].append( + dict( + name='subs', + path='subs.csv', + schema=dict( + fields=[ + dict(name='email', type='string'), + dict(name='items', type='array') + ] + ) + ) + ) + dp['resources'][0][PROP_STREAMING] = True + + e = sqlalchemy.create_engine(os.environ['PRIVATE_DATABASE_URL']) + r = map(dict, + e.execute(""" +select email, title, url, properties +from items join lists on(items.list_id=lists.id) +join users on(lists.user_id=users.id) +where lists.name='searches' +order by email +""" + )) + + r = (dict(email=email, items=list(items)) + for email, items in itertools.groupby(r, lambda x: x['email'])) + spew(dp, [r]) + + + + diff --git a/datapackage_pipelines_budgetkey/pipelines/budgetkey/emails/pipeline-spec.yaml b/datapackage_pipelines_budgetkey/pipelines/budgetkey/emails/pipeline-spec.yaml new file mode 100644 index 00000000..5634c6ed --- /dev/null +++ b/datapackage_pipelines_budgetkey/pipelines/budgetkey/emails/pipeline-spec.yaml @@ -0,0 +1,7 @@ +emails: + title: Send periodic emails + + pipeline: + - run: fetch_subscriptions + - run: sample + - run: send_emails diff --git a/datapackage_pipelines_budgetkey/pipelines/budgetkey/emails/send_emails.py b/datapackage_pipelines_budgetkey/pipelines/budgetkey/emails/send_emails.py new file mode 100644 index 00000000..312cd83b --- /dev/null +++ b/datapackage_pipelines_budgetkey/pipelines/budgetkey/emails/send_emails.py @@ -0,0 +1,13 @@ +from datapackage_pipelines.wrapper import process + +import logging +import requests + + +def process_row(row, *_): + logging.info('ROW: %r', row) + +if __name__ == '__main__': + process(process_row=process_row) + +