diff --git a/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkTask.java b/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkTask.java index d94633ad1..23c9a6033 100644 --- a/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkTask.java +++ b/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkTask.java @@ -101,6 +101,7 @@ private SnowflakeSinkService getSink() @Override public void start(final Map parsedConfig) { + long startTime = System.currentTimeMillis(); this.id = parsedConfig.getOrDefault(Utils.TASK_ID, "-1"); LOGGER.info(Logging.logMessage("SnowflakeSinkTask[ID:{}]:start", this.id)); @@ -141,6 +142,9 @@ public void start(final Map parsedConfig) .setTopic2TableMap(topic2table) .setMetadataConfig(metadataConfig) .build(); + + LOGGER.info(Logging.logMessage("SnowflakeSinkTask[ID:{}]:start. Time: {} seconds", this.id, + (System.currentTimeMillis() - startTime) / 1000)); } /** @@ -151,11 +155,15 @@ public void start(final Map parsedConfig) @Override public void stop() { + long startTime = System.currentTimeMillis(); LOGGER.info(Logging.logMessage("SnowflakeSinkTask[ID:{}]:stop", this.id)); if (sink != null) { this.sink.closeAll(); } + + LOGGER.info(Logging.logMessage("SnowflakeSinkTask[ID:{}]:stop. Time: {} seconds", this.id, + (System.currentTimeMillis() - startTime) / 1000)); } /** @@ -167,11 +175,15 @@ public void stop() @Override public void open(final Collection partitions) { + long startTime = System.currentTimeMillis(); LOGGER.info(Logging.logMessage( - "SnowflakeSinkTask[ID:{}]:open, TopicPartitions: {}", this.id, partitions + "SnowflakeSinkTask[ID:{}]:open, TopicPartition number: {}", this.id, partitions.size() )); partitions.forEach(tp -> this.sink.startTask(Utils.tableName(tp.topic(), this.topic2table), tp.topic(), tp.partition())); + + LOGGER.info(Logging.logMessage("SnowflakeSinkTask[ID:{}]:open. Time: {} seconds", this.id, + (System.currentTimeMillis() - startTime) / 1000)); } @@ -186,11 +198,15 @@ public void open(final Collection partitions) @Override public void close(final Collection partitions) { + long startTime = System.currentTimeMillis(); LOGGER.info(Logging.logMessage("SnowflakeSinkTask[ID:{}]:close", this.id)); if (this.sink != null) { this.sink.close(partitions); } + + LOGGER.info(Logging.logMessage("SnowflakeSinkTask[ID:{}]:close. Time: {} seconds", + this.id, (System.currentTimeMillis() - startTime) / 1000)); } /** @@ -202,10 +218,14 @@ public void close(final Collection partitions) @Override public void put(final Collection records) { + long startTime = System.currentTimeMillis(); LOGGER.info(Logging.logMessage("SnowflakeSinkTask[ID:{}]:put {} records", this.id, records.size())); //log more info may impact performance getSink().insert(records); + + LOGGER.info(Logging.logMessage("SnowflakeSinkTask[ID:{}]:put {} records. Time: {} seconds", + this.id, records.size(), (System.currentTimeMillis() - startTime) / 1000)); } /** @@ -220,6 +240,7 @@ public Map preCommit( Map offsets) throws RetriableException { + long startTime = System.currentTimeMillis(); LOGGER.info(Logging.logMessage("SnowflakeSinkTask[ID:{}]:preCommit", this.id)); if (sink == null || sink.isClosed()) @@ -252,7 +273,8 @@ public Map preCommit( "while preCommit: {} ", this.id, e.getMessage())); return offsets; } - + LOGGER.info(Logging.logMessage("SnowflakeSinkTask[ID:{}]:preCommit. Time: {} seconds", this.id, + (System.currentTimeMillis() - startTime) / 1000)); return committedOffsets; } diff --git a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java index 85ac12e87..569cf908e 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java @@ -778,6 +778,10 @@ private String pipeDefinition(String tableName, String stageName) */ private void removeFile(String stageName, List fileList) { + if (fileList.size() == 0) + { + return; + } InternalUtils.assertNotEmpty("stageName", stageName); try {