Skip to content

Commit

Permalink
Merge branch 'master' into tzhang-si-release
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-xhuang authored Jan 17, 2024
2 parents 54ad136 + 1c85dda commit 15e5592
Show file tree
Hide file tree
Showing 12 changed files with 165 additions and 187 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/End2EndTest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ jobs:
fail-fast: false
matrix:
java: [ 8 ]
snowflake_cloud: [ 'AWS' ]
snowflake_cloud: [ 'AWS', 'AZURE', 'GCP' ]
steps:
- name: Checkout Code
uses: actions/checkout@v2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.Base64;
import java.util.HashMap;
Expand All @@ -26,6 +29,8 @@
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel;
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClient;
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClientFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IngestTestUtils {
private static final String PROFILE_PATH = "profile.json";
Expand All @@ -38,6 +43,8 @@ public class IngestTestUtils {

private final String testId;

private static final Logger logger = LoggerFactory.getLogger(IngestTestUtils.class);

private final SnowflakeStreamingIngestClient client;

private final SnowflakeStreamingIngestChannel channel;
Expand Down Expand Up @@ -146,7 +153,7 @@ private void waitForOffset(SnowflakeStreamingIngestChannel channel, String expec
expectedOffset, lastCommittedOffset));
}

public void test() throws InterruptedException {
public void runBasicTest() throws InterruptedException {
// Insert few rows one by one
for (int offset = 2; offset < 1000; offset++) {
offset++;
Expand All @@ -161,6 +168,30 @@ public void test() throws InterruptedException {
waitForOffset(channel, offset);
}

public void runLongRunningTest(Duration testDuration) throws InterruptedException {
final Instant testStart = Instant.now();
int counter = 0;
while(true) {
counter++;

channel.insertRow(createRow(), String.valueOf(counter));

if (!channel.isValid()) {
throw new IllegalStateException("Channel has been invalidated");
}
Thread.sleep(60000);

final Duration elapsed = Duration.between(testStart, Instant.now());

logger.info("Test loop_nr={} duration={}s/{}s committed_offset={}", counter, elapsed.get(ChronoUnit.SECONDS), testDuration.get(ChronoUnit.SECONDS), channel.getLatestCommittedOffsetToken());

if (elapsed.compareTo(testDuration) > 0) {
break;
}
}
waitForOffset(channel, String.valueOf(counter));
}

public void close() throws Exception {
connection.close();
channel.close().get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@
import org.bouncycastle.jcajce.provider.BouncyCastleFipsProvider;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;

import java.security.Security;
import java.time.Duration;
import java.time.temporal.ChronoUnit;

public class FipsIngestE2ETest {

Expand All @@ -25,7 +28,13 @@ public void tearDown() throws Exception {
}

@Test
public void name() throws InterruptedException {
ingestTestUtils.test();
public void basicTest() throws InterruptedException {
ingestTestUtils.runBasicTest();
}

@Test
@Ignore("Takes too long to run")
public void longRunningTest() throws InterruptedException {
ingestTestUtils.runLongRunningTest(Duration.of(80, ChronoUnit.MINUTES));
}
}
2 changes: 1 addition & 1 deletion e2e-jar-test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
<dependency>
<groupId>net.snowflake</groupId>
<artifactId>snowflake-jdbc-fips</artifactId>
<version>3.13.30</version>
<version>3.13.34</version>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@

import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;

import java.time.Duration;
import java.time.temporal.ChronoUnit;

public class StandardIngestE2ETest {

private IngestTestUtils ingestTestUtils;
Expand All @@ -19,7 +23,13 @@ public void tearDown() throws Exception {
}

@Test
public void name() throws InterruptedException {
ingestTestUtils.test();
public void basicTest() throws InterruptedException {
ingestTestUtils.runBasicTest();
}

@Test
@Ignore("Takes too long to run")
public void longRunningTest() throws InterruptedException {
ingestTestUtils.runLongRunningTest(Duration.of(80, ChronoUnit.MINUTES));
}
}
82 changes: 0 additions & 82 deletions src/main/java/net/snowflake/ingest/connection/RequestBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,6 @@ public class RequestBuilder {
// whatever the actual host is
private final String host;

private final String accountName;

private final boolean addAccountNameInRequest;

private final String userAgentSuffix;

// Reference to the telemetry service
Expand Down Expand Up @@ -127,8 +123,6 @@ public class RequestBuilder {

public static final String HTTP_HEADER_CONTENT_TYPE_JSON = "application/json";

private static final String SF_HEADER_ACCOUNT_NAME = "Snowflake-Account";

/**
* RequestBuilder - general usage constructor
*
Expand Down Expand Up @@ -212,36 +206,6 @@ public RequestBuilder(
null);
}

/**
* RequestBuilder - constructor used by streaming ingest
*
* @param url - the Snowflake account to which we're connecting
* @param userName - the username of the entity loading files
* @param credential - the credential we'll use to authenticate
* @param httpClient - reference to the http client
* @param clientName - name of the client, used to uniquely identify a client if used
*/
public RequestBuilder(
SnowflakeURL url,
String userName,
Object credential,
CloseableHttpClient httpClient,
String clientName,
boolean addAccountNameInRequest) {
this(
url.getAccount(),
userName,
credential,
url.getScheme(),
url.getUrlWithoutPort(),
url.getPort(),
null,
null,
httpClient,
clientName,
addAccountNameInRequest);
}

/**
* RequestBuilder - constructor used by streaming ingest
*
Expand Down Expand Up @@ -295,47 +259,6 @@ public RequestBuilder(
SecurityManager securityManager,
CloseableHttpClient httpClient,
String clientName) {
this(
accountName,
userName,
credential,
schemeName,
hostName,
portNum,
userAgentSuffix,
securityManager,
httpClient,
clientName,
false);
}

/**
* RequestBuilder - this constructor is for testing purposes only
*
* @param accountName - the account name to which we're connecting
* @param userName - for whom are we connecting?
* @param credential - our auth credentials, either JWT key pair or OAuth credential
* @param schemeName - are we HTTP or HTTPS?
* @param hostName - the host for this snowflake instance
* @param portNum - the port number
* @param userAgentSuffix - The suffix part of HTTP Header User-Agent
* @param securityManager - The security manager for authentication
* @param httpClient - reference to the http client
* @param clientName - name of the client, used to uniquely identify a client if used
* @param addAccountNameInRequest if ture, add account name in request header
*/
public RequestBuilder(
String accountName,
String userName,
Object credential,
String schemeName,
String hostName,
int portNum,
String userAgentSuffix,
SecurityManager securityManager,
CloseableHttpClient httpClient,
String clientName,
boolean addAccountNameInRequest) {
// none of these arguments should be null
if (accountName == null || userName == null || credential == null) {
throw new IllegalArgumentException();
Expand All @@ -358,8 +281,6 @@ public RequestBuilder(
this.scheme = schemeName;
this.host = hostName;
this.userAgentSuffix = userAgentSuffix;
this.accountName = accountName;
this.addAccountNameInRequest = addAccountNameInRequest;

// create our security/token manager
if (securityManager == null) {
Expand Down Expand Up @@ -649,9 +570,6 @@ private static void addUserAgent(HttpUriRequest request, String userAgentSuffix)
public void addToken(HttpUriRequest request) {
request.setHeader(HttpHeaders.AUTHORIZATION, BEARER_PARAMETER + securityManager.getToken());
request.setHeader(SF_HEADER_AUTHORIZATION_TOKEN_TYPE, this.securityManager.getTokenType());
if (addAccountNameInRequest) {
request.setHeader(SF_HEADER_ACCOUNT_NAME, accountName);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,8 @@ public static class Builder {
// Allows client to override some default parameter values
private Map<String, Object> parameterOverrides;

// preset SnowflakeURL instance
private SnowflakeURL snowflakeURL;

// flag to specify if we need to add account name in the request header
private boolean addAccountNameInRequest;
// Indicates whether it's under test mode
private boolean isTestMode;

private Builder(String name) {
this.name = name;
Expand All @@ -43,18 +40,13 @@ public Builder setProperties(Properties prop) {
return this;
}

public Builder setSnowflakeURL(SnowflakeURL snowflakeURL) {
this.snowflakeURL = snowflakeURL;
return this;
}

public Builder setAddAccountNameInRequest(boolean addAccountNameInRequest) {
this.addAccountNameInRequest = addAccountNameInRequest;
public Builder setParameterOverrides(Map<String, Object> parameterOverrides) {
this.parameterOverrides = parameterOverrides;
return this;
}

public Builder setParameterOverrides(Map<String, Object> parameterOverrides) {
this.parameterOverrides = parameterOverrides;
public Builder setIsTestMode(boolean isTestMode) {
this.isTestMode = isTestMode;
return this;
}

Expand All @@ -63,17 +55,10 @@ public SnowflakeStreamingIngestClient build() {
Utils.assertNotNull("connection properties", this.prop);

Properties prop = Utils.createProperties(this.prop);
SnowflakeURL accountURL = this.snowflakeURL;
if (accountURL == null) {
accountURL = new SnowflakeURL(prop.getProperty(Constants.ACCOUNT_URL));
}
SnowflakeURL accountURL = new SnowflakeURL(prop.getProperty(Constants.ACCOUNT_URL));

if (addAccountNameInRequest) {
return new SnowflakeStreamingIngestClientInternal<>(
this.name, accountURL, prop, this.parameterOverrides, addAccountNameInRequest);
}
return new SnowflakeStreamingIngestClientInternal<>(
this.name, accountURL, prop, this.parameterOverrides);
this.name, accountURL, prop, this.parameterOverrides, this.isTestMode);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@
*/
class FlushService<T> {

// The max number of upload retry attempts to the stage
private static final int DEFAULT_MAX_UPLOAD_RETRIES = 5;

// Static class to save the list of channels that are used to build a blob, which is mainly used
// to invalidate all the channels when there is a failure
static class BlobData<T> {
Expand Down Expand Up @@ -163,7 +166,8 @@ List<List<ChannelData<T>>> getData() {
client.getRole(),
client.getHttpClient(),
client.getRequestBuilder(),
client.getName());
client.getName(),
DEFAULT_MAX_UPLOAD_RETRIES);
} catch (SnowflakeSQLException | IOException err) {
throw new SFException(err, ErrorCode.UNABLE_TO_CONNECT_TO_STAGE);
}
Expand Down
Loading

0 comments on commit 15e5592

Please sign in to comment.