-
Notifications
You must be signed in to change notification settings - Fork 1
/
migrate.py
105 lines (89 loc) · 3.19 KB
/
migrate.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
import argparse
import boto3, botocore
import time
import sys
import json
from datetime import datetime
def to_epochms(dt):
epoch = datetime.utcfromtimestamp(0)
return int((dt - epoch).total_seconds() * 1000)
def get_log_events(client, log_group, log_stream, timestamp):
try:
response = client.get_log_events(
logGroupName=log_group,
logStreamName=log_stream,
startTime=timestamp,
startFromHead=True
)
except client.exceptions.ResourceNotFoundException:
return []
return response['events']
#
# Main
#
parser = argparse.ArgumentParser()
parser.add_argument('task_json', help='Task configuration json')
parser.add_argument('-i', '--interval', help='polling interval', type=int, default=20)
parser.add_argument('-t', '--timeout', help='end after TIMEOUT seconds', type=int, default=300)
args = parser.parse_args()
task_config = None
with open(args.task_json, 'r') as f:
task_config = json.loads(f.read())
ecs = boto3.client('ecs')
logs = boto3.client('logs')
start_time = datetime.utcnow()
# Get log info from task definition
response = ecs.describe_task_definition(
taskDefinition=task_config['taskDefinition']
)
container = response['taskDefinition']['containerDefinitions'][0]
container_name = container['name']
log_opts = container['logConfiguration']['options']
log_group = log_opts['awslogs-group']
log_region = log_opts['awslogs-region']
log_stream_prefix = log_opts['awslogs-stream-prefix']
print 'Running migration task.'
response = ecs.run_task(
cluster=task_config['cluster'],
taskDefinition=task_config['taskDefinition'],
count=task_config['count'],
launchType=task_config['launchType'],
networkConfiguration=task_config['networkConfiguration']
)
task_arn = response['tasks'][0]['taskArn']
task_id = task_arn.split('/')[-1]
log_stream = '/'.join([log_stream_prefix, container_name, task_id])
last_log_timestamp = to_epochms(start_time)
elapsed_time = None
task_data = None
while True:
print 'Sleeping {} seconds'.format(args.interval)
time.sleep(args.interval)
log_events = get_log_events(logs, log_group, log_stream, last_log_timestamp)
for e in log_events:
timestr = datetime.utcfromtimestamp(e['timestamp'] / 1000).isoformat()
print '[{}]: {}'.format(timestr, e['message'])
if len(log_events):
last_log_timestamp = log_events[-1]['timestamp'] + 1
# Check if task timed out
elapsed_time = datetime.utcnow() - start_time
if elapsed_time.seconds > args.timeout:
print 'Timed out after {} seconds'.format(elapsed_time.seconds)
sys.exit(1)
task_data = ecs.describe_tasks(
cluster=task_config['cluster'],
tasks=[ task_arn ]
)
container = task_data['tasks'][0]['containers'][0]
last_status = container['lastStatus']
exit_code = 0
if 'exitCode' in container:
exit_code = container['exitCode']
# Check if task is done.
if last_status == 'STOPPED':
if exit_code != 0:
print 'Migration failed.'
sys.exit(exit_code)
else:
break
print "Migration complete after {} seconds".format(elapsed_time.seconds)