Skip to content

Commit

Permalink
Add io.text APIs to pylibcudf (#17232)
Browse files Browse the repository at this point in the history
Contributes to #15162

Authors:
  - Matthew Roeschke (https://github.com/mroeschke)
  - Vyas Ramasubramani (https://github.com/vyasr)

Approvers:
  - Lawrence Mitchell (https://github.com/wence-)

URL: #17232
  • Loading branch information
mroeschke authored Nov 7, 2024
1 parent 08e4853 commit c209dae
Show file tree
Hide file tree
Showing 9 changed files with 285 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,5 @@ I/O Functions
csv
json
parquet
text
timezone
6 changes: 6 additions & 0 deletions docs/cudf/source/user_guide/api_docs/pylibcudf/io/text.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
====
text
====

.. automodule:: pylibcudf.io.text
:members:
82 changes: 23 additions & 59 deletions python/cudf/cudf/_lib/text.pyx
Original file line number Diff line number Diff line change
@@ -1,89 +1,53 @@
# Copyright (c) 2020-2024, NVIDIA CORPORATION.

from io import TextIOBase
from libcpp cimport bool

from cython.operator cimport dereference
from libc.stdint cimport uint64_t
from libcpp.memory cimport unique_ptr
from libcpp.string cimport string
from libcpp.utility cimport move
from io import TextIOBase

from pylibcudf.libcudf.column.column cimport column
from pylibcudf.libcudf.io.text cimport (
byte_range_info,
data_chunk_source,
make_source,
make_source_from_bgzip_file,
make_source_from_file,
multibyte_split,
parse_options,
)
import pylibcudf as plc

from cudf._lib.column cimport Column


def read_text(object filepaths_or_buffers,
object delimiter=None,
object byte_range=None,
object strip_delimiters=False,
object compression=None,
object compression_offsets=None):
str delimiter,
object byte_range,
bool strip_delimiters,
object compression,
object compression_offsets):
"""
Cython function to call into libcudf API, see `multibyte_split`.
See Also
--------
cudf.io.text.read_text
"""
cdef string delim = delimiter.encode()

cdef unique_ptr[data_chunk_source] datasource
cdef unique_ptr[column] c_col

cdef size_t c_byte_range_offset
cdef size_t c_byte_range_size
cdef uint64_t c_compression_begin_offset
cdef uint64_t c_compression_end_offset
cdef parse_options c_options

if compression is None:
if isinstance(filepaths_or_buffers, TextIOBase):
datasource = move(make_source(
filepaths_or_buffers.read().encode()))
datasource = plc.io.text.make_source(filepaths_or_buffers.read())
else:
datasource = move(make_source_from_file(
filepaths_or_buffers.encode()))
datasource = plc.io.text.make_source_from_file(filepaths_or_buffers)
elif compression == "bgzip":
if isinstance(filepaths_or_buffers, TextIOBase):
raise ValueError("bgzip compression requires a file path")
if compression_offsets is not None:
if len(compression_offsets) != 2:
raise ValueError(
"compression offsets need to consist of two elements")
c_compression_begin_offset = compression_offsets[0]
c_compression_end_offset = compression_offsets[1]
datasource = move(make_source_from_bgzip_file(
filepaths_or_buffers.encode(),
c_compression_begin_offset,
c_compression_end_offset))
datasource = plc.io.text.make_source_from_bgzip_file(
filepaths_or_buffers,
compression_offsets[0],
compression_offsets[1]
)
else:
datasource = move(make_source_from_bgzip_file(
filepaths_or_buffers.encode()))
datasource = plc.io.text.make_source_from_bgzip_file(
filepaths_or_buffers,
)
else:
raise ValueError("Only bgzip compression is supported at the moment")

c_options = parse_options()
if byte_range is not None:
c_byte_range_offset = byte_range[0]
c_byte_range_size = byte_range[1]
c_options.byte_range = byte_range_info(
c_byte_range_offset,
c_byte_range_size)
c_options.strip_delimiters = strip_delimiters
with nogil:
c_col = move(multibyte_split(
dereference(datasource),
delim,
c_options))

return Column.from_unique_ptr(move(c_col))
options = plc.io.text.ParseOptions(
byte_range=byte_range, strip_delimiters=strip_delimiters
)
plc_column = plc.io.text.multibyte_split(datasource, delimiter, options)
return Column.from_pylibcudf(plc_column)
2 changes: 1 addition & 1 deletion python/pylibcudf/pylibcudf/io/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# =============================================================================

set(cython_sources avro.pyx csv.pyx datasource.pyx json.pyx orc.pyx parquet.pyx timezone.pyx
types.pyx
text.pyx types.pyx
)

set(linked_libraries cudf::cudf)
Expand Down
2 changes: 1 addition & 1 deletion python/pylibcudf/pylibcudf/io/__init__.pxd
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Copyright (c) 2024, NVIDIA CORPORATION.

# CSV is removed since it is def not cpdef (to force kw-only arguments)
from . cimport avro, datasource, json, orc, parquet, timezone, types
from . cimport avro, datasource, json, orc, parquet, timezone, text, types
from .types cimport SourceInfo, TableWithMetadata
2 changes: 1 addition & 1 deletion python/pylibcudf/pylibcudf/io/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2024, NVIDIA CORPORATION.

from . import avro, csv, datasource, json, orc, parquet, timezone, types
from . import avro, csv, datasource, json, orc, parquet, timezone, text, types
from .types import SinkInfo, SourceInfo, TableWithMetadata
30 changes: 30 additions & 0 deletions python/pylibcudf/pylibcudf/io/text.pxd
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Copyright (c) 2024, NVIDIA CORPORATION.

from libcpp.memory cimport unique_ptr
from libcpp.string cimport string
from pylibcudf.column cimport Column
from pylibcudf.libcudf.io.text cimport parse_options, data_chunk_source

cdef class ParseOptions:
cdef parse_options c_options

cdef class DataChunkSource:
cdef unique_ptr[data_chunk_source] c_source
cdef string data_ref


cpdef Column multibyte_split(
DataChunkSource source,
str delimiter,
ParseOptions options=*
)

cpdef DataChunkSource make_source(str data)

cpdef DataChunkSource make_source_from_file(str filename)

cpdef DataChunkSource make_source_from_bgzip_file(
str filename,
int virtual_begin=*,
int virtual_end=*,
)
193 changes: 193 additions & 0 deletions python/pylibcudf/pylibcudf/io/text.pyx
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
# Copyright (c) 2024, NVIDIA CORPORATION.

from cython.operator cimport dereference
from libc.stdint cimport uint64_t
from libcpp.memory cimport unique_ptr
from libcpp.string cimport string
from libcpp.utility cimport move

from pylibcudf.column cimport Column
from pylibcudf.libcudf.column.column cimport column
from pylibcudf.libcudf.io cimport text as cpp_text

cdef class ParseOptions:
"""
Parsing options for `multibyte_split`
Parameters
----------
byte_range : list | tuple, default None
Only rows starting inside this byte range will be
part of the output column.
strip_delimiters : bool, default True
Whether delimiters at the end of rows should
be stripped from the output column.
"""
def __init__(
self,
*,
byte_range=None,
strip_delimiters=False,
):
self.c_options = cpp_text.parse_options()
if byte_range is not None:
c_byte_range_offset = byte_range[0]
c_byte_range_size = byte_range[1]
self.c_options.byte_range = cpp_text.byte_range_info(
c_byte_range_offset,
c_byte_range_size
)
self.c_options.strip_delimiters = strip_delimiters


cdef class DataChunkSource:
"""
Data source for `multibyte_split`
Parameters
----------
data : str
Filename or data itself.
"""

def __cinit__(self, str data):
# Need to keep a reference alive for make_source
self.data_ref = data.encode()


cpdef DataChunkSource make_source(str data):
"""
Creates a data source capable of producing device-buffered views
of the given string.
Parameters
----------
data : str
The host data to be exposed as a data chunk source.
Returns
-------
DataChunkSource
The data chunk source for the provided host data.
"""
cdef DataChunkSource dcs = DataChunkSource(data)
with nogil:
dcs.c_source = move(cpp_text.make_source(dcs.data_ref))
return dcs


cpdef DataChunkSource make_source_from_file(str filename):
"""
Creates a data source capable of producing device-buffered views of the file.
Parameters
----------
filename : str
The filename of the file to be exposed as a data chunk source.
Returns
-------
DataChunkSource
The data chunk source for the provided filename.
"""
cdef DataChunkSource dcs = DataChunkSource(filename)
with nogil:
dcs.c_source = move(cpp_text.make_source_from_file(dcs.data_ref))
return dcs

cpdef DataChunkSource make_source_from_bgzip_file(
str filename,
int virtual_begin=-1,
int virtual_end=-1,
):
"""
Creates a data source capable of producing device-buffered views of
a BGZIP compressed file with virtual record offsets.
Parameters
----------
filename : str
The filename of the BGZIP-compressed file to be exposed as a data chunk source.
virtual_begin : int
The virtual (Tabix) offset of the first byte to be read. Its upper 48 bits
describe the offset into the compressed file, its lower 16 bits describe the
block-local offset.
virtual_end : int, default None
The virtual (Tabix) offset one past the last byte to be read
Returns
-------
DataChunkSource
The data chunk source for the provided filename.
"""
cdef uint64_t c_virtual_begin
cdef uint64_t c_virtual_end
cdef DataChunkSource dcs = DataChunkSource(filename)

if virtual_begin == -1 and virtual_end == -1:
with nogil:
dcs.c_source = move(cpp_text.make_source_from_bgzip_file(dcs.data_ref))
elif virtual_begin != -1 and virtual_end != -1:
c_virtual_begin = virtual_begin
c_virtual_end = virtual_end
with nogil:
dcs.c_source = move(
cpp_text.make_source_from_bgzip_file(
dcs.data_ref,
c_virtual_begin,
c_virtual_end,
)
)
else:
raise ValueError(
"virtual_begin and virtual_end must both be None or both be int"
)
return dcs

cpdef Column multibyte_split(
DataChunkSource source,
str delimiter,
ParseOptions options=None
):
"""
Splits the source text into a strings column using a multiple byte delimiter.
For details, see :cpp:func:`cudf::io::text::multibyte_split`
Parameters
----------
source :
The source string.
delimiter : str
UTF-8 encoded string for which to find offsets in the source.
options : ParseOptions
The parsing options to use (including byte range).
Returns
-------
Column
The strings found by splitting the source by the delimiter
within the relevant byte range.
"""
cdef unique_ptr[column] c_result
cdef unique_ptr[data_chunk_source] c_source = move(source.c_source)
cdef string c_delimiter = delimiter.encode()

if options is None:
options = ParseOptions()

cdef cpp_text.parse_options c_options = options.c_options

with nogil:
c_result = cpp_text.multibyte_split(
dereference(c_source),
c_delimiter,
c_options
)

return Column.from_libcudf(move(c_result))
Loading

0 comments on commit c209dae

Please sign in to comment.