Skip to content

Commit

Permalink
Update FIFO scheduler to new API.
Browse files Browse the repository at this point in the history
  • Loading branch information
sukritkalra committed Apr 10, 2024
1 parent 5596f9b commit a146e5a
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 27 deletions.
6 changes: 1 addition & 5 deletions schedulers/edf_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,11 +169,7 @@ def schedule(
scheduler_runtime = EventTime(
int((end_time - start_time) * 1e6), EventTime.Unit.US
)
runtime = (
scheduler_runtime
if self.runtime == EventTime(-1, EventTime.Unit.US)
else self.runtime
)
runtime = scheduler_runtime if self.runtime.is_invalid() else self.runtime
return Placements(
runtime=runtime, true_runtime=scheduler_runtime, placements=placements
)
106 changes: 87 additions & 19 deletions schedulers/fifo_scheduler.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
import time
from copy import copy
from operator import attrgetter
Expand All @@ -23,56 +24,123 @@ def __init__(
self,
preemptive: bool = False,
runtime: EventTime = EventTime(-1, EventTime.Unit.US),
enforce_deadlines: bool = False,
_flags: Optional["absl.flags"] = None,
):
assert not preemptive, "FIFO scheduler is not preemptive"
super(FIFOScheduler, self).__init__(
preemptive=preemptive, runtime=runtime, _flags=_flags
preemptive=preemptive,
runtime=runtime,
enforce_deadlines=enforce_deadlines,
_flags=_flags,
)
if _flags is not None:
if _flags.release_taskgraphs:
raise ValueError("FIFOScheduler does not support taskgraphs.")

def schedule(
self, sim_time: EventTime, workload: Workload, worker_pools: WorkerPools
) -> Placements:
tasks = workload.get_schedulable_tasks(
time=sim_time, preemption=self.preemptive, worker_pools=worker_pools
)

# Create a virtual WorkerPool set to try scheduling decisions on.
schedulable_worker_pools = copy(worker_pools)
for worker_pool in schedulable_worker_pools.worker_pools:
self._logger.debug(
f"[{sim_time.time}] The state of {worker_pool} is: "
f"{os.linesep} {os.linesep.join(worker_pool.get_utilization())}"
)

start_time = time.time()

# Sort the tasks according to their release times, and place them on
# the worker pools.
ordered_tasks = list(sorted(tasks, key=attrgetter("release_time")))
task_descriptions = [
f"{task.unique_name} ({task.release_time})" for task in ordered_tasks
]
self._logger.debug(
f"[{sim_time.time}] The scheduler received {len(ordered_tasks)} tasks to "
f"be scheduled. The order of the tasks is {task_descriptions}."
)

# Run the scheduling loop.
placements = []
for task in ordered_tasks:
task_placed = False
for worker_pool in schedulable_worker_pools.worker_pools:
if worker_pool.can_accomodate_task(task):
worker_pool.place_task(task)
task_placed = True
placements.append(
Placement.create_task_placement(
task=task,
worker_pool_id=worker_pool.id,
placement_time=sim_time,
self._logger.debug(
f"[{sim_time.time}] Trying to schedule task {task.unique_name} with "
f"release time {task.release_time} and available execution strategies: "
f"{task.available_execution_strategies}."
)

# If we are enforcing deadlines, and the Task is past its deadline,
# then we should create a cancellation for it. This is only applicable if
# the user wants the tasks that cannot meet their deadline to be dropped.
if (
self.enforce_deadlines
and task.deadline
< sim_time
+ task.available_execution_strategies.get_fastest_strategy().runtime
):
placements.append(Placement.create_task_cancellation(task=task))
self._logger.debug(
"[%s] Task %s has a deadline of %s, which has been missed. "
"Cancelling the task.",
sim_time.time,
task,
task.deadline,
)
continue

# Try to place the task on the worker pools.
is_task_placed = False
for execution_strategy in task.available_execution_strategies:
for worker_pool in schedulable_worker_pools.worker_pools:
if worker_pool.can_accomodate_strategy(execution_strategy):
worker_pool.place_task(
task, execution_strategy=execution_strategy
)
)
is_task_placed = True
placements.append(
Placement.create_task_placement(
task=task,
placement_time=sim_time,
worker_pool_id=worker_pool.id,
execution_strategy=execution_strategy,
)
)
self._logger.debug(
f"[{sim_time.time}] Placed {task} on Worker Pool "
f"({worker_pool.id}) to be started at {sim_time} with the "
f"execution strategy: {execution_strategy}."
)
break
if is_task_placed:
break
if not task_placed:
placements.append(Placement.create_task_placement(task=task))

if is_task_placed:
for worker_pool in schedulable_worker_pools.worker_pools:
self._logger.debug(
f"[{sim_time.time}] The state of {worker_pool} is:{os.linesep}"
f"{os.linesep.join(worker_pool.get_utilization())}"
)
else:
self._logger.debug(
"[%s] Failed to place %s because no worker pool "
"could accomodate the resource requirements.",
sim_time.time,
task,
)
placements.append(Placement.create_task_placement(task=task))
end_time = time.time()

# Compute and return the Placements object.
scheduler_runtime = EventTime(
int((end_time - start_time) * 1e6), EventTime.Unit.US
)
runtime = (
scheduler_runtime
if self.runtime == EventTime(-1, EventTime.Unit.US)
else self.runtime
)
runtime = scheduler_runtime if self.runtime.is_invalid() else self.runtime
return Placements(
runtime=runtime, true_runtime=scheduler_runtime, placements=placements
)
14 changes: 11 additions & 3 deletions scripts/run_alibaba_final_experiments.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,16 @@ fi
# being chosen from the trace, along with different arrival patterns.
#RANDOM_SEEDS=(420665456 6785649879 1232434 243243453453 3785432875 8984928429 4295429857 99854278957 32542345235 67676 1979879073895 1)
#RANDOM_SEEDS=(1232434 42066545 6785649879 32434234353 106432512)
RANDOM_SEEDS=(1232434 42066545 106432512)
#RANDOM_SEEDS=(1232434 42066545 106432512)
RANDOM_SEEDS=(1232434)

# Schedulers
# We use the following baseline schedulers to compare the performance of DAGSched with.
#SCHEDULERS=(EDF DAGSched_Dyn)
#SCHEDULERS=(EDF DAGSched_Dyn)
#SCHEDULERS=(DAGSched_Dyn TetriSched_1 TetriSched_5)
#SCHEDULERS=(EDF TetriSched_0 DAGSched_Dyn Graphene)
SCHEDULERS=(Graphene)
SCHEDULERS=(FIFO)

# Poisson arrival rates.
# We use the following arrival rates for the Poisson arrival process.
Expand Down Expand Up @@ -57,6 +58,8 @@ SCHEDULERS=(Graphene)

#MEDIUM_ARRIVAL_RATES=( 0.01 0.01 0.008 0.01 0.025 0.02 0.012 0.014 0.015 0.016 0.01 0.01 0.006 0.008 0.01 0.01)
#HARD_ARRIVAL_RATES=( 0.018 0.015 0.02 0.02 0.035 0.03 0.021 0.022 0.024 0.025 0.05 0.024 0.023 0.0225 0.0235 0.021)
#MEDIUM_ARRIVAL_RATES=( 0.01 0.01 0.01 0.01 0.01 0.015 0.012 0.0135 0.0135 0.013 0.014 0.0145 0.016 0.01 0.01 0.01)
#HARD_ARRIVAL_RATES=( 0.012 0.014 0.015 0.0165 0.0175 0.014 0.02 0.02 0.022 0.025 0.025 0.026 0.026 0.033 0.0345 0.0385)
MEDIUM_ARRIVAL_RATES=( 0.01)
HARD_ARRIVAL_RATES=( 0.012)
#MEDIUM_ARRIVAL_RATES=( 0.01 0.02 0.03)
Expand Down Expand Up @@ -93,7 +96,7 @@ execute_experiment () {

EXPERIMENT_CONF+="
# Worker configuration.
--worker_profile_path=profiles/workers/alibaba_cluster_15k.yaml
--worker_profile_path=profiles/workers/alibaba_cluster_25k.yaml
"

EXPERIMENT_CONF+="
Expand Down Expand Up @@ -126,6 +129,11 @@ execute_experiment () {
--scheduler=EDF
--enforce_deadlines
"
elif [[ ${SCHEDULER} == "FIFO" ]]; then
EXPERIMENT_CONF+="
# Scheduler configuration.
--scheduler=FIFO
"
elif [[ ${SCHEDULER} == "TetriSched" ]]; then
EXPERIMENT_CONF+="
# Scheduler configuration.
Expand Down

0 comments on commit a146e5a

Please sign in to comment.