Skip to content

Commit

Permalink
Allow use of benchmark scheduler within benchmarking pipeline
Browse files Browse the repository at this point in the history
This patch adds support for using the newly added benchmark scheduler
classes within the benchmarking pipeline to assign a core to be used for
benchmarking.

Pull Request: google#273
  • Loading branch information
boomanaiden154 committed Dec 30, 2024
1 parent 015b6e5 commit cec81d2
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 8 deletions.
3 changes: 3 additions & 0 deletions gematria/datasets/pipelines/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ gematria_py_binary(
name = "benchmark_bbs_lib",
srcs = ["benchmark_bbs_lib.py"],
deps = [
":benchmark_cpu_scheduler",
"//gematria/datasets/python:exegesis_benchmark",
"//gematria/proto:execution_annotation_py_pb2",
],
Expand All @@ -63,6 +64,7 @@ gematria_py_binary(
srcs = ["benchmark_bbs.py"],
deps = [
":benchmark_bbs_lib",
":benchmark_cpu_scheduler",
],
)

Expand All @@ -77,6 +79,7 @@ gematria_py_test(
],
deps = [
":benchmark_bbs_lib",
":benchmark_cpu_scheduler",
"//gematria/io/python:tfrecord",
"//gematria/proto:execution_annotation_py_pb2",
],
Expand Down
16 changes: 15 additions & 1 deletion gematria/datasets/pipelines/benchmark_bbs.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from apache_beam.options import pipeline_options

from gematria.datasets.pipelines import benchmark_bbs_lib
from gematria.datasets.pipelines import benchmark_cpu_scheduler

_INPUT_FILE_PATTERN = flags.DEFINE_string(
'input_file_pattern',
Expand All @@ -30,6 +31,15 @@
_OUTPUT_FILE_PATTERN = flags.DEFINE_string(
'output_file_pattern', None, 'The output file path/pattern.', required=True
)
_BENCHMARK_SCHEDULER = flags.DEFINE_enum(
'benchmark_scheduler',
'NoScheduling',
[
scheduler_type.name
for scheduler_type in benchmark_cpu_scheduler.BenchmarkSchedulerImplementations
],
'The scheduler to use for choosing a core for running benchmarks.',
)


def main(argv) -> None:
Expand All @@ -39,7 +49,11 @@ def main(argv) -> None:
beam_options = pipeline_options.PipelineOptions()

pipeline_constructor = benchmark_bbs_lib.benchmark_bbs(
_INPUT_FILE_PATTERN.value, _OUTPUT_FILE_PATTERN.value
_INPUT_FILE_PATTERN.value,
_OUTPUT_FILE_PATTERN.value,
benchmark_cpu_scheduler.BenchmarkSchedulerImplementations[
_BENCHMARK_SCHEDULER.value
],
)

with beam.Pipeline(options=beam_options) as pipeline:
Expand Down
29 changes: 24 additions & 5 deletions gematria/datasets/pipelines/benchmark_bbs_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,37 @@

from gematria.proto import execution_annotation_pb2
from gematria.datasets.python import exegesis_benchmark
from gematria.datasets.pipelines import benchmark_cpu_scheduler

_BEAM_METRIC_NAMESPACE_NAME = 'benchmark_bbs'


class BenchmarkBasicBlock(beam.DoFn):
"""A Beam function that benchmarks basic blocks."""

def setup(self):
self._exegesis_benchmark = exegesis_benchmark.ExegesisBenchmark.create()
def __init__(
self,
benchmark_scheduler_type: benchmark_cpu_scheduler.BenchmarkSchedulerImplementations,
):
self._benchmark_scheduler_type = benchmark_scheduler_type
self._benchmark_success_blocks = metrics.Metrics.counter(
_BEAM_METRIC_NAMESPACE_NAME, 'benchmark_bbs_success'
)
self._benchmark_failed_blocks = metrics.Metrics.counter(
_BEAM_METRIC_NAMESPACE_NAME, 'benchmark_blocks_failed'
)

def setup(self):
self._exegesis_benchmark = exegesis_benchmark.ExegesisBenchmark.create()
self._benchmark_scheduler = (
benchmark_cpu_scheduler.construct_benchmark_scheduler(
self._benchmark_scheduler_type
)
)
self._benchmarking_core = (
self._benchmark_scheduler.setup_and_get_benchmark_core()
)

def process(
self,
block_with_annotations: execution_annotation_pb2.BlockWithExecutionAnnotations,
Expand All @@ -44,8 +59,10 @@ def process(
benchmark_code = self._exegesis_benchmark.process_annotated_block(
block_with_annotations
)

self._benchmark_scheduler.verify()
benchmark_value = self._exegesis_benchmark.benchmark_basic_block(
benchmark_code
benchmark_code, self._benchmarking_core
)
self._benchmark_success_blocks.inc()
yield (block_with_annotations.block_hex, benchmark_value)
Expand All @@ -65,7 +82,9 @@ def process(


def benchmark_bbs(
input_file_pattern: str, output_file_pattern: str
input_file_pattern: str,
output_file_pattern: str,
benchmark_scheduler_type: benchmark_cpu_scheduler.BenchmarkSchedulerImplementations,
) -> Callable[[beam.Pipeline], None]:
"""Creates a pipeline to benchmark BBs."""

Expand All @@ -78,7 +97,7 @@ def pipeline(root: beam.Pipeline) -> None:
)
annotated_bbs_shuffled = annotated_bbs | 'Shuffle' >> beam.Reshuffle()
benchmarked_blocks = annotated_bbs_shuffled | 'Benchmarking' >> beam.ParDo(
BenchmarkBasicBlock()
BenchmarkBasicBlock(benchmark_scheduler_type)
)
formatted_output = benchmarked_blocks | 'Formatting' >> beam.ParDo(
FormatBBsForOutput()
Expand Down
9 changes: 7 additions & 2 deletions gematria/datasets/pipelines/benchmark_bbs_lib_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from gematria.datasets.pipelines import benchmark_bbs_lib
from gematria.proto import execution_annotation_pb2
from gematria.io.python import tfrecord
from gematria.datasets.pipelines import benchmark_cpu_scheduler

BLOCK_FOR_TESTING = execution_annotation_pb2.BlockWithExecutionAnnotations(
execution_annotations=execution_annotation_pb2.ExecutionAnnotations(
Expand All @@ -45,7 +46,9 @@
class BenchmarkBBsTests(absltest.TestCase):

def test_benchmark_basic_block(self):
benchmark_transform = benchmark_bbs_lib.BenchmarkBasicBlock()
benchmark_transform = benchmark_bbs_lib.BenchmarkBasicBlock(
benchmark_cpu_scheduler.BenchmarkSchedulerImplementations.NoScheduling
)
benchmark_transform.setup()

block_outputs = list(benchmark_transform.process(BLOCK_FOR_TESTING))
Expand Down Expand Up @@ -74,7 +77,9 @@ def test_benchmark_bbs(self):
output_file_pattern = os.path.join(output_folder, 'bhive-output')

pipeline_constructor = benchmark_bbs_lib.benchmark_bbs(
test_tfrecord.full_path, output_file_pattern
test_tfrecord.full_path,
output_file_pattern,
benchmark_cpu_scheduler.BenchmarkSchedulerImplementations.NoScheduling,
)

with test_pipeline.TestPipeline() as pipeline_under_test:
Expand Down
17 changes: 17 additions & 0 deletions gematria/datasets/pipelines/benchmark_cpu_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from collections.abc import Iterable
import os
import re
from enum import Enum


class BenchmarkScheduler(metaclass=abc.ABCMeta):
Expand Down Expand Up @@ -128,3 +129,19 @@ def verify(self):
cpu_mask = list(os.sched_getaffinity(0))
if self._cpu_mask != cpu_mask:
raise ValueError('Expected the CPU mask to not change.')


class BenchmarkSchedulerImplementations(Enum):
NoScheduling = 1
Default = 2


def construct_benchmark_scheduler(
scheduler_type: BenchmarkSchedulerImplementations,
) -> BenchmarkScheduler:
if scheduler_type == BenchmarkSchedulerImplementations.NoScheduling:
return NoSchedulingBenchmarkScheduler()
elif scheduler_type == BenchmarkSchedulerImplementations.Default:
return DefaultBenchmarkScheduler()
else:
raise ValueError('Unexpected Benchmark Scheduler Type.')

0 comments on commit cec81d2

Please sign in to comment.