Skip to content

Commit

Permalink
Merge branch 'master' into psaha-SNOW-965885-drop-channel-client
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-psaha authored Jan 2, 2024
2 parents 7972b1b + 24cef7a commit 809e6c1
Show file tree
Hide file tree
Showing 7 changed files with 195 additions and 14 deletions.
4 changes: 2 additions & 2 deletions Jenkinsfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pipeline {
build job: "SFPerf-Other-Jobs/TPCDS_BDEC_Setup",
parameters: [
string(name: 'ingest_sdk_github_branch', value: ingest_sdk_tag),
string(name: 'database', value: "STREAMING_INGEST_BENCHMARK_DB_${valid_db_name_tag}"),
string(name: 'database', value: "BENCHMARK_DB_BDEC_PERFORMANCE_SIGNOFF_${valid_db_name_tag}"),
string(name: 'deployment', value: 'qa3.us-west-2.aws'),
string(name: 'tpcds_scale_factor', value: 'sf1000')
],
Expand All @@ -26,7 +26,7 @@ pipeline {
build job: "SFPerf-Other-Jobs/TPCDS_BDEC_Setup",
parameters: [
string(name: 'ingest_sdk_github_branch', value: ingest_sdk_tag),
string(name: 'database', value: "STREAMING_INGEST_BENCHMARK_DB_${valid_db_name_tag}"),
string(name: 'database', value: "BENCHMARK_DB_BDEC_PERFORMANCE_SIGNOFF_${valid_db_name_tag}"),
string(name: 'deployment', value: 'preprod12.us-west-2.aws'),
string(name: 'tpcds_scale_factor', value: 'sf1000')
],
Expand Down
81 changes: 81 additions & 0 deletions src/main/java/net/snowflake/ingest/connection/RequestBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ public class RequestBuilder {
// whatever the actual host is
private final String host;

private final String accountName;

private final boolean addAccountNameInRequest;

private final String userAgentSuffix;

// Reference to the telemetry service
Expand Down Expand Up @@ -123,6 +127,8 @@ public class RequestBuilder {

public static final String HTTP_HEADER_CONTENT_TYPE_JSON = "application/json";

private static final String SF_HEADER_ACCOUNT_NAME = "Snowflake-Account";

/**
* RequestBuilder - general usage constructor
*
Expand Down Expand Up @@ -206,6 +212,36 @@ public RequestBuilder(
null);
}

/**
* RequestBuilder - constructor used by streaming ingest
*
* @param url - the Snowflake account to which we're connecting
* @param userName - the username of the entity loading files
* @param credential - the credential we'll use to authenticate
* @param httpClient - reference to the http client
* @param clientName - name of the client, used to uniquely identify a client if used
*/
public RequestBuilder(
SnowflakeURL url,
String userName,
Object credential,
CloseableHttpClient httpClient,
String clientName,
boolean addAccountNameInRequest) {
this(
url.getAccount(),
userName,
credential,
url.getScheme(),
url.getUrlWithoutPort(),
url.getPort(),
null,
null,
httpClient,
clientName,
addAccountNameInRequest);
}

/**
* RequestBuilder - constructor used by streaming ingest
*
Expand Down Expand Up @@ -259,6 +295,46 @@ public RequestBuilder(
SecurityManager securityManager,
CloseableHttpClient httpClient,
String clientName) {
this(accountName,
userName,
credential,
schemeName,
hostName,
portNum,
userAgentSuffix,
securityManager,
httpClient,
clientName,
false);
}

/**
* RequestBuilder - this constructor is for testing purposes only
*
* @param accountName - the account name to which we're connecting
* @param userName - for whom are we connecting?
* @param credential - our auth credentials, either JWT key pair or OAuth credential
* @param schemeName - are we HTTP or HTTPS?
* @param hostName - the host for this snowflake instance
* @param portNum - the port number
* @param userAgentSuffix - The suffix part of HTTP Header User-Agent
* @param securityManager - The security manager for authentication
* @param httpClient - reference to the http client
* @param clientName - name of the client, used to uniquely identify a client if used
* @param addAccountNameInRequest if ture, add account name in request header
*/
public RequestBuilder(
String accountName,
String userName,
Object credential,
String schemeName,
String hostName,
int portNum,
String userAgentSuffix,
SecurityManager securityManager,
CloseableHttpClient httpClient,
String clientName,
boolean addAccountNameInRequest) {
// none of these arguments should be null
if (accountName == null || userName == null || credential == null) {
throw new IllegalArgumentException();
Expand All @@ -281,6 +357,8 @@ public RequestBuilder(
this.scheme = schemeName;
this.host = hostName;
this.userAgentSuffix = userAgentSuffix;
this.accountName = accountName;
this.addAccountNameInRequest = addAccountNameInRequest;

// create our security/token manager
if (securityManager == null) {
Expand Down Expand Up @@ -570,6 +648,9 @@ private static void addUserAgent(HttpUriRequest request, String userAgentSuffix)
public void addToken(HttpUriRequest request) {
request.setHeader(HttpHeaders.AUTHORIZATION, BEARER_PARAMETER + securityManager.getToken());
request.setHeader(SF_HEADER_AUTHORIZATION_TOKEN_TYPE, this.securityManager.getTokenType());
if(addAccountNameInRequest) {
request.setHeader(SF_HEADER_ACCOUNT_NAME, accountName);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import java.util.Map;
import java.util.Properties;

import net.snowflake.ingest.streaming.internal.SnowflakeStreamingIngestClientInternal;
import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.SnowflakeURL;
Expand All @@ -28,6 +29,12 @@ public static class Builder {
// Allows client to override some default parameter values
private Map<String, Object> parameterOverrides;

// preset SnowflakeURL instance
private SnowflakeURL snowflakeURL;

// flag to specify if we need to add account name in the request header
private boolean addAccountNameInRequest;

private Builder(String name) {
this.name = name;
}
Expand All @@ -37,6 +44,16 @@ public Builder setProperties(Properties prop) {
return this;
}

public Builder setSnowflakeURL(SnowflakeURL snowflakeURL) {
this.snowflakeURL = snowflakeURL;
return this;
}

public Builder setAddAccountNameInRequest(boolean addAccountNameInRequest) {
this.addAccountNameInRequest = addAccountNameInRequest;
return this;
}

public Builder setParameterOverrides(Map<String, Object> parameterOverrides) {
this.parameterOverrides = parameterOverrides;
return this;
Expand All @@ -47,10 +64,17 @@ public SnowflakeStreamingIngestClient build() {
Utils.assertNotNull("connection properties", this.prop);

Properties prop = Utils.createProperties(this.prop);
SnowflakeURL accountURL = new SnowflakeURL(prop.getProperty(Constants.ACCOUNT_URL));
SnowflakeURL accountURL = this.snowflakeURL;
if (accountURL == null) {
accountURL = new SnowflakeURL(prop.getProperty(Constants.ACCOUNT_URL));
}

if (addAccountNameInRequest) {
return new SnowflakeStreamingIngestClientInternal<>(
this.name, accountURL, prop, this.parameterOverrides, addAccountNameInRequest);
}
return new SnowflakeStreamingIngestClientInternal<>(
this.name, accountURL, prop, this.parameterOverrides);
this.name, accountURL, prop, this.parameterOverrides);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,33 @@ class BlobMetadata {
private final Constants.BdecVersion bdecVersion;
private final List<ChunkMetadata> chunks;
private final BlobStats blobStats;
private final boolean spansMixedTables;

// used for testing only
@VisibleForTesting
BlobMetadata(String path, String md5, List<ChunkMetadata> chunks, BlobStats blobStats) {
this(path, md5, ParameterProvider.BLOB_FORMAT_VERSION_DEFAULT, chunks, blobStats);
this(
path,
md5,
ParameterProvider.BLOB_FORMAT_VERSION_DEFAULT,
chunks,
blobStats,
chunks == null ? false : chunks.size() > 1);
}

BlobMetadata(
String path,
String md5,
Constants.BdecVersion bdecVersion,
List<ChunkMetadata> chunks,
BlobStats blobStats) {
BlobStats blobStats,
boolean spansMixedTables) {
this.path = path;
this.md5 = md5;
this.bdecVersion = bdecVersion;
this.chunks = chunks;
this.blobStats = blobStats;
this.spansMixedTables = spansMixedTables;
}

@JsonIgnore
Expand Down Expand Up @@ -68,13 +77,19 @@ BlobStats getBlobStats() {
return this.blobStats;
}

@JsonProperty("spans_mixed_tables")
boolean getSpansMixedTables() {
return this.spansMixedTables;
}

/** Create {@link BlobMetadata}. */
static BlobMetadata createBlobMetadata(
String path,
String md5,
Constants.BdecVersion bdecVersion,
List<ChunkMetadata> chunks,
BlobStats blobStats) {
return new BlobMetadata(path, md5, bdecVersion, chunks, blobStats);
BlobStats blobStats,
boolean spansMixedTables) {
return new BlobMetadata(path, md5, bdecVersion, chunks, blobStats, spansMixedTables);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -568,8 +568,15 @@ BlobMetadata upload(
blob.length,
System.currentTimeMillis() - startTime);

// at this point we know for sure if the BDEC file has data for more than one chunk, i.e.
// spans mixed tables or not
return BlobMetadata.createBlobMetadata(
blobPath, BlobBuilder.computeMD5(blob), bdecVersion, metadata, blobStats);
blobPath,
BlobBuilder.computeMD5(blob),
bdecVersion,
metadata,
blobStats,
metadata == null ? false : metadata.size() > 1);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,30 @@ public class SnowflakeStreamingIngestClientInternal<T> implements SnowflakeStrea
boolean isTestMode,
RequestBuilder requestBuilder,
Map<String, Object> parameterOverrides) {
this(name, accountURL, prop, httpClient, isTestMode, requestBuilder, parameterOverrides, false);
}

/**
* Constructor
*
* @param name the name of the client
* @param accountURL Snowflake account url
* @param prop connection properties
* @param httpClient http client for sending request
* @param isTestMode whether we're under test mode
* @param requestBuilder http request builder
* @param parameterOverrides parameters we override in case we want to set different values
* @param addAccountNameInRequest if true, will add account name in request header
*/
SnowflakeStreamingIngestClientInternal(
String name,
SnowflakeURL accountURL,
Properties prop,
CloseableHttpClient httpClient,
boolean isTestMode,
RequestBuilder requestBuilder,
Map<String, Object> parameterOverrides,
boolean addAccountNameInRequest) {
this.parameterProvider = new ParameterProvider(parameterOverrides, prop);

this.name = name;
Expand Down Expand Up @@ -193,13 +217,13 @@ public class SnowflakeStreamingIngestClientInternal<T> implements SnowflakeStrea
prop.getProperty(Constants.OAUTH_CLIENT_SECRET),
prop.getProperty(Constants.OAUTH_REFRESH_TOKEN));
}
this.requestBuilder =
new RequestBuilder(
this.requestBuilder = new RequestBuilder(
accountURL,
prop.get(USER).toString(),
credential,
this.httpClient,
String.format("%s_%s", this.name, System.currentTimeMillis()));
String.format("%s_%s", this.name, System.currentTimeMillis()),
addAccountNameInRequest);

logger.logInfo("Using {} for authorization", this.requestBuilder.getAuthType());

Expand Down Expand Up @@ -239,6 +263,31 @@ public SnowflakeStreamingIngestClientInternal(
this(name, accountURL, prop, null, false, null, parameterOverrides);
}

/**
* Default Constructor
*
* @param name the name of the client
* @param accountURL Snowflake account url
* @param prop connection properties
* @param parameterOverrides map of parameters to override for this client
* @param addAccountNameInRequest if true, add account name in request header
*/
public SnowflakeStreamingIngestClientInternal(
String name,
SnowflakeURL accountURL,
Properties prop,
Map<String, Object> parameterOverrides,
boolean addAccountNameInRequest) {
this(name,
accountURL,
prop,
null,
false,
null,
parameterOverrides,
addAccountNameInRequest);
}

/**
* Constructor for TEST ONLY
*
Expand Down Expand Up @@ -732,7 +781,10 @@ List<BlobMetadata> getRetryBlobs(
blobMetadata.getMD5(),
blobMetadata.getVersion(),
relevantChunks,
blobMetadata.getBlobStats()));
blobMetadata.getBlobStats(),
// Important to not change the spansMixedTables value in case of retries. The
// correct value is the value that the already uploaded blob has.
blobMetadata.getSpansMixedTables()));
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import net.snowflake.ingest.utils.Pair;
Expand All @@ -17,7 +18,7 @@
public class RegisterServiceTest {

@Test
public void testRegisterService() {
public void testRegisterService() throws ExecutionException, InterruptedException {
RegisterService<StubChunkData> rs = new RegisterService<>(null, true);

Pair<FlushService.BlobData<StubChunkData>, CompletableFuture<BlobMetadata>> blobFuture =
Expand All @@ -26,6 +27,7 @@ public void testRegisterService() {
CompletableFuture.completedFuture(new BlobMetadata("path", "md5", null, null)));
rs.addBlobs(Collections.singletonList(blobFuture));
Assert.assertEquals(1, rs.getBlobsList().size());
Assert.assertEquals(false, blobFuture.getValue().get().getSpansMixedTables());
List<FlushService.BlobData<StubChunkData>> errorBlobs = rs.registerBlobs(null);
Assert.assertEquals(0, rs.getBlobsList().size());
Assert.assertEquals(0, errorBlobs.size());
Expand Down

0 comments on commit 809e6c1

Please sign in to comment.