Skip to content

Commit

Permalink
Merge branch 'master' into lthiede-SNOW-983635-ZSTD-compression
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-lthiede committed Jan 4, 2024
2 parents 1b13e17 + 24cef7a commit d06af36
Show file tree
Hide file tree
Showing 7 changed files with 235 additions and 12 deletions.
42 changes: 42 additions & 0 deletions Jenkinsfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
pipeline {
agent { label 'parallelizable-c7' }
options { timestamps() }
environment {
ingest_sdk_dir = "${WORKSPACE}/snowflake-ingest-java"
ingest_sdk_tag = sh(returnStdout: true, script: "cd $ingest_sdk_dir && git describe --tags").trim()

}
stages {
stage('TriggerJobs') {
steps {
script {
def valid_db_name_tag = ingest_sdk_tag.split('\\.').join('_')
def deployments = [
"qa3": {
build job: "SFPerf-Other-Jobs/TPCDS_BDEC_Setup",
parameters: [
string(name: 'ingest_sdk_github_branch', value: ingest_sdk_tag),
string(name: 'database', value: "BENCHMARK_DB_BDEC_PERFORMANCE_SIGNOFF_${valid_db_name_tag}"),
string(name: 'deployment', value: 'qa3.us-west-2.aws'),
string(name: 'tpcds_scale_factor', value: 'sf1000')
],
propagate: true
},
"preprod12": {
build job: "SFPerf-Other-Jobs/TPCDS_BDEC_Setup",
parameters: [
string(name: 'ingest_sdk_github_branch', value: ingest_sdk_tag),
string(name: 'database', value: "BENCHMARK_DB_BDEC_PERFORMANCE_SIGNOFF_${valid_db_name_tag}"),
string(name: 'deployment', value: 'preprod12.us-west-2.aws'),
string(name: 'tpcds_scale_factor', value: 'sf1000')
],
propagate: true
}
]

parallel deployments
}
}
}
}
}
81 changes: 81 additions & 0 deletions src/main/java/net/snowflake/ingest/connection/RequestBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ public class RequestBuilder {
// whatever the actual host is
private final String host;

private final String accountName;

private final boolean addAccountNameInRequest;

private final String userAgentSuffix;

// Reference to the telemetry service
Expand Down Expand Up @@ -123,6 +127,8 @@ public class RequestBuilder {

public static final String HTTP_HEADER_CONTENT_TYPE_JSON = "application/json";

private static final String SF_HEADER_ACCOUNT_NAME = "Snowflake-Account";

/**
* RequestBuilder - general usage constructor
*
Expand Down Expand Up @@ -206,6 +212,36 @@ public RequestBuilder(
null);
}

/**
* RequestBuilder - constructor used by streaming ingest
*
* @param url - the Snowflake account to which we're connecting
* @param userName - the username of the entity loading files
* @param credential - the credential we'll use to authenticate
* @param httpClient - reference to the http client
* @param clientName - name of the client, used to uniquely identify a client if used
*/
public RequestBuilder(
SnowflakeURL url,
String userName,
Object credential,
CloseableHttpClient httpClient,
String clientName,
boolean addAccountNameInRequest) {
this(
url.getAccount(),
userName,
credential,
url.getScheme(),
url.getUrlWithoutPort(),
url.getPort(),
null,
null,
httpClient,
clientName,
addAccountNameInRequest);
}

/**
* RequestBuilder - constructor used by streaming ingest
*
Expand Down Expand Up @@ -259,6 +295,46 @@ public RequestBuilder(
SecurityManager securityManager,
CloseableHttpClient httpClient,
String clientName) {
this(accountName,
userName,
credential,
schemeName,
hostName,
portNum,
userAgentSuffix,
securityManager,
httpClient,
clientName,
false);
}

/**
* RequestBuilder - this constructor is for testing purposes only
*
* @param accountName - the account name to which we're connecting
* @param userName - for whom are we connecting?
* @param credential - our auth credentials, either JWT key pair or OAuth credential
* @param schemeName - are we HTTP or HTTPS?
* @param hostName - the host for this snowflake instance
* @param portNum - the port number
* @param userAgentSuffix - The suffix part of HTTP Header User-Agent
* @param securityManager - The security manager for authentication
* @param httpClient - reference to the http client
* @param clientName - name of the client, used to uniquely identify a client if used
* @param addAccountNameInRequest if ture, add account name in request header
*/
public RequestBuilder(
String accountName,
String userName,
Object credential,
String schemeName,
String hostName,
int portNum,
String userAgentSuffix,
SecurityManager securityManager,
CloseableHttpClient httpClient,
String clientName,
boolean addAccountNameInRequest) {
// none of these arguments should be null
if (accountName == null || userName == null || credential == null) {
throw new IllegalArgumentException();
Expand All @@ -281,6 +357,8 @@ public RequestBuilder(
this.scheme = schemeName;
this.host = hostName;
this.userAgentSuffix = userAgentSuffix;
this.accountName = accountName;
this.addAccountNameInRequest = addAccountNameInRequest;

// create our security/token manager
if (securityManager == null) {
Expand Down Expand Up @@ -570,6 +648,9 @@ private static void addUserAgent(HttpUriRequest request, String userAgentSuffix)
public void addToken(HttpUriRequest request) {
request.setHeader(HttpHeaders.AUTHORIZATION, BEARER_PARAMETER + securityManager.getToken());
request.setHeader(SF_HEADER_AUTHORIZATION_TOKEN_TYPE, this.securityManager.getTokenType());
if(addAccountNameInRequest) {
request.setHeader(SF_HEADER_ACCOUNT_NAME, accountName);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import java.util.Map;
import java.util.Properties;

import net.snowflake.ingest.streaming.internal.SnowflakeStreamingIngestClientInternal;
import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.SnowflakeURL;
Expand All @@ -28,6 +29,12 @@ public static class Builder {
// Allows client to override some default parameter values
private Map<String, Object> parameterOverrides;

// preset SnowflakeURL instance
private SnowflakeURL snowflakeURL;

// flag to specify if we need to add account name in the request header
private boolean addAccountNameInRequest;

private Builder(String name) {
this.name = name;
}
Expand All @@ -37,6 +44,16 @@ public Builder setProperties(Properties prop) {
return this;
}

public Builder setSnowflakeURL(SnowflakeURL snowflakeURL) {
this.snowflakeURL = snowflakeURL;
return this;
}

public Builder setAddAccountNameInRequest(boolean addAccountNameInRequest) {
this.addAccountNameInRequest = addAccountNameInRequest;
return this;
}

public Builder setParameterOverrides(Map<String, Object> parameterOverrides) {
this.parameterOverrides = parameterOverrides;
return this;
Expand All @@ -47,10 +64,17 @@ public SnowflakeStreamingIngestClient build() {
Utils.assertNotNull("connection properties", this.prop);

Properties prop = Utils.createProperties(this.prop);
SnowflakeURL accountURL = new SnowflakeURL(prop.getProperty(Constants.ACCOUNT_URL));
SnowflakeURL accountURL = this.snowflakeURL;
if (accountURL == null) {
accountURL = new SnowflakeURL(prop.getProperty(Constants.ACCOUNT_URL));
}

if (addAccountNameInRequest) {
return new SnowflakeStreamingIngestClientInternal<>(
this.name, accountURL, prop, this.parameterOverrides, addAccountNameInRequest);
}
return new SnowflakeStreamingIngestClientInternal<>(
this.name, accountURL, prop, this.parameterOverrides);
this.name, accountURL, prop, this.parameterOverrides);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,33 @@ class BlobMetadata {
private final Constants.BdecVersion bdecVersion;
private final List<ChunkMetadata> chunks;
private final BlobStats blobStats;
private final boolean spansMixedTables;

// used for testing only
@VisibleForTesting
BlobMetadata(String path, String md5, List<ChunkMetadata> chunks, BlobStats blobStats) {
this(path, md5, ParameterProvider.BLOB_FORMAT_VERSION_DEFAULT, chunks, blobStats);
this(
path,
md5,
ParameterProvider.BLOB_FORMAT_VERSION_DEFAULT,
chunks,
blobStats,
chunks == null ? false : chunks.size() > 1);
}

BlobMetadata(
String path,
String md5,
Constants.BdecVersion bdecVersion,
List<ChunkMetadata> chunks,
BlobStats blobStats) {
BlobStats blobStats,
boolean spansMixedTables) {
this.path = path;
this.md5 = md5;
this.bdecVersion = bdecVersion;
this.chunks = chunks;
this.blobStats = blobStats;
this.spansMixedTables = spansMixedTables;
}

@JsonIgnore
Expand Down Expand Up @@ -68,13 +77,19 @@ BlobStats getBlobStats() {
return this.blobStats;
}

@JsonProperty("spans_mixed_tables")
boolean getSpansMixedTables() {
return this.spansMixedTables;
}

/** Create {@link BlobMetadata}. */
static BlobMetadata createBlobMetadata(
String path,
String md5,
Constants.BdecVersion bdecVersion,
List<ChunkMetadata> chunks,
BlobStats blobStats) {
return new BlobMetadata(path, md5, bdecVersion, chunks, blobStats);
BlobStats blobStats,
boolean spansMixedTables) {
return new BlobMetadata(path, md5, bdecVersion, chunks, blobStats, spansMixedTables);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -568,8 +568,15 @@ BlobMetadata upload(
blob.length,
System.currentTimeMillis() - startTime);

// at this point we know for sure if the BDEC file has data for more than one chunk, i.e.
// spans mixed tables or not
return BlobMetadata.createBlobMetadata(
blobPath, BlobBuilder.computeMD5(blob), bdecVersion, metadata, blobStats);
blobPath,
BlobBuilder.computeMD5(blob),
bdecVersion,
metadata,
blobStats,
metadata == null ? false : metadata.size() > 1);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,30 @@ public class SnowflakeStreamingIngestClientInternal<T> implements SnowflakeStrea
boolean isTestMode,
RequestBuilder requestBuilder,
Map<String, Object> parameterOverrides) {
this(name, accountURL, prop, httpClient, isTestMode, requestBuilder, parameterOverrides, false);
}

/**
* Constructor
*
* @param name the name of the client
* @param accountURL Snowflake account url
* @param prop connection properties
* @param httpClient http client for sending request
* @param isTestMode whether we're under test mode
* @param requestBuilder http request builder
* @param parameterOverrides parameters we override in case we want to set different values
* @param addAccountNameInRequest if true, will add account name in request header
*/
SnowflakeStreamingIngestClientInternal(
String name,
SnowflakeURL accountURL,
Properties prop,
CloseableHttpClient httpClient,
boolean isTestMode,
RequestBuilder requestBuilder,
Map<String, Object> parameterOverrides,
boolean addAccountNameInRequest) {
this.parameterProvider = new ParameterProvider(parameterOverrides, prop);

this.name = name;
Expand Down Expand Up @@ -190,13 +214,13 @@ public class SnowflakeStreamingIngestClientInternal<T> implements SnowflakeStrea
prop.getProperty(Constants.OAUTH_CLIENT_SECRET),
prop.getProperty(Constants.OAUTH_REFRESH_TOKEN));
}
this.requestBuilder =
new RequestBuilder(
this.requestBuilder = new RequestBuilder(
accountURL,
prop.get(USER).toString(),
credential,
this.httpClient,
String.format("%s_%s", this.name, System.currentTimeMillis()));
String.format("%s_%s", this.name, System.currentTimeMillis()),
addAccountNameInRequest);

logger.logInfo("Using {} for authorization", this.requestBuilder.getAuthType());

Expand Down Expand Up @@ -236,6 +260,31 @@ public SnowflakeStreamingIngestClientInternal(
this(name, accountURL, prop, null, false, null, parameterOverrides);
}

/**
* Default Constructor
*
* @param name the name of the client
* @param accountURL Snowflake account url
* @param prop connection properties
* @param parameterOverrides map of parameters to override for this client
* @param addAccountNameInRequest if true, add account name in request header
*/
public SnowflakeStreamingIngestClientInternal(
String name,
SnowflakeURL accountURL,
Properties prop,
Map<String, Object> parameterOverrides,
boolean addAccountNameInRequest) {
this(name,
accountURL,
prop,
null,
false,
null,
parameterOverrides,
addAccountNameInRequest);
}

/**
* Constructor for TEST ONLY
*
Expand Down Expand Up @@ -670,7 +719,10 @@ List<BlobMetadata> getRetryBlobs(
blobMetadata.getMD5(),
blobMetadata.getVersion(),
relevantChunks,
blobMetadata.getBlobStats()));
blobMetadata.getBlobStats(),
// Important to not change the spansMixedTables value in case of retries. The
// correct value is the value that the already uploaded blob has.
blobMetadata.getSpansMixedTables()));
}
});

Expand Down
Loading

0 comments on commit d06af36

Please sign in to comment.