Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log exceptions for dynamically added streams with misaligned regions, rather than allowing them to propagate #1326

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ public class Scheduler implements Runnable {
private static final String PENDING_STREAMS_DELETION_COUNT = "StreamsPendingDeletion.Count";
private static final String DELETED_STREAMS_COUNT = "DeletedStreams.Count";
private static final String NON_EXISTING_STREAM_DELETE_COUNT = "NonExistingStreamDelete.Count";
private static final String IGNORED_STREAMS_COUNT = "IgnoredStreams.Count";

private final SchedulerLog slog = new SchedulerLog();

Expand Down Expand Up @@ -504,10 +505,17 @@ Set<StreamIdentifier> checkAndSyncStreamShardsAndLeases()
for (StreamIdentifier streamIdentifier : newStreamConfigMap.keySet()) {
if (!currentStreamConfigMap.containsKey(streamIdentifier)) {
final StreamConfig streamConfig = newStreamConfigMap.get(streamIdentifier);
try {
currentStreamConfigMap.put(streamIdentifier, streamConfig);
} catch (IllegalArgumentException e) {
log.error("Failed to add stream {} to application. This stream will not be processed.",
streamConfig.streamIdentifier(), e);
MetricsUtil.addCount(metricsScope, IGNORED_STREAMS_COUNT, 1, MetricsLevel.DETAILED);
continue;
}
log.info("Found new stream to process: {}. Syncing shards of that stream.", streamConfig);
ShardSyncTaskManager shardSyncTaskManager = createOrGetShardSyncTaskManager(streamConfig);
shardSyncTaskManager.submitShardSyncTask();
currentStreamConfigMap.put(streamIdentifier, streamConfig);
streamsSynced.add(streamIdentifier);
} else {
log.debug("{} is already being processed - skipping shard sync.", streamIdentifier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1154,7 +1154,7 @@ public void testOrphanStreamConfigIsPopulatedWithArn() {
}

@Test
public void testMismatchingArnRegionAndKinesisClientRegionThrowsException() {
public void testMismatchingArnRegionAndKinesisClientRegionOnSchedulerConstructionThrowsException() {
final Region streamArnRegion = Region.US_WEST_1;
Assert.assertNotEquals(streamArnRegion, kinesisClient.serviceClientConfiguration().region());

Expand All @@ -1169,6 +1169,35 @@ public void testMismatchingArnRegionAndKinesisClientRegionThrowsException() {
leaseManagementConfig, lifecycleConfig, metricsConfig, processorConfig, retrievalConfig));
}

@Test
public void testDynamicallyAddedStreamsWithRegionMismatchingKinesisClientRegionAreIgnored() throws Exception {
final Region mismatchingStreamRegion = Region.US_WEST_1;
final Region kinesisClientRegion = kinesisClient.serviceClientConfiguration().region();
Assert.assertNotEquals(mismatchingStreamRegion, kinesisClientRegion);

final StreamIdentifier streamWithMismatchingRegion = StreamIdentifier.multiStreamInstance(
Arn.fromString(constructStreamArnStr(mismatchingStreamRegion, TEST_ACCOUNT, "stream-1")), TEST_EPOCH);

final StreamIdentifier streamWithMatchingRegion = StreamIdentifier.multiStreamInstance(
Arn.fromString(constructStreamArnStr(kinesisClientRegion, TEST_ACCOUNT, "stream-2")), TEST_EPOCH);

when(multiStreamTracker.streamConfigList()).thenReturn(
Collections.emptyList(), // returned on scheduler construction
Arrays.asList( // returned on stream sync
new StreamConfig(streamWithMismatchingRegion, TEST_INITIAL_POSITION),
new StreamConfig(streamWithMatchingRegion, TEST_INITIAL_POSITION)));
retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName)
.retrievalFactory(retrievalFactory);
scheduler = spy(new Scheduler(checkpointConfig, coordinatorConfig,
leaseManagementConfig, lifecycleConfig, metricsConfig, processorConfig, retrievalConfig));
when(scheduler.shouldSyncStreamsNow()).thenReturn(true);

final Set<StreamIdentifier> syncedStreams = scheduler.checkAndSyncStreamShardsAndLeases();
final Set<StreamIdentifier> currentStreamConfigMapKeys = scheduler.currentStreamConfigMap().keySet();
assertFalse(Sets.union(currentStreamConfigMapKeys, syncedStreams).contains(streamWithMismatchingRegion));
assertTrue(Sets.union(currentStreamConfigMapKeys, syncedStreams).contains(streamWithMatchingRegion));
}

private static String constructStreamIdentifierSer(long accountId, String streamName) {
return String.join(":", String.valueOf(accountId), streamName, String.valueOf(TEST_EPOCH));
}
Expand Down
Loading