diff --git a/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkTask.java b/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkTask.java index 8d62f274e..d94633ad1 100644 --- a/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkTask.java +++ b/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkTask.java @@ -205,7 +205,7 @@ public void put(final Collection records) LOGGER.info(Logging.logMessage("SnowflakeSinkTask[ID:{}]:put {} records", this.id, records.size())); //log more info may impact performance - records.forEach(getSink()::insert); + getSink().insert(records); } /** @@ -220,6 +220,7 @@ public Map preCommit( Map offsets) throws RetriableException { + LOGGER.info(Logging.logMessage("SnowflakeSinkTask[ID:{}]:preCommit", this.id)); if (sink == null || sink.isClosed()) { diff --git a/src/main/java/com/snowflake/kafka/connector/internal/FileNameUtils.java b/src/main/java/com/snowflake/kafka/connector/internal/FileNameUtils.java index b1c503a46..95e49976a 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/FileNameUtils.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/FileNameUtils.java @@ -30,6 +30,12 @@ static String fileName(String appName, String table, int partition, return fileName(filePrefix(appName, table, partition), start, end); } + // Used for testing only + static String fileName(String appName, String table, int partition, + long start, long end, long time) + { + return filePrefix(appName, table, partition) + start + "_" + end + "_" + time + ".json.gz"; + } /** * generate file name diff --git a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java index 6e0ae1d45..85ac12e87 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java @@ -640,10 +640,15 @@ public void put(final String stageName, final String fileName, InputStream input = new ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8)); try { - sfconn.uploadStream(stageName, - FileNameUtils.getPrefixFromFileName(fileName), input, - FileNameUtils.removePrefixAndGZFromFileName(fileName), true); - } catch (SQLException e) + InternalUtils.backoffAndRetry(telemetry, + () -> + { + sfconn.uploadStream(stageName, + FileNameUtils.getPrefixFromFileName(fileName), input, + FileNameUtils.removePrefixAndGZFromFileName(fileName), true); + return true; + }); + } catch (Exception e) { throw SnowflakeErrors.ERROR_2003.getException(e); } diff --git a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeIngestionServiceV1.java b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeIngestionServiceV1.java index fcc5c79bc..6c0e7ea61 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeIngestionServiceV1.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeIngestionServiceV1.java @@ -73,6 +73,10 @@ public void ingestFile(final String fileName) @Override public void ingestFiles(final Set fileNames) { + if (fileNames.isEmpty()) + { + return; + } try { InternalUtils.backoffAndRetry(telemetry, diff --git a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkService.java b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkService.java index 87649066b..a231670b4 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkService.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkService.java @@ -23,11 +23,19 @@ public interface SnowflakeSinkService */ void startTask(String tableName, String topic, int partition); + /** + * call pipe to insert a collections of JSON records + * will trigger time based flush + * @param records record content + */ + void insert(final Collection records); + /** * call pipe to insert a JSON record + * will not trigger time based flush * @param record record content */ - void insert(SinkRecord record); + void insert(final SinkRecord record); /** * retrieve offset of last loaded record for given pipe name @@ -36,6 +44,11 @@ public interface SnowflakeSinkService */ long getOffset(TopicPartition topicPartition); + /** + * used for testing only + */ + void callAllGetOffset(); + /** * terminate all tasks and close this service instance */ diff --git a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkServiceV1.java b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkServiceV1.java index aa51c789f..a144cf048 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkServiceV1.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkServiceV1.java @@ -9,17 +9,10 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.connect.sink.SinkRecord; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Comparator; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -79,19 +72,35 @@ public void startTask(final String tableName, final String topic, } @Override - public void insert(final SinkRecord record) + public void insert(final Collection records) + { + // note that records can be empty + for (SinkRecord record : records) + { + insert(record); + } + // check all sink context to see if they need to be flushed + for (ServiceContext pipe : pipes.values()) { + if (pipe.shouldFlush()) + { + pipe.flushBuffer(); + } + } + } + + @Override + public void insert(SinkRecord record) { String nameIndex = getNameIndex(record.topic(), record.kafkaPartition()); //init a new topic partition if (!pipes.containsKey(nameIndex)) { logWarn("Topic: {} Partition: {} hasn't been initialized by OPEN " + - "function", record.topic(), record.kafkaPartition()); + "function", record.topic(), record.kafkaPartition()); startTask(Utils.tableName(record.topic(), this.topic2TableMap), - record.topic(), record.kafkaPartition()); + record.topic(), record.kafkaPartition()); } pipes.get(nameIndex).insert(record); - } @Override @@ -112,6 +121,15 @@ public long getOffset(final TopicPartition topicPartition) } } + // used for testing only + @Override + public void callAllGetOffset() + { + for (ServiceContext pipe : pipes.values()) { + pipe.getOffset(); + } + } + @Override public void close(Collection partitions) { @@ -249,14 +267,16 @@ private class ServiceContext private final SnowflakeConnectionService conn; private final SnowflakeIngestionService ingestionService; private List fileNames; + private List cleanerFileNames; private PartitionBuffer buffer; private final String prefix; - private long committedOffset; // loaded offset + 1 - private long processedOffset; // processed offset + private final AtomicLong committedOffset; // loaded offset + 1 + private final AtomicLong flushedOffset; // flushed offset (file on stage) + private final AtomicLong processedOffset; // processed offset + private long previousFlushTimeStamp; //threads private final ExecutorService cleanerExecutor; - private final ExecutorService flusherExecutor; private final Lock bufferLock; private final Lock fileListLock; private final Lock usageDataLock; @@ -279,12 +299,15 @@ private ServiceContext(String tableName, String stageName, this.stageName = stageName; this.conn = conn; this.fileNames = new LinkedList<>(); + this.cleanerFileNames = new LinkedList<>(); this.buffer = new PartitionBuffer(); this.ingestionService = conn.buildIngestService(stageName, pipeName); - this.prefix = FileNameUtils.filePrefix(conn.getConnectorName(), - tableName, partition); - this.processedOffset = -1; - this.committedOffset = 0; + this.prefix = FileNameUtils.filePrefix(conn.getConnectorName(), tableName, partition); + this.processedOffset = new AtomicLong(-1); + this.flushedOffset = new AtomicLong(-1); + this.committedOffset = new AtomicLong(0); + this.previousFlushTimeStamp = System.currentTimeMillis(); + this.bufferLock = new ReentrantLock(); this.fileListLock = new ReentrantLock(); this.usageDataLock = new ReentrantLock(); @@ -294,7 +317,6 @@ private ServiceContext(String tableName, String stageName, this.startTime = System.currentTimeMillis(); this.cleanerExecutor = Executors.newSingleThreadExecutor(); - this.flusherExecutor = Executors.newSingleThreadExecutor(); logInfo("pipe: {} - service started", pipeName); } @@ -302,14 +324,14 @@ private ServiceContext(String tableName, String stageName, private void init() { logInfo("init pipe: {}", pipeName); - //wait sinkConnector start + // wait for sinkConnector to start createTableAndStage(); + // recover will only check pipe status and create pipe if it does not exist. recover(); try { startCleaner(); - startFlusher(); } catch (Exception e) { logWarn("Cleaner and Flusher threads shut down before initialization"); @@ -319,6 +341,17 @@ private void init() private void startCleaner() { + // when cleaner start, scan stage for all files of this pipe + List tmpFileNames = conn.listStage(stageName, prefix); + fileListLock.lock(); + try + { + cleanerFileNames.addAll(tmpFileNames); + } finally + { + fileListLock.unlock(); + } + cleanerExecutor.submit( () -> { @@ -335,7 +368,7 @@ private void startCleaner() } } catch (InterruptedException e) { - logInfo("Flusher terminated by an interrupt:\n{}", e.getMessage()); + logInfo("Cleaner terminated by an interrupt:\n{}", e.getMessage()); break; } } @@ -349,47 +382,6 @@ private void stopCleaner() logInfo("pipe {}: cleaner terminated", pipeName); } - private void startFlusher() - { - flusherExecutor.submit( - () -> - { - logInfo("pipe {}: flusher started", pipeName); - while (!isStopped) - { - try - { - Thread.sleep(getFlushTime() * 1000); - PartitionBuffer tmpBuff; - - bufferLock.lock(); - try - { - tmpBuff = buffer; - buffer = new PartitionBuffer(); - } finally - { - bufferLock.unlock(); - } - - flush(tmpBuff); - logDebug("pipe {}: flusher flushed", pipeName); - } catch (InterruptedException e) - { - logInfo("Flusher terminated by an interrupt:\n{}", e.getMessage()); - break; - } - } - } - ); - } - - private void stopFlusher() - { - flusherExecutor.shutdownNow(); - logInfo("pipe {}: flusher terminated", pipeName); - } - private void insert(final SinkRecord record) { //init pipe @@ -400,7 +392,7 @@ private void insert(final SinkRecord record) } //ignore ingested files - if (record.kafkaOffset() > processedOffset) + if (record.kafkaOffset() > processedOffset.get()) { SinkRecord snowflakeRecord; if (!(record.value() instanceof SnowflakeRecordContent)) @@ -436,10 +428,10 @@ private void insert(final SinkRecord record) bufferLock.lock(); try { - processedOffset = snowflakeRecord.kafkaOffset(); + processedOffset.set(snowflakeRecord.kafkaOffset()); buffer.insert(snowflakeRecord); if (buffer.getBufferSize() >= getFileSize() || - (getRecordNumber() != 0 && buffer.getNumOfRecord() >= getRecordNumber())) + (getRecordNumber() != 0 && buffer.getNumOfRecord() >= getRecordNumber())) { tmpBuff = buffer; this.buffer = new PartitionBuffer(); @@ -458,6 +450,31 @@ private void insert(final SinkRecord record) } + private boolean shouldFlush() + { + return (System.currentTimeMillis() - this.previousFlushTimeStamp) >= (getFlushTime() * 1000); + } + + private void flushBuffer() + { + // Just checking buffer size, no atomic operation required + if (buffer.isEmpty()) + { + return; + } + PartitionBuffer tmpBuff; + bufferLock.lock(); + try + { + tmpBuff = buffer; + this.buffer = new PartitionBuffer(); + } finally + { + bufferLock.unlock(); + } + flush(tmpBuff); + } + private void writeBrokenDataToTableStage(SinkRecord record) { String fileName = FileNameUtils.brokenRecordFileName(prefix, @@ -468,26 +485,48 @@ private void writeBrokenDataToTableStage(SinkRecord record) private long getOffset() { - return committedOffset; + if (fileNames.isEmpty()) + { + return committedOffset.get(); + } + + Set fileNamesCopy = new HashSet<>(); + fileListLock.lock(); + try + { + fileNamesCopy.addAll(fileNames); + fileNames = new LinkedList<>(); + } finally + { + fileListLock.unlock(); + } + // This api should throw exception if backoff failed + ingestionService.ingestFiles(fileNamesCopy); + committedOffset.set(flushedOffset.get()); + return committedOffset.get(); } - private void flush(PartitionBuffer buff) + private void flush(final PartitionBuffer buff) { if (buff == null || buff.isEmpty()) { return; } + this.previousFlushTimeStamp = System.currentTimeMillis(); + // If we failed to submit/put, throw an runtime exception that kills the connector. + // SnowflakeThreadPoolUtils.flusherThreadPool.submit( String fileName = FileNameUtils.fileName(prefix, buff.getFirstOffset(), - buff.getLastOffset()); + buff.getLastOffset()); String content = buff.getData(); conn.put(stageName, fileName, content); - ingestionService.ingestFile(fileName); + flushedOffset.set(Math.max(buff.getLastOffset() + 1, flushedOffset.get())); fileListLock.lock(); try { fileNames.add(fileName); + cleanerFileNames.add(fileName); } finally { fileListLock.unlock(); @@ -503,8 +542,8 @@ private void checkStatus() fileListLock.lock(); try { - tmpFileNames = fileNames; - fileNames = new LinkedList<>(); + tmpFileNames = cleanerFileNames; + cleanerFileNames = new LinkedList<>(); } finally { fileListLock.unlock(); @@ -545,63 +584,17 @@ else if (time < currentTime - TEN_MINUTES) currentTime - ONE_HOUR), tmpFileNames, loadedFiles, failedFiles); } - updateOffset(tmpFileNames, loadedFiles, failedFiles); purge(loadedFiles); moveToTableStage(failedFiles); fileListLock.lock(); try { - fileNames.addAll(tmpFileNames); + cleanerFileNames.addAll(tmpFileNames); } finally { fileListLock.unlock(); } - - } - - private void updateOffset(List allFiles, - List loadedFiles, - List failedFiles) - { - if (allFiles.isEmpty()) - { - if (loadedFiles.isEmpty() && failedFiles.isEmpty()) - { - return; - } - long result = 0; - for (String name : loadedFiles) - { - long endOffset = FileNameUtils.fileNameToEndOffset(name) + 1; - if (endOffset > result) - { - result = endOffset; - } - } - for (String name : failedFiles) - { - long endOffset = FileNameUtils.fileNameToEndOffset(name) + 1; - if (endOffset > result) - { - result = endOffset; - } - } - committedOffset = result; - } - else - { - long result = Long.MAX_VALUE; - for (String name : allFiles) - { - long startOffset = FileNameUtils.fileNameToStartOffset(name); - if (startOffset < result) - { - result = startOffset; - } - } - committedOffset = result; - } } private void filterResult(Map fileStatus, @@ -656,16 +649,6 @@ private void recover() throw SnowflakeErrors.ERROR_5005.getException("pipe name: " + pipeName, conn.getTelemetryClient()); } - fileListLock.lock(); - try - { - recoverFileStatues().forEach( - (name, status) -> fileNames.add(name) - ); - } finally - { - fileListLock.unlock(); - } logInfo("pipe {}, recovered from existing pipe", pipeName); } else @@ -674,89 +657,11 @@ private void recover() } } - private Map recoverFileStatues() - { - List files = conn.listStage(stageName, prefix); - if (files.isEmpty()) - { - return new HashMap<>(); //no file on stage - } - Map result = new HashMap<>(); - - List loadedFiles = new LinkedList<>(); - List failedFiles = new LinkedList<>(); - - //sort by time - //may be an issue when continuously recovering - // because this time is time when file uploaded. - // if files ingested again, this time will not be - // updated. So the real ingestion time maybe different - // in the second time recovery. - files.sort(Comparator.comparingLong(FileNameUtils::fileNameToTimeIngested)); - - long startTime = FileNameUtils.fileNameToTimeIngested(files.get(0)); - - committedOffset = Long.MAX_VALUE; - processedOffset = -1; - - Set filesForIngestion = new HashSet<>(); - - ingestionService.readOneHourHistory(files, startTime).forEach( - (name, status) -> - { - long startOffset = FileNameUtils.fileNameToStartOffset(name); - long endOffset = FileNameUtils.fileNameToEndOffset(name); - if (processedOffset < endOffset) - { - processedOffset = endOffset; - } - switch (status) - { - case NOT_FOUND: - //re ingest - filesForIngestion.add(name); - result.put(name, status); - if (committedOffset > startOffset) - { - committedOffset = startOffset; - } - break; - case LOAD_IN_PROGRESS: - result.put(name, status); - if (committedOffset > startOffset) - { - committedOffset = startOffset; - } - break; - case LOADED: - loadedFiles.add(name); - break; - default: - failedFiles.add(name); - } - } - ); - // batch call Snowpipe to ingest file - ingestionService.ingestFiles(filesForIngestion); - - if (!loadedFiles.isEmpty()) - { - purge(loadedFiles); - } - if (!failedFiles.isEmpty()) - { - moveToTableStage(failedFiles); - } - logInfo("pipe {} : Recovered {} files", pipeName, files.size()); - return result; - } - private void close() { try { stopCleaner(); - stopFlusher(); } catch (Exception e) { logWarn("Failed to terminate Cleaner or Flusher"); @@ -852,7 +757,7 @@ private void updateUsageData(long numOfRecord, long sizeOfData) private class PartitionBuffer { - private StringBuilder stringBuilder; + private final StringBuilder stringBuilder; private int numOfRecord; private int bufferSize; private long firstOffset; diff --git a/src/test/java/com/snowflake/kafka/connector/internal/MetaColumnIT.java b/src/test/java/com/snowflake/kafka/connector/internal/MetaColumnIT.java index cd735a308..c264274f3 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/MetaColumnIT.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/MetaColumnIT.java @@ -70,8 +70,8 @@ record = service.insert(record); TestUtils.assertWithRetry(() -> { - ResultSet resultSet = TestUtils.executeQuery("select RECORD_METADATA from" + - " " + tableName); + service.callAllGetOffset(); + ResultSet resultSet = TestUtils.executeQuery("select RECORD_METADATA from " + tableName); boolean hasKey1 = false; boolean hasKey3 = false; @@ -100,7 +100,7 @@ else if (node.get("key").asText().equals("key3")) return false; } return true; - }, 30, 8); + }, 30, 10); service.closeAll(); } diff --git a/src/test/java/com/snowflake/kafka/connector/internal/SinkServiceIT.java b/src/test/java/com/snowflake/kafka/connector/internal/SinkServiceIT.java index 326354663..47623eea3 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/SinkServiceIT.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/SinkServiceIT.java @@ -16,6 +16,7 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.sql.SQLException; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.concurrent.ExecutionException; @@ -111,11 +112,12 @@ public void testIngestion() throws Exception , "test", input.schema(), input.value(), offset); service.insert(record1); - List files = conn.listStage(stage, - FileNameUtils.filePrefix(TestUtils.TEST_CONNECTOR_NAME, table, - partition)); - - assert files.size() == 1; + TestUtils.assertWithRetry(() -> conn.listStage(stage, FileNameUtils.filePrefix(TestUtils.TEST_CONNECTOR_NAME, + table, partition)).size() == 1, + 5, 4); + service.callAllGetOffset(); + List files = conn.listStage(stage, FileNameUtils.filePrefix(TestUtils.TEST_CONNECTOR_NAME, + table, partition)); String fileName = files.get(0); assert FileNameUtils.fileNameToTimeIngested(fileName) < System.currentTimeMillis(); @@ -124,10 +126,10 @@ public void testIngestion() throws Exception assert FileNameUtils.fileNameToEndOffset(fileName) == offset; //wait for ingest - TestUtils.assertWithRetry(() -> TestUtils.tableSize(table) == 1, 30, 6); + TestUtils.assertWithRetry(() -> TestUtils.tableSize(table) == 1, 30, 10); //change cleaner - TestUtils.assertWithRetry(() -> getStageSize(stage, table, partition) == 0,30, 6); + TestUtils.assertWithRetry(() -> getStageSize(stage, table, partition) == 0,30, 10); assert service.getOffset(new TopicPartition(topic, partition)) == offset + 1; @@ -198,9 +200,13 @@ public void testNativeInputIngestion() throws Exception service.insert(noSchemaRecord); service.insert(schemaRecord); - List files = conn.listStage(stage, - FileNameUtils.filePrefix(TestUtils.TEST_CONNECTOR_NAME, table, partition)); - assert files.size() == 1; + TestUtils.assertWithRetry(() -> + conn.listStage(stage, FileNameUtils.filePrefix(TestUtils.TEST_CONNECTOR_NAME, + table, partition)).size() == 1, + 5, 4); + service.callAllGetOffset(); + List files = conn.listStage(stage, FileNameUtils.filePrefix(TestUtils.TEST_CONNECTOR_NAME, + table, partition)); String fileName = files.get(0); assert FileNameUtils.fileNameToTimeIngested(fileName) < System.currentTimeMillis(); @@ -209,10 +215,10 @@ public void testNativeInputIngestion() throws Exception assert FileNameUtils.fileNameToEndOffset(fileName) == endOffset; //wait for ingest - TestUtils.assertWithRetry(() -> TestUtils.tableSize(table) == recordCount, 30, 8); + TestUtils.assertWithRetry(() -> TestUtils.tableSize(table) == recordCount, 30, 10); //change cleaner - TestUtils.assertWithRetry(() -> getStageSize(stage, table, partition) == 0,30, 8); + TestUtils.assertWithRetry(() -> getStageSize(stage, table, partition) == 0,30, 10); assert service.getOffset(new TopicPartition(topic, partition)) == recordCount; @@ -239,24 +245,29 @@ public void testRecordNumber() throws Exception .addTask(table, topic, partition1) .build(); - Future result = insert(service, partition, numOfRecord); - Future result1 = insert(service, partition1, numOfRecord1); + insert(service, partition, numOfRecord); + insert(service, partition1, numOfRecord1); - assert result.get() == numOfRecord / numLimit; - assert result1.get() == numOfRecord1 / numLimit; + TestUtils.assertWithRetry( + () -> getStageSize(stage, table, partition) == numOfRecord / numLimit, 5, 4); + TestUtils.assertWithRetry( + () -> getStageSize(stage, table, partition1) == numOfRecord1 / numLimit, 5, 4); - TestUtils.assertWithRetry(() -> - TestUtils.tableSize(table) == numOfRecord + numOfRecord1, 30, 6); + TestUtils.assertWithRetry(() -> { + service.insert(new ArrayList<>()); // trigger time based flush + service.callAllGetOffset(); + return TestUtils.tableSize(table) == numOfRecord + numOfRecord1; + }, 30, 10); service.closeAll(); } - private Future insert(SnowflakeSinkService sink, int partition, + private void insert(SnowflakeSinkService sink, int partition, int numOfRecord) { ExecutorService executorService = Executors.newSingleThreadExecutor(); - return executorService.submit( + executorService.submit( () -> { for (int i = 0; i < numOfRecord; i++) @@ -268,14 +279,12 @@ private Future insert(SnowflakeSinkService sink, int partition, new SinkRecord(topic, partition, Schema.STRING_SCHEMA , "test", input.schema(), input.value(), i)); } - - return getStageSize(stage, table, partition); } ); } @Test - public void testFileSize() throws ExecutionException, InterruptedException + public void testFileSize() throws Exception { conn.createTable(table); conn.createStage(stage); @@ -290,9 +299,10 @@ public void testFileSize() throws ExecutionException, InterruptedException .addTask(table, topic, partition) .build(); - Future result = insert(service, partition, numOfRecord); + insert(service, partition, numOfRecord); - assert result.get() == numOfRecord / (size / 152 + 1); + TestUtils.assertWithRetry( + () -> getStageSize(stage, table, partition) == numOfRecord / (size / 152 + 1), 5, 4); service.closeAll(); } @@ -312,9 +322,16 @@ public void testFlushTime() throws Exception .addTask(table, topic, partition) .build(); - assert insert(service, partition, numOfRecord).get() == 0; + insert(service, partition, numOfRecord); + + TestUtils.assertWithRetry( + () -> getStageSize(stage, table, partition) == 0, 5, 4); - TestUtils.assertWithRetry(() -> getStageSize(stage, table, partition) == 1, 15, 4); + TestUtils.assertWithRetry(() -> { + service.insert(new ArrayList<>()); // trigger time based flush + service.callAllGetOffset(); + return getStageSize(stage, table, partition) == 1; + }, 15, 4); service.closeAll(); } @@ -325,11 +342,14 @@ public void testRecover() throws Exception String data = "{\"content\":{\"name\":\"test\"},\"meta\":{\"offset\":0," + "\"topic\":\"test\",\"partition\":0}}"; + // Two hours ago + long time = System.currentTimeMillis() - 120 * 60 * 1000L; + String fileName1 = FileNameUtils.fileName(TestUtils.TEST_CONNECTOR_NAME, - table, 0, 0, 0); + table, 0, 0, 0, time); String fileName2 = FileNameUtils.fileName(TestUtils.TEST_CONNECTOR_NAME, - table, 0, 1, 1); + table, 0, 1, 1, time); conn.createStage(stage); conn.createTable(table); @@ -347,16 +367,21 @@ public void testRecover() throws Exception SnowflakeSinkService service = SnowflakeSinkServiceFactory.builder(conn) .addTask(table, topic, partition) + .setRecordNumber(1) // immediate flush .build(); SnowflakeConverter converter = new SnowflakeJsonConverter(); SchemaAndValue result = converter.toConnectData(topic, "12321".getBytes(StandardCharsets.UTF_8)); SinkRecord record = new SinkRecord(topic, partition, Schema.STRING_SCHEMA , "test", result.schema(), result.value(), 1); - //lazy init and recovery function + // lazy init and recovery function service.insert(record); - - TestUtils.assertWithRetry(() -> getStageSize(stage, table, 0) == 0, 30, 6); + // wait for async put + TestUtils.assertWithRetry(() -> getStageSize(stage, table, 0) == 3, 5, 10); + // call snow pipe + service.callAllGetOffset(); + // cleaner will remove previous files and ingested new file + TestUtils.assertWithRetry(() -> getStageSize(stage, table, 0) == 0, 30, 10); service.closeAll(); } diff --git a/test/run_test.sh b/test/run_test.sh index 7a3bd741d..f51bcafb3 100755 --- a/test/run_test.sh +++ b/test/run_test.sh @@ -132,6 +132,8 @@ set +e # Send test data and verify DB result from Python python3 test_verify.py $K_IP:$SNOWFLAKE_KAFKA_PORT http://$K_IP:$SC_PORT $TEST_SET $NAME_SALT testError=$? +delete_connectors_with_salt $NAME_SALT $K_IP $KC_PORT +python3 test_verify.py $K_IP:$SNOWFLAKE_KAFKA_PORT http://$K_IP:$SC_PORT clean $NAME_SALT if [ $testError -ne 0 ]; then RED='\033[0;31m' diff --git a/test/run_test_apache.sh b/test/run_test_apache.sh index 12548b009..0c0254006 100755 --- a/test/run_test_apache.sh +++ b/test/run_test_apache.sh @@ -113,6 +113,8 @@ set +e # Send test data and verify DB result from Python python3 test_verify.py $LOCAL_IP:$SNOWFLAKE_KAFKA_PORT http://$LOCAL_IP:$SC_PORT $TEST_SET $NAME_SALT $PRESSURE testError=$? +delete_connectors_with_salt $NAME_SALT $LOCAL_IP $KC_PORT +python3 test_verify.py $LOCAL_IP:$SNOWFLAKE_KAFKA_PORT http://$LOCAL_IP:$SC_PORT clean $NAME_SALT $PRESSURE if [ $testError -ne 0 ]; then RED='\033[0;31m' diff --git a/test/run_test_confluent.sh b/test/run_test_confluent.sh index 5c5e79afe..9847a817f 100755 --- a/test/run_test_confluent.sh +++ b/test/run_test_confluent.sh @@ -128,6 +128,8 @@ create_connectors_with_salt $SNOWFLAKE_CREDENTIAL_FILE $NAME_SALT $LOCAL_IP $KC_ # Send test data and verify DB result from Python python3 test_verify.py $LOCAL_IP:$SNOWFLAKE_KAFKA_PORT http://$LOCAL_IP:$SC_PORT $TEST_SET $NAME_SALT $PRESSURE testError=$? +delete_connectors_with_salt $NAME_SALT $LOCAL_IP $KC_PORT +python3 test_verify.py $LOCAL_IP:$SNOWFLAKE_KAFKA_PORT http://$LOCAL_IP:$SC_PORT clean $NAME_SALT $PRESSURE ##### Following commented code is used to track thread leak #sleep 100 diff --git a/test/test_suit/test_pressure.py b/test/test_suit/test_pressure.py index 237658780..52305936f 100644 --- a/test/test_suit/test_pressure.py +++ b/test/test_suit/test_pressure.py @@ -6,9 +6,11 @@ class TestPressure: def __init__(self, driver, nameSalt): self.driver = driver self.topics = [] - self.topicNum = 20 - self.partitionNum = 10 - self.recordNum = 100 + self.topicNum = 200 + self.partitionNum = 12 + self.recordNum = 1000 + self.round = 10 + self.sleepTime = 10 self.curTest = 0 for i in range(self.topicNum): self.topics.append("travis_pressure_string_json" + str(i) + nameSalt) @@ -17,26 +19,22 @@ def send(self): for t in range(self.topicNum): self.driver.createTopics(self.topics[t], self.partitionNum, 1) - for p in range(self.partitionNum): - for t in range(self.topicNum): - value = [] - for e in range(self.recordNum): - value.append(json.dumps({'number': str(e)}).encode('utf-8')) - self.driver.sendBytesData(self.topics[t], value, partition=p) - - sleep(120) - for p in range(self.partitionNum): - for t in range(self.topicNum): - value = [] - for e in range(self.recordNum): - value.append(json.dumps({'number': str(e)}).encode('utf-8')) - self.driver.sendBytesData(self.topics[t], value, partition=p) + for r in range(self.round): + for p in range(self.partitionNum): + for t in range(self.topicNum): + value = [] + for e in range(self.recordNum): + value.append(json.dumps( + {'numbernumbernumbernumbernumbernumbernumbernumbernumbernumbernumbernumber': str(e)} + ).encode('utf-8')) + self.driver.sendBytesData(self.topics[t], value, partition=p) + sleep(self.sleepTime) def verify(self): for t in range(self.curTest, self.topicNum): res = self.driver.snowflake_conn.cursor().execute( "SELECT count(*) FROM {}".format(self.topics[t])).fetchone()[0] - if res != self.partitionNum * self.recordNum * 2: + if res != self.partitionNum * self.recordNum * self.round: raise RetryableError() if self.curTest <= t: diff --git a/test/test_verify.py b/test/test_verify.py index c8f85a052..5412be7fd 100644 --- a/test/test_verify.py +++ b/test/test_verify.py @@ -179,46 +179,37 @@ def runTestSet(driver, testSet, nameSalt, pressure): testSuitEnableList = [True, True, True, True, True, True, True, True, pressure] elif testSet == "apache": testSuitEnableList = [True, True, True, True, False, False, False, True, pressure] - elif testSet == "clean": - testSuitEnableList = [False, False, False, False, False, False, False, False, False] - else: - errorExit( - "Unknown testSet option {}, please input confluent, apache or clean".format(testSet)) + elif testSet != "clean": + errorExit("Unknown testSet option {}, please input confluent, apache or clean".format(testSet)) testCleanEnableList = [True, True, True, True, True, True, True, True, pressure] - failedFlag = False - try: - for i, test in enumerate(testSuitList): - if testSuitEnableList[i]: - print("\n=== Sending " + test.__class__.__name__ + " data ===") - test.send() - print("=== Done ===", flush=True) - - if testSet != "clean": - driver.verifyWaitTime() - - for i, test in enumerate(testSuitList): - if testSuitEnableList[i]: - print("\n=== Verify " + test.__class__.__name__ + " ===") - driver.verifyWithRetry(test.verify) - print("=== Passed ===", flush=True) - except Exception as e: - print(e) - print("Error: ", sys.exc_info()[0]) - failedFlag = True - finally: + if testSet == "clean": for i, test in enumerate(testSuitList): if testCleanEnableList[i]: test.clean() - - if failedFlag: - exit(1) - - if testSet == "clean": print("\n=== All clean done ===") else: - print("\n=== All test passed ===") + try: + for i, test in enumerate(testSuitList): + if testSuitEnableList[i]: + print("\n=== Sending " + test.__class__.__name__ + " data ===") + test.send() + print("=== Done ===", flush=True) + + driver.verifyWaitTime() + + for i, test in enumerate(testSuitList): + if testSuitEnableList[i]: + print("\n=== Verify " + test.__class__.__name__ + " ===") + driver.verifyWithRetry(test.verify) + print("=== Passed ===", flush=True) + + print("\n=== All test passed ===") + except Exception as e: + print(e) + print("Error: ", sys.exc_info()[0]) + exit(1) if __name__ == "__main__":