Skip to content

Commit

Permalink
Delete remove logic
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-alhuang committed Jul 11, 2024
1 parent 4528682 commit 76aadbb
Show file tree
Hide file tree
Showing 9 changed files with 10 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ public enum OnErrorOption {
private final ZoneId defaultTimezone;

private final String offsetToken;
private final boolean isOffsetTokenProvided;

private final OffsetTokenVerificationFunction offsetTokenVerificationFunction;

Expand All @@ -59,7 +58,6 @@ public static class OpenChannelRequestBuilder {
private ZoneId defaultTimezone;

private String offsetToken;
private boolean isOffsetTokenProvided = false;

private OffsetTokenVerificationFunction offsetTokenVerificationFunction;

Expand Down Expand Up @@ -95,7 +93,6 @@ public OpenChannelRequestBuilder setDefaultTimezone(ZoneId defaultTimezone) {

public OpenChannelRequestBuilder setOffsetToken(String offsetToken) {
this.offsetToken = offsetToken;
this.isOffsetTokenProvided = true;
return this;
}

Expand Down Expand Up @@ -125,7 +122,6 @@ private OpenChannelRequest(OpenChannelRequestBuilder builder) {
this.onErrorOption = builder.onErrorOption;
this.defaultTimezone = builder.defaultTimezone;
this.offsetToken = builder.offsetToken;
this.isOffsetTokenProvided = builder.isOffsetTokenProvided;
this.offsetTokenVerificationFunction = builder.offsetTokenVerificationFunction;
}

Expand Down Expand Up @@ -161,10 +157,6 @@ public String getOffsetToken() {
return this.offsetToken;
}

public boolean isOffsetTokenProvided() {
return this.isOffsetTokenProvided;
}

public OffsetTokenVerificationFunction getOffsetTokenVerificationFunction() {
return this.offsetTokenVerificationFunction;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,4 @@ void invalidateChannelIfSequencersMatch(
int getSize() {
return cache.size();
}

/** Get the number of channels for a given table */
int getSizePerTable(String fullyQualifiedTableName) {
ConcurrentHashMap<String, SnowflakeStreamingIngestChannelInternal<T>> channelsMapPerTable =
cache.get(fullyQualifiedTableName);
return channelsMapPerTable == null ? 0 : channelsMapPerTable.size();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,18 +123,6 @@ public void addStorage(
}
}

/**
* Remove the storage of a target table
*
* @param dbName the database name
* @param schemaName the schema name
* @param tableName the table name
*/
@Override
public void removeStorage(String dbName, String schemaName, String tableName) {
this.externalVolumeMap.remove(Utils.getFullyQualifiedTableName(dbName, schemaName, tableName));
}

/**
* Gets the latest file location info (with a renewed short-lived access token) for the specified
* location
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ List<List<ChannelData<T>>> getData() {
// Reference to the channel cache
private final ChannelCache<T> channelCache;

// Reference to the Stream Ingest storage manager
// Reference to the Streaming Ingest storage manager
private final StorageManager<T, ?> storageManager;

// Reference to register service
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,6 @@ public StreamingIngestStorage<T, InternalStageLocation> getStorage(
public void addStorage(
String dbName, String schemaName, String tableName, FileLocationInfo fileLocationInfo) {}

/** Remove storage from the manager. Do nothing as there's only one stage in non-Iceberg mode. */
@Override
public void removeStorage(String dbName, String schemaName, String tableName) {}

/**
* Gets the latest file location info (with a renewed short-lived access token) for the specified
* location
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -774,16 +774,9 @@ void setNeedFlush() {
this.flushService.setNeedFlush();
}

/**
* Remove the channel in the channel cache if the channel sequencer matches. Update storage
* manager if needed.
*/
/** Remove the channel in the channel cache if the channel sequencer matches. Update storage */
void removeChannelIfSequencersMatch(SnowflakeStreamingIngestChannelInternal<T> channel) {
this.channelCache.removeChannelIfSequencersMatch(channel);
if (this.channelCache.getSizePerTable(channel.getFullyQualifiedName()) == 0) {
this.storageManager.removeStorage(
channel.getDBName(), channel.getSchemaName(), channel.getTableName());
}
}

/** Get whether we're running under test mode */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,6 @@ interface StorageManager<T, TLocation> {
void addStorage(
String dbName, String schemaName, String tableName, FileLocationInfo fileLocationInfo);

/**
* Remove the storage of a target table
*
* @param dbName the database name
* @param schemaName the schema name
* @param tableName the table name
*/
void removeStorage(String dbName, String schemaName, String tableName);

/**
* Gets the latest file location info (with a renewed short-lived access token) for the specified
* location
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ private abstract static class TestContext<T> implements AutoCloseable {
ChannelCache<T> channelCache;
final Map<String, SnowflakeStreamingIngestChannelInternal<T>> channels = new HashMap<>();
FlushService<T> flushService;
StorageManager<T, InternalStageLocation> storageManager;
StorageManager<T, ?> storageManager;
StreamingIngestStorage storage;
ParameterProvider parameterProvider;
InternalParameterProvider internalParameterProvider;
Expand All @@ -108,8 +108,12 @@ private abstract static class TestContext<T> implements AutoCloseable {
client = Mockito.mock(SnowflakeStreamingIngestClientInternal.class);
Mockito.when(client.getParameterProvider()).thenReturn(parameterProvider);
Mockito.when(client.getInternalParameterProvider()).thenReturn(internalParameterProvider);
storageManager = Mockito.spy(new InternalStageManager<>(true, "role", "client", null));
Mockito.when(storageManager.getStorage(ArgumentMatchers.any())).thenReturn(storage);
storageManager =
Mockito.spy(
isIcebergMode
? new ExternalVolumeManager<>(true, "role", "client", null)
: new InternalStageManager<>(true, "role", "client", null));
Mockito.doReturn(storage).when(storageManager).getStorage(ArgumentMatchers.any());
Mockito.when(storageManager.getClientPrefix()).thenReturn("client_prefix");
channelCache = new ChannelCache<>();
Mockito.when(client.getChannelCache()).thenReturn(channelCache);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ public void testOpenChannelRequestCreationSuccess() {

Assert.assertEquals(
"STREAMINGINGEST_TEST.PUBLIC.T_STREAMINGINGEST", request.getFullyQualifiedTableName());
Assert.assertFalse(request.isOffsetTokenProvided());
Assert.assertNull(request.getOffsetToken());
}

@Test
Expand All @@ -295,7 +295,6 @@ public void testOpenChannelRequesCreationtWithOffsetToken() {
Assert.assertEquals(
"STREAMINGINGEST_TEST.PUBLIC.T_STREAMINGINGEST", request.getFullyQualifiedTableName());
Assert.assertEquals("TEST_TOKEN", request.getOffsetToken());
Assert.assertTrue(request.isOffsetTokenProvided());
}

@Test
Expand Down

0 comments on commit 76aadbb

Please sign in to comment.