Skip to content

Commit

Permalink
[WIP] Working on monitoring total throughput
Browse files Browse the repository at this point in the history
  • Loading branch information
ccanel committed Feb 1, 2024
1 parent 6c55062 commit bdfdc52
Showing 1 changed file with 37 additions and 17 deletions.
54 changes: 37 additions & 17 deletions unfair/runtime/inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@
from unfair.runtime import flow_utils, reaction_strategy
from unfair.runtime.reaction_strategy import ReactionStrategy

TPUT_FET_RESOLVED = features.make_win_metric(
features.TPUT_FET, models.MathisFairness.win_size
)
LAST_TOTAL_TPUT_BPS = 0


def predict(net, in_fets, debug=False):
"""Run inference on a flow's packets.
Expand Down Expand Up @@ -337,9 +342,7 @@ def apply_decision(flowkey, new_decision, flow_to_decisions, flow_to_rwnd):
flow_to_decisions[flowkey] = new_decision


def make_decision(
args, flowkeys, min_rtt_us, fets, label, flow_to_decisions, flow_to_rwnd
):
def make_decision(args, flowkeys, min_rtt_us, fets, label, flow_to_decisions):
"""Make a flow unfairness mitigation decision.
Base the decision on the flow's label and existing decision. Use the flow's features
Expand All @@ -355,9 +358,7 @@ def make_decision(
args, flowkeys[0], min_rtt_us, fets, label, flow_to_decisions
)

if new_decision is not None:
for flowkey in flowkeys:
apply_decision(flowkey, new_decision, flow_to_decisions, flow_to_rwnd)
return new_decision


def packets_to_ndarray(pkts, dtype, packets_lost, win_to_loss_event_rate):
Expand Down Expand Up @@ -475,7 +476,7 @@ def configure_ebpf(args):
ifindex,
parent=handle,
prio=10,
protocol=protocols.ETH_P_ALL, # Every packet
protocol=protocols.ETH_P_IP, # IP packets
target=0x10020,
keys=["0x0/0x0+0"],
action=action,
Expand Down Expand Up @@ -1005,22 +1006,48 @@ def batch_inference(
# added as dependencies for the requested features).
labels = predict(net, batch_fets[list(net.in_spc)], args.debug)

# TODO: Check if total throughput has decreased since the last inference run.
# If so, consider reducing the policing.

# Update the max throughput seen from all flows.
current_total_tput_bps = sum(all_fets[TPUT_FET_RESOLVED] for _, _, _, all_fets, _ in batch)
global LAST_TOTAL_TPUT_BPS
LAST_TOTAL_TPUT_BPS = max(LAST_TOTAL_TPUT_BPS, current_total_tput_bps)

# Making assumptions about the max bandwidth implies that we are assuming that the flows share the same bottleneck link.
# Under what conditions is it safe to ramp back up when we notice the total throughput decrease?
# If we assume a shared bottleneck, then we can always make sure we get full utilization.
# But if each flow has a separate bottleneck, then what can we do?
# If each flow has a separate bottleneck, then we have no way of knowing of actual utilization or whether other flows have ramped up to take the available bandwidth.
# Do we need some way to determine whether a flow is application limited?
# Compare offered bandwidth to actual throughput?
# 1) Measure tput
# 2) Offer less tput
# 3) Wait
# 4) Offer original tput
# 5) Did flow ramp back up?


# For each flow, make and apply a policing decision.
for fourtuple, (_, flowkeys, min_rtt_us, all_fets, in_fets) in zip(
merged_fourtuples, batch
):
start, end = flow_to_range[fourtuple]
flw_labels = labels[start:end]
# Make a decision for this flow.
make_decision(
new_decision = make_decision(
args,
flowkeys,
min_rtt_us,
all_fets,
smooth(flw_labels),
flow_to_decisions,
flow_to_rwnd,
)

if new_decision is not None:
for flowkey in flowkeys:
apply_decision(flowkey, new_decision, flow_to_decisions, flow_to_rwnd)


def merge_sender_flows(net, sender_flows):
"""
Expand Down Expand Up @@ -1118,14 +1145,7 @@ def merge_sender_flows(net, sender_flows):
),
# Sum the throughput across flows.
np.sum(
[
in_fets[pkt_idx][
features.make_win_metric(
features.TPUT_FET, models.MathisFairness.win_size
)
]
for in_fets in sender_flows_interp
]
[in_fets[pkt_idx][TPUT_FET_RESOLVED] for in_fets in sender_flows_interp]
),
)

Expand Down

0 comments on commit bdfdc52

Please sign in to comment.