Skip to content

Commit

Permalink
pr comments
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-hmadan committed Oct 25, 2024
1 parent 3e75af2 commit 5fbfd5a
Showing 1 changed file with 25 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
class SubscopedTokenExternalVolumeManager implements IStorageManager {
private static final Logging logger = new Logging(SubscopedTokenExternalVolumeManager.class);
// Reference to the external volume per table
private final Map<String, InternalStage> externalVolumeMap;
private final ConcurrentHashMap<String, InternalStage> externalVolumeMap;

/** Increasing counter to generate a unique blob name */
private final AtomicLong counter;
Expand All @@ -36,14 +36,6 @@ class SubscopedTokenExternalVolumeManager implements IStorageManager {
// Client prefix generated by the Snowflake server
private final String clientPrefix;

// Deployment ID returned by the Snowflake server
private final Long deploymentId;

// concurrency control to avoid creating multiple ExternalVolume objects for the same table (if
// openChannel is called
// multiple times concurrently)
private final Object registerTableLock = new Object();

/**
* Constructor for ExternalVolumeManager
*
Expand All @@ -62,7 +54,6 @@ class SubscopedTokenExternalVolumeManager implements IStorageManager {
ClientConfigureResponse response =
this.serviceClient.clientConfigure(new ClientConfigureRequest(role));
this.clientPrefix = response.getClientPrefix();
this.deploymentId = response.getDeploymentId();
} catch (IngestResponseException | IOException e) {
throw new SFException(e, ErrorCode.CLIENT_CONFIGURE_FAILURE, e.getMessage());
}
Expand All @@ -86,53 +77,32 @@ public InternalStage getStorage(String fullyQualifiedTableName) {
/** Informs the storage manager about a new table that's being ingested into by the client. */
@Override
public void registerTable(TableRef tableRef) {
if (this.externalVolumeMap.containsKey(tableRef.fullyQualifiedName)) {
logger.logInfo(
"Skip registering table since its already been registered with the VolumeManager."
+ " tableRef=%s",
tableRef);
return;
}
this.externalVolumeMap.computeIfAbsent(
tableRef.fullyQualifiedName, fqn -> createStageForTable(tableRef));
}

// future enhancement - per table locks instead of per-client lock
synchronized (registerTableLock) {
if (this.externalVolumeMap.containsKey(tableRef.fullyQualifiedName)) {
logger.logInfo(
"Skip registering table since its already been registered with the VolumeManager."
+ " tableRef=%s",
tableRef);
return;
}
private InternalStage createStageForTable(TableRef tableRef) {
// Get the locationInfo when we know this is the first register call for a given table. This is
// done to reduce the
// unnecessary overload on token generation if a client opens up a hundred channels at the same
// time.
FileLocationInfo locationInfo = getRefreshedLocation(tableRef, Optional.empty());

// get the locationInfo when we know this is the first register call for a given table.
// This is done to reduce the unnecessary overload on token generation if a client opens up a
// hundred channels at
// the same time.
FileLocationInfo locationInfo = getRefreshedLocation(tableRef, Optional.empty());

try {
InternalStage externalVolume =
new InternalStage(
this,
clientName,
getClientPrefix(),
tableRef,
locationInfo,
DEFAULT_MAX_UPLOAD_RETRIES);
this.externalVolumeMap.put(tableRef.fullyQualifiedName, externalVolume);
} catch (SFException ex) {
logger.logError(
"ExtVolManager.registerTable for tableRef=% failed with exception=%s", tableRef, ex);
// allow external volume ctor's SFExceptions to bubble up directly
throw ex;
} catch (Exception err) {
logger.logError(
"ExtVolManager.registerTable for tableRef=% failed with exception=%s", tableRef, err);
throw new SFException(
err,
ErrorCode.UNABLE_TO_CONNECT_TO_STAGE,
String.format("fullyQualifiedTableName=%s", tableRef));
}
try {
return new InternalStage(
this, clientName, getClientPrefix(), tableRef, locationInfo, DEFAULT_MAX_UPLOAD_RETRIES);
} catch (SFException ex) {
logger.logError(
"ExtVolManager.registerTable for tableRef=% failed with exception=%s", tableRef, ex);
// allow external volume ctor's SFExceptions to bubble up directly
throw ex;
} catch (Exception err) {
logger.logError(
"ExtVolManager.registerTable for tableRef=% failed with exception=%s", tableRef, err);
throw new SFException(
err,
ErrorCode.UNABLE_TO_CONNECT_TO_STAGE,
String.format("fullyQualifiedTableName=%s", tableRef));
}
}

Expand Down

0 comments on commit 5fbfd5a

Please sign in to comment.