Skip to content

Commit

Permalink
update batching logic
Browse files Browse the repository at this point in the history
Signed-off-by: Praneeth Bedapudi <[email protected]>
  • Loading branch information
bedapudi6788 committed Nov 7, 2024
1 parent f3be219 commit 992958f
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 19 deletions.
56 changes: 39 additions & 17 deletions fastdeploy/_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,44 +75,65 @@ def process_batch(predictor, input_batch, optimal_batch_size):
return results, last_predictor_success, received_at, predicted_at


to_process = {}
current_sum_of_to_process = 0


def fetch_batch(
main_index,
predictor_sequence,
optimal_batch_size,
max_wait_time_for_batch_collection,
):
global to_process
global current_sum_of_to_process

unique_id_wise_input_count = {}
input_batch = []
current_batch_length = 0
batch_collection_started_at = time.time()
last_input_received_at = time.time()

while current_batch_length < optimal_batch_size:
to_process = main_index.search(
query={
"-1.predicted_at": 0, # prediction not yet done
"last_predictor_success": True, # last predictor success
"last_predictor_sequence": predictor_sequence
- 1, # last predictor sequence
"timedout_in_queue": {"$ne": True}, # not timedout in queue
},
n=optimal_batch_size,
select_keys=[f"{predictor_sequence - 1}.outputs"],
update={
"last_predictor_sequence": predictor_sequence, # set last predictor sequence to current predictor sequence
"last_predictor_success": None, # reset last predictor success
f"{predictor_sequence}.received_at": time.time(), # set received at to current time
},
)
if current_sum_of_to_process < optimal_batch_size:
to_process.update(
main_index.search(
query={
"-1.predicted_at": 0, # prediction not yet done
"last_predictor_success": True, # last predictor success
"last_predictor_sequence": predictor_sequence - 1, # last predictor sequence
"timedout_in_queue": {"$ne": True}, # not timedout in queue
},
n=optimal_batch_size,
select_keys=[f"{predictor_sequence - 1}.outputs"],
update={
"last_predictor_sequence": predictor_sequence, # set last predictor sequence to current predictor sequence
"last_predictor_success": None, # reset last predictor success
f"{predictor_sequence}.received_at": time.time(), # set received at to current time
},
)
)

for unique_id, data in to_process.items():
if current_batch_length > optimal_batch_size * 0.8:
break
outputs = data[f"{predictor_sequence - 1}.outputs"]
input_count = len(outputs)
unique_id_wise_input_count[unique_id] = input_count
input_batch.extend(outputs)
current_batch_length += input_count
last_input_received_at = time.time()

for unique_id in unique_id_wise_input_count.keys():
try:
del to_process[unique_id]
except:
pass

current_sum_of_to_process = sum(
len(v[f"{predictor_sequence - 1}.outputs"]) for v in to_process.values()
)

if current_batch_length == 0:
if time.time() - last_input_received_at > 5:
time.sleep(0.05)
Expand All @@ -133,8 +154,9 @@ def fetch_batch(
break

_utils.logger.info(
f"Fetched batch {[v for v in unique_id_wise_input_count.values()]}"
f"Fetched batch {unique_id_wise_input_count} with {current_sum_of_to_process} remaining in memory, to_process: {len(to_process)}"
)

return unique_id_wise_input_count, input_batch


Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
EMAIL = "[email protected]"
AUTHOR = "BEDAPUDI PRANEETH"
REQUIRES_PYTHON = ">=3.6.0"
VERSION = "3.0.30"
VERSION = "3.0.31"

# What packages are required for this module to be executed?
REQUIRED = ["falcon", "liteindex==0.0.3.2.dev6", "zstandard", "gunicorn[gevent]", "msgpack"]
Expand Down
2 changes: 1 addition & 1 deletion testing/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def generate_payload(self):
else: # function
if self._loaded_function is None:
self._load_function()
return self._loaded_function()
return self._loaded_function()[:self.request_batch_size]

def run(self):
# Handle Ctrl+C gracefully
Expand Down

0 comments on commit 992958f

Please sign in to comment.