diff --git a/pom.xml b/pom.xml
index cb4b2ed0b..b7763631a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -352,6 +352,30 @@
1.14.9test
+
+ org.apache.hadoop
+ hadoop-mapreduce-client-core
+ ${hadoop.version}
+ test
+
+
+ jakarta.xml.bind
+ jakarta.xml.bind-api
+
+
+ javax.xml.bind
+ jaxb-api
+
+
+ org.slf4j
+ slf4j-reload4j
+
+
+ org.slf4j
+ slf4j-simple
+
+
+ org.mockitomockito-core
@@ -478,6 +502,10 @@
org.apache.hadoophadoop-common
+
+ org.apache.hadoop
+ hadoop-mapreduce-client-core
+ org.apache.parquet
@@ -756,8 +784,8 @@
true
+ to workaround https://issues.apache.org/jira/browse/MNG-7982. Now the dependency analyzer complains that
+ the dependency is unused, so we ignore it here-->
org.apache.commons:commons-compressorg.apache.commons:commons-configuration2
@@ -852,9 +880,9 @@
failFast
+ The list of allowed licenses. If you see the build failing due to "There are some forbidden licenses used, please
+ check your dependencies", verify the conditions of the license and add the reference to it here.
+ -->
Apache License 2.0BSD 2-Clause License
@@ -1167,9 +1195,9 @@
+ Plugin executes license processing Python script, which copies third party license files into the directory
+ target/generated-licenses-info/META-INF/third-party-licenses, which is then included in the shaded JAR.
+ -->
org.codehaus.mojoexec-maven-plugin
diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java b/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java
index 71a9d501e..9172c4328 100644
--- a/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java
+++ b/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java
@@ -496,11 +496,10 @@ public InsertValidationResponse insertRows(
* Flush the data in the row buffer by taking the ownership of the old vectors and pass all the
* required info back to the flush service to build the blob
*
- * @param filePath the name of the file the data will be written in
* @return A ChannelData object that contains the info needed by the flush service to build a blob
*/
@Override
- public ChannelData flush(final String filePath) {
+ public ChannelData flush() {
logger.logDebug("Start get data for channel={}", channelFullyQualifiedName);
if (this.bufferedRowCount > 0) {
Optional oldData = Optional.empty();
@@ -518,7 +517,7 @@ public ChannelData flush(final String filePath) {
try {
if (this.bufferedRowCount > 0) {
// Transfer the ownership of the vectors
- oldData = getSnapshot(filePath);
+ oldData = getSnapshot();
oldRowCount = this.bufferedRowCount;
oldBufferSize = this.bufferSize;
oldRowSequencer = this.channelState.incrementAndGetRowSequencer();
@@ -615,12 +614,8 @@ void reset() {
this.statsMap.replaceAll((key, value) -> value.forkEmpty());
}
- /**
- * Get buffered data snapshot for later flushing.
- *
- * @param filePath the name of the file the data will be written in
- */
- abstract Optional getSnapshot(final String filePath);
+ /** Get buffered data snapshot for later flushing. */
+ abstract Optional getSnapshot();
@VisibleForTesting
abstract Object getVectorValueAt(String column, int index);
diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java
index 6f5296209..2e7c6507c 100644
--- a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java
+++ b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java
@@ -430,7 +430,7 @@ void distributeFlushTasks(Set tablesToFlush) {
.forEach(
channel -> {
if (channel.isValid()) {
- ChannelData data = channel.getData(blobPath);
+ ChannelData data = channel.getData();
if (data != null) {
channelsDataPerTable.add(data);
}
diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java
index 3ad3db5f4..d338a6a7b 100644
--- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java
+++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java
@@ -212,6 +212,10 @@ private SerializationResult serializeFromJavaObjects(
}
Map metadata = channelsDataPerTable.get(0).getVectors().metadata;
+ // We insert the filename in the file itself as metadata so that streams can work on replicated
+ // mixed tables. For a more detailed discussion on the topic see SNOW-561447 and
+ // http://go/streams-on-replicated-mixed-tables
+ metadata.put(Constants.PRIMARY_FILE_ID_KEY, StreamingIngestUtils.getShortname(filePath));
parquetWriter =
new BdecParquetWriter(
mergedData,
diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java
index 627478bca..30851c274 100644
--- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java
+++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java
@@ -22,7 +22,6 @@
import net.snowflake.ingest.connection.TelemetryService;
import net.snowflake.ingest.streaming.OffsetTokenVerificationFunction;
import net.snowflake.ingest.streaming.OpenChannelRequest;
-import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.SFException;
import org.apache.parquet.hadoop.BdecParquetWriter;
@@ -262,12 +261,7 @@ boolean hasColumns() {
}
@Override
- Optional getSnapshot(final String filePath) {
- // We insert the filename in the file itself as metadata so that streams can work on replicated
- // mixed tables. For a more detailed discussion on the topic see SNOW-561447 and
- // http://go/streams-on-replicated-mixed-tables
- metadata.put(Constants.PRIMARY_FILE_ID_KEY, StreamingIngestUtils.getShortname(filePath));
-
+ Optional getSnapshot() {
List> oldData = new ArrayList<>();
if (!clientBufferParameters.getEnableParquetInternalBuffering()) {
data.forEach(r -> oldData.add(new ArrayList<>(r)));
diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/RowBuffer.java b/src/main/java/net/snowflake/ingest/streaming/internal/RowBuffer.java
index 02905c02e..6bb2f43b9 100644
--- a/src/main/java/net/snowflake/ingest/streaming/internal/RowBuffer.java
+++ b/src/main/java/net/snowflake/ingest/streaming/internal/RowBuffer.java
@@ -37,10 +37,9 @@ InsertValidationResponse insertRows(
* Flush the data in the row buffer by taking the ownership of the old vectors and pass all the
* required info back to the flush service to build the blob
*
- * @param filePath the name of the file the data will be written in
* @return A ChannelData object that contains the info needed by the flush service to build a blob
*/
- ChannelData flush(final String filePath);
+ ChannelData flush();
/**
* Close the row buffer and release resources. Note that the caller needs to handle
diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java
index ca0bbe782..4e884387b 100644
--- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java
+++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java
@@ -220,11 +220,10 @@ public String getFullyQualifiedTableName() {
/**
* Get all the data needed to build the blob during flush
*
- * @param filePath the name of the file the data will be written in
* @return a ChannelData object
*/
- ChannelData getData(final String filePath) {
- ChannelData data = this.rowBuffer.flush(filePath);
+ ChannelData getData() {
+ ChannelData data = this.rowBuffer.flush();
if (data != null) {
data.setChannelContext(channelFlushContext);
}
diff --git a/src/main/java/org/apache/parquet/hadoop/BdecParquetReader.java b/src/main/java/org/apache/parquet/hadoop/BdecParquetReader.java
index 1a92a8cd4..ef95fab14 100644
--- a/src/main/java/org/apache/parquet/hadoop/BdecParquetReader.java
+++ b/src/main/java/org/apache/parquet/hadoop/BdecParquetReader.java
@@ -34,6 +34,7 @@
*/
public class BdecParquetReader implements AutoCloseable {
private final InternalParquetRecordReader> reader;
+ private final ParquetFileReader fileReader;
/**
* @param data buffer where the data that has to be read resides.
@@ -41,7 +42,7 @@ public class BdecParquetReader implements AutoCloseable {
*/
public BdecParquetReader(byte[] data) throws IOException {
ParquetReadOptions options = ParquetReadOptions.builder().build();
- ParquetFileReader fileReader = ParquetFileReader.open(new BdecInputFile(data), options);
+ fileReader = ParquetFileReader.open(new BdecInputFile(data), options);
reader = new InternalParquetRecordReader<>(new BdecReadSupport(), options.getRecordFilter());
reader.initialize(fileReader, options);
}
@@ -60,6 +61,11 @@ public List