Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rcheng doublebuff2 #735

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,6 @@ public class MetricsUtil {
* description
*/
public static final String OFFSET_PERSISTED_IN_SNOWFLAKE = "persisted-in-snowflake-offset";

public static final String LATEST_CONSUMER_OFFSET = "latest-consumer-offset";
// ********** ^ Streaming Constants ^ **********//

public enum EventType {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,6 @@ private void createStreamingChannelForTopicPartition(
partitionChannelKey, // Streaming channel name
tableName,
hasSchemaEvolutionPermission,
new StreamingBufferThreshold(this.flushTimeSeconds, this.fileSizeBytes, this.recordNum),
this.connectorConfig,
this.kafkaRecordErrorReporter,
this.sinkTaskContext,
Expand Down Expand Up @@ -300,12 +299,6 @@ public void insert(Collection<SinkRecord> records) {
// threshold.
insert(record);
}

// check all partitions to see if they need to be flushed based on time
for (TopicPartitionChannel partitionChannel : partitionsToChannel.values()) {
// Time based flushing
partitionChannel.insertBufferedRecordsIfFlushTimeThresholdReached();
}
}

/**
Expand Down Expand Up @@ -335,7 +328,7 @@ public void insert(SinkRecord record) {
}

TopicPartitionChannel channelPartition = partitionsToChannel.get(partitionChannelKey);
channelPartition.insertRecordToBuffer(record);
channelPartition.insertRecord(record);
}

@Override
Expand All @@ -348,7 +341,6 @@ public long getOffset(TopicPartition topicPartition) {
this.shouldUseConnectorNameInChannelName);
if (partitionsToChannel.containsKey(partitionChannelKey)) {
long offset = partitionsToChannel.get(partitionChannelKey).getOffsetSafeToCommitToKafka();
partitionsToChannel.get(partitionChannelKey).setLatestConsumerOffset(offset);
return offset;
} else {
LOGGER.warn(
Expand Down

This file was deleted.

Loading
Loading