Skip to content

Commit

Permalink
Merge branch 'master' into sfc-gh-xhuang-patch-2
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-xhuang authored May 7, 2024
2 parents 790cd31 + 83f4287 commit 57704dc
Show file tree
Hide file tree
Showing 12 changed files with 373 additions and 267 deletions.
13 changes: 8 additions & 5 deletions .github/workflows/End2EndTest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,21 +68,24 @@ jobs:
continue-on-error: false
run: mvn -DghActionsIT verify --batch-mode
build-e2e-jar-test:
name: E2E JAR Test - ${{ matrix.java }}, Cloud ${{ matrix.snowflake_cloud }}
name: E2E JAR Test - Cloud ${{ matrix.snowflake_cloud }}
runs-on: ubuntu-20.04
strategy:
fail-fast: false
matrix:
java: [ 8 ]
snowflake_cloud: [ 'AWS', 'AZURE', 'GCP' ]
steps:
- name: Checkout Code
uses: actions/checkout@v2
- name: Install Java ${{ matrix.java }}
uses: actions/setup-java@v2
- name: Install Java
uses: actions/setup-java@v4
with:
distribution: temurin
java-version: ${{ matrix.java }}
java-version: | # Install all LTS java versions, the last mentioned one here will be the default
21
17
11
8
cache: maven
- name: Decrypt profile.json for Cloud ${{ matrix.snowflake_cloud }}
env:
Expand Down
63 changes: 32 additions & 31 deletions e2e-jar-test/core/pom.xml
Original file line number Diff line number Diff line change
@@ -1,38 +1,39 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>net.snowflake.snowflake-ingest-java-e2e-jar-test</groupId>
<artifactId>parent</artifactId>
<version>1.0-SNAPSHOT</version>
</parent>
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>net.snowflake.snowflake-ingest-java-e2e-jar-test</groupId>
<artifactId>parent</artifactId>
<version>1.0-SNAPSHOT</version>
</parent>

<artifactId>core</artifactId>
<name>core</name>
<packaging>jar</packaging>
<artifactId>core</artifactId>
<packaging>jar</packaging>
<name>core</name>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<dependencies>
<!-- Provided because we let submodules define which way they pull in the SDK (e.g. with/without snowflake-jdbc-fips, etc) -->
<dependency>
<groupId>net.snowflake</groupId>
<artifactId>snowflake-ingest-sdk</artifactId>
<scope>provided</scope>
</dependency>
<dependencies>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
</dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
</dependency>
<!-- Provided because we let submodules define which way they pull in the SDK (e.g. with/without snowflake-jdbc-fips, etc) -->
<dependency>
<groupId>net.snowflake</groupId>
<artifactId>snowflake-ingest-sdk</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
108 changes: 67 additions & 41 deletions e2e-jar-test/core/src/main/java/net/snowflake/IngestTestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.math.BigDecimal;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.security.KeyFactory;
Expand All @@ -22,9 +23,7 @@
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.UUID;

import net.snowflake.ingest.streaming.OpenChannelRequest;
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel;
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClient;
Expand All @@ -38,7 +37,6 @@ public class IngestTestUtils {
private final Connection connection;

private final String database;
private final String schema;
private final String table;

private final String testId;
Expand All @@ -51,38 +49,51 @@ public class IngestTestUtils {

private final ObjectMapper objectMapper = new ObjectMapper();

private final Random random = new Random();

private final Base64.Decoder base64Decoder = Base64.getDecoder();

public IngestTestUtils(String testName)
throws SQLException,
IOException,
ClassNotFoundException,
NoSuchAlgorithmException,
throws SQLException, IOException, ClassNotFoundException, NoSuchAlgorithmException,
InvalidKeySpecException {
testId = String.format("%s_%s", testName, UUID.randomUUID().toString().replace("-", "_"));
connection = getConnection();
database = String.format("database_%s", testId);
schema = String.format("schema_%s", testId);
table = String.format("table_%s", testId);
database = String.format("ingest_sdk_e2e_jar_database_%s", testId);
table = String.format("ingest_sdk_e2e_jar_table_%s", testId);

connection.createStatement().execute(String.format("create database %s", database));
connection.createStatement().execute(String.format("create schema %s", schema));
connection.createStatement().execute(String.format("create table %s (c1 int, c2 varchar, c3 binary)", table));

connection
.createStatement()
.execute(
String.format(
"create table %s ("
+ "boolean_col boolean,"
+ "int_col int,"
+ "number_col number(10, 5),"
+ "float_col float,"
+ "text_col text,"
+ "binary_col binary,"
+ "variant_col variant,"
+ "array_col array,"
+ "object_col object,"
+ "time_col time,"
+ "date_col date,"
+ "timestamp_ntz_col timestamp_ntz,"
+ "timestamp_ltz_col timestamp_ltz,"
+ "timestamp_tz_col timestamp_tz"
+ ");",
table));
client =
SnowflakeStreamingIngestClientFactory.builder("TestClient01")
.setProperties(loadProperties())
.build();

channel = client.openChannel(
channel =
client.openChannel(
OpenChannelRequest.builder(String.format("channel_%s", this.testId))
.setDBName(database)
.setSchemaName(schema)
.setTableName(table)
.setOnErrorOption(OpenChannelRequest.OnErrorOption.CONTINUE)
.build());
.setDBName(database)
.setSchemaName("PUBLIC")
.setTableName(table)
.setOnErrorOption(OpenChannelRequest.OnErrorOption.ABORT)
.build());
}

private Properties loadProperties() throws IOException {
Expand All @@ -97,7 +108,8 @@ private Properties loadProperties() throws IOException {
}

private Connection getConnection()
throws IOException, ClassNotFoundException, SQLException, NoSuchAlgorithmException, InvalidKeySpecException {
throws IOException, ClassNotFoundException, SQLException, NoSuchAlgorithmException,
InvalidKeySpecException {
Class.forName("net.snowflake.client.jdbc.SnowflakeDriver");

Properties loadedProps = loadProperties();
Expand All @@ -118,14 +130,20 @@ private Connection getConnection()

private Map<String, Object> createRow() {
Map<String, Object> row = new HashMap<>();

byte[] bytes = new byte[1024];
random.nextBytes(bytes);

row.put("c1", random.nextInt());
row.put("c2", String.valueOf(random.nextInt()));
row.put("c3", bytes);

row.put("boolean_col", false);
row.put("int_col", 1);
row.put("number_col", new BigDecimal("11111.11111"));
row.put("float_col", 1.234);
row.put("text_col", "test");
row.put("binary_col", new byte[] {1, 2, 3, 4, 5});
row.put("variant_col", "\"Hello, World!\"");
row.put("array_col", "[{\"k1\": \"v1\"}]");
row.put("object_col", "{\"k1\": \"v1\"}");
row.put("time_col", "00:00");
row.put("date_col", "2000-01-01");
row.put("timestamp_ntz_col", "2000-01-01T11:00");
row.put("timestamp_ltz_col", "2000-01-01T11:00");
row.put("timestamp_tz_col", "2000-01-01T11:00-07:00");
return row;
}

Expand All @@ -134,44 +152,44 @@ private Map<String, Object> createRow() {
* committed offset is equal to the passed offset
*/
private void waitForOffset(SnowflakeStreamingIngestChannel channel, String expectedOffset)
throws InterruptedException {
throws InterruptedException {
int counter = 0;
String lastCommittedOffset = null;
while (counter < 600) {
String currentOffset = channel.getLatestCommittedOffsetToken();
if (expectedOffset.equals(currentOffset)) {
return;
}
System.out.printf("Waiting for offset expected=%s actual=%s%n", expectedOffset, currentOffset);
System.out.printf(
"Waiting for offset expected=%s actual=%s%n", expectedOffset, currentOffset);
lastCommittedOffset = currentOffset;
counter++;
Thread.sleep(100);
}
throw new RuntimeException(
String.format(
"Timeout exceeded while waiting for offset %s. Last committed offset: %s",
expectedOffset, lastCommittedOffset));
String.format(
"Timeout exceeded while waiting for offset %s. Last committed offset: %s",
expectedOffset, lastCommittedOffset));
}

public void runBasicTest() throws InterruptedException {
// Insert few rows one by one
for (int offset = 2; offset < 1000; offset++) {
for (int offset = 0; offset < 1000; offset++) {
offset++;
channel.insertRow(createRow(), String.valueOf(offset));
}

// Insert a batch of rows
String offset = "final-offset";
channel.insertRows(
Arrays.asList(createRow(), createRow(), createRow(), createRow(), createRow()), offset);

Arrays.asList(createRow(), createRow(), createRow(), createRow(), createRow()), offset);
waitForOffset(channel, offset);
}

public void runLongRunningTest(Duration testDuration) throws InterruptedException {
final Instant testStart = Instant.now();
int counter = 0;
while(true) {
while (true) {
counter++;

channel.insertRow(createRow(), String.valueOf(counter));
Expand All @@ -183,7 +201,12 @@ public void runLongRunningTest(Duration testDuration) throws InterruptedExceptio

final Duration elapsed = Duration.between(testStart, Instant.now());

logger.info("Test loop_nr={} duration={}s/{}s committed_offset={}", counter, elapsed.get(ChronoUnit.SECONDS), testDuration.get(ChronoUnit.SECONDS), channel.getLatestCommittedOffsetToken());
logger.info(
"Test loop_nr={} duration={}s/{}s committed_offset={}",
counter,
elapsed.get(ChronoUnit.SECONDS),
testDuration.get(ChronoUnit.SECONDS),
channel.getLatestCommittedOffsetToken());

if (elapsed.compareTo(testDuration) > 0) {
break;
Expand All @@ -193,8 +216,11 @@ public void runLongRunningTest(Duration testDuration) throws InterruptedExceptio
}

public void close() throws Exception {
connection.close();
channel.close().get();
client.close();
connection
.createStatement()
.execute(String.format(String.format("drop database %s", database)));
connection.close();
}
}
Loading

0 comments on commit 57704dc

Please sign in to comment.