Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make _metadata optional on writing #809

Open
martindurant opened this issue Sep 28, 2022 · 29 comments
Open

Make _metadata optional on writing #809

martindurant opened this issue Sep 28, 2022 · 29 comments

Comments

@martindurant
Copy link
Member

Following discussion in #807

I would do as follows:

  • add a new option to write() on whether we create _metadata or not; we always create _common_metadata. We already implicitly don't make a _metadata for file_scheme="simple".
  • for append:
    • if _metadata exists, we update it as now
    • if _metadata does not exist, but _common_metadata does, we append by writing a new file only, but check that the schema is consistent with _common_metadata. We will still need to find out what the new part's sequential number should be.
    • if neither exist, we append by writing a new file, check against the schema of one of the other data files, and create _common_metadata

For a user that has built a dataset using append without a _metadata, they can create one as a separate step with the existing merge() function, if they want.

@yohplala
Copy link

Hi @martindurant

1/

we always create _common_metadata.

I am sorry, I don't have a clear picture of the content of _metadata vs _common_metadata.
Please, what do we gain to keep _common_metadata but not _metadata?
If we start removing _metadata, for what reason not removing as well _common_metadata?
To have ParquetFile.statistics recorded in a separate file?
Has dask the same approach?

2/

add a new option to write() on whether we create _metadata or not

How would you call this new parameter?
It seems it "only" needs to be forwarded then to writer.write_multi() to this part of code (starting line 985):

    if write_fmd:
        # here, if-condition to be added?
        write_common_metadata(join_path(dn, '_metadata'), fmd, open_with,
                              no_row_groups=False)
        write_common_metadata(join_path(dn, '_common_metadata'), fmd,
                              open_with)

3/

for append: if _metadata exists, we update it as now

We could check this thanks to ParquetFile.fn attribute, but I can see it is set, even in case when _metadata does not exist, in line 125. We would need to correct that I think.

            self.fn = join_path(basepath, '_metadata') if basepath else '_metadata'

4/

for append

  • if _metadata does not exist, but _common_metadata does, we append by writing a new file only, but check that the schema is consistent with _common_metadata. We will still need to find out what the new part's sequential number should be.
  • if neither exist, we append by writing a new file, check against the schema of one of the other data files, and create _common_metadata

Ok. Do you identify a specific attribute in ParquetFile object to help us identify if a _common_metadata does exist?

5/
Is a "separate issue / feature proposal", but when calling ParquetFile.__init__() with a directory path containing parquet files, but without _metadata file, we ought to check if filenames are fastparquet-like (part.X.parquet) and if yes, order the list of files in ascending order (with respect to X) before calling metadata_from_many(), line 162.

@yohplala
Copy link

yohplala commented Sep 29, 2022

Code to reproduce result mentioned in 3/

import os
import pandas as pd
import fastparquet as fp

path = os.path.expanduser('~/Documents/code/data/fastparquet/')
df1 = pd.DataFrame({'val': [0,1]})
path1 = f"{path}file.1.parquet"
df2 = pd.DataFrame({'val': [-1,0]})
path2 = f"{path}file.2.parquet"

fp.write(path1, df1, file_scheme="simple")
fp.write(path2, df2, file_scheme="simple")

pf = fp.ParquetFile(path)

pf.fn
Out[28]: '/home/yoh/Documents/code/data/fastparquet/_metadata'

But there is no _metadata file in directory /home/yoh/Documents/code/data/fastparquet/

@martindurant
Copy link
Member Author

I am sorry, I don't have a clear picture of the content of _metadata vs _common_metadata.
Please, what do we gain to keep _common_metadata but not _metadata

_common_metadata is the same as _metadata, but with an empty list for the row_groups. This means it contains the schema and key-value metadata, but no chunk-specific information. It is much smaller, easier to make, and doesn't require updating when appending to the dataset.

@martindurant
Copy link
Member Author

But there is no _metadata file in directory /home/yoh/Documents/code/data/fastparquet/

We made an effective in-memory one by grabbing the row-group definitions in the two files. That's the location the file would be in, if it existed.

@yohplala
Copy link

Hi,
To illustrate 5/,
here is a code snippet to illustrate trouble with lexicographic file ordering when _metadata is removed.

import os
import pandas as pd
import fastparquet as fp

path = os.path.expanduser('~/Documents/code/data/fastparquet/')
append = False
for i in range(11):
    # trouble start at file 11.
    df = pd.DataFrame({'val': [i]})
    fp.write(path, df, file_scheme="hive", append=append)
    append=True

# if there is no '_metadata' to keep track of file order,
os.remove(f"{path}_metadata")

# then a file named 11.parquet is read before a file named 2.parquet.
pf = fp.ParquetFile(path)
pf.to_pandas()
Out[13]: 
    val
0     0
1     1
2    10
3     2
4     3
5     4
6     5
7     6
8     7
9     8
10    9

@yohplala
Copy link

Expected is:

Out[11]: 
    val
0     0
1     1
2     2
3     3
4     4
5     5
6     6
7     7
8     8
9     9
10   10

@yohplala
Copy link

yohplala commented Sep 30, 2022

I am sorry, I don't have a clear picture of the content of _metadata vs _common_metadata.
Please, what do we gain to keep _common_metadata but not _metadata

_common_metadata is the same as _metadata, but with an empty list for the row_groups. This means it contains the schema and key-value metadata, but no chunk-specific information. It is much smaller, easier to make, and doesn't require updating when appending to the dataset.

In key-value-metadata,

  • one can update them now, but let say this is a specific use case
  • but if I update with a different pandas version, then, pandas version that is kept is 'false'
pf.key_value_metadata
Out[14]:
 {'pandas': '{"column_indexes": [{"field_name": null,
                                  "metadata": null,
                                  "name": null,
                                  "numpy_type": "object",
                                  "pandas_type": "mixed-integer"}],
              "columns": [{"field_name": "val",
                           "metadata": null,
                           "name": "val",
                           "numpy_type": "int64",
                           "pandas_type": "int64"}],
              "creator": {"library": "fastparquet",
                          "version": "0.8.3"},
              "index_columns": [{"kind": "range",
                                 "name": null,
                                 "start": 0,
                                 "step": 1,
                                 "stop": 1}],
              "pandas_version": "1.4.2",
              "partition_columns": []}'}

Should we add a mechanism to check this file on append and update it if different than original and if row group is successfully written?
Rational would be that this file is supposed to reflect the state at the last writing operation?

@martindurant
Copy link
Member Author

Correct, without a _metadata, there is no absolute way to guarantee that appended data will appear at the end of the dataset. We must attempt to generate filenames that come after previous ones in both numerical and lexical sorting.

@martindurant
Copy link
Member Author

Should we add a mechanism to check this file on append and update it if different than original and if row group is successfully written?

Rational would be that this file is supposed to reflect the state at the last writing operation?

There is no such convention. The parquet schemas must match, that is all. So we are free to decide whether to rewrite or not. I would vote for no, unless specifically updating the key-value metadata.

@yohplala
Copy link

in both numerical and lexical sorting.

My proposal would be:

Fastparquet already has a "naming convention" that I find ok. I would to check if filename is according this naming convention, then extract the int, and assume that row groups are ordered as per the int.

For other naming shemes, then we take lexicographic ordering.

But I have little knowledge in possible naming conventions. If one exists that satisfies both, why not. I have been liking the "short" file names produced by fastparquet as opposed to that of pyarrow.

@yohplala
Copy link

Should we add a mechanism to check this file on append and update it if different than original and if row group is successfully written?

Rational would be that this file is supposed to reflect the state at the last writing operation?

There is no such convention. The parquet schemas must match, that is all. So we are free to decide whether to rewrite or not. I would vote for no, unless specifically updating the key-value metadata.

Ok for me as well (I am aware I am being "over meticulous" here)

@yohplala
Copy link

Do you think we could define a convention here about its value?

Currently,

  • if reading a single parquet file, pf.fn is the parquet file name test.parquet
  • if reading from a folder, pf.fn is _metadata.
  • in api.py, it can also be set to None as well if the ParquetFile object is created from a 'file-like'?
        elif hasattr(fn, 'read'):
            # file-like
            self.fn = None

Could we set it the following way:

  • if _metadata exists, it is set to _metadata
  • if _metadata does not exist, but common_metadata does, it is set to _common_metadata
  • if none exist, it is set to the filename of the last row group (depending how file list is sorted). This way (the case when there is a single file, conducting to file_scheme='simple' becomes a subcase of this case)
  • and if 'file-like', then keeping current behavior, set to None.

This way, when appending, a ParquetFile gets created, and we can know the state if _metadata and/or _common_metadata files exist or not.

@martindurant
Copy link
Member Author

Yes, I'd be happy with that convention. I think it's more typical to use the first found parquet file rather than the last.

Note that some file-like objects expose a .path attribute which we could use, but probably not worth doing this. In practice, I don't think file objects are passed often.

@yohplala
Copy link

yohplala commented Oct 3, 2022

Yes, I'd be happy with that convention. I think it's more typical to use the first found parquet file rather than the last.

So far, I was thinking that key value metadata + statistics were being recorded in every .parquet file.
As latest files are (usually) the most recent ones, then, with this logic (that I understand now being wrong), then the latest / most up-to-date key value metadata would have been available in the latest .parquet file.
Hence the specific role / value of the last .parquet file.

I understand now that I have been wrong, at least for key value metadata, which are in _common_metadata as you mentioned.

Hence this logic I proposed to track filename of last .parquet file.
And if this file does not have this copy of the latest key value metadata, there is indeed no reason to keep track of it with fn attribute.
So I do agree with your proposal to track with fn attribute the filename of the 1st .parquet file (1st row group).
The pros of your proposal, is that whatever we append or not, the 1st file ought to remain the same.

Side question:
are statistics rebuilt every time Parquetfile is instanciated, from individual row group stat in each .parquet file?

@yohplala
Copy link

yohplala commented Oct 3, 2022

in both numerical and lexical sorting.

My proposal would be:

Fastparquet already has a "naming convention" that I find ok. I would to check if filename is according this naming convention, then extract the int, and assume that row groups are ordered as per the int.

For other naming shemes, then we take lexicographic ordering.

I see a pros in keeping current fastparquet naming scheme:
We already have some utils using it / "based" on it. If we could avoid reworking them, this would be a plus.
Well, they are mostly based on this root function:
api.PART_ID : get ID (int) of a fastparquet' part file.

From this root function then stem other 'utils':

  • api.part_ids(row_groups) : return IDs of fastparquet's part files
  • api.ParquetFile._sort_part_names() : rename fastparquet's part files so that their ID match that of embedded row groups. (used most notably when using overwrite() function)
  • writer.find_max_part() : get largest fastparquet's part file ID.

It would save us time I think to avoid reworking them.

@martindurant
Copy link
Member Author

are statistics rebuilt every time Parquetfile is instanciated, from individual row group stat in each .parquet file?

Yes, fastparquet builds a virtual ParquetFile from the row groups found in all of the data files, if there is no _metadata. This could be reasonably skipped in some instances, and dask does indeed not use this mode by default any more. Fastparquet's use case is a little different, though, since the main operation is to load the whole dataset into memory, so will will need that information anyway (and in the main thread) to be able to build the empty dataframe that is to be filled in.

@martindurant
Copy link
Member Author

I'm not sure if you asked this question: each parquet data file contains statistics relevant to its own row-groups. All the files contain key-value matadata, and this is supposed to be the same across all files (and the same as _common_metadata). If it has changed as a consequence of append operations, then you are right that the last file should be the most up to date.

@yohplala
Copy link

yohplala commented Oct 3, 2022

If it has changed as a consequence of append operations, then you are right that the last file should be the most up to date.

Ok
this is the rationale for me why I think keeping track of the last file could be of interest.
But indeed, actually, having fn set to this filename is not what matters.

What matters is that if there is no _common_metadata file, then key value metadata should be read from the last part file, to ensure, in case of an append, that we likely have the most up to date key value metadata.

@martindurant
Copy link
Member Author

What matters is that if there is no _common_metadata file, then key value metadata should be read from the last part file, to ensure, in case of an append, that we likely have the most up to date key value metadata.

Yes, you have persuaded me

@yohplala
Copy link

yohplala commented Oct 3, 2022

@martindurant , I think here is a sum up of all our discussions so far:

  • adjust fastparquet behavior when instantiating a ParquetFile:

    • fix '5/' : when instantiating from a folder path, if no _metadata, recognize fastparquet's naming scheme, and if used, sort files as per part file IDs.

    • if no _common_metadata, read key value metadata from last part file

    • set fn attribute depending available parquet file:

      • if _metadata exists, it is set to _metadata
      • if _metadata does not exist, but _common_metadata does, it is set to _common_metadata
      • if none exist, it is set to the filename of the first row group
      • and if 'file-like', then keeping current behavior, set to None.
  • allow for writing without _metadata file:

    • new parameter in write_multi() to write or not _metadata

    • in write (copy of your initial message) this parameter is

      • either set by user when creating

      • or in case of appending, it is set depending if _metadata and/or _common_metadata already exist:

        • if _metadata already exists, it is updated as now (rewritten)

        • if _metadata does not exist, but _common_metadata does,

          • we append by writing new part files only,
          • and check that the schema is consistent with _common_metadata.
          • @martindurant : We will still need to find out what the new part's sequential number should be
            @yohplala : Why not incrementing it as now?
        • if neither exist,

          • we append by writing a new file,
          • check against the schema of one of the other data files,
          • and create _common_metadata
          • here, key value metadata are read from last existing part file

@martindurant
Copy link
Member Author

A couple of small notes follow.

The only thing we don't consider in all this is a ParquetFile made from a list of paths. We could simply refuse to append in that case.

if no _metadata, recognize fastparquet's naming scheme, and if used,

Attempt a numerical sort that would match our scheme and maybe others, fallback to lexical. "Others" here might be arrow and spark.

read key value metadata from last part file

and schema? Now if seems odd to read key-values from one file and schema from another. The value of .fn doesn't really matter that much...

We will still need to find out what the new part's sequential number should be

@yohplala : Why not incrementing it as now?

Yes. That means we need to analyse the existing file names and potentially cope with file lists that don't follow a good numbering scheme. I'm not sure if we already do that.

@yohplala
Copy link

yohplala commented Oct 4, 2022

The only thing we don't consider in all this is a ParquetFile made from a list of paths. We could simply refuse to append in that case.

I am ok with that.

read key value metadata from last part file

and schema?

yes, I am ok with that as well, we are in the case there is no _metadata, nor _common_metadata

Yes. That means we need to analyse the existing file names and potentially cope with file lists that don't follow a good numbering scheme. I'm not sure if we already do that.

I don't think we do. And at the moment, I have no clear idea what would be the appropriate way of naming the new files.

@yohplala
Copy link

yohplala commented Oct 4, 2022

About parameter name to be set to write or not _metadata file:
currently, in write_multi, and in all "new" ParquetFile methods I created to make it "more mutable" (create row group files, remove row group files, rename part files...) there is a boolean write_fmd parameter.

It could be this parameter?

This parameter could have a str value in addition of being True or False.
We could then just check if it is set to a specific value in write_multi to decide NOT to write _metadata.
for instance, this value could be 'common' and we would have the following cases (for write_multi):

  • write_fmd = True : write both _metadata and _common_metadata (already ok)
  • write_fmd = 'common' : write only _common_metadata (new case)
  • write_fmd = False do not write _metadata nor _common_metadata (already ok)

Or it could be
None
True
False
?

@yohplala
Copy link

yohplala commented Oct 4, 2022

Note: I modified previous comment / I removed what I think in my previous message was not adequate.

The way of setting write_fmd from Parquetfile.fn attribute could be in a separate function, like:

def write_fmd_auto_set(pf: ParquetFile):
    if pf.fn == '_metadata':
        return ....
    elif ...

This function would then be called from write(), to set write_fmd parameter when calling pf.write_row_groups(write_fmd=...).

In overwrite(), this would be when calling pf.remove_row_groups(write_fmd=...).

@martindurant
Copy link
Member Author

Rather than None, True, False, "common"...., you could have an enum

class MetadataWriteMode(enum.Enum):
    WRITE_META = 1
    WRITE_COMMON_META = 2
    NO_META = 3

which is more verbose, but at least explicit. It would require function signatures

write(..., meta_mode=MetadataWriteMode.WRITE_META)

I can understand if maybe you think that's too many characters.

@yohplala
Copy link

yohplala commented Oct 5, 2022

Hi, yes we can have an enum, I don't see a trouble with that. I would maybe rename/reword it this way?

class MDWriteMode(enum.Enum):
    ALL_META = 1
    ONLY_COMMON = 2
    NO_META = 3

@yohplala
Copy link

yohplala commented Oct 5, 2022

The "only" point I see that needs some discussion is what to do when appending a hive dataset when naming does not follow fastparquet's convention (as you also identified).

I think that what could be appropriate is:

  • when reading and when it is not fastparquet's naming convention, we retain a lexical ordering
  • to name new parquet files to be appended, we take name of last already existing file, we use a routine to "increment it by one". I have not spotted a library doing this.

If we consider last character of filename, we can check if it is out of the ranges [49, 57] or [97, 122]. These ranges come from the int representation of alphanumeric characters.

ord('1')
Out[11]: 49
ord('9')
Out[12]: 57
ord('a')
Out[13]: 97
ord('z')
Out[14]: 122

Then, we increment this last character, or if needed, the last-but-one character as well to name the new file. Something like: (but management of out-of-bound range is to be implemented in addition)

the_string = the_string[:-1] + chr(ord(the_string[-1]) + 1)

(comes from this SO answer)

This way, we don't break lexical ordering with new appended files.

@martindurant
Copy link
Member Author

das.utils has

def natural_sort_key(s: str) -> list[str | int]:
    """
    Sorting `key` function for performing a natural sort on a collection of
    strings

    See https://en.wikipedia.org/wiki/Natural_sort_order

    Parameters
    ----------
    s : str
        A string that is an element of the collection being sorted

    Returns
    -------
    tuple[str or int]
        Tuple of the parts of the input string where each part is either a
        string or an integer

    Examples
    --------
    >>> a = ['f0', 'f1', 'f2', 'f8', 'f9', 'f10', 'f11', 'f19', 'f20', 'f21']
    >>> sorted(a)
    ['f0', 'f1', 'f10', 'f11', 'f19', 'f2', 'f20', 'f21', 'f8', 'f9']
    >>> sorted(a, key=natural_sort_key)
    ['f0', 'f1', 'f2', 'f8', 'f9', 'f10', 'f11', 'f19', 'f20', 'f21']
    """
    return [int(part) if part.isdigit() else part for part in re.split(r"(\d+)", s)]

so that you can do sorted(filenames, key=natural_sort_key) to get numerical order so long as the filenames are consistent with their number placement; and the same function natural_sort_key(last_fn) will tell you where the integers are, so you can increment one. I suppose there had better be only one int group. I bet this works across fastparquet, arrow, spark.

@yohplala
Copy link

yohplala commented Oct 6, 2022

Hi Martin,
Regarding your last comment, and not familiar at all with pyarrow, I made a test.
I could not get to understand how you can tell it to write several files in a single directory.
But the following code writes at least one file, with a filename generated automatically, that I understand is representative of their naming scheme.

import os
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq

path=os.path.expanduser('~/Documents/code/data/fastparquet')
df = pd.DataFrame({'a':list(range(4))})
table = pa.Table.from_pandas(df)
pq.write_to_dataset(table, root_path=path)

I get a filename like:

b2444bb993764d81992980b385cf7d85-0.parquet

This is consistent with the documentation of write_to_dataset().

basename_template : str, optional

  A template string used to generate basenames of written data files.
  The token ‘{i}’ will be replaced with an automatically incremented integer.
  If not specified, it defaults to “guid-{i}.parquet”.
  This option is only supported for use_legacy_dataset=False.

So the int to increment would be the last one, similar to fastparquet's naming scheme. As an answer to your comment: "I suppose there had better be only one int group. ", we could retain the logic to increment that last one.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants