Skip to content

Commit

Permalink
SNOW-1000828 Expose specific error messages as part of the channel in…
Browse files Browse the repository at this point in the history
…validation exception (#721)

* Add channel invalidation error message
  • Loading branch information
sfc-gh-alhuang authored Mar 25, 2024
1 parent b4e04b2 commit 4d5f072
Show file tree
Hide file tree
Showing 9 changed files with 32 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ void addChannel(SnowflakeStreamingIngestChannelInternal<T> channel) {
channels.put(channel.getName(), channel);
// Invalidate old channel if it exits to block new inserts and return error to users earlier
if (oldChannel != null) {
oldChannel.invalidate("removed from cache");
String invalidationCause =
String.format("Old channel removed from cache, channelName=%s", channel.getName());
oldChannel.invalidate("removed from cache", invalidationCause);
}
}

Expand Down Expand Up @@ -82,14 +84,15 @@ void invalidateChannelIfSequencersMatch(
String schemaName,
String tableName,
String channelName,
Long channelSequencer) {
Long channelSequencer,
String invalidationCause) {
String fullyQualifiedTableName = String.format("%s.%s.%s", dbName, schemaName, tableName);
ConcurrentHashMap<String, SnowflakeStreamingIngestChannelInternal<T>> channelsMapPerTable =
cache.get(fullyQualifiedTableName);
if (channelsMapPerTable != null) {
SnowflakeStreamingIngestChannelInternal<T> channel = channelsMapPerTable.get(channelName);
if (channel != null && channel.getChannelSequencer().equals(channelSequencer)) {
channel.invalidate("invalidate with matched sequencer");
channel.invalidate("invalidate with matched sequencer", invalidationCause);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ && shouldStopProcessing(
}

if (e instanceof IOException) {
invalidateAllChannelsInBlob(blobData);
invalidateAllChannelsInBlob(blobData, errorMessage);
return null;
} else if (e instanceof NoSuchAlgorithmException) {
throw new SFException(e, ErrorCode.MD5_HASHING_NOT_AVAILABLE);
Expand Down Expand Up @@ -657,7 +657,8 @@ String getBlobPath(Calendar calendar, String clientPrefix) {
*
* @param blobData list of channels that belongs to the blob
*/
<CD> void invalidateAllChannelsInBlob(List<List<ChannelData<CD>>> blobData) {
<CD> void invalidateAllChannelsInBlob(
List<List<ChannelData<CD>>> blobData, String invalidationCause) {
blobData.forEach(
chunkData ->
chunkData.forEach(
Expand All @@ -669,7 +670,8 @@ <CD> void invalidateAllChannelsInBlob(List<List<ChannelData<CD>>> blobData) {
channelData.getChannelContext().getSchemaName(),
channelData.getChannelContext().getTableName(),
channelData.getChannelContext().getName(),
channelData.getChannelContext().getChannelSequencer());
channelData.getChannelContext().getChannelSequencer(),
invalidationCause);
}));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ List<FlushService.BlobData<T>> registerBlobs(Map<String, Timer.Context> latencyT
}
this.owningClient
.getFlushService()
.invalidateAllChannelsInBlob(futureBlob.getKey().getData());
.invalidateAllChannelsInBlob(futureBlob.getKey().getData(), errorMessage);
errorBlobs.add(futureBlob.getKey());
retry = 0;
idx++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ class SnowflakeStreamingIngestChannelInternal<T> implements SnowflakeStreamingIn
// Internal map of column name -> column properties
private final Map<String, ColumnProperties> tableColumns;

// The latest cause of channel invalidation
private String invalidationCause;

/**
* Constructor for TESTING ONLY which allows us to set the test mode
*
Expand Down Expand Up @@ -217,8 +220,9 @@ public boolean isValid() {
}

/** Mark the channel as invalid, and release resources */
void invalidate(String message) {
void invalidate(String message, String invalidationCause) {
this.channelState.invalidate();
this.invalidationCause = invalidationCause;
this.rowBuffer.close("invalidate");
logger.logWarn(
"Channel is invalidated, name={}, channel sequencer={}, row sequencer={}, message={}",
Expand Down Expand Up @@ -500,7 +504,8 @@ private void checkValidation() {
if (!isValid()) {
this.owningClient.removeChannelIfSequencersMatch(this);
this.rowBuffer.close("checkValidation");
throw new SFException(ErrorCode.INVALID_CHANNEL, getFullyQualifiedName());
throw new SFException(
ErrorCode.INVALID_CHANNEL, getFullyQualifiedName(), this.invalidationCause);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,7 @@ void registerBlobs(List<BlobMetadata> blobs, final int executionCount) {
String.format(
"Channel has been invalidated because of failure"
+ " response, name=%s, channel_sequencer=%d,"
+ " status_code=%d, message=%s,"
+ " status_code=%d, message=%s,"
+ " executionCount=%d",
channelStatus.getChannelName(),
channelStatus.getChannelSequencer(),
Expand All @@ -668,7 +668,8 @@ void registerBlobs(List<BlobMetadata> blobs, final int executionCount) {
chunkStatus.getSchemaName(),
chunkStatus.getTableName(),
channelStatus.getChannelName(),
channelStatus.getChannelSequencer());
channelStatus.getChannelSequencer(),
errorMessage);
}
}
})));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
0010=Missing {0} in config file.
0011=Failed to upload blob.
0012=Failed to cleanup resources during {0}.
0013=Channel {0} is invalid and might contain uncommitted rows, please consider reopening the channel to restart.
0013=Channel {0} is invalid and might contain uncommitted rows, please consider reopening the channel to restart. Channel invalidation cause: "{1}".
0014=Channel {0} is closed, please reopen the channel to restart.
0015=Invalid Snowflake URL, URL format: 'https://<account_name>.<region_name>.snowflakecomputing.com:443', 'https://' and ':443' are optional.
0016=Client is closed, please recreate to restart.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,23 +213,26 @@ public void testInvalidateChannel() {
channel1.getSchemaName(),
channel1.getTableName(),
channel1.getName(),
channel1.getChannelSequencer());
channel1.getChannelSequencer(),
"Invalidated by test");
Assert.assertFalse(channel1.isValid());

cache.invalidateChannelIfSequencersMatch(
channel2.getDBName(),
channel2.getSchemaName(),
channel2.getTableName(),
channel2.getName(),
channel2.getChannelSequencer());
channel2.getChannelSequencer(),
"Invalidated by test");
Assert.assertFalse(channel2.isValid());

cache.invalidateChannelIfSequencersMatch(
channel3.getDBName(),
channel3.getSchemaName(),
channel3.getTableName(),
channel3.getName(),
channel3.getChannelSequencer());
channel3.getChannelSequencer(),
"Invalidated by test");
Assert.assertFalse(channel3.isValid());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -916,7 +916,7 @@ public void testInvalidateChannels() {
Mockito.when(stage.getClientPrefix()).thenReturn("client_prefix");
FlushService<StubChunkData> flushService =
new FlushService<>(client, channelCache, stage, false);
flushService.invalidateAllChannelsInBlob(blobData);
flushService.invalidateAllChannelsInBlob(blobData, "Invalidated by test");

Assert.assertFalse(channel1.isValid());
Assert.assertTrue(channel2.isValid());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ public void testChannelValid() {
UTC);

Assert.assertTrue(channel.isValid());
channel.invalidate("from testChannelValid");
channel.invalidate("from testChannelValid", "Invalidated by test");
Assert.assertFalse(channel.isValid());

// Can't insert rows to invalid channel
Expand Down Expand Up @@ -881,7 +881,7 @@ public void testDropOnCloseInvalidChannel() throws Exception {
Mockito.doReturn(response).when(client).getChannelsStatus(Mockito.any());

Assert.assertFalse(channel.isClosed());
channel.invalidate("test");
channel.invalidate("test", "Invalidated by test");
Mockito.doNothing().when(client).dropChannel(Mockito.any());
Assert.assertThrows(SFException.class, () -> channel.close(true).get());
Mockito.verify(client, Mockito.never()).dropChannel(Mockito.any());
Expand Down

0 comments on commit 4d5f072

Please sign in to comment.