From 4b045de4b4792d6f460ed02d197ea8ffd61827aa Mon Sep 17 00:00:00 2001 From: rjzamora Date: Thu, 29 Aug 2024 08:49:10 -0700 Subject: [PATCH] rename file and generalize --- dask_cuda/benchmarks/local_read_parquet.py | 272 +++++++++++++++++++++ dask_cuda/benchmarks/remote_parquet.py | 179 -------------- 2 files changed, 272 insertions(+), 179 deletions(-) create mode 100644 dask_cuda/benchmarks/local_read_parquet.py delete mode 100644 dask_cuda/benchmarks/remote_parquet.py diff --git a/dask_cuda/benchmarks/local_read_parquet.py b/dask_cuda/benchmarks/local_read_parquet.py new file mode 100644 index 00000000..7dcf0343 --- /dev/null +++ b/dask_cuda/benchmarks/local_read_parquet.py @@ -0,0 +1,272 @@ +import contextlib +from collections import ChainMap +from time import perf_counter as clock + +import fsspec +import pandas as pd + +import dask +import dask.dataframe as dd +from dask.base import tokenize +from dask.distributed import performance_report +from dask.utils import format_bytes, parse_bytes + +from dask_cuda.benchmarks.common import Config, execute_benchmark +from dask_cuda.benchmarks.utils import ( + parse_benchmark_args, + print_key_value, + print_separator, + print_throughput_bandwidth, +) + +DISK_SIZE_CACHE = {} +OPTIONS_CACHE = {} + + +def _noop(df): + return df + + +def read_data(paths, columns, backend, **kwargs): + with dask.config.set({"dataframe.backend": backend}): + return dd.read_parquet( + paths, + columns=columns, + **kwargs, + ) + + +def get_fs_paths_kwargs(args): + kwargs = {} + + storage_options = {} + if args.key: + storage_options["key"] = args.key + if args.secret: + storage_options["secret"] = args.secret + + if args.filesystem == "arrow": + import pyarrow.fs as pa_fs + from fsspec.implementations.arrow import ArrowFSWrapper + + _mapping = { + "key": "access_key", + "secret": "secret_key", + } # See: pyarrow.fs.S3FileSystem docs + s3_args = {} + for k, v in storage_options.items(): + s3_args[_mapping[k]] = v + + fs = pa_fs.FileSystem.from_uri(args.path)[0] + kwargs["filesystem"] = type(fs)(**s3_args) + fsspec_fs = ArrowFSWrapper(kwargs["filesystem"]) + paths = fsspec_fs.glob(f"{args.path}/*.parquet") + + if args.type == "gpu": + kwargs["blocksize"] = args.blocksize + + if args.aggregate_files: + raise NotImplementedError( + "aggregate-files is not supported for filesystem='arrow'" + ) + else: + fsspec_fs = fsspec.core.get_fs_token_paths( + args.path, mode="rb", storage_options=storage_options + )[0] + kwargs["filesystem"] = fsspec_fs + paths = fsspec_fs.glob(f"{args.path}/*.parquet") + + kwargs["blocksize"] = args.blocksize + kwargs["aggregate_files"] = args.aggregate_files + + if args.file_count: + paths = paths[: args.file_count] + + return fsspec_fs, paths, kwargs + + +def bench_once(client, args, write_profile=None): + global OPTIONS_CACHE + global DISK_SIZE_CACHE + + # Construct kwargs + token = tokenize(args) + try: + fsspec_fs, paths, kwargs = OPTIONS_CACHE[token] + except KeyError: + fsspec_fs, paths, kwargs = get_fs_paths_kwargs(args) + OPTIONS_CACHE[token] = (fsspec_fs, paths, kwargs) + + if write_profile is None: + ctx = contextlib.nullcontext() + else: + ctx = performance_report(filename=args.profile) + + with ctx: + t1 = clock() + df = read_data( + paths, + columns=args.columns, + backend="cudf" if args.type == "gpu" else "pandas", + **kwargs, + ) + num_rows = len( + # Use opaque `map_partitions` call to "block" + # dask-expr from using pq metadata to get length + df.map_partitions( + _noop, + meta=df._meta, + enforce_metadata=False, + ) + ) + t2 = clock() + + # Extract total size of files on disk + token = tokenize(paths) + try: + disk_size = DISK_SIZE_CACHE[token] + except KeyError: + disk_size = sum(fsspec_fs.sizes(paths)) + DISK_SIZE_CACHE[token] = disk_size + + return (disk_size, num_rows, t2 - t1) + + +def pretty_print_results(args, address_to_index, p2p_bw, results): + if args.markdown: + print("```") + print("Remote Parquet benchmark") + data_processed, row_count, durations = zip(*results) + print_separator(separator="-") + backend = "cudf" if args.type == "gpu" else "pandas" + print_key_value(key="Path", value=args.path) + print_key_value(key="Columns", value=f"{args.columns}") + print_key_value(key="Backend", value=f"{backend}") + print_key_value(key="Filesystem", value=f"{args.filesystem}") + print_key_value(key="Blocksize", value=f"{format_bytes(args.blocksize)}") + print_key_value(key="Aggregate files", value=f"{args.aggregate_files}") + print_key_value(key="Row count", value=f"{row_count[0]}") + print_key_value(key="Size on disk", value=f"{format_bytes(data_processed[0])}") + if args.markdown: + print("\n```") + print_throughput_bandwidth( + args, durations, data_processed, p2p_bw, address_to_index + ) + + +def create_tidy_results(args, p2p_bw, results): + configuration = { + "path": args.path, + "columns": args.columns, + "backend": "cudf" if args.type == "gpu" else "pandas", + "filesystem": args.filesystem, + "blocksize": args.blocksize, + "aggregate_files": args.aggregate_files, + } + timing_data = pd.DataFrame( + [ + pd.Series( + data=ChainMap( + configuration, + { + "wallclock": duration, + "data_processed": data_processed, + "num_rows": num_rows, + }, + ) + ) + for data_processed, num_rows, duration in results + ] + ) + return timing_data, p2p_bw + + +def parse_args(): + special_args = [ + { + "name": "--blocksize", + "default": "256MB", + "type": parse_bytes, + "help": "How to set the blocksize option", + }, + { + "name": "--aggregate-files", + "default": False, + "action": "store_true", + "help": "How to set the aggregate_files option", + }, + { + "name": "--path", + "default": "s3://dask-cudf-parquet-testing/dedup_parquet", + "type": str, + "help": "Parquet directory to read from (must be a flat directory).", + }, + { + "name": "--file-count", + "type": int, + "help": "Maximum number of files to read.", + }, + { + "name": "--columns", + "type": str, + "help": "Columns to read/select from data.", + }, + { + "name": "--key", + "type": str, + "help": "Public S3 key.", + }, + { + "name": "--secret", + "type": str, + "help": "Secret S3 key.", + }, + { + "name": [ + "-t", + "--type", + ], + "choices": ["cpu", "gpu"], + "default": "gpu", + "type": str, + "help": "Use GPU or CPU dataframes (default 'gpu')", + }, + { + "name": "--filesystem", + "choices": ["arrow", "fsspec"], + "default": "fsspec", + "type": str, + "help": "Filesystem backend", + }, + { + "name": "--runs", + "default": 3, + "type": int, + "help": "Number of runs", + }, + # NOTE: The following args are not relevant to this benchmark + { + "name": "--ignore-size", + "default": "1 MiB", + "metavar": "nbytes", + "type": parse_bytes, + "help": "Ignore messages smaller than this (default '1 MB')", + }, + ] + + return parse_benchmark_args( + description="Parquet read benchmark", + args_list=special_args, + check_explicit_comms=False, + ) + + +if __name__ == "__main__": + execute_benchmark( + Config( + args=parse_args(), + bench_once=bench_once, + create_tidy_results=create_tidy_results, + pretty_print_results=pretty_print_results, + ) + ) diff --git a/dask_cuda/benchmarks/remote_parquet.py b/dask_cuda/benchmarks/remote_parquet.py deleted file mode 100644 index 45550acc..00000000 --- a/dask_cuda/benchmarks/remote_parquet.py +++ /dev/null @@ -1,179 +0,0 @@ -import contextlib -from collections import ChainMap -from time import perf_counter as clock - -import pandas as pd - -import dask -import dask.dataframe as dd -from dask.distributed import performance_report -from dask.utils import format_bytes, parse_bytes - -from dask_cuda.benchmarks.common import Config, execute_benchmark -from dask_cuda.benchmarks.utils import ( - parse_benchmark_args, - print_key_value, - print_separator, - print_throughput_bandwidth, -) - -DEFAULT_DATASET_PATH = "s3://dask-cudf-parquet-testing/dedup_parquet" -DEFAULT_COLUMNS = ["text", "id"] -DEFAULT_STORAGE_SIZE = 2_843_373_145 # Compressed byte size - - -def read_data( - backend, - filesystem, - aggregate_files, - blocksize, -): - path = DEFAULT_DATASET_PATH - columns = DEFAULT_COLUMNS - with dask.config.set({"dataframe.backend": backend}): - if filesystem == "arrow": - # TODO: Warn user that blocksize and aggregate_files - # are ignored when `filesystem == "arrow"` - _blocksize = {} - _aggregate_files = {} - else: - _blocksize = {"blocksize": blocksize} - _aggregate_files = {"aggregate_files": aggregate_files} - - df = dd.read_parquet( - path, - columns=columns, - filesystem=filesystem, - **_blocksize, - **_aggregate_files, - ) - return df.memory_usage().compute().sum() - - -def bench_once(client, args, write_profile=None): - - if write_profile is None: - ctx = contextlib.nullcontext() - else: - ctx = performance_report(filename=args.profile) - - with ctx: - t1 = clock() - output_size = read_data( - backend="cudf" if args.type == "gpu" else "pandas", - filesystem=args.filesystem, - aggregate_files=args.aggregate_files, - blocksize=args.blocksize, - ) - t2 = clock() - - return (DEFAULT_STORAGE_SIZE, output_size, t2 - t1) - - -def pretty_print_results(args, address_to_index, p2p_bw, results): - if args.markdown: - print("```") - print("Remote Parquet benchmark") - print_separator(separator="-") - backend = "cudf" if args.type == "gpu" else "pandas" - print_key_value(key="Backend", value=f"{backend}") - print_key_value(key="Filesystem", value=f"{args.filesystem}") - print_key_value(key="Blocksize", value=f"{args.blocksize}") - print_key_value(key="Aggregate files", value=f"{args.aggregate_files}") - print_key_value(key="Output size", value=f"{format_bytes(results[0][1])}") - if args.markdown: - print("\n```") - data_processed, output_size, durations = zip(*results) - print_throughput_bandwidth( - args, durations, data_processed, p2p_bw, address_to_index - ) - - -def create_tidy_results(args, p2p_bw, results): - configuration = { - "backend": "cudf" if args.type == "gpu" else "pandas", - "filesystem": args.filesystem, - "blocksize": args.blocksize, - "aggregate_files": args.aggregate_files, - } - timing_data = pd.DataFrame( - [ - pd.Series( - data=ChainMap( - configuration, - { - "wallclock": duration, - "data_processed": data_processed, - "output_size": output_size, - }, - ) - ) - for data_processed, output_size, duration in results - ] - ) - return timing_data, p2p_bw - - -def parse_args(): - special_args = [ - { - "name": "--blocksize", - "default": "256MB", - "type": str, - "help": "How to set the blocksize option", - }, - { - "name": "--aggregate-files", - "default": False, - "action": "store_true", - "help": "How to set the aggregate_files option", - }, - { - "name": [ - "-t", - "--type", - ], - "choices": ["cpu", "gpu"], - "default": "gpu", - "type": str, - "help": "Use GPU or CPU dataframes (default 'gpu')", - }, - { - "name": "--filesystem", - "choices": ["arrow", "fsspec"], - "default": "fsspec", - "type": str, - "help": "Filesystem backend", - }, - { - "name": "--runs", - "default": 3, - "type": int, - "help": "Number of runs", - }, - # NOTE: The following args are not relevant to this benchmark - { - "name": "--ignore-size", - "default": "1 MiB", - "metavar": "nbytes", - "type": parse_bytes, - "help": "Ignore messages smaller than this (default '1 MB')", - }, - ] - - return parse_benchmark_args( - description="Remote Parquet (dask/cudf) benchmark", - args_list=special_args, - check_explicit_comms=False, - ) - - -if __name__ == "__main__": - execute_benchmark( - Config( - args=parse_args(), - bench_once=bench_once, - create_tidy_results=create_tidy_results, - pretty_print_results=pretty_print_results, - ) - )