Skip to content

Commit

Permalink
Strengthen locking in FsBlobContainer register impl (elastic#107830)
Browse files Browse the repository at this point in the history
Expands the JVM-wide mutex to prevent all concurrent operations on
file-based registers, but then introduces an artificial mechanism for
emulating write/write contention within a single JVM.
  • Loading branch information
DaveCTurner authored Apr 24, 2024
1 parent 05d4bc3 commit fee9097
Show file tree
Hide file tree
Showing 5 changed files with 178 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ default void compareAndSetRegister(
* @param purpose The purpose of the operation
* @param key key of the value to get
* @param listener a listener, completed with the value read from the register or {@code OptionalBytesReference#MISSING} if the value
* could not be read due to concurrent activity.
* could not be read due to concurrent activity (which should not happen).
*/
default void getRegister(OperationPurpose purpose, String key, ActionListener<OptionalBytesReference> listener) {
compareAndExchangeRegister(purpose, key, BytesArray.EMPTY, BytesArray.EMPTY, listener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.util.concurrent.KeyedLock;
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.Strings;
Expand Down Expand Up @@ -60,6 +61,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;

import static java.util.Collections.unmodifiableMap;

Expand Down Expand Up @@ -392,74 +394,108 @@ private static OutputStream blobOutputStream(Path file) throws IOException {
}

@Override
@SuppressForbidden(reason = "write to channel that we have open for locking purposes already directly")
public void getRegister(OperationPurpose purpose, String key, ActionListener<OptionalBytesReference> listener) {
// no lock to acquire here, we are emulating the lack of read/read and read/write contention in cloud repositories
ActionListener.completeWith(
listener,
() -> doUncontendedCompareAndExchangeRegister(path.resolve(key), BytesArray.EMPTY, BytesArray.EMPTY)
);
}

@Override
public void compareAndExchangeRegister(
OperationPurpose purpose,
String key,
BytesReference expected,
BytesReference updated,
ActionListener<OptionalBytesReference> listener
) {
ActionListener.completeWith(listener, () -> {
BlobContainerUtils.ensureValidRegisterContent(updated);
try (LockedFileChannel lockedFileChannel = LockedFileChannel.open(path.resolve(key))) {
final FileChannel fileChannel = lockedFileChannel.fileChannel();
final ByteBuffer readBuf = ByteBuffer.allocate(BlobContainerUtils.MAX_REGISTER_CONTENT_LENGTH);
while (readBuf.remaining() > 0) {
if (fileChannel.read(readBuf) == -1) {
break;
}
}
final var found = new BytesArray(readBuf.array(), readBuf.arrayOffset(), readBuf.position());
readBuf.clear();
if (fileChannel.read(readBuf) != -1) {
throw new IllegalStateException(
"register contains more than [" + BlobContainerUtils.MAX_REGISTER_CONTENT_LENGTH + "] bytes"
);
ActionListener.completeWith(listener, () -> doCompareAndExchangeRegister(path.resolve(key), expected, updated));
}

private static final KeyedLock<Path> writeMutexes = new KeyedLock<>();

private static OptionalBytesReference doCompareAndExchangeRegister(Path registerPath, BytesReference expected, BytesReference updated)
throws IOException {
// Emulate write/write contention as might happen in cloud repositories, at least for the case where the writers are all in this
// JVM (e.g. for an ESIntegTestCase).
try (var mutex = writeMutexes.tryAcquire(registerPath)) {
return mutex == null
? OptionalBytesReference.MISSING
: doUncontendedCompareAndExchangeRegister(registerPath, expected, updated);
}
}

@SuppressForbidden(reason = "write to channel that we have open for locking purposes already directly")
private static OptionalBytesReference doUncontendedCompareAndExchangeRegister(
Path registerPath,
BytesReference expected,
BytesReference updated
) throws IOException {
BlobContainerUtils.ensureValidRegisterContent(updated);
try (LockedFileChannel lockedFileChannel = LockedFileChannel.open(registerPath)) {
final FileChannel fileChannel = lockedFileChannel.fileChannel();
final ByteBuffer readBuf = ByteBuffer.allocate(BlobContainerUtils.MAX_REGISTER_CONTENT_LENGTH);
while (readBuf.remaining() > 0) {
if (fileChannel.read(readBuf) == -1) {
break;
}
}
final var found = new BytesArray(readBuf.array(), readBuf.arrayOffset(), readBuf.position());
readBuf.clear();
if (fileChannel.read(readBuf) != -1) {
throw new IllegalStateException(
"register contains more than [" + BlobContainerUtils.MAX_REGISTER_CONTENT_LENGTH + "] bytes"
);
}

if (expected.equals(found)) {
var pageStart = 0L;
final var iterator = updated.iterator();
BytesRef page;
while ((page = iterator.next()) != null) {
final var writeBuf = ByteBuffer.wrap(page.bytes, page.offset, page.length);
while (writeBuf.remaining() > 0) {
fileChannel.write(writeBuf, pageStart + writeBuf.position());
}
pageStart += page.length;
if (expected.equals(found)) {
var pageStart = 0L;
final var iterator = updated.iterator();
BytesRef page;
while ((page = iterator.next()) != null) {
final var writeBuf = ByteBuffer.wrap(page.bytes, page.offset, page.length);
while (writeBuf.remaining() > 0) {
fileChannel.write(writeBuf, pageStart + writeBuf.position());
}
fileChannel.force(true);
pageStart += page.length;
}
return OptionalBytesReference.of(found);
} catch (OverlappingFileLockException e) {
return OptionalBytesReference.MISSING;
fileChannel.force(true);
}
});
return OptionalBytesReference.of(found);
} catch (OverlappingFileLockException e) {
assert false : e; // should be impossible, we protect against all concurrent operations within this JVM
return OptionalBytesReference.MISSING;
}
}

private record LockedFileChannel(FileChannel fileChannel, Closeable fileLock) implements Closeable {

// Avoid concurrently opening/closing locked files, because this can trip an assertion within the JDK (see #93955 for details).
// Perhaps it would work with finer-grained locks too, but we don't currently need to be fancy here.
private static final Object mutex = new Object();
//
// Also, avoid concurrent operations on FsBlobContainer registers within a single JVM with a simple blocking lock, to avoid
// OverlappingFileLockException. FileChannel#lock blocks on concurrent operations on the file in a different process. This emulates
// the lack of read/read and read/write contention that can happen on a cloud repository register.
private static final ReentrantLock mutex = new ReentrantLock();

static LockedFileChannel open(Path path) throws IOException {
synchronized (mutex) {
List<Closeable> resources = new ArrayList<>(2);
try {
final FileChannel fileChannel = openOrCreateAtomic(path);
resources.add(fileChannel);
List<Closeable> resources = new ArrayList<>(3);
try {
mutex.lock();
resources.add(mutex::unlock);

final Closeable fileLock = fileChannel.lock()::close;
resources.add(fileLock);
final FileChannel fileChannel = openOrCreateAtomic(path);
resources.add(fileChannel);

final var result = new LockedFileChannel(fileChannel, fileLock);
resources.clear();
return result;
} finally {
IOUtils.closeWhileHandlingException(resources);
}
final Closeable fileLock = fileChannel.lock()::close;
resources.add(fileLock);

final var result = new LockedFileChannel(fileChannel, fileLock);
resources.clear();
return result;
} finally {
IOUtils.closeWhileHandlingException(resources);
}
}

Expand All @@ -476,9 +512,7 @@ private static FileChannel openOrCreateAtomic(Path path) throws IOException {

@Override
public void close() throws IOException {
synchronized (mutex) {
IOUtils.close(fileLock, fileChannel);
}
IOUtils.close(fileLock, fileChannel, mutex::unlock);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@
import java.util.List;
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
Expand All @@ -52,6 +54,7 @@
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.oneOf;
import static org.hamcrest.Matchers.startsWith;

@LuceneTestCase.SuppressFileSystems("*") // we do our own mocking
Expand Down Expand Up @@ -238,6 +241,79 @@ public void testCompareAndExchange() throws Exception {
);
}

public void testRegisterContention() throws Exception {
final Path path = PathUtils.get(createTempDir().toString());
final FsBlobContainer container = new FsBlobContainer(
new FsBlobStore(randomIntBetween(1, 8) * 1024, path, false),
BlobPath.EMPTY,
path
);

final String contendedKey = randomAlphaOfLength(10);
final String uncontendedKey = randomAlphaOfLength(10);

final var startValue = new BytesArray(randomByteArrayOfLength(8));
final var finalValue = randomValueOtherThan(startValue, () -> new BytesArray(randomByteArrayOfLength(8)));

final var p = randomPurpose();
assertTrue(PlainActionFuture.get(l -> container.compareAndSetRegister(p, contendedKey, BytesArray.EMPTY, startValue, l)));
assertTrue(PlainActionFuture.get(l -> container.compareAndSetRegister(p, uncontendedKey, BytesArray.EMPTY, startValue, l)));

final var threads = new Thread[between(2, 5)];
final var startBarrier = new CyclicBarrier(threads.length + 1);
final var casSucceeded = new AtomicBoolean();
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(
i == 0
// first thread does an uncontended write, which must succeed
? () -> {
safeAwait(startBarrier);
final OptionalBytesReference result = PlainActionFuture.get(
l -> container.compareAndExchangeRegister(p, uncontendedKey, startValue, finalValue, l)
);
// NB calling .bytesReference() asserts that the result is present, there was no contention
assertEquals(startValue, result.bytesReference());
}
// other threads try and do contended writes, which may fail and need retrying
: () -> {
safeAwait(startBarrier);
while (casSucceeded.get() == false) {
final OptionalBytesReference result = PlainActionFuture.get(
l -> container.compareAndExchangeRegister(p, contendedKey, startValue, finalValue, l)
);
if (result.isPresent() && result.bytesReference().equals(startValue)) {
casSucceeded.set(true);
}
}
},
"write-thread-" + i
);
threads[i].start();
}

safeAwait(startBarrier);
while (casSucceeded.get() == false) {
for (var key : new String[] { contendedKey, uncontendedKey }) {
// NB calling .bytesReference() asserts that the read did not experience contention
assertThat(
PlainActionFuture.<OptionalBytesReference, RuntimeException>get(l -> container.getRegister(p, key, l)).bytesReference(),
oneOf(startValue, finalValue)
);
}
}

for (Thread thread : threads) {
thread.join();
}

for (var key : new String[] { contendedKey, uncontendedKey }) {
assertEquals(
finalValue,
PlainActionFuture.<OptionalBytesReference, RuntimeException>get(l -> container.getRegister(p, key, l)).bytesReference()
);
}
}

public void testAtomicWriteMetadataWithoutAtomicOverwrite() throws IOException {
this.fileSystem = new FilterFileSystemProvider("nooverwritefs://", fileSystem) {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -712,6 +712,17 @@ public Map<String, BlobMetadata> listBlobsByPrefix(OperationPurpose purpose, Str
return blobMetadataByName;
}

@Override
public void getRegister(OperationPurpose purpose, String key, ActionListener<OptionalBytesReference> listener) {
assertPurpose(purpose);
final var register = registers.get(key);
if (register == null) {
listener.onResponse(OptionalBytesReference.EMPTY);
} else {
listener.onResponse(OptionalBytesReference.of(register.get()));
}
}

@Override
public void compareAndExchangeRegister(
OperationPurpose purpose,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,13 @@ public void onFailure(Exception e) {
};

if (request.getInitialRead() > request.getRequestCount()) {
blobContainer.getRegister(OperationPurpose.REPOSITORY_ANALYSIS, registerName, initialValueListener);
blobContainer.getRegister(OperationPurpose.REPOSITORY_ANALYSIS, registerName, initialValueListener.delegateFailure((l, r) -> {
if (r.isPresent()) {
l.onResponse(r);
} else {
l.onFailure(new IllegalStateException("register read failed due to contention"));
}
}));
} else {
blobContainer.compareAndExchangeRegister(
OperationPurpose.REPOSITORY_ANALYSIS,
Expand Down

0 comments on commit fee9097

Please sign in to comment.