Skip to content

Commit

Permalink
Merge pull request #17890 from jackdelv/HPCC-30458
Browse files Browse the repository at this point in the history
HPCC-30458 Parquet plugin needs timestamp support

Reviewed-by: Jake Smith <[email protected]>
Reviewed-by: Gavin Halliday <[email protected]>
Merged-by: Gavin Halliday <[email protected]>
  • Loading branch information
ghalliday authored Oct 13, 2023
2 parents 25612cd + 6cf2540 commit 4944f9c
Show file tree
Hide file tree
Showing 3 changed files with 277 additions and 120 deletions.
26 changes: 26 additions & 0 deletions plugins/parquet/examples/taxi_data.ecl
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
IMPORT PARQUET;

EXPORT Layout := RECORD
STRING VendorID;
STRING tpep_pickup_datetime;
STRING tpep_dropoff_datetime;
STRING passenger_count;
STRING trip_distance;
STRING RatecodeID;
STRING store_and_fwd_flag;
STRING PULocationID;
STRING DOLocationID;
STRING payment_type;
STRING fare_amount;
STRING extra;
STRING mta_tax;
STRING tip_amount;
STRING tolls_amount;
STRING improvement_surcharge;
STRING total_amount;
END;

tripData := '/datadrive/dev/test_data/yellow_tripdata_2017-01.parquet';
read_in := ParquetIO.Read(Layout, tripData);
COUNT(read_in);
OUTPUT(CHOOSEN(read_in, 100));
272 changes: 161 additions & 111 deletions plugins/parquet/parquetembed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -838,6 +838,130 @@ int64_t ParquetRowBuilder::currArrayIndex()
return !m_pathStack.empty() && m_pathStack.back().nodeType == CPNTSet ? m_pathStack.back().childrenProcessed++ : currentRow;
}

__int64 getSigned(std::shared_ptr<ParquetArrayVisitor> *array_visitor, int index)
{
switch ((*array_visitor)->size)
{
case 8:
return (*array_visitor)->int8_arr->Value(index);
case 16:
return (*array_visitor)->int16_arr->Value(index);
case 32:
return (*array_visitor)->int32_arr->Value(index);
case 64:
return (*array_visitor)->int64_arr->Value(index);
default:
failx("getSigned: Invalid size %i", (*array_visitor)->size);
}
}

unsigned __int64 getUnsigned(std::shared_ptr<ParquetArrayVisitor> *array_visitor, int index)
{
switch ((*array_visitor)->size)
{
case 8:
return (*array_visitor)->uint8_arr->Value(index);
case 16:
return (*array_visitor)->uint16_arr->Value(index);
case 32:
return (*array_visitor)->uint32_arr->Value(index);
case 64:
return (*array_visitor)->uint64_arr->Value(index);
default:
failx("getUnsigned: Invalid size %i", (*array_visitor)->size);
}
}

double getReal(std::shared_ptr<ParquetArrayVisitor> *array_visitor, int index)
{
switch ((*array_visitor)->size)
{
case 2:
return (*array_visitor)->half_float_arr->Value(index);
case 4:
return (*array_visitor)->float_arr->Value(index);
case 8:
return (*array_visitor)->double_arr->Value(index);
default:
failx("getReal: Invalid size %i", (*array_visitor)->size);
}
}

std::string_view ParquetRowBuilder::getCurrView(const RtlFieldInfo *field)
{
serialized.clear();

switch((*array_visitor)->type)
{
case BoolType:
tokenSerializer.serialize((*array_visitor)->bool_arr->Value(currArrayIndex()), serialized);
return serialized.str();
case BinaryType:
return (*array_visitor)->bin_arr->GetView(currArrayIndex());
case LargeBinaryType:
return (*array_visitor)->large_bin_arr->GetView(currArrayIndex());
case RealType:
tokenSerializer.serialize(getReal(array_visitor, currArrayIndex()), serialized);
return serialized.str();
case IntType:
tokenSerializer.serialize(getSigned(array_visitor, currArrayIndex()), serialized);
return serialized.str();
case UIntType:
tokenSerializer.serialize(getUnsigned(array_visitor, currArrayIndex()), serialized);
return serialized.str();
case DateType:
tokenSerializer.serialize((*array_visitor)->size == 32 ? (__int32) (*array_visitor)->date32_arr->Value(currArrayIndex()) : (__int64) (*array_visitor)->date64_arr->Value(currArrayIndex()), serialized);
return serialized.str();
case TimestampType:
tokenSerializer.serialize((__int64) (*array_visitor)->timestamp_arr->Value(currArrayIndex()), serialized);
return serialized.str();
case TimeType:
tokenSerializer.serialize((*array_visitor)->size == 32 ? (__int32) (*array_visitor)->time32_arr->Value(currArrayIndex()) : (__int64) (*array_visitor)->time64_arr->Value(currArrayIndex()), serialized);
return serialized.str();
case DurationType:
tokenSerializer.serialize((__int64) (*array_visitor)->duration_arr->Value(currArrayIndex()), serialized);
return serialized.str();
case StringType:
return (*array_visitor)->string_arr->GetView(currArrayIndex());
case LargeStringType:
return (*array_visitor)->large_string_arr->GetView(currArrayIndex());
case DecimalType:
return (*array_visitor)->size == 128 ? (*array_visitor)->dec_arr->GetView(currArrayIndex()) : (*array_visitor)->large_dec_arr->GetView(currArrayIndex());
default:
failx("Unimplemented Parquet type for field with name %s.", field->name);
}
}

__int64 ParquetRowBuilder::getCurrIntValue(const RtlFieldInfo *field)
{
switch ((*array_visitor)->type)
{
case BoolType:
return (*array_visitor)->bool_arr->Value(currArrayIndex());
case IntType:
return getSigned(array_visitor, currArrayIndex());
case UIntType:
return getUnsigned(array_visitor, currArrayIndex());
case RealType:
return getReal(array_visitor, currArrayIndex());
case DateType:
return (*array_visitor)->size == 32 ? (*array_visitor)->date32_arr->Value(currArrayIndex()) : (*array_visitor)->date64_arr->Value(currArrayIndex());
case TimestampType:
return (*array_visitor)->timestamp_arr->Value(currArrayIndex());
case TimeType:
return (*array_visitor)->size == 32 ? (*array_visitor)->time32_arr->Value(currArrayIndex()) : (*array_visitor)->time64_arr->Value(currArrayIndex());
case DurationType:
return (*array_visitor)->duration_arr->Value(currArrayIndex());
default:
{
__int64 myint64 = 0;
auto scalar = getCurrView(field);
handleDeserializeOutcome(tokenDeserializer.deserialize(scalar.data(), myint64), "signed", scalar.data());
return myint64;
}
}
}

/**
* @brief Gets a Boolean result for an ECL Row
*
Expand All @@ -853,11 +977,8 @@ bool ParquetRowBuilder::getBooleanResult(const RtlFieldInfo *field)
NullFieldProcessor p(field);
return p.boolResult;
}
if ((*array_visitor)->type != BoolType)
{
failx("Incorrect type for field %s.", field->name);
}
return (*array_visitor)->bool_arr->Value(currArrayIndex());

return getCurrIntValue(field);
}

/**
Expand All @@ -877,16 +998,10 @@ void ParquetRowBuilder::getDataResult(const RtlFieldInfo *field, size32_t &len,
rtlUtf8ToDataX(len, result, p.resultChars, p.stringResult);
return;
}
if ((*array_visitor)->type == BinaryType)
{
auto view = (*array_visitor)->large_bin_arr->GetView(currArrayIndex());
rtlStrToDataX(len, result, view.size(), view.data());
return;
}
else
{
failx("Incorrect type for field %s.", field->name);
}

auto view = getCurrView(field);
rtlStrToDataX(len, result, view.size(), view.data());
return;
}

/**
Expand All @@ -904,45 +1019,11 @@ double ParquetRowBuilder::getRealResult(const RtlFieldInfo *field)
NullFieldProcessor p(field);
return p.doubleResult;
}
if ((*array_visitor)->type != DoubleType)
{
failx("Incorrect type for field %s.", field->name);
}
return (*array_visitor)->double_arr->Value(currArrayIndex());
}

__int64 getSigned(std::shared_ptr<ParquetArrayVisitor> *array_visitor, int index)
{
switch ((*array_visitor)->size)
{
case 8:
return (*array_visitor)->int8_arr->Value(index);
case 16:
return (*array_visitor)->int16_arr->Value(index);
case 32:
return (*array_visitor)->int32_arr->Value(index);
case 64:
return (*array_visitor)->int64_arr->Value(index);
default:
failx("getSigned: Invalid size %i", (*array_visitor)->size);
}
}

unsigned __int64 getUnsigned(std::shared_ptr<ParquetArrayVisitor> *array_visitor, int index)
{
switch ((*array_visitor)->size)
{
case 8:
return (*array_visitor)->uint8_arr->Value(index);
case 16:
return (*array_visitor)->uint16_arr->Value(index);
case 32:
return (*array_visitor)->uint32_arr->Value(index);
case 64:
return (*array_visitor)->uint64_arr->Value(index);
default:
failx("getUnsigned: Invalid size %i", (*array_visitor)->size);
}
if ((*array_visitor)->type == RealType)
return getReal(array_visitor, currArrayIndex());
else
return getCurrIntValue(field);
}

/**
Expand All @@ -958,13 +1039,10 @@ __int64 ParquetRowBuilder::getSignedResult(const RtlFieldInfo *field)
if ((*array_visitor)->type == NullType)
{
NullFieldProcessor p(field);
return p.uintResult;
}
if ((*array_visitor)->type != IntType)
{
failx("Incorrect type for field %s.", field->name);
return p.intResult;
}
return getSigned(array_visitor, currArrayIndex());

return getCurrIntValue(field);
}

/**
Expand All @@ -979,15 +1057,14 @@ unsigned __int64 ParquetRowBuilder::getUnsignedResult(const RtlFieldInfo *field)

if ((*array_visitor)->type == NullType)
{

NullFieldProcessor p(field);
return p.uintResult;
}
if ((*array_visitor)->type != UIntType)
{
failx("Incorrect type for field %s.", field->name);
}
return getUnsigned(array_visitor, currArrayIndex());

if ((*array_visitor)->type == UIntType)
return getUnsigned(array_visitor, currArrayIndex());
else
return getCurrIntValue(field);
}

/**
Expand All @@ -1007,17 +1084,10 @@ void ParquetRowBuilder::getStringResult(const RtlFieldInfo *field, size32_t &cha
rtlUtf8ToStrX(chars, result, p.resultChars, p.stringResult);
return;
}
if ((*array_visitor)->type == StringType)
{
auto view = (*array_visitor)->string_arr->GetView(currArrayIndex());
unsigned numchars = rtlUtf8Length(view.size(), view.data());
rtlUtf8ToStrX(chars, result, numchars, view.data());
return;
}
else
{
failx("Incorrect type for field %s.", field->name);
}
auto view = getCurrView(field);
unsigned numchars = rtlUtf8Length(view.size(), view.data());
rtlUtf8ToStrX(chars, result, numchars, view.data());
return;
}

/**
Expand All @@ -1037,17 +1107,10 @@ void ParquetRowBuilder::getUTF8Result(const RtlFieldInfo *field, size32_t &chars
rtlUtf8ToUtf8X(chars, result, p.resultChars, p.stringResult);
return;
}
if ((*array_visitor)->type == StringType)
{
auto view = (*array_visitor)->string_arr->GetView(currArrayIndex());
unsigned numchars = rtlUtf8Length(view.size(), view.data());
rtlUtf8ToUtf8X(chars, result, numchars, view.data());
return;
}
else
{
failx("Incorrect type for field %s.", field->name);
}
auto view = getCurrView(field);
unsigned numchars = rtlUtf8Length(view.size(), view.data());
rtlUtf8ToUtf8X(chars, result, numchars, view.data());
return;
}

/**
Expand All @@ -1067,17 +1130,10 @@ void ParquetRowBuilder::getUnicodeResult(const RtlFieldInfo *field, size32_t &ch
rtlUnicodeToUnicodeX(chars, result, p.resultChars, p.unicodeResult);
return;
}
if ((*array_visitor)->type == StringType)
{
auto view = (*array_visitor)->string_arr->GetView(currArrayIndex());
unsigned numchars = rtlUtf8Length(view.size(), view.data());
rtlUtf8ToUnicodeX(chars, result, numchars, view.data());
return;
}
else
{
failx("Incorrect type for field %s.", field->name);
}
auto view = getCurrView(field);
unsigned numchars = rtlUtf8Length(view.size(), view.data());
rtlUtf8ToUnicodeX(chars, result, numchars, view.data());
return;
}

/**
Expand All @@ -1096,18 +1152,12 @@ void ParquetRowBuilder::getDecimalResult(const RtlFieldInfo *field, Decimal &val
value.set(p.decimalResult);
return;
}
if ((*array_visitor)->type == StringType)
{
auto dvalue = (*array_visitor)->string_arr->GetView(currArrayIndex());
value.setString(dvalue.size(), dvalue.data());
RtlDecimalTypeInfo *dtype = (RtlDecimalTypeInfo *)field->type;
value.setPrecision(dtype->getDecimalDigits(), dtype->getDecimalPrecision());
return;
}
else
{
failx("Incorrect type for field %s.", field->name);
}

auto dvalue = getCurrView(field);
value.setString(dvalue.size(), dvalue.data());
RtlDecimalTypeInfo *dtype = (RtlDecimalTypeInfo *)field->type;
value.setPrecision(dtype->getDecimalDigits(), dtype->getDecimalPrecision());
return;
}

/**
Expand All @@ -1129,7 +1179,7 @@ void ParquetRowBuilder::processBeginSet(const RtlFieldInfo *field, bool &isAll)
}
else
{
failx("Incorrect type for field %s.", field->name);
failx("Error reading nested set with name %s.", field->name);
}
}

Expand Down
Loading

0 comments on commit 4944f9c

Please sign in to comment.