From 7738e61e179e4cc71f14cebc29b488982fe4dbb7 Mon Sep 17 00:00:00 2001 From: Abhishek Mahawar Date: Fri, 4 Mar 2016 10:42:44 +0530 Subject: [PATCH 1/8] Added support for 15 minutes and 30 minutes frequency for pipeline --- dataduct/pipeline/schedule.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/dataduct/pipeline/schedule.py b/dataduct/pipeline/schedule.py index 4909efa..9044a95 100644 --- a/dataduct/pipeline/schedule.py +++ b/dataduct/pipeline/schedule.py @@ -37,6 +37,8 @@ '8-hours': ('8 hours', None), '12-hours': ('12 hours', None), 'one-time': ('15 minutes', 1), + '30-min': ('30 minutes', None), + '15-min': ('15 minutes', None), } From 46ab8ee776e7af6b8667a40aa98f6693bf304ead Mon Sep 17 00:00:00 2001 From: Vivek Gupta Date: Fri, 6 May 2016 15:43:14 -0700 Subject: [PATCH 2/8] On-demand feature addition --- dataduct/etl/etl_pipeline.py | 5 +++ dataduct/pipeline/data_pipeline.py | 2 +- dataduct/pipeline/pipeline_object.py | 52 +++++++++++++++++----------- dataduct/pipeline/schedule.py | 6 ++++ 4 files changed, 44 insertions(+), 21 deletions(-) diff --git a/dataduct/etl/etl_pipeline.py b/dataduct/etl/etl_pipeline.py index 5cd5aed..9fbd170 100644 --- a/dataduct/etl/etl_pipeline.py +++ b/dataduct/etl/etl_pipeline.py @@ -196,9 +196,14 @@ def create_base_objects(self): topic_arn=self.topic_arn, pipeline_name=self.name, ) + if self.frequency == 'on-demand': + scheduleType='ONDEMAND' + else: + scheduleType='cron' self.default = self.create_pipeline_object( object_class=DefaultObject, pipeline_log_uri=self.s3_log_dir, + scheduleType=scheduleType ) @property diff --git a/dataduct/pipeline/data_pipeline.py b/dataduct/pipeline/data_pipeline.py index acc5488..de85cc7 100644 --- a/dataduct/pipeline/data_pipeline.py +++ b/dataduct/pipeline/data_pipeline.py @@ -65,7 +65,7 @@ def aws_format(self): Returns: result(list of dict): list of AWS-readable dict of all objects """ - return [x.aws_format() for x in self.objects] + return [x.aws_format() for x in self.objects if hasattr(x,'fields')] def add_object(self, pipeline_object): """Add an object to the datapipeline diff --git a/dataduct/pipeline/pipeline_object.py b/dataduct/pipeline/pipeline_object.py index 1878532..c19cefb 100644 --- a/dataduct/pipeline/pipeline_object.py +++ b/dataduct/pipeline/pipeline_object.py @@ -8,7 +8,7 @@ from ..s3 import S3Path from ..utils.exceptions import ETLInputError - +scheduleType = '' class PipelineObject(object): """DataPipeline class with steps and metadata. @@ -56,12 +56,15 @@ def s3_files(self): Returns: result(list of S3Files): List of files to be uploaded to s3 """ - result = self.additional_s3_files - for _, values in self.fields.iteritems(): - for value in values: - if isinstance(value, S3File) or isinstance(value, S3Directory): - result.append(value) - return result + if hasattr(self,'additional_s3_files'): + result = self.additional_s3_files + for _, values in self.fields.iteritems(): + for value in values: + if isinstance(value, S3File) or isinstance(value, S3Directory): + result.append(value) + return result + else: + return [] def __getitem__(self, key): """Fetch the items associated with a key @@ -130,16 +133,25 @@ def aws_format(self): result: The AWS-readable dict format of the object """ fields = [] - for key, values in self.fields.iteritems(): - for value in values: - if isinstance(value, PipelineObject): - fields.append({'key': key, 'refValue': value.id}) - elif isinstance(value, S3Path): - fields.append({'key': key, 'stringValue': value.uri}) - elif isinstance(value, S3File) or \ - isinstance(value, S3Directory): - fields.append({'key': key, - 'stringValue': value.s3_path.uri}) - else: - fields.append({'key': key, 'stringValue': str(value)}) - return {'id': self._id, 'name': self._id, 'fields': fields} + global scheduleType + if hasattr(self, 'fields'): + for key, values in self.fields.iteritems(): + for value in values: + if isinstance(value, PipelineObject): + if scheduleType == 'ONDEMAND'and key == 'schedule' : + pass + else: + fields.append({'key': key, 'refValue': value.id}) + elif isinstance(value, S3Path): + fields.append({'key': key, 'stringValue': value.uri}) + elif isinstance(value, S3File) or \ + isinstance(value, S3Directory): + fields.append({'key': key, + 'stringValue': value.s3_path.uri}) + else: + if key == 'scheduleType' and str(value) == 'ONDEMAND': + scheduleType = 'ONDEMAND' + fields.append({'key': key, 'stringValue': str(value)}) + return {'id': self._id, 'name': self._id, 'fields': fields} + else: + return None diff --git a/dataduct/pipeline/schedule.py b/dataduct/pipeline/schedule.py index 4909efa..e756e43 100644 --- a/dataduct/pipeline/schedule.py +++ b/dataduct/pipeline/schedule.py @@ -37,9 +37,11 @@ '8-hours': ('8 hours', None), '12-hours': ('12 hours', None), 'one-time': ('15 minutes', 1), + 'on-demand': ('ondemand',None) } + class Schedule(PipelineObject): """Schedule object added to all pipelines """ @@ -62,6 +64,10 @@ def __init__(self, load_minutes(int): Minutes at which the pipeline should be run **kwargs(optional): Keyword arguments directly passed to base class """ + if frequency == 'on-demand': + logger.debug("Don't create schedule object") + return None + current_time = datetime.utcnow() # Set the defaults for load hour and minutes if load_minutes is None: From bdc9fb1e7ae994eabd692ea6f04578c83f4e1432 Mon Sep 17 00:00:00 2001 From: Vivek Gupta Date: Tue, 10 May 2016 14:11:39 -0700 Subject: [PATCH 3/8] code sanitation --- dataduct/pipeline/schedule.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dataduct/pipeline/schedule.py b/dataduct/pipeline/schedule.py index 0c2818d..a62aa6e 100644 --- a/dataduct/pipeline/schedule.py +++ b/dataduct/pipeline/schedule.py @@ -37,7 +37,7 @@ '8-hours': ('8 hours', None), '12-hours': ('12 hours', None), 'one-time': ('15 minutes', 1), - 'on-demand': ('ondemand',None), + 'on-demand': ('ondemand', None), '30-min': ('30 minutes', None), '15-min': ('15 minutes', None), } @@ -66,7 +66,7 @@ def __init__(self, **kwargs(optional): Keyword arguments directly passed to base class """ if frequency == 'on-demand': - logger.debug("Don't create schedule object") + logger.debug("On-demand schedule required so don't create schedule object") return None current_time = datetime.utcnow() From d21518fd4d9a47cd608d36a5a7a96c5f44fcd5c3 Mon Sep 17 00:00:00 2001 From: Vivek Gupta Date: Tue, 10 May 2016 18:14:06 -0700 Subject: [PATCH 4/8] Fix for issue 226 of coursera/dataduct --- dataduct/etl/etl_pipeline.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/dataduct/etl/etl_pipeline.py b/dataduct/etl/etl_pipeline.py index 9fbd170..66dd2ed 100644 --- a/dataduct/etl/etl_pipeline.py +++ b/dataduct/etl/etl_pipeline.py @@ -481,7 +481,9 @@ def create_steps(self, steps_params, is_bootstrap=False, 'input_path' not in step_param: step_param['input_node'] = input_node - if is_teardown: + # if is_teardown: + ## Instead of just teardown set sns for every step so as to get SNS alerts with error stack trace + if hasattr(self.sns,'fields'): step_param['sns_object'] = self.sns try: From b2c92982adf08d86a58dbd23a4c2faf765db4e5f Mon Sep 17 00:00:00 2001 From: Vivek Gupta Date: Wed, 25 May 2016 07:18:00 +0530 Subject: [PATCH 5/8] SNS all step message toggle and error subject changes --- dataduct/etl/etl_pipeline.py | 10 ++++++---- dataduct/pipeline/sns_alarm.py | 2 +- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/dataduct/etl/etl_pipeline.py b/dataduct/etl/etl_pipeline.py index 66dd2ed..c67907c 100644 --- a/dataduct/etl/etl_pipeline.py +++ b/dataduct/etl/etl_pipeline.py @@ -193,7 +193,7 @@ def create_base_objects(self): else: self.sns = self.create_pipeline_object( object_class=SNSAlarm, - topic_arn=self.topic_arn, + topic_arn=self.topic_arn.replace('all:',''), pipeline_name=self.name, ) if self.frequency == 'on-demand': @@ -481,10 +481,12 @@ def create_steps(self, steps_params, is_bootstrap=False, 'input_path' not in step_param: step_param['input_node'] = input_node - # if is_teardown: - ## Instead of just teardown set sns for every step so as to get SNS alerts with error stack trace if hasattr(self.sns,'fields'): - step_param['sns_object'] = self.sns + if self.topic_arn.startswith("all:"): + ## Instead of just teardown set sns for every step so as to get SNS alerts with error stack trace + step_param['sns_object'] = self.sns + elif is_teardown: + step_param['sns_object'] = self.sns try: step_class = step_param.pop('step_class') diff --git a/dataduct/pipeline/sns_alarm.py b/dataduct/pipeline/sns_alarm.py index e7dfa8a..95fe464 100644 --- a/dataduct/pipeline/sns_alarm.py +++ b/dataduct/pipeline/sns_alarm.py @@ -42,7 +42,7 @@ def __init__(self, 'Error Stack Trace: #{node.errorStackTrace}' ]) - subject = 'Data Pipeline %s failed' % pipeline_name + subject = 'Data Pipeline %s #{node.@status}' % pipeline_name if topic_arn is None: topic_arn = SNS_TOPIC_ARN_FAILURE From 9953f634d0daec53dcd0f2056bd8201d16ab5ab5 Mon Sep 17 00:00:00 2001 From: Abhishek Mahawar Date: Fri, 2 Dec 2016 15:53:22 +0530 Subject: [PATCH 6/8] Fix for ssl issue with where bucket name has dots in it --- dataduct/s3/utils.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/dataduct/s3/utils.py b/dataduct/s3/utils.py index 4086a1d..dd4c400 100644 --- a/dataduct/s3/utils.py +++ b/dataduct/s3/utils.py @@ -12,6 +12,22 @@ CHUNK_SIZE = 5242880 PROGRESS_SECTIONS = 10 +# ---------------------------------------------------------------- +# Fix for ssl issue with where bucket name has dots in it + +try: + import ssl + _old_match_hostname = ssl.match_hostname + + def _new_match_hostname(cert, hostname): + if hostname.endswith('.s3.amazonaws.com'): + pos = hostname.find('.s3.amazonaws.com') + hostname = hostname[:pos].replace('.', '') + hostname[pos:] + return _old_match_hostname(cert, hostname) + + ssl.match_hostname = _new_match_hostname +except Exception: + pass def get_s3_bucket(bucket_name): """Returns an S3 bucket object from boto From 3decf0780d759a139fcb548eb17c8dbac384bc5f Mon Sep 17 00:00:00 2001 From: Igor Dralyuk Date: Tue, 13 Dec 2016 19:22:56 -0800 Subject: [PATCH 7/8] Updated circle.yml --- circle.yml | 56 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 56 insertions(+) create mode 100644 circle.yml diff --git a/circle.yml b/circle.yml new file mode 100644 index 0000000..e831160 --- /dev/null +++ b/circle.yml @@ -0,0 +1,56 @@ +machine: + python: + version: 2.7.12 + +dependencies: + pre: + - sudo apt-get update; sudo apt-get install graphviz + - pip install coveralls + - pip install -r requirements.txt + - mkdir ~/.dataduct + - | + echo " + etl: + ROLE: DataPipelineDefaultRole + RESOURCE_ROLE: DataPipelineDefaultResourceRole + S3_ETL_BUCKET: FILL_ME_IN + ec2: + CORE_INSTANCE_TYPE: m1.large + emr: + CLUSTER_AMI: 2.4.7 + redshift: + DATABASE_NAME: FILL_ME_IN + CLUSTER_ID: FILL_ME_IN + USERNAME: FILL_ME_IN + PASSWORD: FILL_ME_IN + postgres: + DATABASE_NAME: FILL_ME_IN + RDS_INSTANCE_ID: FILL_ME_IN + USERNAME: FILL_ME_IN + PASSWORD: FILL_ME_IN + REGION: FILL_ME_IN + mysql: + DATABASE_KEY: + HOST: FILL_ME_IN + USERNAME: FILL_ME_IN + PASSWORD: FILL_ME_IN" > ~/.dataduct/dataduct.cfg + - | + echo " + [distutils] + index-servers = pypi + [pypi] + repository: $PYPI_REPOSITORY + username: $PYPI_USERNAME + password: $PYPI_PASSWORD" > ~/.pypirc + +test: + override: + - nosetests --with-coverage --cover-package=. --cover-erase + post: + - coveralls + +deployment: + pypi: + branch: /.*/ + commands: + - python setup.py sdist upload -r pypi From cfe5f7068d97d57b9d43d93c7a6682b0cb7a64a7 Mon Sep 17 00:00:00 2001 From: Igor Dralyuk Date: Tue, 13 Dec 2016 19:32:19 -0800 Subject: [PATCH 8/8] Disabled coveralls in circle.yml --- circle.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/circle.yml b/circle.yml index e831160..87b003f 100644 --- a/circle.yml +++ b/circle.yml @@ -46,8 +46,8 @@ dependencies: test: override: - nosetests --with-coverage --cover-package=. --cover-erase - post: - - coveralls +# post: +# - coveralls deployment: pypi: