From 3d3538836f9914038f55f0a270af7b58e3fd8ab8 Mon Sep 17 00:00:00 2001 From: Varun Bansal Date: Tue, 11 Jul 2023 17:07:10 +0530 Subject: [PATCH] Add shard id to remote store logs (#8574) --------- Signed-off-by: bansvaru --- .../opensearch/index/shard/IndexShard.java | 2 +- .../shard/RemoteStoreRefreshListener.java | 13 ++++-- .../index/translog/RemoteFsTranslog.java | 43 ++++++++++++------- .../transfer/TranslogTransferManager.java | 16 +++---- .../TranslogTransferManagerTests.java | 2 + 5 files changed, 47 insertions(+), 29 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index e7720e9343b80..154e1a4f22242 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -4589,7 +4589,7 @@ public void syncTranslogFilesFromRemoteTranslog() throws IOException { TranslogFactory translogFactory = translogFactorySupplier.apply(indexSettings, shardRouting); assert translogFactory instanceof RemoteBlobStoreInternalTranslogFactory; Repository repository = ((RemoteBlobStoreInternalTranslogFactory) translogFactory).getRepository(); - RemoteFsTranslog.download(repository, shardId, getThreadPool(), shardPath().resolveTranslog()); + RemoteFsTranslog.download(repository, shardId, getThreadPool(), shardPath().resolveTranslog(), logger); } /** diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index aaba74cd54341..46d52bc8ca5df 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -8,7 +8,6 @@ package org.opensearch.index.shard; -import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.codecs.CodecUtil; @@ -22,6 +21,7 @@ import org.opensearch.action.bulk.BackoffPolicy; import org.opensearch.common.CheckedFunction; import org.opensearch.common.concurrent.GatedCloseable; +import org.opensearch.common.logging.Loggers; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.index.engine.EngineException; @@ -60,7 +60,7 @@ */ public final class RemoteStoreRefreshListener implements ReferenceManager.RefreshListener { - private static final Logger logger = LogManager.getLogger(RemoteStoreRefreshListener.class); + private final Logger logger; /** * The initial retry interval at which the retry job gets scheduled after a failure. @@ -117,6 +117,7 @@ public RemoteStoreRefreshListener( SegmentReplicationCheckpointPublisher checkpointPublisher, RemoteRefreshSegmentTracker segmentTracker ) { + logger = Loggers.getLogger(getClass(), indexShard.shardId()); this.indexShard = indexShard; this.storeDirectory = indexShard.store().directory(); this.remoteDirectory = (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) indexShard.remoteStore().directory()) @@ -155,7 +156,7 @@ public void onFailure(String file) { // Track upload failure segmentTracker.addUploadBytesFailed(latestFileNameSizeOnLocalMap.get(file)); } - }, remoteDirectory, storeDirectory, this::getChecksumOfLocalFile); + }, remoteDirectory, storeDirectory, this::getChecksumOfLocalFile, logger); } @Override @@ -470,6 +471,8 @@ private void updateFinalUploadStatusInSegmentTracker(boolean uploadStatus, long */ private static class FileUploader { + private final Logger logger; + private final UploadTracker uploadTracker; private final RemoteSegmentStoreDirectory remoteDirectory; @@ -482,12 +485,14 @@ public FileUploader( UploadTracker uploadTracker, RemoteSegmentStoreDirectory remoteDirectory, Directory storeDirectory, - CheckedFunction checksumProvider + CheckedFunction checksumProvider, + Logger logger ) { this.uploadTracker = uploadTracker; this.remoteDirectory = remoteDirectory; this.storeDirectory = storeDirectory; this.checksumProvider = checksumProvider; + this.logger = logger; } /** diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index 1e565b97387d1..9e027b9765bbc 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -8,9 +8,9 @@ package org.opensearch.index.translog; -import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.common.SetOnce; +import org.opensearch.common.logging.Loggers; import org.opensearch.common.util.concurrent.ReleasableLock; import org.opensearch.common.util.io.IOUtils; import org.opensearch.core.util.FileSystemUtils; @@ -32,6 +32,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.HashSet; +import java.util.Locale; import java.util.Map; import java.util.Set; import java.util.concurrent.Semaphore; @@ -49,7 +50,7 @@ */ public class RemoteFsTranslog extends Translog { - private static final Logger logger = LogManager.getLogger(RemoteFsTranslog.class); + private final Logger logger; private final BlobStoreRepository blobStoreRepository; private final TranslogTransferManager translogTransferManager; private final FileTransferTracker fileTransferTracker; @@ -82,16 +83,19 @@ public RemoteFsTranslog( BooleanSupplier primaryModeSupplier ) throws IOException { super(config, translogUUID, deletionPolicy, globalCheckpointSupplier, primaryTermSupplier, persistedSequenceNumberConsumer); + logger = Loggers.getLogger(getClass(), shardId); this.blobStoreRepository = blobStoreRepository; this.primaryModeSupplier = primaryModeSupplier; fileTransferTracker = new FileTransferTracker(shardId); this.translogTransferManager = buildTranslogTransferManager(blobStoreRepository, threadPool, shardId, fileTransferTracker); try { - download(translogTransferManager, location); + download(translogTransferManager, location, logger); Checkpoint checkpoint = readCheckpoint(location); this.readers.addAll(recoverFromFiles(checkpoint)); if (readers.isEmpty()) { - throw new IllegalStateException("at least one reader must be recovered"); + String errorMsg = String.format(Locale.ROOT, "%s at least one reader must be recovered", shardId); + logger.error(errorMsg); + throw new IllegalStateException(errorMsg); } boolean success = false; current = null; @@ -120,8 +124,13 @@ public RemoteFsTranslog( } } - public static void download(Repository repository, ShardId shardId, ThreadPool threadPool, Path location) throws IOException { - assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository"; + public static void download(Repository repository, ShardId shardId, ThreadPool threadPool, Path location, Logger logger) + throws IOException { + assert repository instanceof BlobStoreRepository : String.format( + Locale.ROOT, + "%s repository should be instance of BlobStoreRepository", + shardId + ); BlobStoreRepository blobStoreRepository = (BlobStoreRepository) repository; FileTransferTracker fileTransferTracker = new FileTransferTracker(shardId); TranslogTransferManager translogTransferManager = buildTranslogTransferManager( @@ -130,11 +139,11 @@ public static void download(Repository repository, ShardId shardId, ThreadPool t shardId, fileTransferTracker ); - RemoteFsTranslog.download(translogTransferManager, location); + RemoteFsTranslog.download(translogTransferManager, location, logger); } - public static void download(TranslogTransferManager translogTransferManager, Path location) throws IOException { - logger.info("Downloading translog files from remote for shard {} ", translogTransferManager.getShardId()); + public static void download(TranslogTransferManager translogTransferManager, Path location, Logger logger) throws IOException { + logger.trace("Downloading translog files from remote"); TranslogTransferMetadata translogMetadata = translogTransferManager.readMetadata(); if (translogMetadata != null) { if (Files.notExists(location)) { @@ -156,7 +165,7 @@ public static void download(TranslogTransferManager translogTransferManager, Pat location.resolve(Translog.CHECKPOINT_FILE_NAME) ); } - logger.info("Downloaded translog files from remote for shard {} ", translogTransferManager.getShardId()); + logger.trace("Downloaded translog files from remote"); } public static TranslogTransferManager buildTranslogTransferManager( @@ -321,8 +330,8 @@ public boolean syncNeeded() { @Override public void close() throws IOException { - assert Translog.calledFromOutsideOrViaTragedyClose() - : "Translog.close method is called from inside Translog, but not via closeOnTragicEvent method"; + assert Translog.calledFromOutsideOrViaTragedyClose() : shardId + + "Translog.close method is called from inside Translog, but not via closeOnTragicEvent method"; if (closed.compareAndSet(false, true)) { try (ReleasableLock lock = writeLock.acquire()) { sync(); @@ -340,12 +349,14 @@ protected long getMinReferencedGen() throws IOException { minGenerationForSeqNo(minSeqNoToKeep, current, readers) ); - assert minReferencedGen >= getMinFileGeneration() : "deletion policy requires a minReferenceGen of [" + assert minReferencedGen >= getMinFileGeneration() : shardId + + " deletion policy requires a minReferenceGen of [" + minReferencedGen + "] but the lowest gen available is [" + getMinFileGeneration() + "]"; - assert minReferencedGen <= currentFileGeneration() : "deletion policy requires a minReferenceGen of [" + assert minReferencedGen <= currentFileGeneration() : shardId + + " deletion policy requires a minReferenceGen of [" + minReferencedGen + "] which is higher than the current generation [" + currentFileGeneration() @@ -356,7 +367,7 @@ protected long getMinReferencedGen() throws IOException { protected void setMinSeqNoToKeep(long seqNo) { if (seqNo < this.minSeqNoToKeep) { throw new IllegalArgumentException( - "min seq number required can't go backwards: " + "current [" + this.minSeqNoToKeep + "] new [" + seqNo + "]" + shardId + " min seq number required can't go backwards: " + "current [" + this.minSeqNoToKeep + "] new [" + seqNo + "]" ); } this.minSeqNoToKeep = seqNo; @@ -416,7 +427,7 @@ private void deleteStaleRemotePrimaryTerms() { // of older primary term. if (olderPrimaryCleaned.trySet(Boolean.TRUE)) { // First we delete all stale primary terms folders from remote store - assert readers.isEmpty() == false : "Expected non-empty readers"; + assert readers.isEmpty() == false : shardId + " Expected non-empty readers"; long minimumReferencedPrimaryTerm = readers.stream().map(BaseTranslogReader::getPrimaryTerm).min(Long::compare).get(); translogTransferManager.deletePrimaryTermsAsync(minimumReferencedPrimaryTerm); } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java index 6da0ee5521738..54140226e3744 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -8,7 +8,6 @@ package org.opensearch.index.translog.transfer; -import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.store.IndexInput; @@ -21,6 +20,7 @@ import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.io.VersionedCodecStreamWrapper; import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.common.logging.Loggers; import org.opensearch.common.lucene.store.ByteArrayIndexInput; import org.opensearch.index.shard.ShardId; import org.opensearch.index.translog.Translog; @@ -61,8 +61,7 @@ public class TranslogTransferManager { private static final long TRANSFER_TIMEOUT_IN_MILLIS = 30000; - private static final Logger logger = LogManager.getLogger(TranslogTransferManager.class); - + private final Logger logger; private final static String METADATA_DIR = "metadata"; private final static String DATA_DIR = "data"; @@ -84,6 +83,7 @@ public TranslogTransferManager( this.remoteDataTransferPath = remoteBaseTransferPath.add(DATA_DIR); this.remoteMetadataTransferPath = remoteBaseTransferPath.add(METADATA_DIR); this.fileTransferTracker = fileTransferTracker; + this.logger = Loggers.getLogger(getClass(), shardId); } public ShardId getShardId() { @@ -200,7 +200,7 @@ public TranslogTransferMetadata readMetadata() throws IOException { exceptionSetOnce.set(e); } }, e -> { - logger.error(() -> new ParameterizedMessage("Exception while listing metadata files "), e); + logger.error(() -> new ParameterizedMessage("Exception while listing metadata files"), e); exceptionSetOnce.set((IOException) e); }), latch @@ -295,7 +295,7 @@ public void deleteGenerationAsync(long primaryTerm, Set generations, Runna * @param minPrimaryTermToKeep all primary terms below this primary term are deleted. */ public void deletePrimaryTermsAsync(long minPrimaryTermToKeep) { - logger.info("Deleting primary terms from remote store lesser than {} for {}", minPrimaryTermToKeep, shardId); + logger.info("Deleting primary terms from remote store lesser than {}", minPrimaryTermToKeep); transferService.listFoldersAsync(ThreadPool.Names.REMOTE_PURGE, remoteDataTransferPath, new ActionListener<>() { @Override public void onResponse(Set folders) { @@ -333,7 +333,7 @@ private void deletePrimaryTermAsync(long primaryTerm) { new ActionListener<>() { @Override public void onResponse(Void unused) { - logger.info("Deleted primary term {} for {}", primaryTerm, shardId); + logger.info("Deleted primary term {}", primaryTerm); } @Override @@ -349,12 +349,12 @@ public void delete() { transferService.deleteAsync(ThreadPool.Names.REMOTE_PURGE, remoteBaseTransferPath, new ActionListener<>() { @Override public void onResponse(Void unused) { - logger.info("Deleted all remote translog data for {}", shardId); + logger.info("Deleted all remote translog data"); } @Override public void onFailure(Exception e) { - logger.error("Exception occurred while cleaning translog ", e); + logger.error("Exception occurred while cleaning translog", e); } }); } diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java index 924a9d039da28..5f8aa64457896 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java @@ -17,6 +17,7 @@ import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.BlobStore; import org.opensearch.common.blobstore.support.PlainBlobMetadata; +import org.opensearch.index.Index; import org.opensearch.index.shard.ShardId; import org.opensearch.index.translog.Translog; import org.opensearch.index.translog.transfer.FileSnapshot.CheckpointFileSnapshot; @@ -65,6 +66,7 @@ public void setUp() throws Exception { primaryTerm = randomNonNegativeLong(); generation = randomNonNegativeLong(); shardId = mock(ShardId.class); + when(shardId.getIndex()).thenReturn(new Index("index", "indexUUid")); minTranslogGeneration = randomLongBetween(0, generation); remoteBaseTransferPath = new BlobPath().add("base_path"); transferService = mock(TransferService.class);