Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1654124: Write file name to metadata at the place when we create the file #824

Merged
merged 10 commits into from
Sep 10, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 18 additions & 8 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,12 @@
<version>1.14.9</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>${hadoop.version}</version>
<scope>test</scope>
</dependency>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is needed for ParquetFileReader to work

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
Expand Down Expand Up @@ -478,6 +484,10 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.parquet/parquet-hadoop -->
<dependency>
<groupId>org.apache.parquet</groupId>
Expand Down Expand Up @@ -756,8 +766,8 @@
<ignoreNonCompile>true</ignoreNonCompile>
<ignoredDependencies>
<!-- We defined these as direct dependencies (as opposed to just declaring it in dependencyManagement)
to workaround https://issues.apache.org/jira/browse/MNG-7982. Now the dependency analyzer complains that
the dependency is unused, so we ignore it here-->
to workaround https://issues.apache.org/jira/browse/MNG-7982. Now the dependency analyzer complains that
the dependency is unused, so we ignore it here-->
<ignoredDependency>org.apache.commons:commons-compress</ignoredDependency>
<ignoredDependency>org.apache.commons:commons-configuration2</ignoredDependency>
</ignoredDependencies>
Expand Down Expand Up @@ -852,9 +862,9 @@
<configuration>
<errorRemedy>failFast</errorRemedy>
<!--
The list of allowed licenses. If you see the build failing due to "There are some forbidden licenses used, please
check your dependencies", verify the conditions of the license and add the reference to it here.
-->
The list of allowed licenses. If you see the build failing due to "There are some forbidden licenses used, please
check your dependencies", verify the conditions of the license and add the reference to it here.
-->
<includedLicenses>
<includedLicense>Apache License 2.0</includedLicense>
<includedLicense>BSD 2-Clause License</includedLicense>
Expand Down Expand Up @@ -1167,9 +1177,9 @@
</executions>
</plugin>
<!--
Plugin executes license processing Python script, which copies third party license files into the directory
target/generated-licenses-info/META-INF/third-party-licenses, which is then included in the shaded JAR.
-->
Plugin executes license processing Python script, which copies third party license files into the directory
target/generated-licenses-info/META-INF/third-party-licenses, which is then included in the shaded JAR.
-->
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -496,11 +496,10 @@ public InsertValidationResponse insertRows(
* Flush the data in the row buffer by taking the ownership of the old vectors and pass all the
* required info back to the flush service to build the blob
*
* @param filePath the name of the file the data will be written in
* @return A ChannelData object that contains the info needed by the flush service to build a blob
*/
@Override
public ChannelData<T> flush(final String filePath) {
public ChannelData<T> flush() {
logger.logDebug("Start get data for channel={}", channelFullyQualifiedName);
if (this.bufferedRowCount > 0) {
Optional<T> oldData = Optional.empty();
Expand All @@ -518,7 +517,7 @@ public ChannelData<T> flush(final String filePath) {
try {
if (this.bufferedRowCount > 0) {
// Transfer the ownership of the vectors
oldData = getSnapshot(filePath);
oldData = getSnapshot();
oldRowCount = this.bufferedRowCount;
oldBufferSize = this.bufferSize;
oldRowSequencer = this.channelState.incrementAndGetRowSequencer();
Expand Down Expand Up @@ -615,12 +614,8 @@ void reset() {
this.statsMap.replaceAll((key, value) -> value.forkEmpty());
}

/**
* Get buffered data snapshot for later flushing.
*
* @param filePath the name of the file the data will be written in
*/
abstract Optional<T> getSnapshot(final String filePath);
/** Get buffered data snapshot for later flushing. */
abstract Optional<T> getSnapshot();

@VisibleForTesting
abstract Object getVectorValueAt(String column, int index);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ void distributeFlushTasks(Set<String> tablesToFlush) {
.forEach(
channel -> {
if (channel.isValid()) {
ChannelData<T> data = channel.getData(blobPath);
ChannelData<T> data = channel.getData();
if (data != null) {
channelsDataPerTable.add(data);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,10 @@ private SerializationResult serializeFromJavaObjects(
}

Map<String, String> metadata = channelsDataPerTable.get(0).getVectors().metadata;
// We insert the filename in the file itself as metadata so that streams can work on replicated
// mixed tables. For a more detailed discussion on the topic see SNOW-561447 and
// http://go/streams-on-replicated-mixed-tables
metadata.put(Constants.PRIMARY_FILE_ID_KEY, StreamingIngestUtils.getShortname(filePath));
Comment on lines +215 to +218
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the major change, i move the place where we put the file name

parquetWriter =
new BdecParquetWriter(
mergedData,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import net.snowflake.ingest.connection.TelemetryService;
import net.snowflake.ingest.streaming.OffsetTokenVerificationFunction;
import net.snowflake.ingest.streaming.OpenChannelRequest;
import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.SFException;
import org.apache.parquet.hadoop.BdecParquetWriter;
Expand Down Expand Up @@ -262,12 +261,7 @@ boolean hasColumns() {
}

@Override
Optional<ParquetChunkData> getSnapshot(final String filePath) {
// We insert the filename in the file itself as metadata so that streams can work on replicated
// mixed tables. For a more detailed discussion on the topic see SNOW-561447 and
// http://go/streams-on-replicated-mixed-tables
metadata.put(Constants.PRIMARY_FILE_ID_KEY, StreamingIngestUtils.getShortname(filePath));
sfc-gh-tzhang marked this conversation as resolved.
Show resolved Hide resolved

Optional<ParquetChunkData> getSnapshot() {
List<List<Object>> oldData = new ArrayList<>();
if (!clientBufferParameters.getEnableParquetInternalBuffering()) {
data.forEach(r -> oldData.add(new ArrayList<>(r)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,9 @@ InsertValidationResponse insertRows(
* Flush the data in the row buffer by taking the ownership of the old vectors and pass all the
* required info back to the flush service to build the blob
*
* @param filePath the name of the file the data will be written in
* @return A ChannelData object that contains the info needed by the flush service to build a blob
*/
ChannelData<T> flush(final String filePath);
ChannelData<T> flush();

/**
* Close the row buffer and release resources. Note that the caller needs to handle
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,11 +220,10 @@ public String getFullyQualifiedTableName() {
/**
* Get all the data needed to build the blob during flush
*
* @param filePath the name of the file the data will be written in
* @return a ChannelData object
*/
ChannelData<T> getData(final String filePath) {
ChannelData<T> data = this.rowBuffer.flush(filePath);
ChannelData<T> getData() {
ChannelData<T> data = this.rowBuffer.flush();
if (data != null) {
data.setChannelContext(channelFlushContext);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,15 @@
*/
public class BdecParquetReader implements AutoCloseable {
private final InternalParquetRecordReader<List<Object>> reader;
private final ParquetFileReader fileReader;

/**
* @param data buffer where the data that has to be read resides.
* @throws IOException
*/
public BdecParquetReader(byte[] data) throws IOException {
ParquetReadOptions options = ParquetReadOptions.builder().build();
ParquetFileReader fileReader = ParquetFileReader.open(new BdecInputFile(data), options);
fileReader = ParquetFileReader.open(new BdecInputFile(data), options);
reader = new InternalParquetRecordReader<>(new BdecReadSupport(), options.getRecordFilter());
reader.initialize(fileReader, options);
}
Expand All @@ -60,6 +61,11 @@ public List<Object> read() throws IOException {
}
}

/** Get the key value metadata in the file */
public Map<String, String> getKeyValueMetadata() {
return fileReader.getFileMetaData().getKeyValueMetaData();
}

/**
* Close the reader.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ void setParameterOverride(Map<String, Object> parameterOverride) {

ChannelData<T> flushChannel(String name) {
SnowflakeStreamingIngestChannelInternal<T> channel = channels.get(name);
ChannelData<T> channelData = channel.getRowBuffer().flush(name + "_snowpipe_streaming.bdec");
ChannelData<T> channelData = channel.getRowBuffer().flush();
channelData.setChannelContext(channel.getChannelContext());
this.channelData.add(channelData);
return channelData;
Expand Down
Loading
Loading