Skip to content

Commit

Permalink
fix liveinput drift
Browse files Browse the repository at this point in the history
  • Loading branch information
nginthfs committed Sep 17, 2024
1 parent cc32c88 commit f4a966b
Showing 1 changed file with 13 additions and 53 deletions.
66 changes: 13 additions & 53 deletions solr/core/src/java/org/apache/solr/storage/SizeAwareDirectory.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,14 @@ public class SizeAwareDirectory extends FilterDirectory
private boolean initialized = false;
private volatile long reconciledTimeNanos;
private volatile LongAdder size = new LongAdder();
// TODO(nickginther): do not use this value, there is currently a bug where it drifts
private volatile LongAdder onDiskSize = new LongAdder();
private volatile SizeWriter sizeWriter =
(size, onDiskSize, name) -> {
this.size.add(size);
this.onDiskSize.add(onDiskSize);
};

private final FileSizeMap fileSizeMap = new FileSizeMap();
private final ConcurrentHashMap<String, Sizes> fileSizeMap = new ConcurrentHashMap<>();

private final ConcurrentHashMap<String, SizeAccountingIndexOutput> liveOutputs =
new ConcurrentHashMap<>();
Expand All @@ -69,46 +68,6 @@ private interface SizeWriter {
void apply(long size, long onDiskSize, String name);
}

private static class FileSizeMap implements Accountable {
private final long RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(FileSizeMap.class);
private final ConcurrentHashMap<String, Sizes> fileSizeMap = new ConcurrentHashMap<>();
private volatile LongAdder onDiskSize = new LongAdder();

public Sizes add(String name, Sizes sizes) {
Sizes extant = fileSizeMap.put(name, sizes);
if (extant != null) {
onDiskSize.add(-extant.onDiskSize);
}
onDiskSize.add(sizes.onDiskSize);
return extant;
}

public Sizes remove(String name) {
Sizes sizes = fileSizeMap.remove(name);
if (sizes != null) {
onDiskSize.add(-sizes.onDiskSize);
}
return sizes;
}

public long onDiskSize() {
return onDiskSize.sum();
}

public Sizes get(String key) {
return fileSizeMap.get(key);
}

public int size() {
return fileSizeMap.size();
}

@Override
public long ramBytesUsed() {
return RAM_BYTES_USED + RamUsageEstimator.sizeOfMap(fileSizeMap);
}
}

private static class Sizes implements Accountable {
private static final long RAM_BYTES_USED =
RamUsageEstimator.shallowSizeOfInstance(SizeAccountingIndexOutput.class);
Expand Down Expand Up @@ -140,7 +99,7 @@ public SizeAwareDirectory(Directory in, long reconcileTTLNanos) {
@Override
public long ramBytesUsed() {
return BASE_RAM_BYTES_USED
+ fileSizeMap.ramBytesUsed()
+ RamUsageEstimator.sizeOfMap(fileSizeMap)
+ RamUsageEstimator.sizeOfMap(liveOutputs);
}

Expand Down Expand Up @@ -168,13 +127,14 @@ public long onDiskFileLength(String name) throws IOException {
if (live.backing instanceof CompressingDirectory.SizeReportingIndexOutput) {
return ((CompressingDirectory.SizeReportingIndexOutput) live.backing).getBytesWritten();
}
return live.backing.getFilePointer();
// backing IndexOutput does not allow us to get onDiskSize
return 0;
} else if (in instanceof DirectoryFactory.OnDiskSizeDirectory) {
// fallback delegate to wrapped Directory
return ((DirectoryFactory.OnDiskSizeDirectory) in).onDiskFileLength(name);
} else {
// fallback delegate to wrapped Directory
return fileLength(name);
// directory does not implement onDiskSize
return 0;
}
}

Expand All @@ -195,7 +155,7 @@ public long onDiskSize() throws IOException {
if (initialized
&& (reconcileThreshold == null
|| System.nanoTime() - reconciledTimeNanos < reconcileTTLNanos)) {
return fileSizeMap.onDiskSize();
return onDiskSize.sum();
}
return initSize().onDiskSize;
}
Expand Down Expand Up @@ -252,7 +212,7 @@ private Sizes initSize() throws IOException {
if (fileSize > 0) {
// whether the file exists or not, we don't care about it if it has zero size.
// more often though, 0 size means the file isn't there.
fileSizeMap.add(file, sizes);
fileSizeMap.put(file, sizes);
if (DirectoryFactory.sizeOf(in, file) == 0) {
// during reconciliation, we have to check for file presence _after_ adding
// to the map, to prevent a race condition that could leak entries into `fileSizeMap`
Expand Down Expand Up @@ -383,7 +343,7 @@ private static final class SizeAccountingIndexOutput extends IndexOutput impleme

private final IndexOutput backing;

private final FileSizeMap fileSizeMap;
private final ConcurrentHashMap<String, Sizes> fileSizeMap;

private final ConcurrentHashMap<String, SizeAccountingIndexOutput> liveOutputs;

Expand All @@ -394,7 +354,7 @@ private static final class SizeAccountingIndexOutput extends IndexOutput impleme
private SizeAccountingIndexOutput(
String name,
IndexOutput backing,
FileSizeMap fileSizeMap,
ConcurrentHashMap<String, Sizes> fileSizeMap,
ConcurrentHashMap<String, SizeAccountingIndexOutput> liveOutputs,
SizeWriter sizeWriter) {
super("byteSize(" + name + ")", name);
Expand Down Expand Up @@ -427,15 +387,15 @@ public void close() throws IOException {
try (backing) {
// TODO
} finally {
long onDiskSize = size;
long onDiskSize = 0;
if (backing instanceof CompressingDirectory.SizeReportingIndexOutput) {
long finalBytesWritten = getBytesWritten(backing);
onDiskSize = finalBytesWritten;
// logical size should already be set through writeByte(s), but we need to finalize the
// on-disk size here
sizeWriter.apply(0, finalBytesWritten - lastBytesWritten, name);
}
fileSizeMap.add(name, new Sizes(backing.getFilePointer(), onDiskSize));
fileSizeMap.put(name, new Sizes(backing.getFilePointer(), onDiskSize));
liveOutputs.remove(name);
}
}
Expand Down Expand Up @@ -483,7 +443,7 @@ public long ramBytesUsed() {
@Override
public void rename(String source, String dest) throws IOException {
in.rename(source, dest);
Sizes extant = fileSizeMap.add(dest, fileSizeMap.remove(source));
Sizes extant = fileSizeMap.put(dest, fileSizeMap.remove(source));
assert extant == null; // it's illegal for dest to already exist
}
}

0 comments on commit f4a966b

Please sign in to comment.