Skip to content

Commit

Permalink
Send clientname and clientkey with every registerblobs request for me…
Browse files Browse the repository at this point in the history
…tering views (#922)

send clientname and clientkey with every registerblobs request, to be used for emitting iceberg metering info
  • Loading branch information
sfc-gh-hmadan authored Dec 2, 2024
1 parent 52ac348 commit 5d11892
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,8 @@ private Optional<String> putRemote(String fullFilePath, byte[] data, int retryCo
fileTransferMetadataCopy,
inStream,
proxyProperties,
clientPrefix,
clientName,
clientPrefix,
fullFilePath));
} else {
SnowflakeFileTransferAgent.uploadWithoutConnection(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,29 @@ class RegisterBlobRequest implements IStreamingIngestRequest {
@JsonProperty("blobs")
private List<BlobMetadata> blobs;

@JsonProperty("client_name")
private String clientName;

@JsonProperty("client_key")
private String clientKey;

@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonProperty("is_iceberg")
private boolean enableIcebergStreaming;

RegisterBlobRequest(
String requestId, String role, List<BlobMetadata> blobs, boolean enableIcebergStreaming) {
String requestId,
String role,
List<BlobMetadata> blobs,
boolean enableIcebergStreaming,
String clientName,
String clientKey) {
this.requestId = requestId;
this.role = role;
this.blobs = blobs;
this.enableIcebergStreaming = enableIcebergStreaming;
this.clientName = clientName;
this.clientKey = clientKey;
}

String getRequestId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -585,9 +585,11 @@ List<List<BlobMetadata>> partitionBlobListForRegistrationRequest(List<BlobMetada
*/
void registerBlobs(List<BlobMetadata> blobs, final int executionCount) {
logger.logInfo(
"Register blob request preparing for blob={}, client={}, executionCount={}",
"Register blob request preparing for blob={}, clientName={}, clientKey={},"
+ " executionCount={}",
blobs.stream().map(BlobMetadata::getPath).collect(Collectors.toList()),
this.name,
this.storageManager.getClientPrefix(),
executionCount);

RegisterBlobResponse response = null;
Expand All @@ -597,7 +599,9 @@ void registerBlobs(List<BlobMetadata> blobs, final int executionCount) {
this.storageManager.getClientPrefix() + "_" + counter.getAndIncrement(),
this.role,
blobs,
this.parameterProvider.isEnableIcebergStreaming());
this.parameterProvider.isEnableIcebergStreaming(),
this.getName(),
this.storageManager.getClientPrefix());
response = snowflakeServiceClient.registerBlob(request, executionCount);
} catch (IOException | IngestResponseException e) {
throw new SFException(e, ErrorCode.REGISTER_BLOB_FAILURE, e.getMessage());
Expand Down

0 comments on commit 5d11892

Please sign in to comment.