Skip to content

Commit

Permalink
Minor changes to add logging time and empty list handling in batch de…
Browse files Browse the repository at this point in the history
…lete (#170)
  • Loading branch information
sfc-gh-zli authored Jun 11, 2020
1 parent a9ab61f commit 5e1a161
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 2 deletions.
26 changes: 24 additions & 2 deletions src/main/java/com/snowflake/kafka/connector/SnowflakeSinkTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ private SnowflakeSinkService getSink()
@Override
public void start(final Map<String, String> parsedConfig)
{
long startTime = System.currentTimeMillis();
this.id = parsedConfig.getOrDefault(Utils.TASK_ID, "-1");

LOGGER.info(Logging.logMessage("SnowflakeSinkTask[ID:{}]:start", this.id));
Expand Down Expand Up @@ -141,6 +142,9 @@ public void start(final Map<String, String> parsedConfig)
.setTopic2TableMap(topic2table)
.setMetadataConfig(metadataConfig)
.build();

LOGGER.info(Logging.logMessage("SnowflakeSinkTask[ID:{}]:start. Time: {} seconds", this.id,
(System.currentTimeMillis() - startTime) / 1000));
}

/**
Expand All @@ -151,11 +155,15 @@ public void start(final Map<String, String> parsedConfig)
@Override
public void stop()
{
long startTime = System.currentTimeMillis();
LOGGER.info(Logging.logMessage("SnowflakeSinkTask[ID:{}]:stop", this.id));
if (sink != null)
{
this.sink.closeAll();
}

LOGGER.info(Logging.logMessage("SnowflakeSinkTask[ID:{}]:stop. Time: {} seconds", this.id,
(System.currentTimeMillis() - startTime) / 1000));
}

/**
Expand All @@ -167,11 +175,15 @@ public void stop()
@Override
public void open(final Collection<TopicPartition> partitions)
{
long startTime = System.currentTimeMillis();
LOGGER.info(Logging.logMessage(
"SnowflakeSinkTask[ID:{}]:open, TopicPartitions: {}", this.id, partitions
"SnowflakeSinkTask[ID:{}]:open, TopicPartition number: {}", this.id, partitions.size()
));
partitions.forEach(tp -> this.sink.startTask(Utils.tableName(tp.topic(),
this.topic2table), tp.topic(), tp.partition()));

LOGGER.info(Logging.logMessage("SnowflakeSinkTask[ID:{}]:open. Time: {} seconds", this.id,
(System.currentTimeMillis() - startTime) / 1000));
}


Expand All @@ -186,11 +198,15 @@ public void open(final Collection<TopicPartition> partitions)
@Override
public void close(final Collection<TopicPartition> partitions)
{
long startTime = System.currentTimeMillis();
LOGGER.info(Logging.logMessage("SnowflakeSinkTask[ID:{}]:close", this.id));
if (this.sink != null)
{
this.sink.close(partitions);
}

LOGGER.info(Logging.logMessage("SnowflakeSinkTask[ID:{}]:close. Time: {} seconds",
this.id, (System.currentTimeMillis() - startTime) / 1000));
}

/**
Expand All @@ -202,10 +218,14 @@ public void close(final Collection<TopicPartition> partitions)
@Override
public void put(final Collection<SinkRecord> records)
{
long startTime = System.currentTimeMillis();
LOGGER.info(Logging.logMessage("SnowflakeSinkTask[ID:{}]:put {} records",
this.id, records.size()));
//log more info may impact performance
getSink().insert(records);

LOGGER.info(Logging.logMessage("SnowflakeSinkTask[ID:{}]:put {} records. Time: {} seconds",
this.id, records.size(), (System.currentTimeMillis() - startTime) / 1000));
}

/**
Expand All @@ -220,6 +240,7 @@ public Map<TopicPartition, OffsetAndMetadata> preCommit(
Map<TopicPartition, OffsetAndMetadata> offsets)
throws RetriableException
{
long startTime = System.currentTimeMillis();
LOGGER.info(Logging.logMessage("SnowflakeSinkTask[ID:{}]:preCommit", this.id));

if (sink == null || sink.isClosed())
Expand Down Expand Up @@ -252,7 +273,8 @@ public Map<TopicPartition, OffsetAndMetadata> preCommit(
"while preCommit: {} ", this.id, e.getMessage()));
return offsets;
}

LOGGER.info(Logging.logMessage("SnowflakeSinkTask[ID:{}]:preCommit. Time: {} seconds", this.id,
(System.currentTimeMillis() - startTime) / 1000));
return committedOffsets;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -778,6 +778,10 @@ private String pipeDefinition(String tableName, String stageName)
*/
private void removeFile(String stageName, List<String> fileList)
{
if (fileList.size() == 0)
{
return;
}
InternalUtils.assertNotEmpty("stageName", stageName);
try
{
Expand Down

0 comments on commit 5e1a161

Please sign in to comment.