-
Notifications
You must be signed in to change notification settings - Fork 4
/
transfer.py
121 lines (104 loc) · 4 KB
/
transfer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
import re
import boto3
from airflow.exceptions import AirflowException
def assume_role(role_arn, session_name="veda-data-airflow_s3-discovery"):
sts = boto3.client("sts")
print(f"Assuming role: {role_arn}")
credentials = sts.assume_role(
RoleArn=role_arn,
RoleSessionName=session_name,
)
creds = credentials["Credentials"]
return {
"aws_access_key_id": creds["AccessKeyId"],
"aws_secret_access_key": creds.get("SecretAccessKey"),
"aws_session_token": creds.get("SessionToken"),
}
def get_matching_files(s3_client, bucket, prefix, regex_pattern):
matching_files = []
response = s3_client.list_objects_v2(Bucket=bucket, Prefix=prefix)
while True:
for obj in response["Contents"]:
file_key = obj["Key"]
if re.match(regex_pattern, file_key):
matching_files.append(file_key)
if "NextContinuationToken" in response:
response = s3_client.list_objects_v2(
Bucket=bucket,
Prefix=prefix,
ContinuationToken=response["NextContinuationToken"],
)
else:
break
return matching_files
def transfer_files_within_s3(
s3_client, origin_bucket, matching_files, destination_bucket, collection
):
transfer_exceptions = False
for file_key in matching_files:
filename = file_key.split("/")[-1]
# print(f"Transferring file: {filename}")
target_key = f"{collection}/{filename}"
copy_source = {"Bucket": origin_bucket, "Key": file_key}
# We can use the etag to check if the file has already been copied and avoid duplication of effort
# by using the CopySourceIfNoneMatch parameter below.
try:
target_metadata = s3_client.head_object(
Bucket=destination_bucket, Key=target_key
)
target_etag = target_metadata["ETag"]
# print(f"File already exists, checking Etag: {filename}")
s3_client.copy_object(
CopySource=copy_source,
Bucket=destination_bucket,
Key=target_key,
CopySourceIfNoneMatch=target_etag,
)
except s3_client.exceptions.ClientError as err:
if err.response["Error"]["Code"] == "404":
# File not found OK to copy
s3_client.copy_object(
CopySource=copy_source,
Bucket=destination_bucket,
Key=target_key
)
else:
msg = f"ClientError copying {filename=} {err=}"
print(msg)
transfer_exceptions = True
except Exception as e:
msg = f"Exception copying {filename=} {e=}"
print(msg)
transfer_exceptions = True
if transfer_exceptions:
raise Exception(f"{transfer_exceptions=}")
def data_transfer_handler(event, role_arn=None):
origin_bucket = event.get("origin_bucket")
origin_prefix = event.get("origin_prefix")
filename_regex = event.get("filename_regex")
target_bucket = event.get("target_bucket")
collection = event.get("collection")
kwargs = assume_role(role_arn=role_arn) if role_arn else {}
s3client = boto3.client("s3", **kwargs)
matching_files = get_matching_files(
s3_client=s3client,
bucket=origin_bucket,
prefix=origin_prefix,
regex_pattern=filename_regex,
)
if len(matching_files) == 0:
raise AirflowException("No matching files found")
if not event.get("dry_run"):
transfer_files_within_s3(
s3_client=s3client,
origin_bucket=origin_bucket,
matching_files=matching_files,
destination_bucket=target_bucket,
collection=collection,
)
else:
print(
f"Would have copied {len(matching_files)} files from {origin_bucket} to {target_bucket}"
)
print(f"Files matched: {matching_files}")
return {**event}