Skip to content

Commit

Permalink
@no-snow refactor a few variables into ClientBufferParameters (#581)
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-gdoci authored Sep 8, 2023
1 parent 257aeee commit 3a3cbc8
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,6 @@ public int getOrdinal() {
// Metric callback to report size of inserted rows
private final Consumer<Float> rowSizeMetric;

private final long maxChunkSizeInBytes;

// State of the owning channel
final ChannelRuntimeState channelState;

Expand All @@ -172,13 +170,16 @@ public int getOrdinal() {

final ZoneId defaultTimezone;

// Buffer parameters that are set at the owning client level
final ClientBufferParameters clientBufferParameters;

AbstractRowBuffer(
OpenChannelRequest.OnErrorOption onErrorOption,
ZoneId defaultTimezone,
String fullyQualifiedChannelName,
Consumer<Float> rowSizeMetric,
ChannelRuntimeState channelRuntimeState,
long maxChunkSizeInBytes) {
ClientBufferParameters clientBufferParameters) {
this.onErrorOption = onErrorOption;
this.defaultTimezone = defaultTimezone;
this.rowSizeMetric = rowSizeMetric;
Expand All @@ -188,7 +189,7 @@ public int getOrdinal() {
this.flushLock = new ReentrantLock();
this.bufferedRowCount = 0;
this.bufferSize = 0F;
this.maxChunkSizeInBytes = maxChunkSizeInBytes;
this.clientBufferParameters = clientBufferParameters;

// Initialize empty stats
this.statsMap = new HashMap<>();
Expand Down Expand Up @@ -539,9 +540,7 @@ static <T> AbstractRowBuffer<T> createRowBuffer(
String fullyQualifiedChannelName,
Consumer<Float> rowSizeMetric,
ChannelRuntimeState channelRuntimeState,
boolean enableParquetMemoryOptimization,
long maxChunkSizeInBytes,
long maxRowSizeInBytes) {
ClientBufferParameters clientBufferParameters) {
switch (bdecVersion) {
case THREE:
//noinspection unchecked
Expand All @@ -552,20 +551,18 @@ static <T> AbstractRowBuffer<T> createRowBuffer(
fullyQualifiedChannelName,
rowSizeMetric,
channelRuntimeState,
enableParquetMemoryOptimization,
maxChunkSizeInBytes,
maxRowSizeInBytes);
clientBufferParameters);
default:
throw new SFException(
ErrorCode.INTERNAL_ERROR, "Unsupported BDEC format version: " + bdecVersion);
}
}

private void checkBatchSizeEnforcedMaximum(float batchSizeInBytes) {
if (batchSizeInBytes > maxChunkSizeInBytes) {
if (batchSizeInBytes > clientBufferParameters.getMaxChunkSizeInBytes()) {
throw new SFException(
ErrorCode.MAX_BATCH_SIZE_EXCEEDED,
maxChunkSizeInBytes,
clientBufferParameters.getMaxChunkSizeInBytes(),
INSERT_ROWS_RECOMMENDED_MAX_BATCH_SIZE_IN_BYTES);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright (c) 2023 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;

import net.snowflake.ingest.utils.ParameterProvider;

/** Channel's buffer relevant parameters that are set at the owning client level. */
public class ClientBufferParameters {

private boolean enableParquetInternalBuffering;

private long maxChunkSizeInBytes;

private long maxAllowedRowSizeInBytes;

/**
* Private constructor used for test methods
*
* @param enableParquetInternalBuffering flag whether buffering in internal Parquet buffers is
* enabled
* @param maxChunkSizeInBytes maximum chunk size in bytes
* @param maxAllowedRowSizeInBytes maximum row size in bytes
*/
private ClientBufferParameters(
boolean enableParquetInternalBuffering,
long maxChunkSizeInBytes,
long maxAllowedRowSizeInBytes) {
this.enableParquetInternalBuffering = enableParquetInternalBuffering;
this.maxChunkSizeInBytes = maxChunkSizeInBytes;
this.maxAllowedRowSizeInBytes = maxAllowedRowSizeInBytes;
}

/** @param clientInternal reference to the client object where the relevant parameters are set */
public ClientBufferParameters(SnowflakeStreamingIngestClientInternal clientInternal) {
this.enableParquetInternalBuffering =
clientInternal != null
? clientInternal.getParameterProvider().getEnableParquetInternalBuffering()
: ParameterProvider.ENABLE_PARQUET_INTERNAL_BUFFERING_DEFAULT;
this.maxChunkSizeInBytes =
clientInternal != null
? clientInternal.getParameterProvider().getMaxChunkSizeInBytes()
: ParameterProvider.MAX_CHUNK_SIZE_IN_BYTES_DEFAULT;
this.maxAllowedRowSizeInBytes =
clientInternal != null
? clientInternal.getParameterProvider().getMaxAllowedRowSizeInBytes()
: ParameterProvider.MAX_ALLOWED_ROW_SIZE_IN_BYTES_DEFAULT;
}

/**
* @param enableParquetInternalBuffering flag whether buffering in internal Parquet buffers is
* enabled
* @param maxChunkSizeInBytes maximum chunk size in bytes
* @param maxAllowedRowSizeInBytes maximum row size in bytes
* @return ClientBufferParameters object
*/
public static ClientBufferParameters test_createClientBufferParameters(
boolean enableParquetInternalBuffering,
long maxChunkSizeInBytes,
long maxAllowedRowSizeInBytes) {
return new ClientBufferParameters(
enableParquetInternalBuffering, maxChunkSizeInBytes, maxAllowedRowSizeInBytes);
}

public boolean getEnableParquetInternalBuffering() {
return enableParquetInternalBuffering;
}

public long getMaxChunkSizeInBytes() {
return maxChunkSizeInBytes;
}

public long getMaxAllowedRowSizeInBytes() {
return maxAllowedRowSizeInBytes;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,6 @@ public class ParquetRowBuffer extends AbstractRowBuffer<ParquetChunkData> {
private final String channelName;

private MessageType schema;
private final boolean enableParquetInternalBuffering;
private final long maxChunkSizeInBytes;
private final long maxAllowedRowSizeInBytes;

/** Construct a ParquetRowBuffer object. */
ParquetRowBuffer(
Expand All @@ -62,24 +59,19 @@ public class ParquetRowBuffer extends AbstractRowBuffer<ParquetChunkData> {
String fullyQualifiedChannelName,
Consumer<Float> rowSizeMetric,
ChannelRuntimeState channelRuntimeState,
boolean enableParquetInternalBuffering,
long maxChunkSizeInBytes,
long maxAllowedRowSizeInBytes) {
ClientBufferParameters clientBufferParameters) {
super(
onErrorOption,
defaultTimezone,
fullyQualifiedChannelName,
rowSizeMetric,
channelRuntimeState,
maxChunkSizeInBytes);
clientBufferParameters);
this.fieldIndex = new HashMap<>();
this.metadata = new HashMap<>();
this.data = new ArrayList<>();
this.tempData = new ArrayList<>();
this.channelName = fullyQualifiedChannelName;
this.enableParquetInternalBuffering = enableParquetInternalBuffering;
this.maxChunkSizeInBytes = maxChunkSizeInBytes;
this.maxAllowedRowSizeInBytes = maxAllowedRowSizeInBytes;
}

@Override
Expand Down Expand Up @@ -122,9 +114,14 @@ public void setupSchema(List<ColumnMetadata> columns) {
private void createFileWriter() {
fileOutput = new ByteArrayOutputStream();
try {
if (enableParquetInternalBuffering) {
if (clientBufferParameters.getEnableParquetInternalBuffering()) {
bdecParquetWriter =
new BdecParquetWriter(fileOutput, schema, metadata, channelName, maxChunkSizeInBytes);
new BdecParquetWriter(
fileOutput,
schema,
metadata,
channelName,
clientBufferParameters.getMaxChunkSizeInBytes());
} else {
this.bdecParquetWriter = null;
}
Expand All @@ -150,7 +147,7 @@ float addRow(
}

void writeRow(List<Object> row) {
if (enableParquetInternalBuffering) {
if (clientBufferParameters.getEnableParquetInternalBuffering()) {
bdecParquetWriter.writeRow(row);
} else {
data.add(row);
Expand Down Expand Up @@ -209,11 +206,12 @@ private float addRow(

long rowSizeRoundedUp = Double.valueOf(Math.ceil(size)).longValue();

if (rowSizeRoundedUp > maxAllowedRowSizeInBytes) {
if (rowSizeRoundedUp > clientBufferParameters.getMaxAllowedRowSizeInBytes()) {
throw new SFException(
ErrorCode.MAX_ROW_SIZE_EXCEEDED,
String.format(
"rowSizeInBytes=%.3f maxAllowedRowSizeInBytes=%d", size, maxAllowedRowSizeInBytes));
"rowSizeInBytes=%.3f maxAllowedRowSizeInBytes=%d",
size, clientBufferParameters.getMaxAllowedRowSizeInBytes()));
}

out.accept(Arrays.asList(indexedRow));
Expand Down Expand Up @@ -257,7 +255,7 @@ Optional<ParquetChunkData> getSnapshot(final String filePath) {
metadata.put(Constants.PRIMARY_FILE_ID_KEY, StreamingIngestUtils.getShortname(filePath));

List<List<Object>> oldData = new ArrayList<>();
if (!enableParquetInternalBuffering) {
if (!clientBufferParameters.getEnableParquetInternalBuffering()) {
data.forEach(r -> oldData.add(new ArrayList<>(r)));
}
return bufferedRowCount <= 0
Expand Down Expand Up @@ -321,7 +319,10 @@ void closeInternal() {

@Override
public Flusher<ParquetChunkData> createFlusher() {
return new ParquetFlusher(schema, enableParquetInternalBuffering, maxChunkSizeInBytes);
return new ParquetFlusher(
schema,
clientBufferParameters.getEnableParquetInternalBuffering(),
clientBufferParameters.getMaxChunkSizeInBytes());
}

private static class ParquetColumn {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.Logging;
import net.snowflake.ingest.utils.ParameterProvider;
import net.snowflake.ingest.utils.SFException;
import net.snowflake.ingest.utils.Utils;

Expand Down Expand Up @@ -122,15 +121,7 @@ class SnowflakeStreamingIngestChannelInternal<T> implements SnowflakeStreamingIn
getFullyQualifiedName(),
this::collectRowSize,
channelState,
owningClient != null
? owningClient.getParameterProvider().getEnableParquetInternalBuffering()
: ParameterProvider.ENABLE_PARQUET_INTERNAL_BUFFERING_DEFAULT,
owningClient != null
? owningClient.getParameterProvider().getMaxChunkSizeInBytes()
: ParameterProvider.MAX_CHUNK_SIZE_IN_BYTES_DEFAULT,
owningClient != null
? owningClient.getParameterProvider().getMaxAllowedRowSizeInBytes()
: ParameterProvider.MAX_ALLOWED_ROW_SIZE_IN_BYTES_DEFAULT);
new ClientBufferParameters(owningClient));
logger.logInfo(
"Channel={} created for table={}",
this.channelFlushContext.getName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,10 @@ private AbstractRowBuffer<?> createTestBuffer(OpenChannelRequest.OnErrorOption o
"test.buffer",
rs -> {},
initialState,
enableParquetMemoryOptimization,
MAX_CHUNK_SIZE_IN_BYTES_DEFAULT,
MAX_ALLOWED_ROW_SIZE_IN_BYTES_DEFAULT);
ClientBufferParameters.test_createClientBufferParameters(
enableParquetMemoryOptimization,
MAX_CHUNK_SIZE_IN_BYTES_DEFAULT,
MAX_ALLOWED_ROW_SIZE_IN_BYTES_DEFAULT));
}

@Test
Expand Down

0 comments on commit 3a3cbc8

Please sign in to comment.