diff --git a/Jenkinsfile b/Jenkinsfile index f1a340a80..eedd90a7b 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -16,7 +16,7 @@ pipeline { build job: "SFPerf-Other-Jobs/TPCDS_BDEC_Setup", parameters: [ string(name: 'ingest_sdk_github_branch', value: ingest_sdk_tag), - string(name: 'database', value: "STREAMING_INGEST_BENCHMARK_DB_${valid_db_name_tag}"), + string(name: 'database', value: "BENCHMARK_DB_BDEC_PERFORMANCE_SIGNOFF_${valid_db_name_tag}"), string(name: 'deployment', value: 'qa3.us-west-2.aws'), string(name: 'tpcds_scale_factor', value: 'sf1000') ], @@ -26,7 +26,7 @@ pipeline { build job: "SFPerf-Other-Jobs/TPCDS_BDEC_Setup", parameters: [ string(name: 'ingest_sdk_github_branch', value: ingest_sdk_tag), - string(name: 'database', value: "STREAMING_INGEST_BENCHMARK_DB_${valid_db_name_tag}"), + string(name: 'database', value: "BENCHMARK_DB_BDEC_PERFORMANCE_SIGNOFF_${valid_db_name_tag}"), string(name: 'deployment', value: 'preprod12.us-west-2.aws'), string(name: 'tpcds_scale_factor', value: 'sf1000') ], diff --git a/src/main/java/net/snowflake/ingest/connection/RequestBuilder.java b/src/main/java/net/snowflake/ingest/connection/RequestBuilder.java index 605655c88..9456ca2db 100644 --- a/src/main/java/net/snowflake/ingest/connection/RequestBuilder.java +++ b/src/main/java/net/snowflake/ingest/connection/RequestBuilder.java @@ -54,6 +54,10 @@ public class RequestBuilder { // whatever the actual host is private final String host; + private final String accountName; + + private final boolean addAccountNameInRequest; + private final String userAgentSuffix; // Reference to the telemetry service @@ -123,6 +127,8 @@ public class RequestBuilder { public static final String HTTP_HEADER_CONTENT_TYPE_JSON = "application/json"; + private static final String SF_HEADER_ACCOUNT_NAME = "Snowflake-Account"; + /** * RequestBuilder - general usage constructor * @@ -206,6 +212,36 @@ public RequestBuilder( null); } + /** + * RequestBuilder - constructor used by streaming ingest + * + * @param url - the Snowflake account to which we're connecting + * @param userName - the username of the entity loading files + * @param credential - the credential we'll use to authenticate + * @param httpClient - reference to the http client + * @param clientName - name of the client, used to uniquely identify a client if used + */ + public RequestBuilder( + SnowflakeURL url, + String userName, + Object credential, + CloseableHttpClient httpClient, + String clientName, + boolean addAccountNameInRequest) { + this( + url.getAccount(), + userName, + credential, + url.getScheme(), + url.getUrlWithoutPort(), + url.getPort(), + null, + null, + httpClient, + clientName, + addAccountNameInRequest); + } + /** * RequestBuilder - constructor used by streaming ingest * @@ -259,6 +295,46 @@ public RequestBuilder( SecurityManager securityManager, CloseableHttpClient httpClient, String clientName) { + this(accountName, + userName, + credential, + schemeName, + hostName, + portNum, + userAgentSuffix, + securityManager, + httpClient, + clientName, + false); + } + + /** + * RequestBuilder - this constructor is for testing purposes only + * + * @param accountName - the account name to which we're connecting + * @param userName - for whom are we connecting? + * @param credential - our auth credentials, either JWT key pair or OAuth credential + * @param schemeName - are we HTTP or HTTPS? + * @param hostName - the host for this snowflake instance + * @param portNum - the port number + * @param userAgentSuffix - The suffix part of HTTP Header User-Agent + * @param securityManager - The security manager for authentication + * @param httpClient - reference to the http client + * @param clientName - name of the client, used to uniquely identify a client if used + * @param addAccountNameInRequest if ture, add account name in request header + */ + public RequestBuilder( + String accountName, + String userName, + Object credential, + String schemeName, + String hostName, + int portNum, + String userAgentSuffix, + SecurityManager securityManager, + CloseableHttpClient httpClient, + String clientName, + boolean addAccountNameInRequest) { // none of these arguments should be null if (accountName == null || userName == null || credential == null) { throw new IllegalArgumentException(); @@ -281,6 +357,8 @@ public RequestBuilder( this.scheme = schemeName; this.host = hostName; this.userAgentSuffix = userAgentSuffix; + this.accountName = accountName; + this.addAccountNameInRequest = addAccountNameInRequest; // create our security/token manager if (securityManager == null) { @@ -570,6 +648,9 @@ private static void addUserAgent(HttpUriRequest request, String userAgentSuffix) public void addToken(HttpUriRequest request) { request.setHeader(HttpHeaders.AUTHORIZATION, BEARER_PARAMETER + securityManager.getToken()); request.setHeader(SF_HEADER_AUTHORIZATION_TOKEN_TYPE, this.securityManager.getTokenType()); + if(addAccountNameInRequest) { + request.setHeader(SF_HEADER_ACCOUNT_NAME, accountName); + } } /** diff --git a/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestClientFactory.java b/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestClientFactory.java index 2af794cd2..9ba87cffb 100644 --- a/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestClientFactory.java +++ b/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestClientFactory.java @@ -6,6 +6,7 @@ import java.util.Map; import java.util.Properties; + import net.snowflake.ingest.streaming.internal.SnowflakeStreamingIngestClientInternal; import net.snowflake.ingest.utils.Constants; import net.snowflake.ingest.utils.SnowflakeURL; @@ -28,6 +29,12 @@ public static class Builder { // Allows client to override some default parameter values private Map parameterOverrides; + // preset SnowflakeURL instance + private SnowflakeURL snowflakeURL; + + // flag to specify if we need to add account name in the request header + private boolean addAccountNameInRequest; + private Builder(String name) { this.name = name; } @@ -37,6 +44,16 @@ public Builder setProperties(Properties prop) { return this; } + public Builder setSnowflakeURL(SnowflakeURL snowflakeURL) { + this.snowflakeURL = snowflakeURL; + return this; + } + + public Builder setAddAccountNameInRequest(boolean addAccountNameInRequest) { + this.addAccountNameInRequest = addAccountNameInRequest; + return this; + } + public Builder setParameterOverrides(Map parameterOverrides) { this.parameterOverrides = parameterOverrides; return this; @@ -47,10 +64,17 @@ public SnowflakeStreamingIngestClient build() { Utils.assertNotNull("connection properties", this.prop); Properties prop = Utils.createProperties(this.prop); - SnowflakeURL accountURL = new SnowflakeURL(prop.getProperty(Constants.ACCOUNT_URL)); + SnowflakeURL accountURL = this.snowflakeURL; + if (accountURL == null) { + accountURL = new SnowflakeURL(prop.getProperty(Constants.ACCOUNT_URL)); + } + if (addAccountNameInRequest) { + return new SnowflakeStreamingIngestClientInternal<>( + this.name, accountURL, prop, this.parameterOverrides, addAccountNameInRequest); + } return new SnowflakeStreamingIngestClientInternal<>( - this.name, accountURL, prop, this.parameterOverrides); + this.name, accountURL, prop, this.parameterOverrides); } } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/BlobMetadata.java b/src/main/java/net/snowflake/ingest/streaming/internal/BlobMetadata.java index d2cf31a6c..fb366db9c 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/BlobMetadata.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/BlobMetadata.java @@ -18,11 +18,18 @@ class BlobMetadata { private final Constants.BdecVersion bdecVersion; private final List chunks; private final BlobStats blobStats; + private final boolean spansMixedTables; // used for testing only @VisibleForTesting BlobMetadata(String path, String md5, List chunks, BlobStats blobStats) { - this(path, md5, ParameterProvider.BLOB_FORMAT_VERSION_DEFAULT, chunks, blobStats); + this( + path, + md5, + ParameterProvider.BLOB_FORMAT_VERSION_DEFAULT, + chunks, + blobStats, + chunks == null ? false : chunks.size() > 1); } BlobMetadata( @@ -30,12 +37,14 @@ class BlobMetadata { String md5, Constants.BdecVersion bdecVersion, List chunks, - BlobStats blobStats) { + BlobStats blobStats, + boolean spansMixedTables) { this.path = path; this.md5 = md5; this.bdecVersion = bdecVersion; this.chunks = chunks; this.blobStats = blobStats; + this.spansMixedTables = spansMixedTables; } @JsonIgnore @@ -68,13 +77,19 @@ BlobStats getBlobStats() { return this.blobStats; } + @JsonProperty("spans_mixed_tables") + boolean getSpansMixedTables() { + return this.spansMixedTables; + } + /** Create {@link BlobMetadata}. */ static BlobMetadata createBlobMetadata( String path, String md5, Constants.BdecVersion bdecVersion, List chunks, - BlobStats blobStats) { - return new BlobMetadata(path, md5, bdecVersion, chunks, blobStats); + BlobStats blobStats, + boolean spansMixedTables) { + return new BlobMetadata(path, md5, bdecVersion, chunks, blobStats, spansMixedTables); } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java index cb1ef7810..f47580feb 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java @@ -568,8 +568,15 @@ BlobMetadata upload( blob.length, System.currentTimeMillis() - startTime); + // at this point we know for sure if the BDEC file has data for more than one chunk, i.e. + // spans mixed tables or not return BlobMetadata.createBlobMetadata( - blobPath, BlobBuilder.computeMD5(blob), bdecVersion, metadata, blobStats); + blobPath, + BlobBuilder.computeMD5(blob), + bdecVersion, + metadata, + blobStats, + metadata == null ? false : metadata.size() > 1); } /** diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java index 058668651..dd9d24e36 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java @@ -161,6 +161,30 @@ public class SnowflakeStreamingIngestClientInternal implements SnowflakeStrea boolean isTestMode, RequestBuilder requestBuilder, Map parameterOverrides) { + this(name, accountURL, prop, httpClient, isTestMode, requestBuilder, parameterOverrides, false); + } + + /** + * Constructor + * + * @param name the name of the client + * @param accountURL Snowflake account url + * @param prop connection properties + * @param httpClient http client for sending request + * @param isTestMode whether we're under test mode + * @param requestBuilder http request builder + * @param parameterOverrides parameters we override in case we want to set different values + * @param addAccountNameInRequest if true, will add account name in request header + */ + SnowflakeStreamingIngestClientInternal( + String name, + SnowflakeURL accountURL, + Properties prop, + CloseableHttpClient httpClient, + boolean isTestMode, + RequestBuilder requestBuilder, + Map parameterOverrides, + boolean addAccountNameInRequest) { this.parameterProvider = new ParameterProvider(parameterOverrides, prop); this.name = name; @@ -193,13 +217,13 @@ public class SnowflakeStreamingIngestClientInternal implements SnowflakeStrea prop.getProperty(Constants.OAUTH_CLIENT_SECRET), prop.getProperty(Constants.OAUTH_REFRESH_TOKEN)); } - this.requestBuilder = - new RequestBuilder( + this.requestBuilder = new RequestBuilder( accountURL, prop.get(USER).toString(), credential, this.httpClient, - String.format("%s_%s", this.name, System.currentTimeMillis())); + String.format("%s_%s", this.name, System.currentTimeMillis()), + addAccountNameInRequest); logger.logInfo("Using {} for authorization", this.requestBuilder.getAuthType()); @@ -239,6 +263,31 @@ public SnowflakeStreamingIngestClientInternal( this(name, accountURL, prop, null, false, null, parameterOverrides); } + /** + * Default Constructor + * + * @param name the name of the client + * @param accountURL Snowflake account url + * @param prop connection properties + * @param parameterOverrides map of parameters to override for this client + * @param addAccountNameInRequest if true, add account name in request header + */ + public SnowflakeStreamingIngestClientInternal( + String name, + SnowflakeURL accountURL, + Properties prop, + Map parameterOverrides, + boolean addAccountNameInRequest) { + this(name, + accountURL, + prop, + null, + false, + null, + parameterOverrides, + addAccountNameInRequest); + } + /** * Constructor for TEST ONLY * @@ -732,7 +781,10 @@ List getRetryBlobs( blobMetadata.getMD5(), blobMetadata.getVersion(), relevantChunks, - blobMetadata.getBlobStats())); + blobMetadata.getBlobStats(), + // Important to not change the spansMixedTables value in case of retries. The + // correct value is the value that the already uploaded blob has. + blobMetadata.getSpansMixedTables())); } }); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/RegisterServiceTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/RegisterServiceTest.java index 000b948e9..37eb5f96e 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/RegisterServiceTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/RegisterServiceTest.java @@ -7,6 +7,7 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import net.snowflake.ingest.utils.Pair; @@ -17,7 +18,7 @@ public class RegisterServiceTest { @Test - public void testRegisterService() { + public void testRegisterService() throws ExecutionException, InterruptedException { RegisterService rs = new RegisterService<>(null, true); Pair, CompletableFuture> blobFuture = @@ -26,6 +27,7 @@ public void testRegisterService() { CompletableFuture.completedFuture(new BlobMetadata("path", "md5", null, null))); rs.addBlobs(Collections.singletonList(blobFuture)); Assert.assertEquals(1, rs.getBlobsList().size()); + Assert.assertEquals(false, blobFuture.getValue().get().getSpansMixedTables()); List> errorBlobs = rs.registerBlobs(null); Assert.assertEquals(0, rs.getBlobsList().size()); Assert.assertEquals(0, errorBlobs.size());