Skip to content

Commit

Permalink
Fix thread context in getRepositoryData (elastic#99627)
Browse files Browse the repository at this point in the history
Listeners which subscribe to `BlobStoreRepository#repoDataInitialized`
are today completed in the thread context of the thread which first
triggers the initialization of repository generation tracking, but we
must instead capture each listener's own thread context to avoid
cross-context pollution.
  • Loading branch information
DaveCTurner authored Sep 20, 2023
1 parent ebd5ead commit 0ad8aba
Show file tree
Hide file tree
Showing 4 changed files with 241 additions and 122 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/99627.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 99627
summary: Fix thread context in `getRepositoryData`
area: Snapshot/Restore
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.SingleResultDeduplicator;
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.action.support.ListenableActionFuture;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.RefCountingListener;
import org.elasticsearch.action.support.RefCountingRunnable;
Expand Down Expand Up @@ -68,6 +67,7 @@
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.common.util.concurrent.ListenableFuture;
import org.elasticsearch.common.xcontent.ChunkedToXContent;
Expand Down Expand Up @@ -1788,36 +1788,41 @@ public void getRepositoryData(ActionListener<RepositoryData> listener) {
// master-eligible or not.
assert clusterService.localNode().isMasterNode() : "should only load repository data on master nodes";

if (lifecycle.started() == false) {
listener.onFailure(notStartedException());
return;
}
while (true) {
// retry loop, in case the state changes underneath us somehow

if (latestKnownRepoGen.get() == RepositoryData.CORRUPTED_REPO_GEN) {
listener.onFailure(corruptedStateException(null, null));
return;
}
final RepositoryData cached = latestKnownRepositoryData.get();
// Fast path loading repository data directly from cache if we're in fully consistent mode and the cache matches up with
// the latest known repository generation
if (bestEffortConsistency == false && cached.getGenId() == latestKnownRepoGen.get()) {
listener.onResponse(cached);
return;
}
if (metadata.generation() == RepositoryData.UNKNOWN_REPO_GEN && isReadOnly() == false) {
logger.debug(
"[{}] loading repository metadata for the first time, trying to determine correct generation and to store "
+ "it in the cluster state",
metadata.name()
);
initializeRepoGenerationTracking(listener);
} else {
logger.trace(
"[{}] loading un-cached repository data with best known repository generation [{}]",
metadata.name(),
latestKnownRepoGen
);
repoDataLoadDeduplicator.execute(listener);
if (lifecycle.started() == false) {
listener.onFailure(notStartedException());
return;
}

if (latestKnownRepoGen.get() == RepositoryData.CORRUPTED_REPO_GEN) {
listener.onFailure(corruptedStateException(null, null));
return;
}
final RepositoryData cached = latestKnownRepositoryData.get();
// Fast path loading repository data directly from cache if we're in fully consistent mode and the cache matches up with
// the latest known repository generation
if (bestEffortConsistency == false && cached.getGenId() == latestKnownRepoGen.get()) {
listener.onResponse(cached);
return;
}
if (metadata.generation() == RepositoryData.UNKNOWN_REPO_GEN && isReadOnly() == false) {
logger.debug("""
[{}] loading repository metadata for the first time, trying to determine correct generation and to store it in the \
cluster state""", metadata.name());
if (initializeRepoGenerationTracking(listener)) {
return;
} // else there was a concurrent modification, retry from the start
} else {
logger.trace(
"[{}] loading un-cached repository data with best known repository generation [{}]",
metadata.name(),
latestKnownRepoGen
);
repoDataLoadDeduplicator.execute(listener);
return;
}
}
}

Expand All @@ -1826,7 +1831,8 @@ private RepositoryException notStartedException() {
}

// Listener used to ensure that repository data is only initialized once in the cluster state by #initializeRepoGenerationTracking
private ListenableActionFuture<RepositoryData> repoDataInitialized;
@Nullable // unless we're in the process of initializing repo-generation tracking
private SubscribableListener<RepositoryData> repoDataInitialized;

/**
* Method used to set the current repository generation in the cluster state's {@link RepositoryMetadata} to the latest generation that
Expand All @@ -1835,103 +1841,120 @@ private RepositoryException notStartedException() {
* have a consistent view of the {@link RepositoryData} before any data has been written to the repository.
*
* @param listener listener to resolve with new repository data
* @return {@code true} if this method at least started the initialization process successfully and will eventually complete the
* listener, {@code false} if there was some concurrent state change which prevents us from starting repo generation tracking (typically
* that some other node got there first) and the caller should check again and possibly retry or complete the listener in some other
* way.
*/
private void initializeRepoGenerationTracking(ActionListener<RepositoryData> listener) {
private boolean initializeRepoGenerationTracking(ActionListener<RepositoryData> listener) {
final SubscribableListener<RepositoryData> listenerToSubscribe;
final ActionListener<RepositoryData> listenerToComplete;

synchronized (this) {
if (repoDataInitialized == null) {
// double check the generation since we checked it outside the mutex in the caller and it could have changed by a
// double-check the generation since we checked it outside the mutex in the caller and it could have changed by a
// concurrent initialization of the repo metadata and just load repository normally in case we already finished the
// initialization
if (metadata.generation() != RepositoryData.UNKNOWN_REPO_GEN) {
getRepositoryData(listener);
return;
return false; // retry
}
logger.trace("[{}] initializing repository generation in cluster state", metadata.name());
repoDataInitialized = new ListenableActionFuture<>();
repoDataInitialized.addListener(listener);
final Consumer<Exception> onFailure = e -> {
logger.warn(
() -> format("[%s] Exception when initializing repository generation in cluster state", metadata.name()),
e
);
final ActionListener<RepositoryData> existingListener;
synchronized (BlobStoreRepository.this) {
existingListener = repoDataInitialized;
repoDataInitialized = null;
repoDataInitialized = listenerToSubscribe = new SubscribableListener<>();
listenerToComplete = new ActionListener<>() {
private ActionListener<RepositoryData> acquireAndClearRepoDataInitialized() {
synchronized (BlobStoreRepository.this) {
assert repoDataInitialized == listenerToSubscribe;
repoDataInitialized = null;
return listenerToSubscribe;
}
}
existingListener.onFailure(e);
};
repoDataLoadDeduplicator.execute(
ActionListener.wrap(
repoData -> submitUnbatchedTask(
"set initial safe repository generation [" + metadata.name() + "][" + repoData.getGenId() + "]",
new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
RepositoryMetadata metadata = getRepoMetadata(currentState);
// No update to the repository generation should have occurred concurrently in general except
// for
// extreme corner cases like failing over to an older version master node and back to the
// current
// node concurrently
if (metadata.generation() != RepositoryData.UNKNOWN_REPO_GEN) {
throw new RepositoryException(
metadata.name(),
"Found unexpected initialized repo metadata [" + metadata + "]"
);
}
return ClusterState.builder(currentState)
.metadata(
Metadata.builder(currentState.getMetadata())
.putCustom(
RepositoriesMetadata.TYPE,
RepositoriesMetadata.get(currentState)
.withUpdatedGeneration(metadata.name(), repoData.getGenId(), repoData.getGenId())
)
)
.build();
}

@Override
public void onFailure(Exception e) {
onFailure.accept(e);
}
@Override
public void onResponse(RepositoryData repositoryData) {
acquireAndClearRepoDataInitialized().onResponse(repositoryData);
}

@Override
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
logger.trace(
"[{}] initialized repository generation in cluster state to [{}]",
metadata.name(),
repoData.getGenId()
);
// Resolve listeners on generic pool since some callbacks for repository data do additional IO
threadPool.generic().execute(() -> {
final ActionListener<RepositoryData> existingListener;
synchronized (BlobStoreRepository.this) {
existingListener = repoDataInitialized;
repoDataInitialized = null;
}
existingListener.onResponse(repoData);
logger.trace(
"[{}] called listeners after initializing repository to generation [{}]",
metadata.name(),
repoData.getGenId()
);
});
}
}
),
onFailure
)
);
@Override
public void onFailure(Exception e) {
logger.warn(
() -> format("[%s] Exception when initializing repository generation in cluster state", metadata.name()),
e
);
acquireAndClearRepoDataInitialized().onFailure(e);
}
};
} else {
logger.trace(
"[{}] waiting for existing initialization of repository metadata generation in cluster state",
metadata.name()
);
repoDataInitialized.addListener(listener);
}
listenerToComplete = null;
listenerToSubscribe = repoDataInitialized;
}
}

if (listenerToComplete != null) {
SubscribableListener
// load the current repository data
.newForked(repoDataLoadDeduplicator::execute)
// write its generation to the cluster state
.<RepositoryData>andThen(
(l, repoData) -> submitUnbatchedTask(
"set initial safe repository generation [" + metadata.name() + "][" + repoData.getGenId() + "]",
new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
return getClusterStateWithUpdatedRepositoryGeneration(currentState, repoData);
}

@Override
public void onFailure(Exception e) {
l.onFailure(e);
}

@Override
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
l.onResponse(repoData);
}
}
)
)
// fork to generic pool since we're on the applier thread and some callbacks for repository data do additional IO
.<RepositoryData>andThen((l, repoData) -> {
logger.trace("[{}] initialized repository generation in cluster state to [{}]", metadata.name(), repoData.getGenId());
threadPool.generic().execute(ActionRunnable.supply(ActionListener.runAfter(l, () -> {
logger.trace(
"[{}] called listeners after initializing repository to generation [{}]",
metadata.name(),
repoData.getGenId()
);
}), () -> repoData));
})
// and finally complete the listener
.addListener(listenerToComplete);
}

listenerToSubscribe.addListener(listener, EsExecutors.DIRECT_EXECUTOR_SERVICE, threadPool.getThreadContext());
return true;
}

private ClusterState getClusterStateWithUpdatedRepositoryGeneration(ClusterState currentState, RepositoryData repoData) {
// In theory we might have failed over to a different master which initialized the repo and then failed back to this node, so we
// must check the repository generation in the cluster state is still unknown here.
final RepositoryMetadata repoMetadata = getRepoMetadata(currentState);
if (repoMetadata.generation() != RepositoryData.UNKNOWN_REPO_GEN) {
throw new RepositoryException(repoMetadata.name(), "Found unexpected initialized repo metadata [" + repoMetadata + "]");
}
return ClusterState.builder(currentState)
.metadata(
Metadata.builder(currentState.getMetadata())
.putCustom(
RepositoriesMetadata.TYPE,
RepositoriesMetadata.get(currentState)
.withUpdatedGeneration(repoMetadata.name(), repoData.getGenId(), repoData.getGenId())
)
)
.build();
}

/**
Expand Down
Loading

0 comments on commit 0ad8aba

Please sign in to comment.