Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-45891][SQL] Rebuild variant binary from shredded data. #48851

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

chenhao-db
Copy link
Contributor

What changes were proposed in this pull request?

It implements the variant rebuild functionality according to the current shredding spec in apache/parquet-format#461, and allows the Parquet reader will be able to read shredded variant data.

Why are the changes needed?

It gives Spark the basic ability to read shredded variant data. It can be improved in the future to read only requested fields.

Does this PR introduce any user-facing change?

Yes, the Parquet reader will be able to read shredded variant data.

How was this patch tested?

Unit tests.

Was this patch authored or co-authored using generative AI tooling?

No.

@chenhao-db
Copy link
Contributor Author

@gene-db @cashmand @cloud-fan could you help review? Thanks!

Copy link
Contributor

@cashmand cashmand left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, change LGTM!

int numElements();
}

public static Variant rebuild(ShreddedRow row, VariantSchema schema) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you mention that this rebuild function should only be called on the top-level schema, and that other one can be called on any recursively shredded sub-schema?

} else if (variantIdx >= 0 && !row.isNullAt(variantIdx)) {
builder.appendVariant(new Variant(row.getBinary(variantIdx), metadata));
} else {
builder.appendNull();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the case that shouldn't really be valid, right? For objects, we shouldn't be calling rebuild recursively, and for arrays or top-level values, we should be storing it in value? Might be worth a comment.

Row(metadata(Seq("b", "d")), null, Row(Row(1, null), Row(null, value("null")))),
Row(metadata(Seq("a", "b", "c", "d")),
shreddedValue("""{"a": 1, "c": 3}""", Seq("a", "b", "c", "d")),
Row(Row(2, null), Row(null, value("4")))),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to make sure I understand, would the result here be any different if the value "4" was put into the typed_value field of d? Is this an example where shredding made a suboptimal but valid decision?

Copy link
Contributor

@gene-db gene-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chenhao-db Thanks for this read feature! I left a few questions.

builder.appendDate(row.getInt(typedIdx));
} else if (scalar instanceof VariantSchema.TimestampType) {
builder.appendTimestamp(row.getLong(typedIdx));
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be

} else if (scalar instanceof VariantSchema.TimestampNTZType) {
  builder.appendTimestampNtz(row.getLong(typedIdx));
} else {
  // error handling
}

Should the error handling ultimately return a malformed variant exception? RIght now, it just crashes?

rebuild(array.getStruct(i, elementSchema.numFields), metadata, elementSchema, builder);
}
builder.finishWritingArray(start, offsets);
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the object case? Should we explicitly check for

} else if (schema.objectSchema != null) {
...
} else {
  // error handling
}

And the error handling should be malformed variant, right?

for (int i = 0; i < v.objectSize(); ++i) {
Variant.ObjectField field = v.getFieldAtIndex(i);
int id = builder.addKey(field.key);
fields.add(new VariantBuilder.FieldEntry(field.key, id, builder.getWritePos() - start));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we have to check to see if there are no duplicate fields in the variant blob? The spec says the shredded field must overwrite the encoded blob field.

builder.finishWritingObject(start, fields);
}
} else if (variantIdx >= 0 && !row.isNullAt(variantIdx)) {
builder.appendVariant(new Variant(row.getBinary(variantIdx), metadata));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add a comment here. This is when there is no typed_value, and only the value parquet column?

SparkShreddedRow(row.getStruct(ordinal, numFields))
override def getArray(ordinal: Int): SparkShreddedRow =
SparkShreddedRow(row.getArray(ordinal))
override def numElements(): Int = row.asInstanceOf[ArrayData].numElements()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is row guaranteed to be ArrayData?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants