Skip to content

Commit

Permalink
push for compare
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-rcheng committed Nov 20, 2023
1 parent 15953d5 commit 0ce288a
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,15 @@ public static boolean isClientValid(SnowflakeStreamingIngestClient client) {
* @return A string with the loggable properties
*/
public static String getLoggableClientProperties(Properties properties) {
return properties.entrySet().stream()
.filter(
propKvp ->
LOGGABLE_STREAMING_CONFIG_PROPERTIES.stream()
.anyMatch(propKvp.getKey().toString()::equalsIgnoreCase))
.collect(Collectors.toList())
.toString();
return properties == null
? ""
: properties.entrySet().stream()
.filter(
propKvp ->
LOGGABLE_STREAMING_CONFIG_PROPERTIES.stream()
.anyMatch(propKvp.getKey().toString()::equalsIgnoreCase))
.collect(Collectors.toList())
.toString();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig;
import com.snowflake.kafka.connector.internal.KCLogger;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.Properties;

import net.snowflake.ingest.internal.com.github.benmanes.caffeine.cache.Caffeine;
import net.snowflake.ingest.internal.com.github.benmanes.caffeine.cache.LoadingCache;
import net.snowflake.ingest.internal.com.github.benmanes.caffeine.cache.RemovalCause;
Expand Down Expand Up @@ -64,7 +65,7 @@ public static StreamingClientProvider getStreamingClientProviderForTests(

/** ONLY FOR TESTING - private constructor to inject properties for testing */
private StreamingClientProvider(
LoadingCache<Map<String, String>, SnowflakeStreamingIngestClient> registeredClients,
LoadingCache<Properties, SnowflakeStreamingIngestClient> registeredClients,
StreamingClientHandler streamingClientHandler) {
this();
this.registeredClients = registeredClients;
Expand All @@ -73,13 +74,10 @@ private StreamingClientProvider(

private static final KCLogger LOGGER = new KCLogger(StreamingClientProvider.class.getName());
private StreamingClientHandler streamingClientHandler;
private Lock providerLock;

/**
* Maps the client's properties to the created SnowflakeStreamingIngestClient with the connectors
* configs. See {@link StreamingUtils#convertConfigForStreamingClient(Map)}
*/
private LoadingCache<Map<String, String>, SnowflakeStreamingIngestClient> registeredClients;
// if the one client optimization is enabled, we cache the created clients based on corresponding
// Streaming properties
private LoadingCache<Properties, SnowflakeStreamingIngestClient> registeredClients;

// private constructor for singleton
private StreamingClientProvider() {
Expand All @@ -91,13 +89,12 @@ private StreamingClientProvider() {
(Map<String, String> key,
SnowflakeStreamingIngestClient client,
RemovalCause removalCause) -> {
this.streamingClientHandler.closeClient(client);
LOGGER.info(
"Removed registered client {} due to {}",
client.getName(),
removalCause.toString());
})
.build(this.streamingClientHandler::createClient);
.build();
}

/**
Expand Down Expand Up @@ -147,13 +144,12 @@ public void closeClient(
SnowflakeStreamingIngestClient registeredClient =
this.registeredClients.getIfPresent(connectorConfig);
if (registeredClient != null) {
// invalidations are processed on the next get or in the background, so we still need to close the client here
this.registeredClients.invalidate(connectorConfig);
}

this.streamingClientHandler.closeClient(client);
this.streamingClientHandler.closeClient(
registeredClient); // in case the registered client is somehow different from the given
// client
this.streamingClientHandler.closeClient(registeredClient); // in case the given client is different for some reason
}

public Map<Map<String, String>, SnowflakeStreamingIngestClient> getRegisteredClients() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,4 +176,10 @@ public void testGetLoggableClientProperties() {
}
}
}

@Test
public void testGetLoggableClientInvalidProperties() {
assert StreamingClientHandler.getLoggableClientProperties(null).equals("");
assert StreamingClientHandler.getLoggableClientProperties(new Properties()).equals("[]");
}
}

0 comments on commit 0ce288a

Please sign in to comment.