From cbfa5d34252d933799962777c0d0e70e2756888c Mon Sep 17 00:00:00 2001 From: Alec Huang Date: Thu, 21 Mar 2024 13:08:53 -0700 Subject: [PATCH] Add channel invalidation error message --- .../ingest/streaming/internal/ChannelCache.java | 9 ++++++--- .../ingest/streaming/internal/FlushService.java | 7 ++++--- .../ingest/streaming/internal/RegisterService.java | 2 +- .../SnowflakeStreamingIngestChannelInternal.java | 9 +++++++-- .../internal/SnowflakeStreamingIngestClientInternal.java | 5 +++-- .../snowflake/ingest/ingest_error_messages.properties | 2 +- .../ingest/streaming/internal/ChannelCacheTest.java | 9 ++++++--- .../ingest/streaming/internal/FlushServiceTest.java | 2 +- .../internal/SnowflakeStreamingIngestChannelTest.java | 4 ++-- 9 files changed, 31 insertions(+), 18 deletions(-) diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ChannelCache.java b/src/main/java/net/snowflake/ingest/streaming/internal/ChannelCache.java index 3926832fb..1d1f03b5c 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ChannelCache.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ChannelCache.java @@ -37,7 +37,9 @@ void addChannel(SnowflakeStreamingIngestChannelInternal channel) { channels.put(channel.getName(), channel); // Invalidate old channel if it exits to block new inserts and return error to users earlier if (oldChannel != null) { - oldChannel.invalidate("removed from cache"); + String errorMessage = + String.format("Old channel removed from cache, channelName=%s", channel.getName()); + oldChannel.invalidate("removed from cache", errorMessage); } } @@ -82,14 +84,15 @@ void invalidateChannelIfSequencersMatch( String schemaName, String tableName, String channelName, - Long channelSequencer) { + Long channelSequencer, + String invalidateCause) { String fullyQualifiedTableName = String.format("%s.%s.%s", dbName, schemaName, tableName); ConcurrentHashMap> channelsMapPerTable = cache.get(fullyQualifiedTableName); if (channelsMapPerTable != null) { SnowflakeStreamingIngestChannelInternal channel = channelsMapPerTable.get(channelName); if (channel != null && channel.getChannelSequencer().equals(channelSequencer)) { - channel.invalidate("invalidate with matched sequencer"); + channel.invalidate("invalidate with matched sequencer", invalidateCause); } } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java index 2324adda8..9ecaccb9f 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java @@ -464,7 +464,7 @@ && shouldStopProcessing( } if (e instanceof IOException) { - invalidateAllChannelsInBlob(blobData); + invalidateAllChannelsInBlob(blobData, errorMessage); return null; } else if (e instanceof NoSuchAlgorithmException) { throw new SFException(e, ErrorCode.MD5_HASHING_NOT_AVAILABLE); @@ -657,7 +657,7 @@ String getBlobPath(Calendar calendar, String clientPrefix) { * * @param blobData list of channels that belongs to the blob */ - void invalidateAllChannelsInBlob(List>> blobData) { + void invalidateAllChannelsInBlob(List>> blobData, String errorMessage) { blobData.forEach( chunkData -> chunkData.forEach( @@ -669,7 +669,8 @@ void invalidateAllChannelsInBlob(List>> blobData) { channelData.getChannelContext().getSchemaName(), channelData.getChannelContext().getTableName(), channelData.getChannelContext().getName(), - channelData.getChannelContext().getChannelSequencer()); + channelData.getChannelContext().getChannelSequencer(), + errorMessage); })); } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/RegisterService.java b/src/main/java/net/snowflake/ingest/streaming/internal/RegisterService.java index 0be99b821..29913a2b5 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/RegisterService.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/RegisterService.java @@ -175,7 +175,7 @@ List> registerBlobs(Map latencyT } this.owningClient .getFlushService() - .invalidateAllChannelsInBlob(futureBlob.getKey().getData()); + .invalidateAllChannelsInBlob(futureBlob.getKey().getData(), errorMessage); errorBlobs.add(futureBlob.getKey()); retry = 0; idx++; diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java index 578f8e6c1..8ef5c6cc2 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java @@ -58,6 +58,9 @@ class SnowflakeStreamingIngestChannelInternal implements SnowflakeStreamingIn // Internal map of column name -> column properties private final Map tableColumns; + // The cause of channel invalidation + private String invalidateCause; + /** * Constructor for TESTING ONLY which allows us to set the test mode * @@ -217,8 +220,9 @@ public boolean isValid() { } /** Mark the channel as invalid, and release resources */ - void invalidate(String message) { + void invalidate(String message, String invalidateCause) { this.channelState.invalidate(); + this.invalidateCause = invalidateCause; this.rowBuffer.close("invalidate"); logger.logWarn( "Channel is invalidated, name={}, channel sequencer={}, row sequencer={}, message={}", @@ -500,7 +504,8 @@ private void checkValidation() { if (!isValid()) { this.owningClient.removeChannelIfSequencersMatch(this); this.rowBuffer.close("checkValidation"); - throw new SFException(ErrorCode.INVALID_CHANNEL, getFullyQualifiedName()); + throw new SFException( + ErrorCode.INVALID_CHANNEL, getFullyQualifiedName(), this.invalidateCause); } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java index 62fc265ff..3c4896ee1 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java @@ -650,7 +650,7 @@ void registerBlobs(List blobs, final int executionCount) { String.format( "Channel has been invalidated because of failure" + " response, name=%s, channel_sequencer=%d," - + " status_code=%d, message=%s," + + " status_code=%d, message=%s," + " executionCount=%d", channelStatus.getChannelName(), channelStatus.getChannelSequencer(), @@ -668,7 +668,8 @@ void registerBlobs(List blobs, final int executionCount) { chunkStatus.getSchemaName(), chunkStatus.getTableName(), channelStatus.getChannelName(), - channelStatus.getChannelSequencer()); + channelStatus.getChannelSequencer(), + errorMessage); } } }))); diff --git a/src/main/resources/net/snowflake/ingest/ingest_error_messages.properties b/src/main/resources/net/snowflake/ingest/ingest_error_messages.properties index 268afd051..99b9f0c27 100644 --- a/src/main/resources/net/snowflake/ingest/ingest_error_messages.properties +++ b/src/main/resources/net/snowflake/ingest/ingest_error_messages.properties @@ -15,7 +15,7 @@ 0010=Missing {0} in config file. 0011=Failed to upload blob. 0012=Failed to cleanup resources during {0}. -0013=Channel {0} is invalid and might contain uncommitted rows, please consider reopening the channel to restart. +0013=Channel {0} is invalid and might contain uncommitted rows, please consider reopening the channel to restart. Channel invalidated cause: "{1}". 0014=Channel {0} is closed, please reopen the channel to restart. 0015=Invalid Snowflake URL, URL format: 'https://..snowflakecomputing.com:443', 'https://' and ':443' are optional. 0016=Client is closed, please recreate to restart. diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/ChannelCacheTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/ChannelCacheTest.java index ea88d618c..947908ef9 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/ChannelCacheTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/ChannelCacheTest.java @@ -213,7 +213,8 @@ public void testInvalidateChannel() { channel1.getSchemaName(), channel1.getTableName(), channel1.getName(), - channel1.getChannelSequencer()); + channel1.getChannelSequencer(), + "Invalidated by test"); Assert.assertFalse(channel1.isValid()); cache.invalidateChannelIfSequencersMatch( @@ -221,7 +222,8 @@ public void testInvalidateChannel() { channel2.getSchemaName(), channel2.getTableName(), channel2.getName(), - channel2.getChannelSequencer()); + channel2.getChannelSequencer(), + "Invalidated by test"); Assert.assertFalse(channel2.isValid()); cache.invalidateChannelIfSequencersMatch( @@ -229,7 +231,8 @@ public void testInvalidateChannel() { channel3.getSchemaName(), channel3.getTableName(), channel3.getName(), - channel3.getChannelSequencer()); + channel3.getChannelSequencer(), + "Invalidated by test"); Assert.assertFalse(channel3.isValid()); } } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java index 28d1206a2..f200c7177 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java @@ -916,7 +916,7 @@ public void testInvalidateChannels() { Mockito.when(stage.getClientPrefix()).thenReturn("client_prefix"); FlushService flushService = new FlushService<>(client, channelCache, stage, false); - flushService.invalidateAllChannelsInBlob(blobData); + flushService.invalidateAllChannelsInBlob(blobData, "Invalidated by test"); Assert.assertFalse(channel1.isValid()); Assert.assertTrue(channel2.isValid()); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java index 780563d28..87e3f8f11 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java @@ -175,7 +175,7 @@ public void testChannelValid() { UTC); Assert.assertTrue(channel.isValid()); - channel.invalidate("from testChannelValid"); + channel.invalidate("from testChannelValid", "Invalidated by test"); Assert.assertFalse(channel.isValid()); // Can't insert rows to invalid channel @@ -881,7 +881,7 @@ public void testDropOnCloseInvalidChannel() throws Exception { Mockito.doReturn(response).when(client).getChannelsStatus(Mockito.any()); Assert.assertFalse(channel.isClosed()); - channel.invalidate("test"); + channel.invalidate("test", "Invalidated by test"); Mockito.doNothing().when(client).dropChannel(Mockito.any()); Assert.assertThrows(SFException.class, () -> channel.close(true).get()); Mockito.verify(client, Mockito.never()).dropChannel(Mockito.any());