From 5e1a16158e4cd94fe7b1a3fc48ad443f237ada5d Mon Sep 17 00:00:00 2001 From: Zihan Li <63482288+sfc-gh-zli@users.noreply.github.com> Date: Thu, 11 Jun 2020 10:50:20 -0700 Subject: [PATCH] Minor changes to add logging time and empty list handling in batch delete (#170) --- .../kafka/connector/SnowflakeSinkTask.java | 26 +++++++++++++++++-- .../SnowflakeConnectionServiceV1.java | 4 +++ 2 files changed, 28 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkTask.java b/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkTask.java index d94633ad1..23c9a6033 100644 --- a/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkTask.java +++ b/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkTask.java @@ -101,6 +101,7 @@ private SnowflakeSinkService getSink() @Override public void start(final Map parsedConfig) { + long startTime = System.currentTimeMillis(); this.id = parsedConfig.getOrDefault(Utils.TASK_ID, "-1"); LOGGER.info(Logging.logMessage("SnowflakeSinkTask[ID:{}]:start", this.id)); @@ -141,6 +142,9 @@ public void start(final Map parsedConfig) .setTopic2TableMap(topic2table) .setMetadataConfig(metadataConfig) .build(); + + LOGGER.info(Logging.logMessage("SnowflakeSinkTask[ID:{}]:start. Time: {} seconds", this.id, + (System.currentTimeMillis() - startTime) / 1000)); } /** @@ -151,11 +155,15 @@ public void start(final Map parsedConfig) @Override public void stop() { + long startTime = System.currentTimeMillis(); LOGGER.info(Logging.logMessage("SnowflakeSinkTask[ID:{}]:stop", this.id)); if (sink != null) { this.sink.closeAll(); } + + LOGGER.info(Logging.logMessage("SnowflakeSinkTask[ID:{}]:stop. Time: {} seconds", this.id, + (System.currentTimeMillis() - startTime) / 1000)); } /** @@ -167,11 +175,15 @@ public void stop() @Override public void open(final Collection partitions) { + long startTime = System.currentTimeMillis(); LOGGER.info(Logging.logMessage( - "SnowflakeSinkTask[ID:{}]:open, TopicPartitions: {}", this.id, partitions + "SnowflakeSinkTask[ID:{}]:open, TopicPartition number: {}", this.id, partitions.size() )); partitions.forEach(tp -> this.sink.startTask(Utils.tableName(tp.topic(), this.topic2table), tp.topic(), tp.partition())); + + LOGGER.info(Logging.logMessage("SnowflakeSinkTask[ID:{}]:open. Time: {} seconds", this.id, + (System.currentTimeMillis() - startTime) / 1000)); } @@ -186,11 +198,15 @@ public void open(final Collection partitions) @Override public void close(final Collection partitions) { + long startTime = System.currentTimeMillis(); LOGGER.info(Logging.logMessage("SnowflakeSinkTask[ID:{}]:close", this.id)); if (this.sink != null) { this.sink.close(partitions); } + + LOGGER.info(Logging.logMessage("SnowflakeSinkTask[ID:{}]:close. Time: {} seconds", + this.id, (System.currentTimeMillis() - startTime) / 1000)); } /** @@ -202,10 +218,14 @@ public void close(final Collection partitions) @Override public void put(final Collection records) { + long startTime = System.currentTimeMillis(); LOGGER.info(Logging.logMessage("SnowflakeSinkTask[ID:{}]:put {} records", this.id, records.size())); //log more info may impact performance getSink().insert(records); + + LOGGER.info(Logging.logMessage("SnowflakeSinkTask[ID:{}]:put {} records. Time: {} seconds", + this.id, records.size(), (System.currentTimeMillis() - startTime) / 1000)); } /** @@ -220,6 +240,7 @@ public Map preCommit( Map offsets) throws RetriableException { + long startTime = System.currentTimeMillis(); LOGGER.info(Logging.logMessage("SnowflakeSinkTask[ID:{}]:preCommit", this.id)); if (sink == null || sink.isClosed()) @@ -252,7 +273,8 @@ public Map preCommit( "while preCommit: {} ", this.id, e.getMessage())); return offsets; } - + LOGGER.info(Logging.logMessage("SnowflakeSinkTask[ID:{}]:preCommit. Time: {} seconds", this.id, + (System.currentTimeMillis() - startTime) / 1000)); return committedOffsets; } diff --git a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java index 85ac12e87..569cf908e 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java @@ -778,6 +778,10 @@ private String pipeDefinition(String tableName, String stageName) */ private void removeFile(String stageName, List fileList) { + if (fileList.size() == 0) + { + return; + } InternalUtils.assertNotEmpty("stageName", stageName); try {