From 2cda91db5af844ed6bb538088771b93ad27f0c0b Mon Sep 17 00:00:00 2001 From: Hitesh Madan Date: Mon, 2 Dec 2024 08:33:12 +0000 Subject: [PATCH] send clientname and clientkey with every registerblobs request, to be used for emitting iceberg billing info --- .../ingest/streaming/internal/InternalStage.java | 2 +- .../streaming/internal/RegisterBlobRequest.java | 15 ++++++++++++++- .../SnowflakeStreamingIngestClientInternal.java | 8 ++++++-- 3 files changed, 21 insertions(+), 4 deletions(-) diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/InternalStage.java b/src/main/java/net/snowflake/ingest/streaming/internal/InternalStage.java index 65dc97070..c7df36d65 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/InternalStage.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/InternalStage.java @@ -181,8 +181,8 @@ private Optional putRemote(String fullFilePath, byte[] data, int retryCo fileTransferMetadataCopy, inStream, proxyProperties, - clientPrefix, clientName, + clientPrefix, fullFilePath)); } else { SnowflakeFileTransferAgent.uploadWithoutConnection( diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/RegisterBlobRequest.java b/src/main/java/net/snowflake/ingest/streaming/internal/RegisterBlobRequest.java index 43c42dffc..0fc2518ec 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/RegisterBlobRequest.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/RegisterBlobRequest.java @@ -20,16 +20,29 @@ class RegisterBlobRequest implements IStreamingIngestRequest { @JsonProperty("blobs") private List blobs; + @JsonProperty("client_name") + private String clientName; + + @JsonProperty("client_key") + private String clientKey; + @JsonInclude(JsonInclude.Include.NON_NULL) @JsonProperty("is_iceberg") private boolean enableIcebergStreaming; RegisterBlobRequest( - String requestId, String role, List blobs, boolean enableIcebergStreaming) { + String requestId, + String role, + List blobs, + boolean enableIcebergStreaming, + String clientName, + String clientKey) { this.requestId = requestId; this.role = role; this.blobs = blobs; this.enableIcebergStreaming = enableIcebergStreaming; + this.clientName = clientName; + this.clientKey = clientKey; } String getRequestId() { diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java index f10f683c3..8eb5cc066 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java @@ -585,9 +585,11 @@ List> partitionBlobListForRegistrationRequest(List blobs, final int executionCount) { logger.logInfo( - "Register blob request preparing for blob={}, client={}, executionCount={}", + "Register blob request preparing for blob={}, clientName={}, clientKey={}," + + " executionCount={}", blobs.stream().map(BlobMetadata::getPath).collect(Collectors.toList()), this.name, + this.storageManager.getClientPrefix(), executionCount); RegisterBlobResponse response = null; @@ -597,7 +599,9 @@ void registerBlobs(List blobs, final int executionCount) { this.storageManager.getClientPrefix() + "_" + counter.getAndIncrement(), this.role, blobs, - this.parameterProvider.isEnableIcebergStreaming()); + this.parameterProvider.isEnableIcebergStreaming(), + this.getName(), + this.storageManager.getClientPrefix()); response = snowflakeServiceClient.registerBlob(request, executionCount); } catch (IOException | IngestResponseException e) { throw new SFException(e, ErrorCode.REGISTER_BLOB_FAILURE, e.getMessage());