Skip to content

Commit

Permalink
fix: adjusted exception handling
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Peterson <[email protected]>
  • Loading branch information
mattp-swirldslabs committed Nov 20, 2024
1 parent ff0f3f4 commit 3b94ee7
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.hedera.hapi.block.BlockItemUnparsed;
import com.hedera.hapi.block.SubscribeStreamResponseUnparsed;
import com.hedera.pbj.runtime.OneOf;
import com.hedera.pbj.runtime.ParseException;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.io.IOException;
import java.util.List;
Expand Down Expand Up @@ -137,7 +138,7 @@ public void onEvent(ObjectEvent<SubscribeStreamResponseUnparsed> event, long l,
LOGGER.log(ERROR, "Service is not running. Block item will not be processed further.");
}

} catch (BlockStreamProtocolException | IOException e) {
} catch (BlockStreamProtocolException | IOException | ParseException e) {

metricsService.get(StreamPersistenceHandlerError).increment();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,47 +116,42 @@ class BlockAsDirWriter implements BlockWriter<List<BlockItemUnparsed>> {
*/
@Override
public Optional<List<BlockItemUnparsed>> write(@NonNull final List<BlockItemUnparsed> blockItems)
throws IOException {
throws IOException, ParseException {

try {
final Bytes unparsedBlockHeader = blockItems.getFirst().blockHeader();
if (unparsedBlockHeader != null) {
resetState(BlockHeader.PROTOBUF.parse(unparsedBlockHeader));
}
final Bytes unparsedBlockHeader = blockItems.getFirst().blockHeader();
if (unparsedBlockHeader != null) {
resetState(BlockHeader.PROTOBUF.parse(unparsedBlockHeader));
}

for (BlockItemUnparsed blockItemUnparsed : blockItems) {
final Path blockItemFilePath = calculateBlockItemPath();
for (int retries = 0; ; retries++) {
try {
write(blockItemFilePath, blockItemUnparsed);
break;
} catch (IOException e) {

LOGGER.log(ERROR, "Error writing the BlockItem protobuf to a file: ", e);

// Remove the block if repairing the permissions fails
if (retries > 0) {
// Attempt to remove the block
blockRemover.remove(Long.parseLong(currentBlockDir.toString()));
throw e;
} else {
// Attempt to repair the permissions on the block path
// and the blockItem path
repairPermissions(blockNodeRootPath);
repairPermissions(calculateBlockPath());
LOGGER.log(INFO, "Retrying to write the BlockItem protobuf to a file");
}
for (BlockItemUnparsed blockItemUnparsed : blockItems) {
final Path blockItemFilePath = calculateBlockItemPath();
for (int retries = 0; ; retries++) {
try {
write(blockItemFilePath, blockItemUnparsed);
break;
} catch (IOException e) {

LOGGER.log(ERROR, "Error writing the BlockItem protobuf to a file: ", e);

// Remove the block if repairing the permissions fails
if (retries > 0) {
// Attempt to remove the block
blockRemover.remove(Long.parseLong(currentBlockDir.toString()));
throw e;
} else {
// Attempt to repair the permissions on the block path
// and the blockItem path
repairPermissions(blockNodeRootPath);
repairPermissions(calculateBlockPath());
LOGGER.log(INFO, "Retrying to write the BlockItem protobuf to a file");
}
}
}
}

if (blockItems.getLast().hasBlockProof()) {
metricsService.get(BlocksPersisted).increment();
return Optional.of(blockItems);
}
} catch (ParseException p) {
LOGGER.log(ERROR, "Error parsing BlockHeader: ", p);
throw new IOException(p);
if (blockItems.getLast().hasBlockProof()) {
metricsService.get(BlocksPersisted).increment();
return Optional.of(blockItems);
}

return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.hedera.block.server.persistence.storage.write;

import com.hedera.pbj.runtime.ParseException;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.io.IOException;
import java.util.Optional;
Expand All @@ -35,5 +36,5 @@ public interface BlockWriter<V> {
* block proof signaling the end of the block, an empty optional otherwise.
* @throws IOException when failing to write the block item to storage.
*/
Optional<V> write(@NonNull final V blockItem) throws IOException;
Optional<V> write(@NonNull final V blockItem) throws IOException, ParseException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import com.hedera.hapi.block.SubscribeStreamResponseCode;
import com.hedera.hapi.block.SubscribeStreamResponseUnparsed;
import com.hedera.hapi.block.stream.output.BlockHeader;
import com.hedera.pbj.runtime.ParseException;
import com.hedera.pbj.runtime.grpc.Pipeline;
import com.swirlds.metrics.api.LongGauge;
import java.io.IOException;
Expand Down Expand Up @@ -135,7 +136,7 @@ public void testUnsubscribeEach() throws InterruptedException, IOException {
}

@Test
public void testMediatorPersistenceWithoutSubscribers() throws IOException {
public void testMediatorPersistenceWithoutSubscribers() throws IOException, ParseException {

final BlockNodeContext blockNodeContext = TestConfigUtil.getTestBlockNodeContext();
final ServiceStatus serviceStatus = new ServiceStatusImpl(blockNodeContext);
Expand All @@ -162,7 +163,7 @@ public void testMediatorPersistenceWithoutSubscribers() throws IOException {
}

@Test
public void testMediatorPublishEventToSubscribers() throws IOException {
public void testMediatorPublishEventToSubscribers() throws IOException, ParseException {

final BlockNodeContext blockNodeContext = TestConfigUtil.getTestBlockNodeContext();
final ServiceStatus serviceStatus = new ServiceStatusImpl(blockNodeContext);
Expand Down Expand Up @@ -384,7 +385,7 @@ public void testSubscribeWhenHandlerAlreadySubscribed() throws IOException {
// }

@Test
public void testMediatorBlocksPublishAfterException() throws IOException, InterruptedException {
public void testMediatorBlocksPublishAfterException() throws IOException, InterruptedException, ParseException {

final BlockNodeContext blockNodeContext = TestConfigUtil.getTestBlockNodeContext();
final ServiceStatus serviceStatus = new ServiceStatusImpl(blockNodeContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import com.hedera.hapi.block.SubscribeStreamRequest;
import com.hedera.hapi.block.SubscribeStreamResponseCode;
import com.hedera.hapi.block.SubscribeStreamResponseUnparsed;
import com.hedera.pbj.runtime.ParseException;
import com.hedera.pbj.runtime.grpc.Pipeline;
import com.hedera.pbj.runtime.grpc.ServiceInterface;
import com.hedera.pbj.runtime.io.buffer.Bytes;
Expand Down Expand Up @@ -153,7 +154,7 @@ public void tearDown() {
}

@Test
public void testPublishBlockStreamRegistrationAndExecution() throws IOException, NoSuchAlgorithmException {
public void testPublishBlockStreamRegistrationAndExecution() throws IOException, NoSuchAlgorithmException, ParseException {

final PbjBlockStreamServiceProxy pbjBlockStreamServiceProxy = buildBlockStreamService(blockWriter);

Expand Down Expand Up @@ -497,7 +498,7 @@ public void testSubAndUnsubWhileStreaming() throws InterruptedException {
}

@Test
public void testMediatorExceptionHandlingWhenPersistenceFailure() throws IOException {
public void testMediatorExceptionHandlingWhenPersistenceFailure() throws IOException, ParseException {
final ConcurrentHashMap<
BlockNodeEventHandler<ObjectEvent<SubscribeStreamResponseUnparsed>>,
BatchEventProcessor<ObjectEvent<SubscribeStreamResponseUnparsed>>>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public void testRemoveBlockReadPermsRepairFailed() throws IOException, ParseExce
}

@Test
public void testRemoveBlockItemReadPerms() throws IOException {
public void testRemoveBlockItemReadPerms() throws IOException, ParseException {
final BlockWriter<List<BlockItemUnparsed>> blockWriter =
BlockAsDirWriterBuilder.newBuilder(blockNodeContext).build();
blockWriter.write(blockItems);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ public void testRemoveBlockWritePerms() throws IOException, ParseException {
}

@Test
public void testUnrecoverableIOExceptionOnWrite() throws IOException {
public void testUnrecoverableIOExceptionOnWrite() throws IOException, ParseException {

final BlockRemover blockRemover = new BlockAsDirRemover(Path.of(testConfig.rootPath()));

Expand Down

0 comments on commit 3b94ee7

Please sign in to comment.