Skip to content

Commit

Permalink
WIP abstract writer becomes local writer interface
Browse files Browse the repository at this point in the history
Signed-off-by: Atanas Atanasov <[email protected]>
  • Loading branch information
ata-nas committed Nov 21, 2024
1 parent 6126411 commit 945f514
Show file tree
Hide file tree
Showing 8 changed files with 87 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import dagger.Provides;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Path;
import java.util.List;
import java.util.Objects;
Expand Down Expand Up @@ -74,16 +75,15 @@ static BlockWriter<List<BlockItem>> providesBlockWriter(
.type();
try {
return switch (persistenceType) {
case null -> throw new NullPointerException(
"Persistence StorageType cannot be [null], cannot create an instance of BlockWriter");
case BLOCK_AS_FILE -> BlockAsFileWriterBuilder.newBuilder(blockNodeContext, blockRemover, pathResolver)
.build();
case BLOCK_AS_DIR -> BlockAsDirWriterBuilder.newBuilder(blockNodeContext, blockRemover, pathResolver)
.build();
case NOOP -> new NoOpBlockWriter(blockNodeContext, blockRemover, pathResolver);
};
} catch (final IOException e) {
throw new RuntimeException("Failed to create BlockWriter", e);
// we cannot have checked exceptions with dagger @Provides
throw new UncheckedIOException("Failed to create BlockWriter", e);
}
}

Expand All @@ -99,8 +99,6 @@ static BlockWriter<List<BlockItem>> providesBlockWriter(
static BlockReader<Block> providesBlockReader(@NonNull final PersistenceStorageConfig config) {
final StorageType persistenceType = Objects.requireNonNull(config).type();
return switch (persistenceType) {
case null -> throw new NullPointerException(
"Persistence StorageType cannot be [null], cannot create an instance of BlockReader");
case BLOCK_AS_FILE -> BlockAsFileReaderBuilder.newBuilder().build();
case BLOCK_AS_DIR -> BlockAsDirReaderBuilder.newBuilder(config).build();
case NOOP -> new NoOpBlockReader();
Expand All @@ -119,8 +117,6 @@ static BlockReader<Block> providesBlockReader(@NonNull final PersistenceStorageC
static BlockRemover providesBlockRemover(@NonNull final PersistenceStorageConfig config) {
final StorageType persistenceType = Objects.requireNonNull(config).type();
return switch (persistenceType) {
case null -> throw new NullPointerException(
"Persistence StorageType cannot be [null], cannot create an instance of BlockRemover");
case BLOCK_AS_FILE -> new BlockAsFileRemover();
case BLOCK_AS_DIR -> new BlockAsDirRemover(Path.of(config.rootPath()));
case NOOP -> new NoOpRemover();
Expand All @@ -140,8 +136,6 @@ static PathResolver providesPathResolver(@NonNull final PersistenceStorageConfig
final StorageType persistenceType = Objects.requireNonNull(config).type();
final Path root = Path.of(config.rootPath());
return switch (persistenceType) {
case null -> throw new NullPointerException(
"Persistence StorageType cannot be [null], cannot create an instance of PathResolver");
case BLOCK_AS_FILE -> new BlockAsFilePathResolver(root);
case BLOCK_AS_DIR -> new BlockAsDirPathResolver(root);
case NOOP -> new NoOpPathResolver();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@
import com.swirlds.config.api.ConfigData;
import com.swirlds.config.api.ConfigProperty;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Objects;

/**
* Use this configuration across the persistent storage package
Expand All @@ -46,27 +48,32 @@ public record PersistenceStorageConfig(
* directory in the current working directory
*/
public PersistenceStorageConfig {
Objects.requireNonNull(type);
Objects.requireNonNull(rootPath);

// verify rootPath prop
Path path = Path.of(rootPath);
if (rootPath.isEmpty()) {
path = Paths.get(rootPath).toAbsolutePath().resolve("data");
if (rootPath.isBlank()) {
path = Paths.get("").toAbsolutePath().resolve("data");
}

// Check if absolute
if (!path.isAbsolute()) {
throw new IllegalArgumentException(rootPath + " Root path must be absolute");
}

// Create Directory if it does not exist
try {
FileUtilities.createFolderPathIfNotExists(path, ERROR, BLOCK_NODE_ROOT_DIRECTORY_SEMANTIC_NAME);
} catch (final IOException e) {
final String message =
"Unable to instantiate [%s]! Unable to create the root directory for the block storage [%s]"
.formatted(this.getClass().getName(), path);
throw new RuntimeException(message, e);
throw new UncheckedIOException(message, e);
}

LOGGER.log(INFO, "Persistence Storage configuration persistence.storage.rootPath: " + path);
rootPath = path.toString();
LOGGER.log(INFO, "Persistence configuration persistence.storage.type: " + type);
LOGGER.log(INFO, "Persistence Storage Configuration: persistence.storage.rootPath=" + path);
LOGGER.log(INFO, "Persistence Storage Configuration: persistence.storage.type= " + type);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,6 @@ public BlockAsDirPathResolver(@NonNull final Path root) {

@Override
public Path resolvePathToBlock(final long blockNumber) {
throw new UnsupportedOperationException("Not implemented yet");
return root.resolve(String.valueOf(blockNumber));
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@

import static com.hedera.block.server.Constants.BLOCK_FILE_EXTENSION;
import static com.hedera.block.server.Constants.BLOCK_NODE_ROOT_DIRECTORY_SEMANTIC_NAME;
import static com.hedera.block.server.metrics.BlockNodeMetricTypes.Counter.BlocksPersisted;
import static java.lang.System.Logger;
import static java.lang.System.Logger.Level.DEBUG;
import static java.lang.System.Logger.Level.ERROR;
import static java.lang.System.Logger.Level.INFO;

import com.hedera.block.common.utils.FileUtilities;
import com.hedera.block.server.config.BlockNodeContext;
import com.hedera.block.server.metrics.MetricsService;
import com.hedera.block.server.persistence.storage.PersistenceStorageConfig;
import com.hedera.block.server.persistence.storage.path.PathResolver;
import com.hedera.block.server.persistence.storage.remove.BlockRemover;
Expand All @@ -51,12 +53,15 @@
* to remove the current, incomplete block (directory) before re-throwing the exception to the
* caller.
*/
class BlockAsDirWriter extends AbstractBlockWriter<List<BlockItem>> {
class BlockAsDirWriter implements LocalBlockWriter<List<BlockItem>> {
private final Logger LOGGER = System.getLogger(getClass().getName());
private final Path blockNodeRootPath;
private final MetricsService metricsService;
private final BlockRemover blockRemover;
private final PathResolver pathResolver;
private final FileAttribute<Set<PosixFilePermission>> folderPermissions;
private long blockNodeFileNameIndex;
private Path currentBlockDir;
private long currentBlockNumber;

/**
* Use the corresponding builder to construct a new BlockAsDirWriter with
Expand All @@ -76,16 +81,16 @@ class BlockAsDirWriter extends AbstractBlockWriter<List<BlockItem>> {
@NonNull final PathResolver pathResolver,
final FileAttribute<Set<PosixFilePermission>> folderPermissions)
throws IOException {
super(blockNodeContext.metricsService(), blockRemover, pathResolver);

LOGGER.log(INFO, "Initializing FileSystemBlockStorage");

this.metricsService = Objects.requireNonNull(blockNodeContext.metricsService());
this.blockRemover = Objects.requireNonNull(blockRemover);
this.pathResolver = Objects.requireNonNull(pathResolver);

final PersistenceStorageConfig config =
blockNodeContext.configuration().getConfigData(PersistenceStorageConfig.class);

final Path blockNodeRootPath = Path.of(config.rootPath());
this.blockNodeRootPath = Path.of(config.rootPath());
LOGGER.log(INFO, "Block Node Root Path: " + blockNodeRootPath);
this.blockNodeRootPath = blockNodeRootPath;

if (Objects.nonNull(folderPermissions)) {
this.folderPermissions = folderPermissions;
Expand Down Expand Up @@ -131,21 +136,21 @@ public Optional<List<BlockItem>> write(@NonNull final List<BlockItem> toWrite) t
// Remove the block if repairing the permissions fails
if (retries > 0) {
// Attempt to remove the block
blockRemover.remove(Long.parseLong(currentBlockDir.toString()));
blockRemover.remove(currentBlockNumber);
throw e;
} else {
// Attempt to repair the permissions on the block path
// and the blockItem path
repairPermissions(blockNodeRootPath);
repairPermissions(calculateBlockPath());
repairPermissions(pathResolver.resolvePathToBlock(currentBlockNumber));
LOGGER.log(INFO, "Retrying to write the BlockItem protobuf to a file");
}
}
}
}

if (toWrite.getLast().hasBlockProof()) {
incrementBlocksPersisted();
metricsService.get(BlocksPersisted).increment();
return Optional.of(toWrite);
} else {
return Optional.empty();
Expand Down Expand Up @@ -173,20 +178,24 @@ protected void write(@NonNull final Path blockItemFilePath, @NonNull final Block
private void resetState(@NonNull final BlockItem blockItem) throws IOException {
// Here a "block" is represented as a directory of BlockItems.
// Create the "block" directory based on the block_number
currentBlockDir = Path.of(String.valueOf(blockItem.blockHeader().number()));
currentBlockNumber = blockItem.blockHeader().number();

// Check the blockNodeRootPath permissions and
// attempt to repair them if possible
repairPermissions(blockNodeRootPath);

// Construct the path to the block directory
FileUtilities.createFolderPathIfNotExists(
calculateBlockPath(), DEBUG, folderPermissions, BLOCK_NODE_ROOT_DIRECTORY_SEMANTIC_NAME);
pathResolver.resolvePathToBlock(currentBlockNumber),
DEBUG,
folderPermissions,
BLOCK_NODE_ROOT_DIRECTORY_SEMANTIC_NAME);

// Reset
blockNodeFileNameIndex = 0;
}

// todo do we need this method at all?
private void repairPermissions(@NonNull final Path path) throws IOException {
final boolean isWritable = Files.isWritable(path);
if (!isWritable) {
Expand All @@ -200,13 +209,8 @@ private void repairPermissions(@NonNull final Path path) throws IOException {
@NonNull
private Path calculateBlockItemPath() {
// Build the path to a .blk file
final Path blockPath = calculateBlockPath();
final Path blockPath = pathResolver.resolvePathToBlock(currentBlockNumber);
blockNodeFileNameIndex++;
return blockPath.resolve(blockNodeFileNameIndex + BLOCK_FILE_EXTENSION);
}

@NonNull
private Path calculateBlockPath() {
return blockNodeRootPath.resolve(currentBlockDir);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@

package com.hedera.block.server.persistence.storage.write;

import static com.hedera.block.server.metrics.BlockNodeMetricTypes.Counter.BlocksPersisted;

import com.hedera.block.server.config.BlockNodeContext;
import com.hedera.block.server.metrics.MetricsService;
import com.hedera.block.server.persistence.storage.path.PathResolver;
import com.hedera.block.server.persistence.storage.remove.BlockRemover;
import com.hedera.hapi.block.stream.Block;
Expand All @@ -28,19 +31,25 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.Objects;
import java.util.Optional;

/**
* TODO: add documentation
*/
class BlockAsFileWriter extends AbstractBlockWriter<List<BlockItem>> {
class BlockAsFileWriter implements LocalBlockWriter<List<BlockItem>> {
private final MetricsService metricsService;
private final BlockRemover blockRemover; // todo do I need here?
private final PathResolver pathResolver;
private Block curentBlock; // fixme this is temporary just to explore the workflow and make proof of concept

BlockAsFileWriter(
@NonNull final BlockNodeContext blockNodeContext,
@NonNull final BlockRemover blockRemover,
@NonNull final PathResolver pathResolver) {
super(blockNodeContext.metricsService(), blockRemover, pathResolver);
this.metricsService = Objects.requireNonNull(blockNodeContext.metricsService());
this.blockRemover = Objects.requireNonNull(blockRemover);
this.pathResolver = Objects.requireNonNull(pathResolver);
}

@Override
Expand All @@ -54,12 +63,15 @@ public Optional<List<BlockItem>> write(@NonNull final List<BlockItem> toWrite) t
}

if (toWrite.getLast().hasBlockProof()) {
metricsService.get(BlocksPersisted).increment();
return writeToFs(curentBlock);
} else {
return Optional.empty();
}
}

// todo we could recursively retry if exception occurs, then after a few attempts
// if we cannot persist, we must throw the initial exception
private Optional<List<BlockItem>> writeToFs(final Block blockToWrite) throws IOException {
final long number = blockToWrite.items().getFirst().blockHeader().number(); // fixme could be null, handle!

Expand All @@ -72,7 +84,7 @@ private Optional<List<BlockItem>> writeToFs(final Block blockToWrite) throws IOE
try (final FileOutputStream fos = new FileOutputStream(blockToWritePathResolved.toFile())) {
Block.PROTOBUF.toBytes(blockToWrite).writeTo(fos);
// todo what should be fallback logic if something goes wrong here? we attempt to resolve the path
// with proper perms? we
// with proper perms (is that necessary)? we must clean up and retry?
} catch (final IOException ioe) {
// todo handle properly
throw new UncheckedIOException(ioe);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright (C) 2024 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hedera.block.server.persistence.storage.write;

/**
* TODO: add documentation
*/
interface LocalBlockWriter<V> extends BlockWriter<V> {}
Loading

0 comments on commit 945f514

Please sign in to comment.