Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HPCC-30781 Introduce format registry for pluggable file formats #18334

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 33 additions & 22 deletions common/thorhelper/thorread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1300,7 +1300,7 @@ class CParquetActivityContext : public IThorActivityContext
/*
* Base class for reading a Parquet local file
*/
class ParquetDiskRowReader : public CInterfaceOf<IDiskRowStream>, implements IDiskRowReader
class ParquetDiskRowReader : public ExternalFormatDiskRowReader
{
public:
ParquetDiskRowReader(IDiskReadMapping * _mapping);
Expand All @@ -1325,23 +1325,13 @@ class ParquetDiskRowReader : public CInterfaceOf<IDiskRowStream>, implements IDi
virtual bool setInputFile(const CLogicalFileSlice & slice, const FieldFilterArray & expectedFilter, unsigned copy) override;

protected:
IDiskReadMapping * mapping = nullptr;
Owned<IEngineRowAllocator> outputAllocator;

MemoryBuffer tempOutputBuffer;
MemoryBufferBuilder bufferBuilder;

parquetembed::ParquetReader * parquetFileReader = nullptr;
CParquetActivityContext * parquetActivityCtx = nullptr;
StringAttr format;
RecordTranslationMode translationMode;
bool eogPending = false;
};

ParquetDiskRowReader::ParquetDiskRowReader(IDiskReadMapping * _mapping)
: mapping(_mapping), bufferBuilder(tempOutputBuffer, 0), parquetActivityCtx(new CParquetActivityContext(true, 1, 0))
: ExternalFormatDiskRowReader(_mapping), parquetActivityCtx(new CParquetActivityContext(true, 1, 0))
{
translationMode = mapping->queryTranslationMode();
}

ParquetDiskRowReader::~ParquetDiskRowReader()
Expand Down Expand Up @@ -1435,12 +1425,11 @@ void ParquetDiskRowReader::stop()

void ParquetDiskRowReader::clearInput()
{
eogPending = false;
}

bool ParquetDiskRowReader::matches(const char * _format, bool _streamRemote, IDiskReadMapping * _mapping)
{
if (!strieq(format, "parquet"))
if (!strieq(_format, PARQUET_FILE_TYPE_NAME))
return false;
return true; // TO DO add additional check
}
Expand Down Expand Up @@ -1708,17 +1697,18 @@ void RemoteDiskRowReader::stop()

///---------------------------------------------------------------------------------------------------------------------

// Lookup to map the names of file types/formats to their object constructors;
// map will be initialized within MODULE_INIT
static std::map<std::string, std::function<DiskRowReader*(IDiskReadMapping*)>> genericFileTypeMap;


// format is assumed to be lowercase
IDiskRowReader * doCreateLocalDiskReader(const char * format, IDiskReadMapping * _mapping)
{
#ifdef _USE_PARQUET
if (strieq(format, "parquet"))
return new ParquetDiskRowReader(_mapping);
#endif
if (strieq(format, "flat"))
return new BinaryDiskRowReader(_mapping);
if (strieq(format, "csv"))
return new CsvDiskRowReader(_mapping);
auto foundReader = genericFileTypeMap.find(format);

if (foundReader != genericFileTypeMap.end())
return foundReader->second(_mapping);

UNIMPLEMENTED;
}
Expand Down Expand Up @@ -1756,6 +1746,27 @@ IDiskRowReader * createDiskReader(const char * format, bool streamRemote, IDiskR
return createLocalDiskReader(format, _mapping);
}

MODULE_INIT(INIT_PRIORITY_STANDARD)
{
// All pluggable file types that use the generic disk reader
// should be defined here; the key is the lowecase name of the format,
// as will be used in ECL, and the value should be a lambda
// that creates the appropriate disk row reader object
genericFileTypeMap.emplace("flat", [](IDiskReadMapping * _mapping) { return new BinaryDiskRowReader(_mapping); });
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the future I suspect this will change to registering factory class instances that have createReader()/writer methods, but this is good for now.

genericFileTypeMap.emplace("csv", [](IDiskReadMapping * _mapping) { return new CsvDiskRowReader(_mapping); });
#ifdef _USE_PARQUET
genericFileTypeMap.emplace(PARQUET_FILE_TYPE_NAME, [](IDiskReadMapping * _mapping) { return new ParquetDiskRowReader(_mapping); });
#endif

// Stuff the file type names that were just instantiated into a list;
// list will be accessed by the ECL compiler to validate the names
// at compile time
for (auto iter = genericFileTypeMap.begin(); iter != genericFileTypeMap.end(); iter++)
addAvailableGenericFileTypeName(iter->first.c_str());

return true;
}


/*

Expand Down
7 changes: 1 addition & 6 deletions ecl/hql/hqlgram2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8995,13 +8995,8 @@ bool HqlGram::convertAllToAttribute(attribute &atr)
void HqlGram::setPluggableModeExpr(attribute & targetAttr, attribute & pluginAttr, attribute & options)
{
const char * fileFormatStr = str(pluginAttr.getId());
bool isSupportedFileType = false;

// TODO: Hardcoded tests for known filetype plugins; should be
// an iteration through a list of available types, checking for names
isSupportedFileType = strisame(fileFormatStr, "parquet");

if (isSupportedFileType)
if (hasGenericFiletypeName(fileFormatStr))
{
// Do we just cite the name of the plugin, or perhaps something
// like a unique ID from the plugin's factory (assuming there is one)?
Expand Down
2 changes: 2 additions & 0 deletions plugins/parquet/parquetembed.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ extern void UNSUPPORTED(const char *feature) __attribute__((noreturn));
extern void failx(const char *msg, ...) __attribute__((noreturn)) __attribute__((format(printf, 1, 2)));
extern void fail(const char *msg) __attribute__((noreturn));

#define PARQUET_FILE_TYPE_NAME "parquet"

#define reportIfFailure(st) \
if (!st.ok()) \
{ \
Expand Down
26 changes: 26 additions & 0 deletions system/jlib/jfile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7757,6 +7757,32 @@ extern IFileIO *createBlockedIO(IFileIO *base, size32_t blockSize)
return new CBlockedFileIO(base, blockSize);
}

///---------------------------------------------------------------------------------------------------------------------

// Module-level global that will contain a list of pluggable file type
// names (e.g. "parquet", "csv") that are supported through the
// generic disk reader
static StringArray genericFileTypeNameList;

void addAvailableGenericFileTypeName(const char * name)
{
genericFileTypeNameList.append(name);
}

// Determine if file type is defined; used by the ECL parser
bool hasGenericFiletypeName(const char * name)
{
ForEachItemIn(idx, genericFileTypeNameList)
{
if (strieq(genericFileTypeNameList.item(idx), name))
return true;
}

return false;
}

///---------------------------------------------------------------------------------------------------------------------

// Cache/update plane index blocked IO settings
static unsigned planeBlockIOMapCBId = 0;
static std::unordered_map<std::string, size32_t> planeBlockedIOMap;
Expand Down
5 changes: 5 additions & 0 deletions system/jlib/jfile.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -788,4 +788,9 @@ extern jlib_decl IPropertyTreeIterator * getPlanesIterator(const char * category
extern jlib_decl IFileIO *createBlockedIO(IFileIO *base, size32_t blockSize);
extern jlib_decl size32_t getBlockedFileIOSize(const char *planeName, size32_t defaultSize=0);

//---- Pluggable file type related functions ----------------------------------------------

extern jlib_decl void addAvailableGenericFileTypeName(const char * name);
extern jlib_decl bool hasGenericFiletypeName(const char * name);

#endif
Loading