Skip to content

Commit

Permalink
add concurrent add/delete test
Browse files Browse the repository at this point in the history
  • Loading branch information
nginthfs committed Sep 12, 2024
1 parent cd8f892 commit e4ff6fc
Showing 1 changed file with 189 additions and 8 deletions.
197 changes: 189 additions & 8 deletions solr/core/src/test/org/apache/solr/storage/SizeAwareDirectoryTest.java
Original file line number Diff line number Diff line change
@@ -1,21 +1,42 @@
package org.apache.solr.storage;

import org.apache.logging.log4j.util.Strings;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexOutput;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.DirectoryFactory;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

import java.io.FileOutputStream;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

public class SizeAwareDirectoryTest extends SolrTestCaseJ4 {

private ConcurrentHashMap<String, Boolean> activeFiles = new ConcurrentHashMap<>();
private ConcurrentHashMap<String, Boolean> deletedFiles = new ConcurrentHashMap<>();
private String path;

@Before
public void setUp() throws Exception {
super.setUp();
activeFiles.clear();
deletedFiles.clear();
path = createTempDir().toString() + "/somedir";
}

@Test
public void testInitSize() throws Exception {
final String path = createTempDir().toString() + "/somedir";
CompressingDirectoryFactory dirFac = new CompressingDirectoryFactory();;
try (dirFac) {
dirFac.initCoreContainer(null);
Expand Down Expand Up @@ -45,9 +66,9 @@ public void testInitSize() throws Exception {
}
}

@Test
public void testSizeTracking() throws Exception {
// after onDiskSize has been init(), the size should be correct using the LongAdder sum in SizeAwareDirectory
final String path = createTempDir().toString() + "/somedir";
CompressingDirectoryFactory dirFac = new CompressingDirectoryFactory();;
try (dirFac) {
dirFac.initCoreContainer(null);
Expand Down Expand Up @@ -75,8 +96,8 @@ public void testSizeTracking() throws Exception {
}
}

@Test
public void testWriteBigFile() throws Exception {
final String path = createTempDir().toString() + "/somedir";
CompressingDirectoryFactory dirFac = new CompressingDirectoryFactory();;
try (dirFac) {
dirFac.initCoreContainer(null);
Expand All @@ -103,9 +124,138 @@ public void testWriteBigFile() throws Exception {
}
}

@Test
public void testSimultaneous() throws Exception {
CompressingDirectoryFactory dirFac = new CompressingDirectoryFactory();
try (dirFac) {
dirFac.initCoreContainer(null);
dirFac.init(new NamedList<>());

Directory dir =
dirFac.get(path, DirectoryFactory.DirContext.DEFAULT, DirectoryFactory.LOCK_TYPE_SINGLE);
try {
ConcurrentHashMap<String, Boolean> activeFiles = new ConcurrentHashMap<>();
// small file first to initSize()
try (IndexOutput file = dir.createOutput("test_file", IOContext.DEFAULT)) {
file.writeInt(42);
} // implicitly close file
addToActiveFiles("test_file");

long expectedDiskSize = Files.size(Paths.get(path+"/test_file"));
assertEquals("directory size should be equal to on disk size of test files", expectedDiskSize, dirFac.onDiskSize(dir));

String createPrefix = "test_first_";
Random r = new Random(42);
int numCreateThreads = 100;
List<Thread> createThreads = getCreateThreads(dir, numCreateThreads, r, createPrefix);
startThreads(createThreads);
waitForThreads(createThreads);

List<Thread> createAndDeleteThreads = getCreateThreads(dir, numCreateThreads, r, "test_second_");
// randomly delete 10 percent of the files created above, while also creating some files
createAndDeleteThreads.addAll(getRandomDeleteThreads(dir, numCreateThreads / 10, new Random(84), createPrefix, numCreateThreads));
startThreads(createAndDeleteThreads);
waitForThreads(createAndDeleteThreads);


System.out.println("deleted some files " + Strings.join(new ArrayList<>(deletedFiles.keySet()), ','));

expectedDiskSize = expectedDiskSizeForFiles();
assertEquals("directory size should be equal to on disk size of test files", expectedDiskSize, dirFac.onDiskSize(dir));
} finally {
dirFac.release(dir);
}
}
}

private void waitForThreads(List<Thread> threads) {
for (int i = 0; i < threads.size(); i++) {
try {
threads.get(i).join();
} catch (InterruptedException e) {
System.out.println("Thread was interrupted");
fail("thread was interrupted "+i);
}
}
}

private List<Thread> getCreateThreads(Directory dir, int numThreads, Random r, String prefix) {
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < numThreads; i++) {
String name = prefix+i;
int size = r.nextInt(100000000) + CompressingDirectory.COMPRESSION_BLOCK_SIZE + 1;
threads.add(new Thread(new CreateFileTask(dir, name, size)));
}
return threads;
}

private List<Thread> getRandomDeleteThreads(Directory dir, int numThreads, Random r, String prefix, int numActiveFiles) {
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < numThreads; i++) {
int index = r.nextInt(numActiveFiles);
threads.add(new Thread(new DeleteFileTask(dir, prefix, index)));
}
return threads;
}

private void startThreads(List<Thread> threads) {
for (Thread t : threads) {
t.start();
}
}

private long expectedDiskSizeForFiles() throws Exception {
long fileSize = 0;
for (String name : activeFiles.keySet()) {
fileSize += Files.size(Paths.get(path+"/"+name));
}
return fileSize;
}

private class CreateFileTask implements Runnable {
final String name;
final int size;
final Directory dir;

public CreateFileTask(Directory dir, String name, int size) {
this.name = name;
this.size = size;
this.dir = dir;
}

public void run() {
try {
writeRandomFileOfSize(dir, name, size);
} catch (Exception e) {
fail("exception writing file"+name);
}
}
}

private class DeleteFileTask implements Runnable {
final String prefix;
final Directory dir;
final int index;

public DeleteFileTask(Directory dir, String prefix, int index) {
this.dir = dir;
this.prefix = prefix;
this.index = index;
}

public void run() {
String name = pickFromActiveSet(index, prefix);
try {
deleteFile(dir, name);
} catch (Exception e) {
fail("exception deleting file"+name);
}
}
}

@Test
public void testDelete() throws Exception {
// write a file, then another, then delete one of the files - the onDiskSize should update correctly
final String path = createTempDir().toString() + "/somedir";
CompressingDirectoryFactory dirFac = new CompressingDirectoryFactory();;
try (dirFac) {
dirFac.initCoreContainer(null);
Expand All @@ -127,7 +277,7 @@ public void testDelete() throws Exception {
expectedDiskSize = Files.size(Paths.get(path+"/test_file")) + Files.size(Paths.get(path+"/test_file2"));
assertEquals("directory size should be equal to on disk size of test files", expectedDiskSize, dirFac.onDiskSize(dir));

dir.deleteFile("test_file2");
deleteFile(dir, "test_file2");
expectedDiskSize = Files.size(Paths.get(path+"/test_file"));
assertEquals("directory size should be equal to on disk size of test files", expectedDiskSize, dirFac.onDiskSize(dir));
} finally {
Expand All @@ -136,6 +286,12 @@ public void testDelete() throws Exception {
}
}

private void deleteFile(Directory dir, String name) throws Exception {
dir.deleteFile(name);
activeFiles.remove(name);
addToDeletedFiles(name);
}

private void writeBlockSizeFile(Directory dir, String name) throws Exception {
try (IndexOutput file = dir.createOutput(name, IOContext.DEFAULT)) {
// write some small things first to force past blocksize boundary
Expand All @@ -148,15 +304,40 @@ private void writeBlockSizeFile(Directory dir, String name) throws Exception {
random.nextBytes(byteArray);
file.writeBytes(byteArray, blocksize);
} // implicitly close file
addToActiveFiles(name);
}

private void writeRandomFileOfSize(Directory dir, String name, int size) throws Exception {
try (IndexOutput file = dir.createOutput(name, IOContext.DEFAULT)) {
// write a giant blocksize thing to force compression with dump()
Random random = new Random(42);
byte[] byteArray = new byte[size];
random.nextBytes(byteArray);
file.writeBytes(byteArray, size);
int bufferSize = 4096; // Chunk size
byte[] buffer = new byte[bufferSize];
int remainingBytes = size;

while (remainingBytes > 0) {
int bytesToWrite = Math.min(remainingBytes, bufferSize);
random.nextBytes(buffer);

file.writeBytes(buffer, bytesToWrite);

remainingBytes -= bytesToWrite;
}
} // implicitly close file
addToActiveFiles(name);
}

private void addToActiveFiles(String name) {
activeFiles.put(name, true);
}

private String pickFromActiveSet(int index, String prefix) {
List<String> l = activeFiles.keySet().stream().filter(s -> s.startsWith(prefix)).collect(Collectors.toList());
assertTrue(l.size() > index);
return l.get(index);
}

private void addToDeletedFiles(String name) {
deletedFiles.put(name, true);
}
}

0 comments on commit e4ff6fc

Please sign in to comment.