diff --git a/plugins/parquet/examples/taxi_data.ecl b/plugins/parquet/examples/taxi_data.ecl new file mode 100644 index 00000000000..b2ed9deef06 --- /dev/null +++ b/plugins/parquet/examples/taxi_data.ecl @@ -0,0 +1,26 @@ +IMPORT PARQUET; + +EXPORT Layout := RECORD + STRING VendorID; + STRING tpep_pickup_datetime; + STRING tpep_dropoff_datetime; + STRING passenger_count; + STRING trip_distance; + STRING RatecodeID; + STRING store_and_fwd_flag; + STRING PULocationID; + STRING DOLocationID; + STRING payment_type; + STRING fare_amount; + STRING extra; + STRING mta_tax; + STRING tip_amount; + STRING tolls_amount; + STRING improvement_surcharge; + STRING total_amount; +END; + +tripData := '/datadrive/dev/test_data/yellow_tripdata_2017-01.parquet'; +read_in := ParquetIO.Read(Layout, tripData); +COUNT(read_in); +OUTPUT(CHOOSEN(read_in, 100)); \ No newline at end of file diff --git a/plugins/parquet/parquetembed.cpp b/plugins/parquet/parquetembed.cpp index 366b76279c3..749b14d35b6 100644 --- a/plugins/parquet/parquetembed.cpp +++ b/plugins/parquet/parquetembed.cpp @@ -838,6 +838,130 @@ int64_t ParquetRowBuilder::currArrayIndex() return !m_pathStack.empty() && m_pathStack.back().nodeType == CPNTSet ? m_pathStack.back().childrenProcessed++ : currentRow; } +__int64 getSigned(std::shared_ptr *array_visitor, int index) +{ + switch ((*array_visitor)->size) + { + case 8: + return (*array_visitor)->int8_arr->Value(index); + case 16: + return (*array_visitor)->int16_arr->Value(index); + case 32: + return (*array_visitor)->int32_arr->Value(index); + case 64: + return (*array_visitor)->int64_arr->Value(index); + default: + failx("getSigned: Invalid size %i", (*array_visitor)->size); + } +} + +unsigned __int64 getUnsigned(std::shared_ptr *array_visitor, int index) +{ + switch ((*array_visitor)->size) + { + case 8: + return (*array_visitor)->uint8_arr->Value(index); + case 16: + return (*array_visitor)->uint16_arr->Value(index); + case 32: + return (*array_visitor)->uint32_arr->Value(index); + case 64: + return (*array_visitor)->uint64_arr->Value(index); + default: + failx("getUnsigned: Invalid size %i", (*array_visitor)->size); + } +} + +double getReal(std::shared_ptr *array_visitor, int index) +{ + switch ((*array_visitor)->size) + { + case 2: + return (*array_visitor)->half_float_arr->Value(index); + case 4: + return (*array_visitor)->float_arr->Value(index); + case 8: + return (*array_visitor)->double_arr->Value(index); + default: + failx("getReal: Invalid size %i", (*array_visitor)->size); + } +} + +std::string_view ParquetRowBuilder::getCurrView(const RtlFieldInfo *field) +{ + serialized.clear(); + + switch((*array_visitor)->type) + { + case BoolType: + tokenSerializer.serialize((*array_visitor)->bool_arr->Value(currArrayIndex()), serialized); + return serialized.str(); + case BinaryType: + return (*array_visitor)->bin_arr->GetView(currArrayIndex()); + case LargeBinaryType: + return (*array_visitor)->large_bin_arr->GetView(currArrayIndex()); + case RealType: + tokenSerializer.serialize(getReal(array_visitor, currArrayIndex()), serialized); + return serialized.str(); + case IntType: + tokenSerializer.serialize(getSigned(array_visitor, currArrayIndex()), serialized); + return serialized.str(); + case UIntType: + tokenSerializer.serialize(getUnsigned(array_visitor, currArrayIndex()), serialized); + return serialized.str(); + case DateType: + tokenSerializer.serialize((*array_visitor)->size == 32 ? (__int32) (*array_visitor)->date32_arr->Value(currArrayIndex()) : (__int64) (*array_visitor)->date64_arr->Value(currArrayIndex()), serialized); + return serialized.str(); + case TimestampType: + tokenSerializer.serialize((__int64) (*array_visitor)->timestamp_arr->Value(currArrayIndex()), serialized); + return serialized.str(); + case TimeType: + tokenSerializer.serialize((*array_visitor)->size == 32 ? (__int32) (*array_visitor)->time32_arr->Value(currArrayIndex()) : (__int64) (*array_visitor)->time64_arr->Value(currArrayIndex()), serialized); + return serialized.str(); + case DurationType: + tokenSerializer.serialize((__int64) (*array_visitor)->duration_arr->Value(currArrayIndex()), serialized); + return serialized.str(); + case StringType: + return (*array_visitor)->string_arr->GetView(currArrayIndex()); + case LargeStringType: + return (*array_visitor)->large_string_arr->GetView(currArrayIndex()); + case DecimalType: + return (*array_visitor)->size == 128 ? (*array_visitor)->dec_arr->GetView(currArrayIndex()) : (*array_visitor)->large_dec_arr->GetView(currArrayIndex()); + default: + failx("Unimplemented Parquet type for field with name %s.", field->name); + } +} + +__int64 ParquetRowBuilder::getCurrIntValue(const RtlFieldInfo *field) +{ + switch ((*array_visitor)->type) + { + case BoolType: + return (*array_visitor)->bool_arr->Value(currArrayIndex()); + case IntType: + return getSigned(array_visitor, currArrayIndex()); + case UIntType: + return getUnsigned(array_visitor, currArrayIndex()); + case RealType: + return getReal(array_visitor, currArrayIndex()); + case DateType: + return (*array_visitor)->size == 32 ? (*array_visitor)->date32_arr->Value(currArrayIndex()) : (*array_visitor)->date64_arr->Value(currArrayIndex()); + case TimestampType: + return (*array_visitor)->timestamp_arr->Value(currArrayIndex()); + case TimeType: + return (*array_visitor)->size == 32 ? (*array_visitor)->time32_arr->Value(currArrayIndex()) : (*array_visitor)->time64_arr->Value(currArrayIndex()); + case DurationType: + return (*array_visitor)->duration_arr->Value(currArrayIndex()); + default: + { + __int64 myint64 = 0; + auto scalar = getCurrView(field); + handleDeserializeOutcome(tokenDeserializer.deserialize(scalar.data(), myint64), "signed", scalar.data()); + return myint64; + } + } +} + /** * @brief Gets a Boolean result for an ECL Row * @@ -853,11 +977,8 @@ bool ParquetRowBuilder::getBooleanResult(const RtlFieldInfo *field) NullFieldProcessor p(field); return p.boolResult; } - if ((*array_visitor)->type != BoolType) - { - failx("Incorrect type for field %s.", field->name); - } - return (*array_visitor)->bool_arr->Value(currArrayIndex()); + + return getCurrIntValue(field); } /** @@ -877,16 +998,10 @@ void ParquetRowBuilder::getDataResult(const RtlFieldInfo *field, size32_t &len, rtlUtf8ToDataX(len, result, p.resultChars, p.stringResult); return; } - if ((*array_visitor)->type == BinaryType) - { - auto view = (*array_visitor)->large_bin_arr->GetView(currArrayIndex()); - rtlStrToDataX(len, result, view.size(), view.data()); - return; - } - else - { - failx("Incorrect type for field %s.", field->name); - } + + auto view = getCurrView(field); + rtlStrToDataX(len, result, view.size(), view.data()); + return; } /** @@ -904,45 +1019,11 @@ double ParquetRowBuilder::getRealResult(const RtlFieldInfo *field) NullFieldProcessor p(field); return p.doubleResult; } - if ((*array_visitor)->type != DoubleType) - { - failx("Incorrect type for field %s.", field->name); - } - return (*array_visitor)->double_arr->Value(currArrayIndex()); -} - -__int64 getSigned(std::shared_ptr *array_visitor, int index) -{ - switch ((*array_visitor)->size) - { - case 8: - return (*array_visitor)->int8_arr->Value(index); - case 16: - return (*array_visitor)->int16_arr->Value(index); - case 32: - return (*array_visitor)->int32_arr->Value(index); - case 64: - return (*array_visitor)->int64_arr->Value(index); - default: - failx("getSigned: Invalid size %i", (*array_visitor)->size); - } -} -unsigned __int64 getUnsigned(std::shared_ptr *array_visitor, int index) -{ - switch ((*array_visitor)->size) - { - case 8: - return (*array_visitor)->uint8_arr->Value(index); - case 16: - return (*array_visitor)->uint16_arr->Value(index); - case 32: - return (*array_visitor)->uint32_arr->Value(index); - case 64: - return (*array_visitor)->uint64_arr->Value(index); - default: - failx("getUnsigned: Invalid size %i", (*array_visitor)->size); - } + if ((*array_visitor)->type == RealType) + return getReal(array_visitor, currArrayIndex()); + else + return getCurrIntValue(field); } /** @@ -958,13 +1039,10 @@ __int64 ParquetRowBuilder::getSignedResult(const RtlFieldInfo *field) if ((*array_visitor)->type == NullType) { NullFieldProcessor p(field); - return p.uintResult; - } - if ((*array_visitor)->type != IntType) - { - failx("Incorrect type for field %s.", field->name); + return p.intResult; } - return getSigned(array_visitor, currArrayIndex()); + + return getCurrIntValue(field); } /** @@ -979,15 +1057,14 @@ unsigned __int64 ParquetRowBuilder::getUnsignedResult(const RtlFieldInfo *field) if ((*array_visitor)->type == NullType) { - NullFieldProcessor p(field); return p.uintResult; } - if ((*array_visitor)->type != UIntType) - { - failx("Incorrect type for field %s.", field->name); - } - return getUnsigned(array_visitor, currArrayIndex()); + + if ((*array_visitor)->type == UIntType) + return getUnsigned(array_visitor, currArrayIndex()); + else + return getCurrIntValue(field); } /** @@ -1007,17 +1084,10 @@ void ParquetRowBuilder::getStringResult(const RtlFieldInfo *field, size32_t &cha rtlUtf8ToStrX(chars, result, p.resultChars, p.stringResult); return; } - if ((*array_visitor)->type == StringType) - { - auto view = (*array_visitor)->string_arr->GetView(currArrayIndex()); - unsigned numchars = rtlUtf8Length(view.size(), view.data()); - rtlUtf8ToStrX(chars, result, numchars, view.data()); - return; - } - else - { - failx("Incorrect type for field %s.", field->name); - } + auto view = getCurrView(field); + unsigned numchars = rtlUtf8Length(view.size(), view.data()); + rtlUtf8ToStrX(chars, result, numchars, view.data()); + return; } /** @@ -1037,17 +1107,10 @@ void ParquetRowBuilder::getUTF8Result(const RtlFieldInfo *field, size32_t &chars rtlUtf8ToUtf8X(chars, result, p.resultChars, p.stringResult); return; } - if ((*array_visitor)->type == StringType) - { - auto view = (*array_visitor)->string_arr->GetView(currArrayIndex()); - unsigned numchars = rtlUtf8Length(view.size(), view.data()); - rtlUtf8ToUtf8X(chars, result, numchars, view.data()); - return; - } - else - { - failx("Incorrect type for field %s.", field->name); - } + auto view = getCurrView(field); + unsigned numchars = rtlUtf8Length(view.size(), view.data()); + rtlUtf8ToUtf8X(chars, result, numchars, view.data()); + return; } /** @@ -1067,17 +1130,10 @@ void ParquetRowBuilder::getUnicodeResult(const RtlFieldInfo *field, size32_t &ch rtlUnicodeToUnicodeX(chars, result, p.resultChars, p.unicodeResult); return; } - if ((*array_visitor)->type == StringType) - { - auto view = (*array_visitor)->string_arr->GetView(currArrayIndex()); - unsigned numchars = rtlUtf8Length(view.size(), view.data()); - rtlUtf8ToUnicodeX(chars, result, numchars, view.data()); - return; - } - else - { - failx("Incorrect type for field %s.", field->name); - } + auto view = getCurrView(field); + unsigned numchars = rtlUtf8Length(view.size(), view.data()); + rtlUtf8ToUnicodeX(chars, result, numchars, view.data()); + return; } /** @@ -1096,18 +1152,12 @@ void ParquetRowBuilder::getDecimalResult(const RtlFieldInfo *field, Decimal &val value.set(p.decimalResult); return; } - if ((*array_visitor)->type == StringType) - { - auto dvalue = (*array_visitor)->string_arr->GetView(currArrayIndex()); - value.setString(dvalue.size(), dvalue.data()); - RtlDecimalTypeInfo *dtype = (RtlDecimalTypeInfo *)field->type; - value.setPrecision(dtype->getDecimalDigits(), dtype->getDecimalPrecision()); - return; - } - else - { - failx("Incorrect type for field %s.", field->name); - } + + auto dvalue = getCurrView(field); + value.setString(dvalue.size(), dvalue.data()); + RtlDecimalTypeInfo *dtype = (RtlDecimalTypeInfo *)field->type; + value.setPrecision(dtype->getDecimalDigits(), dtype->getDecimalPrecision()); + return; } /** @@ -1129,7 +1179,7 @@ void ParquetRowBuilder::processBeginSet(const RtlFieldInfo *field, bool &isAll) } else { - failx("Incorrect type for field %s.", field->name); + failx("Error reading nested set with name %s.", field->name); } } diff --git a/plugins/parquet/parquetembed.hpp b/plugins/parquet/parquetembed.hpp index 8da6377116b..9f1bb2015e0 100644 --- a/plugins/parquet/parquetembed.hpp +++ b/plugins/parquet/parquetembed.hpp @@ -132,13 +132,18 @@ enum ParquetArrayType BoolType, IntType, UIntType, - FloatType, + DateType, + TimestampType, + TimeType, + DurationType, StringType, + LargeStringType, BinaryType, - Decimal128Type, + LargeBinaryType, + DecimalType, ListType, StructType, - DoubleType + RealType }; class ParquetArrayVisitor : public arrow::ArrayVisitor @@ -211,16 +216,65 @@ class ParquetArrayVisitor : public arrow::ArrayVisitor size = 64; return arrow::Status::OK(); } + arrow::Status Visit(const arrow::Date32Array &array) + { + date32_arr = &array; + type = DateType; + size = 32; + return arrow::Status::OK(); + } + arrow::Status Visit(const arrow::Date64Array &array) + { + date64_arr = &array; + type = DateType; + size = 64; + return arrow::Status::OK(); + } + arrow::Status Visit(const arrow::TimestampArray &array) + { + timestamp_arr = &array; + type = TimestampType; + return arrow::Status::OK(); + } + arrow::Status Visit(const arrow::Time32Array &array) + { + time32_arr = &array; + type = TimeType; + size = 32; + return arrow::Status::OK(); + } + arrow::Status Visit(const arrow::Time64Array &array) + { + time64_arr = &array; + type = TimeType; + size = 64; + return arrow::Status::OK(); + } + arrow::Status Visit(const arrow::DurationArray &array) + { + duration_arr = &array; + type = DurationType; + return arrow::Status::OK(); + } + arrow::Status Visit(const arrow::HalfFloatArray &array) + { + half_float_arr = &array; + type = RealType; + size = 2; + return arrow::Status::OK(); + } arrow::Status Visit(const arrow::FloatArray &array) { float_arr = &array; - type = FloatType; + type = RealType; + size = 4; return arrow::Status::OK(); } arrow::Status Visit(const arrow::DoubleArray &array) { double_arr = &array; - type = DoubleType; + type = RealType; + size = 8; return arrow::Status::OK(); } arrow::Status Visit(const arrow::StringArray &array) @@ -229,6 +283,12 @@ class ParquetArrayVisitor : public arrow::ArrayVisitor type = StringType; return arrow::Status::OK(); } + arrow::Status Visit(const arrow::LargeStringArray &array) + { + large_string_arr = &array; + type = LargeStringType; + return arrow::Status::OK(); + } arrow::Status Visit(const arrow::BinaryArray &array) { bin_arr = &array; @@ -238,13 +298,21 @@ class ParquetArrayVisitor : public arrow::ArrayVisitor arrow::Status Visit(const arrow::LargeBinaryArray &array) { large_bin_arr = &array; - type = BinaryType; + type = LargeBinaryType; return arrow::Status::OK(); } arrow::Status Visit(const arrow::Decimal128Array &array) { dec_arr = &array; - type = Decimal128Type; + type = DecimalType; + size = 128; + return arrow::Status::OK(); + } + arrow::Status Visit(const arrow::Decimal256Array &array) + { + large_dec_arr = &array; + type = DecimalType; + size = 256; return arrow::Status::OK(); } arrow::Status Visit(const arrow::ListArray &array) @@ -271,12 +339,21 @@ class ParquetArrayVisitor : public arrow::ArrayVisitor const arrow::UInt16Array *uint16_arr = nullptr; const arrow::UInt32Array *uint32_arr = nullptr; const arrow::UInt64Array *uint64_arr = nullptr; + const arrow::Date32Array *date32_arr = nullptr; + const arrow::Date64Array *date64_arr = nullptr; + const arrow::TimestampArray *timestamp_arr = nullptr; + const arrow::Time32Array *time32_arr = nullptr; + const arrow::Time64Array *time64_arr = nullptr; + const arrow::DurationArray *duration_arr = nullptr; + const arrow::HalfFloatArray *half_float_arr = nullptr; const arrow::FloatArray *float_arr = nullptr; const arrow::DoubleArray *double_arr = nullptr; const arrow::StringArray *string_arr = nullptr; + const arrow::LargeStringArray *large_string_arr = nullptr; const arrow::BinaryArray *bin_arr = nullptr; const arrow::LargeBinaryArray *large_bin_arr = nullptr; const arrow::Decimal128Array *dec_arr = nullptr; + const arrow::Decimal256Array *large_dec_arr = nullptr; const arrow::ListArray *list_arr = nullptr; const arrow::StructArray *struct_arr = nullptr; }; @@ -820,6 +897,8 @@ class ParquetRowBuilder : public CInterfaceOf protected: const std::shared_ptr &getChunk(std::shared_ptr *column); + std::string_view getCurrView(const RtlFieldInfo *field); + __int64 getCurrIntValue(const RtlFieldInfo *field); void nextField(const RtlFieldInfo *field); void nextFromStruct(const RtlFieldInfo *field); void xpathOrName(StringBuffer &outXPath, const RtlFieldInfo *field) const; @@ -827,7 +906,9 @@ class ParquetRowBuilder : public CInterfaceOf private: __int64 currentRow; - TokenDeserializer m_tokenDeserializer; + TokenDeserializer tokenDeserializer; + TokenSerializer tokenSerializer; + StringBuffer serialized; std::unordered_map> *result_rows; std::vector m_pathStack; std::shared_ptr *array_visitor; @@ -1022,7 +1103,7 @@ class ParquetEmbedFunctionContext : public CInterfaceOf Owned m_oInputStream; //! Input Stream used for building a dataset. - TokenDeserializer m_tokenDeserializer; + TokenDeserializer tokenDeserializer; TokenSerializer m_tokenSerializer; unsigned m_nextParam = 0; //! Index of the next parameter to process. unsigned m_numParams = 0; //! Number of parameters in the function definition.