Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HPCC-30458 Parquet plugin needs timestamp support #17890

Merged
merged 1 commit into from
Oct 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions plugins/parquet/examples/taxi_data.ecl
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
IMPORT PARQUET;

EXPORT Layout := RECORD
STRING VendorID;
STRING tpep_pickup_datetime;
STRING tpep_dropoff_datetime;
STRING passenger_count;
STRING trip_distance;
STRING RatecodeID;
STRING store_and_fwd_flag;
STRING PULocationID;
STRING DOLocationID;
STRING payment_type;
STRING fare_amount;
STRING extra;
STRING mta_tax;
STRING tip_amount;
STRING tolls_amount;
STRING improvement_surcharge;
STRING total_amount;
END;

tripData := '/datadrive/dev/test_data/yellow_tripdata_2017-01.parquet';
read_in := ParquetIO.Read(Layout, tripData);
COUNT(read_in);
OUTPUT(CHOOSEN(read_in, 100));
272 changes: 161 additions & 111 deletions plugins/parquet/parquetembed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -838,6 +838,130 @@ int64_t ParquetRowBuilder::currArrayIndex()
return !m_pathStack.empty() && m_pathStack.back().nodeType == CPNTSet ? m_pathStack.back().childrenProcessed++ : currentRow;
}

__int64 getSigned(std::shared_ptr<ParquetArrayVisitor> *array_visitor, int index)
{
switch ((*array_visitor)->size)
{
case 8:
return (*array_visitor)->int8_arr->Value(index);
case 16:
return (*array_visitor)->int16_arr->Value(index);
case 32:
return (*array_visitor)->int32_arr->Value(index);
case 64:
return (*array_visitor)->int64_arr->Value(index);
default:
failx("getSigned: Invalid size %i", (*array_visitor)->size);
}
}

unsigned __int64 getUnsigned(std::shared_ptr<ParquetArrayVisitor> *array_visitor, int index)
{
switch ((*array_visitor)->size)
{
case 8:
return (*array_visitor)->uint8_arr->Value(index);
case 16:
return (*array_visitor)->uint16_arr->Value(index);
case 32:
return (*array_visitor)->uint32_arr->Value(index);
case 64:
return (*array_visitor)->uint64_arr->Value(index);
default:
failx("getUnsigned: Invalid size %i", (*array_visitor)->size);
}
}

double getReal(std::shared_ptr<ParquetArrayVisitor> *array_visitor, int index)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: These functions could be static (to avoid the function names being exported globally). Don't change for this PR though.

{
switch ((*array_visitor)->size)
{
case 2:
return (*array_visitor)->half_float_arr->Value(index);
case 4:
return (*array_visitor)->float_arr->Value(index);
case 8:
return (*array_visitor)->double_arr->Value(index);
default:
failx("getReal: Invalid size %i", (*array_visitor)->size);
}
}

std::string_view ParquetRowBuilder::getCurrView(const RtlFieldInfo *field)
{
serialized.clear();

switch((*array_visitor)->type)
{
case BoolType:
tokenSerializer.serialize((*array_visitor)->bool_arr->Value(currArrayIndex()), serialized);
return serialized.str();
case BinaryType:
return (*array_visitor)->bin_arr->GetView(currArrayIndex());
case LargeBinaryType:
return (*array_visitor)->large_bin_arr->GetView(currArrayIndex());
case RealType:
tokenSerializer.serialize(getReal(array_visitor, currArrayIndex()), serialized);
return serialized.str();
case IntType:
tokenSerializer.serialize(getSigned(array_visitor, currArrayIndex()), serialized);
return serialized.str();
case UIntType:
tokenSerializer.serialize(getUnsigned(array_visitor, currArrayIndex()), serialized);
return serialized.str();
case DateType:
tokenSerializer.serialize((*array_visitor)->size == 32 ? (__int32) (*array_visitor)->date32_arr->Value(currArrayIndex()) : (__int64) (*array_visitor)->date64_arr->Value(currArrayIndex()), serialized);
return serialized.str();
case TimestampType:
tokenSerializer.serialize((__int64) (*array_visitor)->timestamp_arr->Value(currArrayIndex()), serialized);
return serialized.str();
case TimeType:
tokenSerializer.serialize((*array_visitor)->size == 32 ? (__int32) (*array_visitor)->time32_arr->Value(currArrayIndex()) : (__int64) (*array_visitor)->time64_arr->Value(currArrayIndex()), serialized);
return serialized.str();
case DurationType:
tokenSerializer.serialize((__int64) (*array_visitor)->duration_arr->Value(currArrayIndex()), serialized);
return serialized.str();
case StringType:
return (*array_visitor)->string_arr->GetView(currArrayIndex());
case LargeStringType:
return (*array_visitor)->large_string_arr->GetView(currArrayIndex());
case DecimalType:
return (*array_visitor)->size == 128 ? (*array_visitor)->dec_arr->GetView(currArrayIndex()) : (*array_visitor)->large_dec_arr->GetView(currArrayIndex());
default:
failx("Unimplemented Parquet type for field with name %s.", field->name);
}
}

__int64 ParquetRowBuilder::getCurrIntValue(const RtlFieldInfo *field)
{
switch ((*array_visitor)->type)
{
case BoolType:
return (*array_visitor)->bool_arr->Value(currArrayIndex());
case IntType:
return getSigned(array_visitor, currArrayIndex());
case UIntType:
return getUnsigned(array_visitor, currArrayIndex());
case RealType:
return getReal(array_visitor, currArrayIndex());
case DateType:
return (*array_visitor)->size == 32 ? (*array_visitor)->date32_arr->Value(currArrayIndex()) : (*array_visitor)->date64_arr->Value(currArrayIndex());
case TimestampType:
return (*array_visitor)->timestamp_arr->Value(currArrayIndex());
case TimeType:
return (*array_visitor)->size == 32 ? (*array_visitor)->time32_arr->Value(currArrayIndex()) : (*array_visitor)->time64_arr->Value(currArrayIndex());
case DurationType:
return (*array_visitor)->duration_arr->Value(currArrayIndex());
default:
{
__int64 myint64 = 0;
auto scalar = getCurrView(field);
handleDeserializeOutcome(tokenDeserializer.deserialize(scalar.data(), myint64), "signed", scalar.data());
return myint64;
}
}
}

/**
* @brief Gets a Boolean result for an ECL Row
*
Expand All @@ -853,11 +977,8 @@ bool ParquetRowBuilder::getBooleanResult(const RtlFieldInfo *field)
NullFieldProcessor p(field);
return p.boolResult;
}
if ((*array_visitor)->type != BoolType)
{
failx("Incorrect type for field %s.", field->name);
}
return (*array_visitor)->bool_arr->Value(currArrayIndex());

return getCurrIntValue(field);
}

/**
Expand All @@ -877,16 +998,10 @@ void ParquetRowBuilder::getDataResult(const RtlFieldInfo *field, size32_t &len,
rtlUtf8ToDataX(len, result, p.resultChars, p.stringResult);
return;
}
if ((*array_visitor)->type == BinaryType)
{
auto view = (*array_visitor)->large_bin_arr->GetView(currArrayIndex());
rtlStrToDataX(len, result, view.size(), view.data());
return;
}
else
{
failx("Incorrect type for field %s.", field->name);
}

auto view = getCurrView(field);
rtlStrToDataX(len, result, view.size(), view.data());
return;
}

/**
Expand All @@ -904,45 +1019,11 @@ double ParquetRowBuilder::getRealResult(const RtlFieldInfo *field)
NullFieldProcessor p(field);
return p.doubleResult;
}
if ((*array_visitor)->type != DoubleType)
{
failx("Incorrect type for field %s.", field->name);
}
return (*array_visitor)->double_arr->Value(currArrayIndex());
}

__int64 getSigned(std::shared_ptr<ParquetArrayVisitor> *array_visitor, int index)
{
switch ((*array_visitor)->size)
{
case 8:
return (*array_visitor)->int8_arr->Value(index);
case 16:
return (*array_visitor)->int16_arr->Value(index);
case 32:
return (*array_visitor)->int32_arr->Value(index);
case 64:
return (*array_visitor)->int64_arr->Value(index);
default:
failx("getSigned: Invalid size %i", (*array_visitor)->size);
}
}

unsigned __int64 getUnsigned(std::shared_ptr<ParquetArrayVisitor> *array_visitor, int index)
{
switch ((*array_visitor)->size)
{
case 8:
return (*array_visitor)->uint8_arr->Value(index);
case 16:
return (*array_visitor)->uint16_arr->Value(index);
case 32:
return (*array_visitor)->uint32_arr->Value(index);
case 64:
return (*array_visitor)->uint64_arr->Value(index);
default:
failx("getUnsigned: Invalid size %i", (*array_visitor)->size);
}
if ((*array_visitor)->type == RealType)
return getReal(array_visitor, currArrayIndex());
else
return getCurrIntValue(field);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Strings will not be converted to real values. However I would be tempted to create a separate PR to fix that and merge this one as-is.

}

/**
Expand All @@ -958,13 +1039,10 @@ __int64 ParquetRowBuilder::getSignedResult(const RtlFieldInfo *field)
if ((*array_visitor)->type == NullType)
{
NullFieldProcessor p(field);
return p.uintResult;
}
if ((*array_visitor)->type != IntType)
{
failx("Incorrect type for field %s.", field->name);
return p.intResult;
}
return getSigned(array_visitor, currArrayIndex());

return getCurrIntValue(field);
}

/**
Expand All @@ -979,15 +1057,14 @@ unsigned __int64 ParquetRowBuilder::getUnsignedResult(const RtlFieldInfo *field)

if ((*array_visitor)->type == NullType)
{

NullFieldProcessor p(field);
return p.uintResult;
}
if ((*array_visitor)->type != UIntType)
{
failx("Incorrect type for field %s.", field->name);
}
return getUnsigned(array_visitor, currArrayIndex());

if ((*array_visitor)->type == UIntType)
return getUnsigned(array_visitor, currArrayIndex());
else
return getCurrIntValue(field);
}

/**
Expand All @@ -1007,17 +1084,10 @@ void ParquetRowBuilder::getStringResult(const RtlFieldInfo *field, size32_t &cha
rtlUtf8ToStrX(chars, result, p.resultChars, p.stringResult);
return;
}
if ((*array_visitor)->type == StringType)
{
auto view = (*array_visitor)->string_arr->GetView(currArrayIndex());
unsigned numchars = rtlUtf8Length(view.size(), view.data());
rtlUtf8ToStrX(chars, result, numchars, view.data());
return;
}
else
{
failx("Incorrect type for field %s.", field->name);
}
auto view = getCurrView(field);
unsigned numchars = rtlUtf8Length(view.size(), view.data());
rtlUtf8ToStrX(chars, result, numchars, view.data());
return;
}

/**
Expand All @@ -1037,17 +1107,10 @@ void ParquetRowBuilder::getUTF8Result(const RtlFieldInfo *field, size32_t &chars
rtlUtf8ToUtf8X(chars, result, p.resultChars, p.stringResult);
return;
}
if ((*array_visitor)->type == StringType)
{
auto view = (*array_visitor)->string_arr->GetView(currArrayIndex());
unsigned numchars = rtlUtf8Length(view.size(), view.data());
rtlUtf8ToUtf8X(chars, result, numchars, view.data());
return;
}
else
{
failx("Incorrect type for field %s.", field->name);
}
auto view = getCurrView(field);
unsigned numchars = rtlUtf8Length(view.size(), view.data());
rtlUtf8ToUtf8X(chars, result, numchars, view.data());
return;
}

/**
Expand All @@ -1067,17 +1130,10 @@ void ParquetRowBuilder::getUnicodeResult(const RtlFieldInfo *field, size32_t &ch
rtlUnicodeToUnicodeX(chars, result, p.resultChars, p.unicodeResult);
return;
}
if ((*array_visitor)->type == StringType)
{
auto view = (*array_visitor)->string_arr->GetView(currArrayIndex());
unsigned numchars = rtlUtf8Length(view.size(), view.data());
rtlUtf8ToUnicodeX(chars, result, numchars, view.data());
return;
}
else
{
failx("Incorrect type for field %s.", field->name);
}
auto view = getCurrView(field);
unsigned numchars = rtlUtf8Length(view.size(), view.data());
rtlUtf8ToUnicodeX(chars, result, numchars, view.data());
return;
}

/**
Expand All @@ -1096,18 +1152,12 @@ void ParquetRowBuilder::getDecimalResult(const RtlFieldInfo *field, Decimal &val
value.set(p.decimalResult);
return;
}
if ((*array_visitor)->type == StringType)
{
auto dvalue = (*array_visitor)->string_arr->GetView(currArrayIndex());
value.setString(dvalue.size(), dvalue.data());
RtlDecimalTypeInfo *dtype = (RtlDecimalTypeInfo *)field->type;
value.setPrecision(dtype->getDecimalDigits(), dtype->getDecimalPrecision());
return;
}
else
{
failx("Incorrect type for field %s.", field->name);
}

auto dvalue = getCurrView(field);
value.setString(dvalue.size(), dvalue.data());
RtlDecimalTypeInfo *dtype = (RtlDecimalTypeInfo *)field->type;
value.setPrecision(dtype->getDecimalDigits(), dtype->getDecimalPrecision());
return;
}

/**
Expand All @@ -1129,7 +1179,7 @@ void ParquetRowBuilder::processBeginSet(const RtlFieldInfo *field, bool &isAll)
}
else
{
failx("Incorrect type for field %s.", field->name);
failx("Error reading nested set with name %s.", field->name);
}
}

Expand Down
Loading
Loading