From 5fbfd5ab4c169011a8d418580e8c580605390750 Mon Sep 17 00:00:00 2001 From: Hitesh Madan Date: Fri, 25 Oct 2024 17:34:48 +0000 Subject: [PATCH] pr comments --- .../SubscopedTokenExternalVolumeManager.java | 80 ++++++------------- 1 file changed, 25 insertions(+), 55 deletions(-) diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SubscopedTokenExternalVolumeManager.java b/src/main/java/net/snowflake/ingest/streaming/internal/SubscopedTokenExternalVolumeManager.java index 5977403c0..8f606b3dd 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SubscopedTokenExternalVolumeManager.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SubscopedTokenExternalVolumeManager.java @@ -20,7 +20,7 @@ class SubscopedTokenExternalVolumeManager implements IStorageManager { private static final Logging logger = new Logging(SubscopedTokenExternalVolumeManager.class); // Reference to the external volume per table - private final Map externalVolumeMap; + private final ConcurrentHashMap externalVolumeMap; /** Increasing counter to generate a unique blob name */ private final AtomicLong counter; @@ -36,14 +36,6 @@ class SubscopedTokenExternalVolumeManager implements IStorageManager { // Client prefix generated by the Snowflake server private final String clientPrefix; - // Deployment ID returned by the Snowflake server - private final Long deploymentId; - - // concurrency control to avoid creating multiple ExternalVolume objects for the same table (if - // openChannel is called - // multiple times concurrently) - private final Object registerTableLock = new Object(); - /** * Constructor for ExternalVolumeManager * @@ -62,7 +54,6 @@ class SubscopedTokenExternalVolumeManager implements IStorageManager { ClientConfigureResponse response = this.serviceClient.clientConfigure(new ClientConfigureRequest(role)); this.clientPrefix = response.getClientPrefix(); - this.deploymentId = response.getDeploymentId(); } catch (IngestResponseException | IOException e) { throw new SFException(e, ErrorCode.CLIENT_CONFIGURE_FAILURE, e.getMessage()); } @@ -86,53 +77,32 @@ public InternalStage getStorage(String fullyQualifiedTableName) { /** Informs the storage manager about a new table that's being ingested into by the client. */ @Override public void registerTable(TableRef tableRef) { - if (this.externalVolumeMap.containsKey(tableRef.fullyQualifiedName)) { - logger.logInfo( - "Skip registering table since its already been registered with the VolumeManager." - + " tableRef=%s", - tableRef); - return; - } + this.externalVolumeMap.computeIfAbsent( + tableRef.fullyQualifiedName, fqn -> createStageForTable(tableRef)); + } - // future enhancement - per table locks instead of per-client lock - synchronized (registerTableLock) { - if (this.externalVolumeMap.containsKey(tableRef.fullyQualifiedName)) { - logger.logInfo( - "Skip registering table since its already been registered with the VolumeManager." - + " tableRef=%s", - tableRef); - return; - } + private InternalStage createStageForTable(TableRef tableRef) { + // Get the locationInfo when we know this is the first register call for a given table. This is + // done to reduce the + // unnecessary overload on token generation if a client opens up a hundred channels at the same + // time. + FileLocationInfo locationInfo = getRefreshedLocation(tableRef, Optional.empty()); - // get the locationInfo when we know this is the first register call for a given table. - // This is done to reduce the unnecessary overload on token generation if a client opens up a - // hundred channels at - // the same time. - FileLocationInfo locationInfo = getRefreshedLocation(tableRef, Optional.empty()); - - try { - InternalStage externalVolume = - new InternalStage( - this, - clientName, - getClientPrefix(), - tableRef, - locationInfo, - DEFAULT_MAX_UPLOAD_RETRIES); - this.externalVolumeMap.put(tableRef.fullyQualifiedName, externalVolume); - } catch (SFException ex) { - logger.logError( - "ExtVolManager.registerTable for tableRef=% failed with exception=%s", tableRef, ex); - // allow external volume ctor's SFExceptions to bubble up directly - throw ex; - } catch (Exception err) { - logger.logError( - "ExtVolManager.registerTable for tableRef=% failed with exception=%s", tableRef, err); - throw new SFException( - err, - ErrorCode.UNABLE_TO_CONNECT_TO_STAGE, - String.format("fullyQualifiedTableName=%s", tableRef)); - } + try { + return new InternalStage( + this, clientName, getClientPrefix(), tableRef, locationInfo, DEFAULT_MAX_UPLOAD_RETRIES); + } catch (SFException ex) { + logger.logError( + "ExtVolManager.registerTable for tableRef=% failed with exception=%s", tableRef, ex); + // allow external volume ctor's SFExceptions to bubble up directly + throw ex; + } catch (Exception err) { + logger.logError( + "ExtVolManager.registerTable for tableRef=% failed with exception=%s", tableRef, err); + throw new SFException( + err, + ErrorCode.UNABLE_TO_CONNECT_TO_STAGE, + String.format("fullyQualifiedTableName=%s", tableRef)); } }