Skip to content

Commit

Permalink
fixed spotlessApply
Browse files Browse the repository at this point in the history
  • Loading branch information
¨Claude committed Dec 20, 2024
1 parent db868ac commit 6fd112f
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -172,28 +172,27 @@ public final void close() {
@Override
public final boolean tryAdvance(final Consumer<? super SchemaAndValue> action) {
boolean result = false;
if (closed) {
return false;
}
try {
if (inputStream == null) {
try {
inputStream = inputOpened(inputStreamIOSupplier.get());
} catch (IOException e) {
logger.error("Error trying to open inputStream: {}", e.getMessage(), e);
close();
return false;
if (!closed) {
try {
if (inputStream == null) {
try {
inputStream = inputOpened(inputStreamIOSupplier.get());
} catch (IOException e) {
logger.error("Error trying to open inputStream: {}", e.getMessage(), e);
close();
return false;
}
}
result = doAdvance(action);
if (result) {
offsetManagerEntry.incrementRecordCount();
}
} catch (RuntimeException e) { // NOPMD must catch runtime exception here.
logger.error("Error trying to advance data: {}", e.getMessage(), e);
}
result = doAdvance(action);
if (result) {
offsetManagerEntry.incrementRecordCount();
if (!result) {
close();
}
} catch (RuntimeException e) { // NOPMD must catch runtime exception here.
logger.error("Error trying to advance data: {}", e.getMessage(), e);
}
if (!result) {
close();
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,10 @@ public void commit() throws InterruptedException {
}

@Override
public void commitRecord(SourceRecord record) throws InterruptedException {
public void commitRecord(final SourceRecord record) throws InterruptedException {
if (LOGGER.isInfoEnabled()) {
Map<String, Object> map = (Map<String, Object>) record.sourceOffset();
S3OffsetManagerEntry entry = S3OffsetManagerEntry.wrap(map);
final Map<String, Object> map = (Map<String, Object>) record.sourceOffset();
final S3OffsetManagerEntry entry = S3OffsetManagerEntry.wrap(map);
LOGGER.info("Committed individual record {} {} {} committed", entry.getBucket(), entry.getKey(),
entry.getRecordCount());
}
Expand Down Expand Up @@ -162,7 +162,7 @@ List<SourceRecord> extractSourceRecords(final List<SourceRecord> results) {
final S3SourceRecord s3SourceRecord = sourceRecordIterator.next();
if (s3SourceRecord != null) {
try {
S3OffsetManagerEntry entry = s3SourceRecord.getOffsetManagerEntry();
final S3OffsetManagerEntry entry = s3SourceRecord.getOffsetManagerEntry();
offsetManager.updateCurrentOffsets(entry);
results.add(s3SourceRecord.getSourceRecord());
lastRecord = entry.getRecordCount();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public static S3OffsetManagerEntry wrap(final Map<String, Object> properties) {
}
final Map<String, Object> ourProperties = new HashMap<>(properties);
long recordCount = 0;
Object recordCountProperty = ourProperties.computeIfAbsent(RECORD_COUNT, s -> 0L);
final Object recordCountProperty = ourProperties.computeIfAbsent(RECORD_COUNT, s -> 0L);
if (recordCountProperty instanceof Number) {
recordCount = ((Number) recordCountProperty).longValue();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ void testFromProperties() {
assertThat(entry.getRecordCount()).isEqualTo(1L);
assertThat(entry.getProperty("random_entry")).isEqualTo(5L);

S3OffsetManagerEntry other = entry.fromProperties(entry.getProperties());
final S3OffsetManagerEntry other = entry.fromProperties(entry.getProperties());
assertThat(other.getRecordCount()).isEqualTo(1L);
assertThat(other.getProperty("random_entry")).isEqualTo(5L);

Expand Down

0 comments on commit 6fd112f

Please sign in to comment.