diff --git a/common/thorhelper/thorread.cpp b/common/thorhelper/thorread.cpp index 1f92b7ea31b..a5c2e672182 100644 --- a/common/thorhelper/thorread.cpp +++ b/common/thorhelper/thorread.cpp @@ -1300,7 +1300,7 @@ class CParquetActivityContext : public IThorActivityContext /* * Base class for reading a Parquet local file */ -class ParquetDiskRowReader : public CInterfaceOf, implements IDiskRowReader +class ParquetDiskRowReader : public ExternalFormatDiskRowReader { public: ParquetDiskRowReader(IDiskReadMapping * _mapping); @@ -1325,23 +1325,13 @@ class ParquetDiskRowReader : public CInterfaceOf, implements IDi virtual bool setInputFile(const CLogicalFileSlice & slice, const FieldFilterArray & expectedFilter, unsigned copy) override; protected: - IDiskReadMapping * mapping = nullptr; - Owned outputAllocator; - - MemoryBuffer tempOutputBuffer; - MemoryBufferBuilder bufferBuilder; - parquetembed::ParquetReader * parquetFileReader = nullptr; CParquetActivityContext * parquetActivityCtx = nullptr; - StringAttr format; - RecordTranslationMode translationMode; - bool eogPending = false; }; ParquetDiskRowReader::ParquetDiskRowReader(IDiskReadMapping * _mapping) - : mapping(_mapping), bufferBuilder(tempOutputBuffer, 0), parquetActivityCtx(new CParquetActivityContext(true, 1, 0)) + : ExternalFormatDiskRowReader(_mapping), parquetActivityCtx(new CParquetActivityContext(true, 1, 0)) { - translationMode = mapping->queryTranslationMode(); } ParquetDiskRowReader::~ParquetDiskRowReader() @@ -1435,12 +1425,11 @@ void ParquetDiskRowReader::stop() void ParquetDiskRowReader::clearInput() { - eogPending = false; } bool ParquetDiskRowReader::matches(const char * _format, bool _streamRemote, IDiskReadMapping * _mapping) { - if (!strieq(format, "parquet")) + if (!strieq(_format, PARQUET_FILE_TYPE_NAME)) return false; return true; // TO DO add additional check } @@ -1708,17 +1697,18 @@ void RemoteDiskRowReader::stop() ///--------------------------------------------------------------------------------------------------------------------- +// Lookup to map the names of file types/formats to their object constructors; +// map will be initialized within MODULE_INIT +static std::map> genericFileTypeMap; + +// format is assumed to be lowercase IDiskRowReader * doCreateLocalDiskReader(const char * format, IDiskReadMapping * _mapping) { -#ifdef _USE_PARQUET - if (strieq(format, "parquet")) - return new ParquetDiskRowReader(_mapping); -#endif - if (strieq(format, "flat")) - return new BinaryDiskRowReader(_mapping); - if (strieq(format, "csv")) - return new CsvDiskRowReader(_mapping); + auto foundReader = genericFileTypeMap.find(format); + + if (foundReader != genericFileTypeMap.end()) + return foundReader->second(_mapping); UNIMPLEMENTED; } @@ -1756,6 +1746,27 @@ IDiskRowReader * createDiskReader(const char * format, bool streamRemote, IDiskR return createLocalDiskReader(format, _mapping); } +MODULE_INIT(INIT_PRIORITY_STANDARD) +{ + // All pluggable file types that use the generic disk reader + // should be defined here; the key is the lowecase name of the format, + // as will be used in ECL, and the value should be a lambda + // that creates the appropriate disk row reader object + genericFileTypeMap.emplace("flat", [](IDiskReadMapping * _mapping) { return new BinaryDiskRowReader(_mapping); }); + genericFileTypeMap.emplace("csv", [](IDiskReadMapping * _mapping) { return new CsvDiskRowReader(_mapping); }); +#ifdef _USE_PARQUET + genericFileTypeMap.emplace(PARQUET_FILE_TYPE_NAME, [](IDiskReadMapping * _mapping) { return new ParquetDiskRowReader(_mapping); }); +#endif + + // Stuff the file type names that were just instantiated into a list; + // list will be accessed by the ECL compiler to validate the names + // at compile time + for (auto iter = genericFileTypeMap.begin(); iter != genericFileTypeMap.end(); iter++) + addAvailableGenericFileTypeName(iter->first.c_str()); + + return true; +} + /* diff --git a/ecl/hql/hqlgram2.cpp b/ecl/hql/hqlgram2.cpp index 754437f0698..46d13859e32 100644 --- a/ecl/hql/hqlgram2.cpp +++ b/ecl/hql/hqlgram2.cpp @@ -8995,13 +8995,8 @@ bool HqlGram::convertAllToAttribute(attribute &atr) void HqlGram::setPluggableModeExpr(attribute & targetAttr, attribute & pluginAttr, attribute & options) { const char * fileFormatStr = str(pluginAttr.getId()); - bool isSupportedFileType = false; - // TODO: Hardcoded tests for known filetype plugins; should be - // an iteration through a list of available types, checking for names - isSupportedFileType = strisame(fileFormatStr, "parquet"); - - if (isSupportedFileType) + if (hasGenericFiletypeName(fileFormatStr)) { // Do we just cite the name of the plugin, or perhaps something // like a unique ID from the plugin's factory (assuming there is one)? diff --git a/plugins/parquet/parquetembed.hpp b/plugins/parquet/parquetembed.hpp index 2c65e660371..2135b82ab20 100644 --- a/plugins/parquet/parquetembed.hpp +++ b/plugins/parquet/parquetembed.hpp @@ -50,6 +50,8 @@ extern void UNSUPPORTED(const char *feature) __attribute__((noreturn)); extern void failx(const char *msg, ...) __attribute__((noreturn)) __attribute__((format(printf, 1, 2))); extern void fail(const char *msg) __attribute__((noreturn)); +#define PARQUET_FILE_TYPE_NAME "parquet" + #define reportIfFailure(st) \ if (!st.ok()) \ { \ diff --git a/system/jlib/jfile.cpp b/system/jlib/jfile.cpp index d0496dc84df..f9fd0521570 100644 --- a/system/jlib/jfile.cpp +++ b/system/jlib/jfile.cpp @@ -7757,6 +7757,32 @@ extern IFileIO *createBlockedIO(IFileIO *base, size32_t blockSize) return new CBlockedFileIO(base, blockSize); } +///--------------------------------------------------------------------------------------------------------------------- + +// Module-level global that will contain a list of pluggable file type +// names (e.g. "parquet", "csv") that are supported through the +// generic disk reader +static StringArray genericFileTypeNameList; + +void addAvailableGenericFileTypeName(const char * name) +{ + genericFileTypeNameList.append(name); +} + +// Determine if file type is defined; used by the ECL parser +bool hasGenericFiletypeName(const char * name) +{ + ForEachItemIn(idx, genericFileTypeNameList) + { + if (strieq(genericFileTypeNameList.item(idx), name)) + return true; + } + + return false; +} + +///--------------------------------------------------------------------------------------------------------------------- + // Cache/update plane index blocked IO settings static unsigned planeBlockIOMapCBId = 0; static std::unordered_map planeBlockedIOMap; diff --git a/system/jlib/jfile.hpp b/system/jlib/jfile.hpp index 509200dd1c4..9600a9eea7b 100644 --- a/system/jlib/jfile.hpp +++ b/system/jlib/jfile.hpp @@ -788,4 +788,9 @@ extern jlib_decl IPropertyTreeIterator * getPlanesIterator(const char * category extern jlib_decl IFileIO *createBlockedIO(IFileIO *base, size32_t blockSize); extern jlib_decl size32_t getBlockedFileIOSize(const char *planeName, size32_t defaultSize=0); +//---- Pluggable file type related functions ---------------------------------------------- + +extern jlib_decl void addAvailableGenericFileTypeName(const char * name); +extern jlib_decl bool hasGenericFiletypeName(const char * name); + #endif