Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use non-ns units on timestamps declared the old way #874

Merged
merged 5 commits into from
Aug 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/main.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ jobs:
run: |
pip install Cython
pip install hypothesis
pip install pytest-httpserver pytest-localserver pytest-xdist pytest-asyncio
pip install pytest-localserver pytest-xdist pytest-asyncio
pip install -e . --no-deps # Install fastparquet
git clone https://github.com/pandas-dev/pandas
cd pandas
Expand Down
6 changes: 6 additions & 0 deletions docs/source/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,9 @@ API
.. autofunction:: write

.. autofunction:: update_file_custom_metadata


.. raw:: html

<script data-goatcounter="https://distdatacats.goatcounter.com/count"
async src="//gc.zgo.at/count.js"></script>
6 changes: 6 additions & 0 deletions docs/source/details.rst
Original file line number Diff line number Diff line change
Expand Up @@ -256,3 +256,9 @@ Dask and Pandas fully support calling ``fastparquet`` directly, with the functio
Please see their relevant docstrings. Remote filesystems are supported by using
a URL with a "protocol://" specifier and any ``storage_options`` to be passed to
the file system implementation.


.. raw:: html

<script data-goatcounter="https://distdatacats.goatcounter.com/count"
async src="//gc.zgo.at/count.js"></script>
6 changes: 6 additions & 0 deletions docs/source/filesystems.rst
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,9 @@ Similarly, providing an open function and another to make any necessary director
row_group_offsets=[0, 500], open_with=myopen, mkdirs=noop)

(In the case of s3, no intermediate directories need to be created)


.. raw:: html

<script data-goatcounter="https://distdatacats.goatcounter.com/count"
async src="//gc.zgo.at/count.js"></script>
6 changes: 6 additions & 0 deletions docs/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -97,3 +97,9 @@ Index
1. :ref:`genindex`
1. :ref:`modindex`
1. :ref:`search`


.. raw:: html

<script data-goatcounter="https://distdatacats.goatcounter.com/count"
async src="//gc.zgo.at/count.js"></script>
6 changes: 6 additions & 0 deletions docs/source/install.rst
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,9 @@ This will produce a ``build/html/`` subdirectory, where the entry point is
``index.html``.




.. raw:: html

<script data-goatcounter="https://distdatacats.goatcounter.com/count"
async src="//gc.zgo.at/count.js"></script>
6 changes: 6 additions & 0 deletions docs/source/quickstart.rst
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,9 @@ Further options that may be of interest are:
write('outdir.parq', df, row_group_offsets=[0, 10000, 20000],
compression='GZIP', file_scheme='hive')



.. raw:: html

<script data-goatcounter="https://distdatacats.goatcounter.com/count"
async src="//gc.zgo.at/count.js"></script>
6 changes: 6 additions & 0 deletions docs/source/releasenotes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -188,3 +188,9 @@ NB: minor versions up to 0.6.3 fix build issues
consolidate of many data files.

.. _cramjam: https://github.com/milesgranger/pyrus-cramjam


.. raw:: html

<script data-goatcounter="https://distdatacats.goatcounter.com/count"
async src="//gc.zgo.at/count.js"></script>
33 changes: 16 additions & 17 deletions fastparquet/converted_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,11 @@ def tobson(x):
parquet_thrift.ConvertedType.INT_16: np.dtype("int16"),
parquet_thrift.ConvertedType.INT_32: np.dtype('int32'),
parquet_thrift.ConvertedType.INT_64: np.dtype('int64'),
parquet_thrift.ConvertedType.TIME_MILLIS: np.dtype('<m8[ns]'),
parquet_thrift.ConvertedType.TIME_MILLIS: np.dtype('<m8[ms]'),
parquet_thrift.ConvertedType.DATE: np.dtype('<M8[ns]'),
parquet_thrift.ConvertedType.TIMESTAMP_MILLIS: np.dtype('<M8[ns]'),
parquet_thrift.ConvertedType.TIME_MICROS: np.dtype('<m8[ns]'),
parquet_thrift.ConvertedType.TIMESTAMP_MICROS: np.dtype('<M8[ns]')
parquet_thrift.ConvertedType.TIMESTAMP_MILLIS: np.dtype('<M8[ms]'),
parquet_thrift.ConvertedType.TIME_MICROS: np.dtype('<m8[us]'),
parquet_thrift.ConvertedType.TIMESTAMP_MICROS: np.dtype('<M8[us]')
}
nullable = {
np.dtype('int8'): pd.Int8Dtype(),
Expand Down Expand Up @@ -115,6 +115,8 @@ def typemap(se, md=None):
return simple[se.type]
else:
return np.dtype("S%i" % se.type_length)
if md and "time" in md.get("numpy_type", ""):
return np.dtype(md["numpy_type"])
if se.converted_type in complex:
return complex[se.converted_type]
return np.dtype("O")
Expand Down Expand Up @@ -145,7 +147,7 @@ def converts_inplace(se):
return False


def convert(data, se, timestamp96=True):
def convert(data, se, timestamp96=True, dtype=None):
"""Convert known types from primitive to rich.

Parameters
Expand All @@ -157,6 +159,7 @@ def convert(data, se, timestamp96=True):
ctype = se.converted_type
if se.type == parquet_thrift.Type.INT96 and timestamp96:
data2 = data.view([('ns', 'i8'), ('day', 'i4')])
# TODO: this should be ms unit, now that we can?
return ((data2['day'] - 2440588) * 86400000000000 +
data2['ns']).view('M8[ns]')
if se.logicalType is not None and se.logicalType.TIMESTAMP is not None:
Expand All @@ -179,6 +182,7 @@ def convert(data, se, timestamp96=True):
# NB: general but slow method
# could optimize when data.dtype.itemsize <= 8
# TODO: easy cythonize (but rare)
# TODO: extension point for pandas-decimal (no conversion needed)
return np.array([
int.from_bytes(
data.data[i:i + 1], byteorder='big', signed=True
Expand All @@ -189,21 +193,16 @@ def convert(data, se, timestamp96=True):
data = data * DAYS_TO_MILLIS
return data.view('datetime64[ns]')
elif ctype == parquet_thrift.ConvertedType.TIME_MILLIS:
out = data.astype('int64', copy=False)
time_shift(out.view("int64"), 1000000)
return out.view('timedelta64[ns]')
# this was not covered by new pandas time units
data = data.astype('int64', copy=False)
time_shift(data, 1000000)
return data.view('timedelta64[ns]')
elif ctype == parquet_thrift.ConvertedType.TIMESTAMP_MILLIS:
out = data
time_shift(data.view("int64"), 1000000)
return out.view('datetime64[ns]')
return data.view('datetime64[ms]')
elif ctype == parquet_thrift.ConvertedType.TIME_MICROS:
out = data
time_shift(data.view("int64"))
return out.view('timedelta64[ns]')
return data.view('timedelta64[us]')
elif ctype == parquet_thrift.ConvertedType.TIMESTAMP_MICROS:
out = data
time_shift(data.view("int64"))
return out.view('datetime64[ns]')
return data.view('datetime64[us]')
elif ctype == parquet_thrift.ConvertedType.UINT_8:
# TODO: return strided views?
# data.view('uint8')[::data.itemsize].view(out_dtype)
Expand Down
9 changes: 5 additions & 4 deletions fastparquet/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,8 @@ def read_data_page_v2(infile, schema_helper, se, data_header2, cmd,
# can read-into
into0 = ((use_cat or converts_inplace(se) and see)
and data_header2.num_nulls == 0
and max_rep == 0 and assign.dtype.kind != "O" and row_filter is None)
and max_rep == 0 and assign.dtype.kind != "O" and row_filter is None
and assign.dtype.kind not in "Mm") # TODO: this can be done in place but is complex
if row_filter is None:
row_filter = Ellipsis
# can decompress-into
Expand Down Expand Up @@ -548,7 +549,7 @@ def read_col(column, schema_helper, infile, use_cat=False,
if d and not use_cat:
part[defi == max_defi] = dic[val]
elif not use_cat:
part[defi == max_defi] = convert(val, se)
part[defi == max_defi] = convert(val, se, dtype=assign.dtype)
else:
part[defi == max_defi] = val
else:
Expand All @@ -557,7 +558,7 @@ def read_col(column, schema_helper, infile, use_cat=False,
piece = piece._data
if use_cat and not d:
# only possible for multi-index
val = convert(val, se)
val = convert(val, se, dtype=assign.dtype)
try:
i = pd.Categorical(val)
except:
Expand All @@ -567,7 +568,7 @@ def read_col(column, schema_helper, infile, use_cat=False,
elif d and not use_cat:
piece[:] = dic[val]
elif not use_cat:
piece[:] = convert(val, se)
piece[:] = convert(val, se, dtype=assign.dtype)
else:
piece[:] = val

Expand Down
3 changes: 2 additions & 1 deletion fastparquet/test/test_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,7 @@ def test_auto_null_object(tempdir):
df['bb'] = df['b'].astype('object')
df['aaa'] = df['a'].astype('object')
object_cols = ['d', 'ff', 'bb', 'aaa', 'aa']
test_cols = list(set(df) - set(object_cols)) + ['d']
test_cols = list(set(df) - set(object_cols) - {"c"}) + ['d']
fn = os.path.join(tmp, "test.parq")

with pytest.raises(ValueError):
Expand All @@ -573,6 +573,7 @@ def test_auto_null_object(tempdir):
assert col.repetition_type == parquet_thrift.FieldRepetitionType.OPTIONAL
df2 = pf.to_pandas(categories=['e'])

assert df2.c.equals(df.c)
tm.assert_frame_equal(df[test_cols], df2[test_cols], check_categorical=False,
check_dtype=False)
tm.assert_frame_equal(df[['bb']].astype('float64'), df2[['bb']])
Expand Down
8 changes: 6 additions & 2 deletions fastparquet/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,8 +303,12 @@ def convert(data, se):

elif converted_type == parquet_thrift.ConvertedType.TIME_MICROS:
# TODO: shift inplace
out = np.empty(len(data), 'int64')
time_shift(data.values.view('int64'), out)
if data.dtype == "m8[ns]":
out = np.empty(len(data), 'int64')
time_shift(data.values.view('int64'), out)
else:
# assuming ms or us
out = data.values
elif type == parquet_thrift.Type.INT96 and dtype.kind == 'M':
ns_per_day = (24 * 3600 * 1000000000)
day = data.values.view('int64') // ns_per_day + 2440588
Expand Down
10 changes: 0 additions & 10 deletions readthedocs.yml

This file was deleted.

Loading