Skip to content

Commit

Permalink
Merge branch 'release/0.2.1'
Browse files Browse the repository at this point in the history
  • Loading branch information
chuwy committed May 3, 2017
2 parents ebb72ce + d970a47 commit d47346f
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 39 deletions.
3 changes: 2 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ python:
- 3.4
- 3.5
install:
pip install flake8
- pip install flake8
- pip install -e .
before_script:
flake8 .
script: python -m pytest --doctest-modules --ignore setup.py
Expand Down
5 changes: 5 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
Version 0.2.1 (2017-05-03)
--------------------------
Block until run manifests table in created state (#20)
List all run ids regardless max-keys (#21)

Version 0.2.0 (2017-04-11)
--------------------------
Add flake linting to CI process (#4)
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

## Overview

The **[Snowplow][snowplow]** Analytics SDK for Python lets you work with **[Snowplow enriched events] [enriched-events]** in your Python event processing and data modeling jobs.
The **[Snowplow][snowplow]** Analytics SDK for Python lets you work with **[Snowplow enriched events][enriched-events]** in your Python event processing and data modeling jobs.

Use this SDK with **[Apache Spark][spark]**, **[AWS Lambda][lambda]**, **[Databricks][databricks]** and other Python-compatible data processing frameworks.

Expand Down
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@
License: Apache License Version 2.0
"""

from distutils.core import setup
from setuptools import setup

setup(
name='snowplow_analytics_sdk',
version='0.2.0',
version='0.2.1',
description='Snowplow Analytics Python SDK',
author='Fred Blundun',
url='https://github.com/snowplow/snowplow-python-analytics-sdk',
Expand Down
103 changes: 68 additions & 35 deletions snowplow_analytics_sdk/run_manifests.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@
language governing permissions and limitations there under.
"""

import sys

from datetime import datetime

from botocore.exceptions import ClientError

DYNAMODB_RUNID_ATTRIBUTE = 'RunId'

Expand Down Expand Up @@ -41,49 +42,67 @@ def create_manifest_table(dynamodb_client, table_name):
dynamodb_client - boto3 DynamoDB client (not service)
table_name - string representing existing table name
"""
dynamodb_client.create_table(
AttributeDefinitions=[
{
'AttributeName': DYNAMODB_RUNID_ATTRIBUTE,
'AttributeType': 'S'
},
],
TableName=table_name,
KeySchema=[
{
'AttributeName': DYNAMODB_RUNID_ATTRIBUTE,
'KeyType': 'HASH'
},
],
ProvisionedThroughput={
'ReadCapacityUnits': 5,
'WriteCapacityUnits': 5
}
)
try:
dynamodb_client.create_table(
AttributeDefinitions=[
{
'AttributeName': DYNAMODB_RUNID_ATTRIBUTE,
'AttributeType': 'S'
},
],
TableName=table_name,
KeySchema=[
{
'AttributeName': DYNAMODB_RUNID_ATTRIBUTE,
'KeyType': 'HASH'
},
],
ProvisionedThroughput={
'ReadCapacityUnits': 5,
'WriteCapacityUnits': 5
}
)
dynamodb_client.get_waiter('table_exists').wait(TableName=table_name)
except ClientError as e:
# Table already exists
if e.response['Error']['Code'] == 'ResourceInUseException':
pass
else:
raise e


def list_runids(s3_client, full_path):
"""Return list of all run ids inside S3 folder
"""Return list of all run ids inside S3 folder. It does not respect
S3 pagination (`MaxKeys`) and returns **all** keys from bucket
Arguments:
s3_client - boto3 S3 client (not service)
full_path - full valid S3 path to events (such as enriched-archive)
example: s3://acme-events-bucket/main-pipeline/enriched-archive
"""
listing_finished = False # last response was not truncated
run_ids_buffer = []
last_continuation_token = None

(bucket, prefix) = split_full_path(full_path)
if prefix is None:
response = s3_client.list_objects_v2(Bucket=bucket, Delimiter='/')
else:
response = s3_client.list_objects_v2(
Bucket=bucket,
Prefix=prefix,
Delimiter='/')
common_prefixes = response.get('CommonPrefixes')
if common_prefixes is None:
return None
else:
run_ids = [extract_run_id(key['Prefix']) for key in common_prefixes]
return [run_id for run_id in run_ids if run_id is not None]

while not listing_finished:
options = clean_dict({
'Bucket': bucket,
'Prefix': prefix,
'Delimiter': '/',
'ContinuationToken': last_continuation_token
})

response = s3_client.list_objects_v2(**options)
keys = [extract_run_id(key['Prefix']) for key in response.get('CommonPrefixes', [])]
run_ids_buffer.extend([key for key in keys if key is not None])
last_continuation_token = response.get('NextContinuationToken', None)

if not response['IsTruncated']:
listing_finished = True

return run_ids_buffer


def split_full_path(path):
Expand All @@ -106,7 +125,7 @@ def split_full_path(path):
parts = path.split('/')
bucket = parts[0]
path = '/'.join(parts[1:])
return (bucket, normalize_prefix(path))
return bucket, normalize_prefix(path)


def extract_run_id(key):
Expand Down Expand Up @@ -146,6 +165,20 @@ def normalize_prefix(path):
return path + '/'


def clean_dict(dict):
"""Remove all keys with Nones as values
>>> clean_dict({'key': None})
{}
>>> clean_dict({'empty_s': ''})
{'empty_s': ''}
"""
if sys.version_info[0] < 3:
return {k: v for k, v in dict.iteritems() if v is not None}
else:
return {k: v for k, v in dict.items() if v is not None}


def add_to_manifest(dynamodb_client, table_name, run_id):
"""Add run_id into DynamoDB manifest table
Expand Down

0 comments on commit d47346f

Please sign in to comment.