From e4ff6fca342fd2b2dc0a47b028a8c16df376350a Mon Sep 17 00:00:00 2001 From: Nick Ginther Date: Thu, 12 Sep 2024 16:32:25 -0500 Subject: [PATCH] add concurrent add/delete test --- .../solr/storage/SizeAwareDirectoryTest.java | 197 +++++++++++++++++- 1 file changed, 189 insertions(+), 8 deletions(-) diff --git a/solr/core/src/test/org/apache/solr/storage/SizeAwareDirectoryTest.java b/solr/core/src/test/org/apache/solr/storage/SizeAwareDirectoryTest.java index 8d4e95957da..666ce7ccb0a 100644 --- a/solr/core/src/test/org/apache/solr/storage/SizeAwareDirectoryTest.java +++ b/solr/core/src/test/org/apache/solr/storage/SizeAwareDirectoryTest.java @@ -1,21 +1,42 @@ package org.apache.solr.storage; +import org.apache.logging.log4j.util.Strings; import org.apache.lucene.store.Directory; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexOutput; import org.apache.solr.SolrTestCaseJ4; import org.apache.solr.common.util.NamedList; import org.apache.solr.core.DirectoryFactory; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import java.io.FileOutputStream; import java.nio.file.Files; import java.nio.file.Paths; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.Random; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; public class SizeAwareDirectoryTest extends SolrTestCaseJ4 { + private ConcurrentHashMap activeFiles = new ConcurrentHashMap<>(); + private ConcurrentHashMap deletedFiles = new ConcurrentHashMap<>(); + private String path; + + @Before + public void setUp() throws Exception { + super.setUp(); + activeFiles.clear(); + deletedFiles.clear(); + path = createTempDir().toString() + "/somedir"; + } + + @Test public void testInitSize() throws Exception { - final String path = createTempDir().toString() + "/somedir"; CompressingDirectoryFactory dirFac = new CompressingDirectoryFactory();; try (dirFac) { dirFac.initCoreContainer(null); @@ -45,9 +66,9 @@ public void testInitSize() throws Exception { } } + @Test public void testSizeTracking() throws Exception { // after onDiskSize has been init(), the size should be correct using the LongAdder sum in SizeAwareDirectory - final String path = createTempDir().toString() + "/somedir"; CompressingDirectoryFactory dirFac = new CompressingDirectoryFactory();; try (dirFac) { dirFac.initCoreContainer(null); @@ -75,8 +96,8 @@ public void testSizeTracking() throws Exception { } } + @Test public void testWriteBigFile() throws Exception { - final String path = createTempDir().toString() + "/somedir"; CompressingDirectoryFactory dirFac = new CompressingDirectoryFactory();; try (dirFac) { dirFac.initCoreContainer(null); @@ -103,9 +124,138 @@ public void testWriteBigFile() throws Exception { } } + @Test + public void testSimultaneous() throws Exception { + CompressingDirectoryFactory dirFac = new CompressingDirectoryFactory(); + try (dirFac) { + dirFac.initCoreContainer(null); + dirFac.init(new NamedList<>()); + + Directory dir = + dirFac.get(path, DirectoryFactory.DirContext.DEFAULT, DirectoryFactory.LOCK_TYPE_SINGLE); + try { + ConcurrentHashMap activeFiles = new ConcurrentHashMap<>(); + // small file first to initSize() + try (IndexOutput file = dir.createOutput("test_file", IOContext.DEFAULT)) { + file.writeInt(42); + } // implicitly close file + addToActiveFiles("test_file"); + + long expectedDiskSize = Files.size(Paths.get(path+"/test_file")); + assertEquals("directory size should be equal to on disk size of test files", expectedDiskSize, dirFac.onDiskSize(dir)); + + String createPrefix = "test_first_"; + Random r = new Random(42); + int numCreateThreads = 100; + List createThreads = getCreateThreads(dir, numCreateThreads, r, createPrefix); + startThreads(createThreads); + waitForThreads(createThreads); + + List createAndDeleteThreads = getCreateThreads(dir, numCreateThreads, r, "test_second_"); + // randomly delete 10 percent of the files created above, while also creating some files + createAndDeleteThreads.addAll(getRandomDeleteThreads(dir, numCreateThreads / 10, new Random(84), createPrefix, numCreateThreads)); + startThreads(createAndDeleteThreads); + waitForThreads(createAndDeleteThreads); + + + System.out.println("deleted some files " + Strings.join(new ArrayList<>(deletedFiles.keySet()), ',')); + + expectedDiskSize = expectedDiskSizeForFiles(); + assertEquals("directory size should be equal to on disk size of test files", expectedDiskSize, dirFac.onDiskSize(dir)); + } finally { + dirFac.release(dir); + } + } + } + + private void waitForThreads(List threads) { + for (int i = 0; i < threads.size(); i++) { + try { + threads.get(i).join(); + } catch (InterruptedException e) { + System.out.println("Thread was interrupted"); + fail("thread was interrupted "+i); + } + } + } + + private List getCreateThreads(Directory dir, int numThreads, Random r, String prefix) { + List threads = new ArrayList<>(); + for (int i = 0; i < numThreads; i++) { + String name = prefix+i; + int size = r.nextInt(100000000) + CompressingDirectory.COMPRESSION_BLOCK_SIZE + 1; + threads.add(new Thread(new CreateFileTask(dir, name, size))); + } + return threads; + } + + private List getRandomDeleteThreads(Directory dir, int numThreads, Random r, String prefix, int numActiveFiles) { + List threads = new ArrayList<>(); + for (int i = 0; i < numThreads; i++) { + int index = r.nextInt(numActiveFiles); + threads.add(new Thread(new DeleteFileTask(dir, prefix, index))); + } + return threads; + } + + private void startThreads(List threads) { + for (Thread t : threads) { + t.start(); + } + } + + private long expectedDiskSizeForFiles() throws Exception { + long fileSize = 0; + for (String name : activeFiles.keySet()) { + fileSize += Files.size(Paths.get(path+"/"+name)); + } + return fileSize; + } + + private class CreateFileTask implements Runnable { + final String name; + final int size; + final Directory dir; + + public CreateFileTask(Directory dir, String name, int size) { + this.name = name; + this.size = size; + this.dir = dir; + } + + public void run() { + try { + writeRandomFileOfSize(dir, name, size); + } catch (Exception e) { + fail("exception writing file"+name); + } + } + } + + private class DeleteFileTask implements Runnable { + final String prefix; + final Directory dir; + final int index; + + public DeleteFileTask(Directory dir, String prefix, int index) { + this.dir = dir; + this.prefix = prefix; + this.index = index; + } + + public void run() { + String name = pickFromActiveSet(index, prefix); + try { + deleteFile(dir, name); + } catch (Exception e) { + fail("exception deleting file"+name); + } + } + } + + @Test public void testDelete() throws Exception { // write a file, then another, then delete one of the files - the onDiskSize should update correctly - final String path = createTempDir().toString() + "/somedir"; CompressingDirectoryFactory dirFac = new CompressingDirectoryFactory();; try (dirFac) { dirFac.initCoreContainer(null); @@ -127,7 +277,7 @@ public void testDelete() throws Exception { expectedDiskSize = Files.size(Paths.get(path+"/test_file")) + Files.size(Paths.get(path+"/test_file2")); assertEquals("directory size should be equal to on disk size of test files", expectedDiskSize, dirFac.onDiskSize(dir)); - dir.deleteFile("test_file2"); + deleteFile(dir, "test_file2"); expectedDiskSize = Files.size(Paths.get(path+"/test_file")); assertEquals("directory size should be equal to on disk size of test files", expectedDiskSize, dirFac.onDiskSize(dir)); } finally { @@ -136,6 +286,12 @@ public void testDelete() throws Exception { } } + private void deleteFile(Directory dir, String name) throws Exception { + dir.deleteFile(name); + activeFiles.remove(name); + addToDeletedFiles(name); + } + private void writeBlockSizeFile(Directory dir, String name) throws Exception { try (IndexOutput file = dir.createOutput(name, IOContext.DEFAULT)) { // write some small things first to force past blocksize boundary @@ -148,15 +304,40 @@ private void writeBlockSizeFile(Directory dir, String name) throws Exception { random.nextBytes(byteArray); file.writeBytes(byteArray, blocksize); } // implicitly close file + addToActiveFiles(name); } private void writeRandomFileOfSize(Directory dir, String name, int size) throws Exception { try (IndexOutput file = dir.createOutput(name, IOContext.DEFAULT)) { // write a giant blocksize thing to force compression with dump() Random random = new Random(42); - byte[] byteArray = new byte[size]; - random.nextBytes(byteArray); - file.writeBytes(byteArray, size); + int bufferSize = 4096; // Chunk size + byte[] buffer = new byte[bufferSize]; + int remainingBytes = size; + + while (remainingBytes > 0) { + int bytesToWrite = Math.min(remainingBytes, bufferSize); + random.nextBytes(buffer); + + file.writeBytes(buffer, bytesToWrite); + + remainingBytes -= bytesToWrite; + } } // implicitly close file + addToActiveFiles(name); + } + + private void addToActiveFiles(String name) { + activeFiles.put(name, true); + } + + private String pickFromActiveSet(int index, String prefix) { + List l = activeFiles.keySet().stream().filter(s -> s.startsWith(prefix)).collect(Collectors.toList()); + assertTrue(l.size() > index); + return l.get(index); + } + + private void addToDeletedFiles(String name) { + deletedFiles.put(name, true); } }