Skip to content

Commit

Permalink
Merge pull request #1 from rearc-data/adu-md5_compare
Browse files Browse the repository at this point in the history
Adds md5 comparison to keep from re-publishing non-modified files
aduyko authored Sep 3, 2020

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
2 parents 6a39176 + 4609a0e commit d2b3725
Showing 5 changed files with 154 additions and 11 deletions.
4 changes: 2 additions & 2 deletions pre-processing/pre-processing-cfn.yaml
Original file line number Diff line number Diff line change
@@ -85,7 +85,7 @@ Resources:
Type: AWS::Events::Rule
Properties:
Description: "ScheduledRule"
ScheduleExpression: "cron(0 10,22 * * ? *)"
ScheduleExpression: "cron(0 4,10,16,22 * * ? *)"
State: "ENABLED"
Targets:
-
@@ -116,4 +116,4 @@ Outputs:
- Arn
LambdaFunctionName:
Value:
Ref: LambdaFunction
Ref: LambdaFunction
6 changes: 5 additions & 1 deletion pre-processing/pre-processing-code/lambda_function.py
Original file line number Diff line number Diff line change
@@ -140,4 +140,8 @@ def lambda_handler(event, context):
'body': json.dumps('Revision did not complete successfully')
}
else:
raise Exception('Something went wrong when uploading files to s3')
print("No changes found, no Revision created")
return {
'statusCode': 200,
'body': json.dumps('MD5 comparison found no changes, nothing to publish')
}
46 changes: 46 additions & 0 deletions pre-processing/pre-processing-code/s3_md5_compare.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Compare the md5 of a file to the s3 etag md5
# Source: li xin on StackOverflow
# https://stackoverflow.com/questions/1775816/how-to-get-the-md5sum-of-a-file-on-amazons-s3

import hashlib
import botocore.exceptions

def md5_checksum(filename):
m = hashlib.md5()
with open(filename, 'rb') as f:
for data in iter(lambda: f.read(1024 * 1024), b''):
m.update(data)
return m.hexdigest()

def etag_checksum(filename, chunk_size=8 * 1024 * 1024):
md5s = []
with open(filename, 'rb') as f:
for data in iter(lambda: f.read(chunk_size), b''):
md5s.append(hashlib.md5(data).digest())
m = hashlib.md5(b"".join(md5s))
return '{}-{}'.format(m.hexdigest(), len(md5s))

def etag_compare(filename, etag):
et = etag[1:-1] # strip quotes
if '-' in et and et == etag_checksum(filename):
return False
if '-' not in et and et == md5_checksum(filename):
return False
return True

def md5_compare(s3, bucket_name, s3_key, filename):
#Get the file metadata from s3
#If the file does not exist, return True for changes found
try:
obj_dict = s3.head_object(Bucket=bucket_name, Key=s3_key)
except botocore.exceptions.ClientError as e:
error_code = e.response['Error']['Code']
if error_code == '404':
return True

etag = (obj_dict['ETag'])

md5_matches = etag_compare(filename,etag)

return md5_matches

28 changes: 20 additions & 8 deletions pre-processing/pre-processing-code/source_data.py
Original file line number Diff line number Diff line change
@@ -5,6 +5,7 @@
from urllib.request import urlopen
from urllib.error import URLError, HTTPError
from multiprocessing.dummy import Pool
from s3_md5_compare import md5_compare

def data_to_s3(endpoint):

@@ -24,8 +25,8 @@ def data_to_s3(endpoint):

else:
data_set_name = os.environ['DATA_SET_NAME']
new_s3_key = data_set_name + '/dataset/'
filename = data_set_name + endpoint
new_s3_key = data_set_name + '/dataset/' + filename
file_location = '/tmp/' + filename

with open(file_location + '.gz', 'wb') as f:
@@ -42,15 +43,17 @@ def data_to_s3(endpoint):
s3_bucket = os.environ['S3_BUCKET']
s3 = boto3.client('s3')

s3.upload_file(file_location, s3_bucket, new_s3_key + filename)

print('Uploaded: ' + filename)
has_changes = md5_compare(s3, s3_bucket, new_s3_key, file_location)
if has_changes:
s3.upload_file(file_location, s3_bucket, new_s3_key)
print('Uploaded: ' + filename)

# deletes to preserve limited space in aws lamdba
os.remove(file_location)

# dicts to be used to add assets to the dataset revision
return {'Bucket': s3_bucket, 'Key': new_s3_key + filename}
asset_source = {'Bucket': s3_bucket, 'Key': new_s3_key}
return {'has_changes': has_changes, 'asset_source': asset_source}

def source_dataset():

@@ -63,10 +66,19 @@ def source_dataset():
'_oceania.json',
'_africa.json'
]
asset_list = []

# multithreading speed up accessing data, making lambda run quicker
with (Pool(6)) as p:
asset_list = p.map(data_to_s3, endpoints)

s3_uploads = p.map(data_to_s3, endpoints)

# If any of the data has changed, we need to republish the adx product
count_updated_data = sum(
upload['has_changes'] == True for upload in s3_uploads)
if count_updated_data > 0:
asset_list = list(
map(lambda upload: upload['asset_source'], s3_uploads))
if len(asset_list) == 0:
raise Exception('Something went wrong when uploading files to s3')
# asset_list is returned to be used in lamdba_handler function
return asset_list
return asset_list
81 changes: 81 additions & 0 deletions update.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
#!/usr/bin/env bash

# Exit on error. Append "|| true" if you expect an error.
set -o errexit
# Exit on error inside any functions or subshells.
set -o errtrace
# Do not allow use of undefined vars. Use ${VAR:-} to use an undefined VAR
#set -o nounset
# Catch the error in case mysqldump fails (but gzip succeeds) in `mysqldump |gzip`
set -o pipefail
# Turn on traces, useful while debugging but commented out by default
# set -o xtrace

# Sets profile variable to an empty value by default, reassigns in while loop below if it was included as a parameter
PROFILE=""

while [[ $# -gt 0 ]]; do
opt="${1}"
shift;
current_arg="$1"
case ${opt} in
"-d"|"--dataset-name") export DATASET_NAME="$1"; shift;;
"-r"|"--region") export REGION="$1"; shift;;
"-f"|"--profile") PROFILE=" --profile $1"; shift;;
*) echo "ERROR: Invalid option: \""$opt"\"" >&2; exit 1;;
esac
done

while [[ ${#DATASET_NAME} -gt 53 ]]; do
echo "dataset-name must be under 53 characters in length, enter a shorter name:"
read -p "New dataset-name: " DATASET_NAME
case ${#DATASET_NAME} in
[1-9]|[1-4][0-9]|5[0-3]) break;;
* ) echo "Enter in a shorter dataset-name";;
esac
done

#get existing cloudformation stack
echo "getting existing CFN parameters"
CFN_STACK_NAME="producer-${DATASET_NAME}-preprocessing"
while read parameter_name parameter_value; do
echo "$parameter_name: $parameter_value"
case ${parameter_name} in
"S3Bucket") export S3_BUCKET="$parameter_value";;
"DataSetArn") export DATASET_ARN="$parameter_value";;
"ProductId") export PRODUCT_ID="$parameter_value";;
#Ignore these two because they were set manually already
"Region");;
"DataSetName");;
*) echo "ERROR: Invalid parameter found: \""$parameter_name"\", please update manually" >&2; exit 1;;
esac
done < <(aws cloudformation describe-stacks --stack-name $CFN_STACK_NAME --query 'Stacks[0].Parameters' --output text$PROFILE)

#creating a pre-processing zip package, these commands may need to be adjusted depending on folder structure and dependencies
(cd pre-processing/pre-processing-code && zip -r pre-processing-code.zip . -x "*.dist-info/*" -x "bin/*" -x "**/__pycache__/*")

#upload pre-preprocessing.zip to s3
echo "uploading pre-preprocessing.zip to s3"
aws s3 cp pre-processing/pre-processing-code/pre-processing-code.zip s3://$S3_BUCKET/$DATASET_NAME/automation/pre-processing-code.zip --region $REGION$PROFILE

#invoking the pre-processing lambda function to create first dataset revision
echo "updating the pre-processing lambda function code"
LAMBDA_FUNCTION_NAME="source-for-${DATASET_NAME}"
# AWS CLI version 2 changes require explicitly declairing `--cli-binary-format raw-in-base64-out` for the format of the `--payload`
aws lambda update-function-code --function-name $LAMBDA_FUNCTION_NAME --s3-bucket $S3_BUCKET --s3-key $DATASET_NAME/automation/pre-processing-code.zip$PROFILE
echo "updated lambda function code to use latest pre-processing.zip"

#updating pre-processing cloudformation stack
echo "updating pre-processing cloudformation stack"
CFN_STACK_NAME="producer-${DATASET_NAME}-preprocessing"
aws cloudformation update-stack --stack-name $CFN_STACK_NAME --template-body file://pre-processing/pre-processing-cfn.yaml --parameters ParameterKey=S3Bucket,ParameterValue=$S3_BUCKET ParameterKey=DataSetName,ParameterValue=$DATASET_NAME ParameterKey=DataSetArn,ParameterValue=$DATASET_ARN ParameterKey=ProductId,ParameterValue=$PRODUCT_ID ParameterKey=Region,ParameterValue=$REGION --region $REGION --capabilities "CAPABILITY_AUTO_EXPAND" "CAPABILITY_NAMED_IAM" "CAPABILITY_IAM"$PROFILE

echo "waiting for cloudformation stack update to complete"
aws cloudformation wait stack-update-complete --stack-name $CFN_STACK_NAME --region $REGION$PROFILE

if [[ $? -ne 0 ]]
then
echo "Cloudformation stack update failed"
break
fi
echo "cloudformation stack update completed"

0 comments on commit d2b3725

Please sign in to comment.