-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathinit_sync.py
231 lines (203 loc) · 8.94 KB
/
init_sync.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
import pymongo
from pymongo.collection import Collection
import time
import os
import pandas as pd
import shutil
import logging
import glob
from bson import ObjectId
import pickle
from constants import (
TYPES_TO_CONVERT_TO_STR,
MONGODB_READING_BATCH_SIZE,
METADATA_FILE_NAME,
DATA_FILES_PATH,
# INIT_SYNC_CURRENT_SKIP_FILE_NAME,
# added the two new files to save the initial sync status and last parquet file number
INIT_SYNC_STATUS_FILE_NAME,
LAST_PARQUET_FILE_NUMBER,
INIT_SYNC_LAST_ID_FILE_NAME,
INIT_SYNC_MAX_ID_FILE_NAME,
)
import schema_utils
from utils import get_parquet_full_path_filename, to_string, get_table_dir
from push_file_to_lz import push_file_to_lz
# not required as now init_sync stat is stored in LZ
#from flags import set_init_flag, clear_init_flag
from file_utils import FileType, read_from_file, write_to_file, delete_file
def init_sync(collection_name: str):
logger = logging.getLogger(f"{__name__}[{collection_name}]")
# detect if there's a init_sync_stat file in LZ, and get its value
init_sync_stat_flag = read_from_file(
collection_name, INIT_SYNC_STATUS_FILE_NAME, FileType.PICKLE
)
if init_sync_stat_flag == "Y":
logger.info(
f"init sync for collection {collection_name} has already finished previously. Skipping init sync this time."
)
return
# detect if there's a last_id file, and restore last_id from it
last_id = read_from_file(
collection_name, INIT_SYNC_LAST_ID_FILE_NAME, FileType.PICKLE
)
if (init_sync_stat_flag == "N" and last_id):
logger.info(
f"interrupted init sync detected, continuing with previous _id={last_id}"
)
# skip old logic with LZ file for init_sync_stat
# skip init_sync if there's already parquet files and no current_skip/last_id file
#table_dir = get_table_dir(collection_name)
#current_skip_file_path = os.path.join(table_dir, INIT_SYNC_CURRENT_SKIP_FILE_NAME)
#last_id_file_path = os.path.join(table_dir, INIT_SYNC_LAST_ID_FILE_NAME)
# needs to exclude the situation of cache or temp parquet files exist but
# not normal numbered parquet files, in which case we shouldn't skip init sync
# if (
# not os.path.exists(last_id_file_path)
# and os.path.exists(table_dir)
# and any(
# file.endswith(".parquet") and os.path.splitext(file)[0].isnumeric()
# for file in os.listdir(table_dir)
# )
# ):
logger.info(f"begin init sync for {collection_name}")
# begin by writing init_sync_stat file with "N" as value
#set_init_flag(collection_name)
if not init_sync_stat_flag:
# writing init_sync_stat file with "N"
init_sync_stat_flag = "N"
logger.info(f"writing init sync stat file with as 'N' for {collection_name}")
write_to_file(
init_sync_stat_flag, collection_name, INIT_SYNC_STATUS_FILE_NAME, FileType.PICKLE
)
db_name = os.getenv("MONGO_DB_NAME")
logger.debug(f"db_name={db_name}")
logger.debug(f"collection={collection_name}")
enable_perf_timer = os.getenv("DEBUG__ENABLE_PERF_TIMER")
client = pymongo.MongoClient(os.getenv("MONGO_CONN_STR"))
db = client[os.getenv("MONGO_DB_NAME")]
collection = db[collection_name]
count = collection.estimated_document_count()
# use max_id as the mechanism to set the stopping point of init sync
max_id_from_file = read_from_file(
collection_name, INIT_SYNC_MAX_ID_FILE_NAME, FileType.PICKLE
)
if max_id_from_file:
max_id = max_id_from_file
logger.info(f"resumed max_id={max_id}")
else:
max_id = __get_max_id(collection, logger)
if max_id:
logger.info(f"writing max_id into file: {max_id}")
write_to_file(
max_id, collection_name, INIT_SYNC_MAX_ID_FILE_NAME, FileType.PICKLE
)
batch_size = int(os.getenv("INIT_LOAD_BATCH_SIZE"))
columns_to_convert_to_str = None
#moved to the begining to check if initial sync is completed
# detect if there's a last_id file, and restore last_id from it
# last_id = read_from_file(
# collection_name, INIT_SYNC_LAST_ID_FILE_NAME, FileType.PICKLE
# )
# if last_id:
# logger.info(
# f"interrupted init sync detected, continuing with previous _id={last_id}"
# )
while last_id is None or last_id < max_id:
# for debug only
debug_env_var_sleep_sec = os.getenv("DEBUG__INIT_SYNC_SLEEP_SEC")
if debug_env_var_sleep_sec and debug_env_var_sleep_sec.isnumeric():
logger.info(f"sleep({debug_env_var_sleep_sec}) begin")
time.sleep(int(debug_env_var_sleep_sec))
logger.info(f"sleep({debug_env_var_sleep_sec}) ends")
if not last_id:
batch_cursor = collection.find().sort({"_id": 1}).limit(batch_size)
else:
batch_cursor = (
collection.find({"_id": {"$gt": last_id, "$lte": max_id}})
.sort({"_id": 1})
.limit(batch_size)
)
read_start_time = time.time()
batch_df = pd.DataFrame(list(batch_cursor))
read_end_time = time.time()
if enable_perf_timer:
logger.info(f"TIME: read took {read_end_time-read_start_time:.2f} seconds")
# quit the loop if no more data
if batch_df.empty:
break
# get the last _id of its original data type ObjectId, before we convert it to string later
raw_last_id = batch_df["_id"].iloc[-1]
first_id = batch_df["_id"][0]
logger.info("starting a new batch.")
logger.info(f"first _id of this batch: {first_id}")
logger.info(f"last _id of this batch: {raw_last_id}")
# process df according to internal schema
schema_utils.process_dataframe(collection_name, batch_df)
trans_end_time = time.time()
if enable_perf_timer:
logger.info(f"TIME: trans took {trans_end_time-read_end_time:.2f} seconds")
logger.debug("creating parquet file...")
# changed to get last parquet file number from LZ for resilience
#parquet_full_path_filename = get_parquet_full_path_filename(collection_name)
last_parquet_file_num = read_from_file(
collection_name, LAST_PARQUET_FILE_NUMBER, FileType.PICKLE
)
if not last_parquet_file_num:
last_parquet_file_num = 0
parquet_full_path_filename = get_parquet_full_path_filename(collection_name, last_parquet_file_num)
logger.info(f"writing parquet file: {parquet_full_path_filename}")
batch_df.to_parquet(parquet_full_path_filename, index=False)
write_end_time = time.time()
if enable_perf_timer:
logger.info(f"TIME: write took {write_end_time-trans_end_time:.2f} seconds")
if not last_id:
# do not copy, but send the template file directly
metadata_json_path = os.path.join(
os.path.dirname(os.path.abspath(__file__)), METADATA_FILE_NAME
)
logger.info("writing metadata file to LZ")
push_file_to_lz(metadata_json_path, collection_name)
# write the current batch to LZ
push_start_time = time.time()
logger.info("writing parquet file to LZ")
push_file_to_lz(parquet_full_path_filename, collection_name)
push_end_time = time.time()
if enable_perf_timer:
logger.info(f"TIME: push took {push_end_time-push_start_time:.2f} seconds")
last_id = raw_last_id
logger.debug(f"DATA TYPE OF last_id IS: {type(last_id)}")
# write current last_id to file
logger.info(f"writing last_id into file: {last_id}")
write_to_file(
last_id, collection_name, INIT_SYNC_LAST_ID_FILE_NAME, FileType.PICKLE
)
# write last parquet file number to file
last_parquet_file_num += 1
logger.info(f"writing last parquet number into file: {last_parquet_file_num}")
write_to_file(
last_parquet_file_num,
collection_name,
LAST_PARQUET_FILE_NUMBER,
FileType.PICKLE,
)
# delete last_id file, as init sync is complete
logger.info("removing the last_id file")
delete_file(collection_name, INIT_SYNC_LAST_ID_FILE_NAME)
#set_init_flag_stat as complete = Y
logger.info("Setting init_sync_stat flag as Y")
init_sync_stat_flag = "Y"
write_to_file(
init_sync_stat_flag, collection_name, INIT_SYNC_STATUS_FILE_NAME, FileType.PICKLE
)
logger.info(f"init sync completed for collection {collection_name}")
def __get_max_id(collection: Collection, logger: logging.Logger):
pipeline = [{"$sort": {"_id": -1}}, {"$limit": 1}, {"$project": {"_id": 1}}]
result = collection.aggregate(pipeline)
try:
doc = result.next()
max_id = doc["_id"]
return max_id
except StopIteration:
logger.warning(f"Can't get max _id, empty or non-exists collection.")
return None