Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CRT library uses massive CPU on large instances #836

Open
LarryFinn opened this issue Oct 11, 2024 · 24 comments
Open

CRT library uses massive CPU on large instances #836

LarryFinn opened this issue Oct 11, 2024 · 24 comments
Labels
bug This issue is a bug. p2 This is a standard priority issue

Comments

@LarryFinn
Copy link

Describe the bug

We recently upgraded our crt library from 0.24.0 to 0.31.1 and noticed massive cpu spikes. It seems that the library tries to use all the cpu the machine has, which is beyond what the kube container is allocated. I've tried toggling maxConcurrency and targetThroughputInGbps but it has no affects. I've decreased the "apparent" cpu count in java by using -XX:ActiveProcessorCount=10. The very odd part is in datadog the cpu spikes seem to be from lock functions. Attached are two files during a load test. You can see the change in cpu time for the underlying library

Screenshot 2024-10-11 at 9 35 05 AM
Screenshot 2024-10-11 at 9 33 34 AM

Expected Behavior

Should not utilize all cpu available

Current Behavior

Utilizes all cpu available

Reproduction Steps

We see this in load tests, i am not sure how to reproduce it reliably

Possible Solution

No response

Additional Information/Context

No response

aws-crt-java version used

0.31.1

Java version used

17

Operating System and version

Ubuntu 20.04.6 x86_64

@LarryFinn LarryFinn added bug This issue is a bug. needs-triage This issue or PR still needs to be triaged. labels Oct 11, 2024
@jmklix
Copy link
Member

jmklix commented Oct 14, 2024

Can you provide a minimal reproduction code sample that produces the same CPU spikes that you're seeing? Does this only happen when you're running on a large instance?

@jmklix jmklix added response-requested Waiting on additional info and feedback. Will move to 'closing-soon' in 7 days. p2 This is a standard priority issue and removed needs-triage This issue or PR still needs to be triaged. labels Oct 14, 2024
@LarryFinn
Copy link
Author

LarryFinn commented Oct 20, 2024

Sorry it took so long, work got busy and it was hard to make this minimal. I think there are two things at play here:

  1. we use multiple s3 clients, which isn't a best practice but i wouldnt expect this outcome
  2. we upload as we download
    The problem is definitely more related to point 1, but im surprised it happens. Here's some code, it might not compile exactly because i had some different inputs but i think you can figure it out
var s3CrtAsyncClientBuilder =  S3AsyncClient.crtBuilder()
                .maxConcurrency(Integer.MAX_VALUE);
   private void multiClientUpload() {
        var concurrency = 30;
        var futures = new ArrayList<CompletableFuture<Integer>>(concurrency);
        var clients = new ArrayList<S3AsyncClient>(concurrency * 2);
        var managers = new ArrayList<S3TransferManager>(concurrency * 2);
        long fileSize = fileSize(SOME_PATH);
        for (int i = 0; i < concurrency; i++) {
            var dlClient = s3CrtAsyncClientBuilder.build();
            clients.add(dlClient);
            var dlMgr = S3TransferManager.builder().s3Client(dlClient).build();
            managers.add(dlMgr);
            var ulClient = s3CrtAsyncClientBuilder.build();
            clients.add(ulClient);
            var ulMgr = S3TransferManager.builder().s3Client(ulClient).build();
            managers.add(ulMgr);
            futures.add(doRequestAndUpload(SOME_PATH, i, dlMgr, ulMgr, fileSize));
        }
        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
        managers.forEach(S3TransferManager::close);
        clients.forEach(S3AsyncClient::close);
    }

    private CompletableFuture<Integer> doRequestAndUpload(
            String path
            int i,
            S3TransferManager mgr,
            S3TransferManager uploadMgr,
            long fileSize) {
        return doRequest(path, i, mgr, stream -> {
            var body =
                    new InputStreamWithExecutorAsyncRequestBody(AsyncRequestBodyFromInputStreamConfiguration.builder()
                            .executor(testPool)
                            .inputStream(stream)
                            .maxReadLimit(8 * 1024 * 1024)
                            .contentLength(fileSize)
                            .build());
            var uploadRequest = UploadRequest.builder()
                    .requestBody(body)
                    .putObjectRequest(PutObjectRequest.builder()
                            .bucket(BUCKET_HERE)
                            .key("blarg/" + i + ".csv")
                            .build())
                    .build();
            var result = uploadMgr.upload(uploadRequest).completionFuture().join();
            return i;
        });
    }

    private long fileSize(String path) {
        try (var client = s3CrtAsyncClientBuilder.build()) {
            return client.headObject(b -> b.bucket(BUCKET_HERE).key(path))
                    .thenApply(HeadObjectResponse::contentLength)
                    .join();
        }
    }
    private CompletableFuture<Integer> doRequest(
            String path,
            int i,
            S3TransferManager mgr,
            Function<InputStream, Integer> consumer) {
        DownloadRequest<ResponseInputStream<GetObjectResponse>> downloadRequest;
        var downloadRequestBuilder = DownloadRequest.builder()
                .getObjectRequest(GetObjectRequest.builder()
                        .bucket(SOME_BUCKET)
                        .key(path)
                        .build());
            downloadRequest = downloadRequestBuilder
                    .responseTransformer(AsyncResponseTransformer.toBlockingInputStream())
                    .build();
        return mgr.download(downloadRequest)
                .completionFuture()
                .thenApplyAsync(
                        input -> {
                            return consumer.apply(input.result());
                        },
                        testPool);
    }

multiClientUpload is what triggers the whole thing. I made a cached threadpool as testPool. The file i used is about 500mb with ~6m lines. my guess is the multiple clients need to do some locking JNI wise and that locks across clients

@github-actions github-actions bot removed the response-requested Waiting on additional info and feedback. Will move to 'closing-soon' in 7 days. label Oct 20, 2024
@waahm7
Copy link
Contributor

waahm7 commented Oct 21, 2024

@LarryFinn Thank you for the code. Can you please explain a bit more about the exact problem that you are facing? Is it causing the application to crash, or is it slow, etc.?

noticed massive cpu spikes. It seems that the library tries to use all the cpu the machine has, which is beyond what the kube container is allocated. I've tried toggling maxConcurrency and targetThroughputInGbps but it has no affects.

Each client of CRT will create N threads, but they are green threads and will not have much overhead. Number of threads doesn't correlate with the concurrency. You can limit the overall concurrency by lowering maxConcurrency and targetThroughput. So, even though CRT will create N threads, it should not have much overhead as we will be limiting the concurrency per client.

we use multiple s3 clients, which isn't a best practice but i wouldnt expect this outcome

Any reason you need to do multiple S3 clients? It is not a best practice since each client will create it's own thread pool and other resources and will lead to duplicated work and resources. CRT client is designed to maximize performance with a single client, and having multiple clients will not improve performance.

my guess is the multiple clients need to do some locking JNI wise and that locks across clients

Yes, clients do need to acquire JNI locks. Is the problem that multiple clients are just waiting to acquire the lock?

@LarryFinn
Copy link
Author

LarryFinn commented Oct 21, 2024

It seems like the clients are waiting to acquire JNI locks. Thats all i can discern from the profiling information. When I don't use multiple clients, our code actually get stuck a bit. even if using multiple clients isnt a best practice, i wouldnt expect the cpu jump.
I ran the snippet above with shared clients vs not shared clients, and the cpu usage is like 10x with multiple clients, which is really bizarre

@waahm7
Copy link
Contributor

waahm7 commented Oct 21, 2024

When I don't use multiple clients, our code actually get stuck a bit.

Can you explain a bit more about the use case, workload and the error that you are getting? I think it might be more useful to focus on getting one client to work properly.

Even if using multiple clients isnt a best practice, i wouldnt expect the cpu jump.

That will depend on how many clients are getting created in parallel. Each client will have its own thread pool and will use the CPU. So N clients will have N times the resource usage.

@LarryFinn
Copy link
Author

Even if using multiple clients isnt a best practice, i wouldnt expect the cpu jump

Im not an expert on the C code, but it def looks like the datadog profiler is showing massive amount of time and cpu spent in locking, which doesnt make sense, even with more threads doing more things

Can you explain a bit more about the use case, workload and the error that you are getting? I think it might be more useful to focus on getting one client to work properly.

Yeah, we store a lot of results in s3 and what we will do is some set operations on multiple s3 files. so as we download s3 files we process them, and upload the results. it's hard to say what happens with a single shared client but sometimes it just gets stuck. java thread dumps dont really show anything too interesting, and im not sure if there is a better way to debug what's going on

@LarryFinn
Copy link
Author

LarryFinn commented Oct 22, 2024

@waahm7 i created a simple example to show issues we've been having. it's contrived but whatever
This method with sharing a single client gets stuck

   public void manual() throws IOException {
        var client = s3AsyncClientBuilder.build();
        var mgr = S3TransferManager.builder().s3Client(client).build();
        var file = Path.of("/tmp/some-long-csv.csv");
        var length = file.toFile().length();
        var streams = new ArrayList<PipedOutputStream>();
        var futures = new ArrayList<CompletableFuture<CompletedUpload>>();
        for (int i =0; i < 10; i++) {
            var putRequest = PutObjectRequest.builder().bucket(my-bucket).key("prefix/" + i).build();
            PipedOutputStream outputStream = new PipedOutputStream();
            streams.add(outputStream);
            InputStream inputStream = new PipedInputStream(outputStream);

            var uploadRequest = UploadRequest.builder()
                .putObjectRequest(putRequest)
                .requestBody(AsyncRequestBody.fromInputStream(inputStream, length, uploadExecutor))
                .build();

            futures.add(mgr.upload(uploadRequest).completionFuture());
        }
        try (var fis = new BufferedReader(new FileReader(file.toFile()))) {
            String line;
            while ((line = fis.readLine()) != null) {
                for (var stream : streams) {
                    stream.write((line + "\n").getBytes());
                }
            }
        }
        CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)).join();
    }

but this one doesnt

  public void manualNotShared() throws IOException {
        var file = Path.of("/tmp/some-long-csv.csv");
        var length = file.toFile().length();
        var streams = new ArrayList<PipedOutputStream>();
        var futures = new ArrayList<CompletableFuture<CompletedUpload>>();
        for (int i =0; i < 10; i++) {
            var client = s3AsyncClientBuilder.build();
            var mgr = S3TransferManager.builder().s3Client(client).build();
            var putRequest = PutObjectRequest.builder().bucket(my-bucket).key("prefix/" + i).build();
            PipedOutputStream outputStream = new PipedOutputStream();
            streams.add(outputStream);
            InputStream inputStream = new PipedInputStream(outputStream);

            var uploadRequest = UploadRequest.builder()
                .putObjectRequest(putRequest)
                .requestBody(AsyncRequestBody.fromInputStream(inputStream, length, uploadExecutor))
                .build();

            futures.add(mgr.upload(uploadRequest).completionFuture());
        }
        try (var fis = new BufferedReader(new FileReader(file.toFile()))) {
            String line;
            while ((line = fis.readLine()) != null) {
                for (var stream : streams) {
                    stream.write((line + "\n").getBytes());
                }
            }
        }
        CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)).join();
    }

My thought is the AWS threadpool is smaller than 10 (or whatever number of uploads) so this gets blocked. if i do a small number like 3, it is fine

@TingDaoK
Copy link
Contributor

TingDaoK commented Oct 23, 2024

Let's focus on the single client result in hanging first!

So, I tried your code sample with minor change to add the executors

    private static final Logger logger = LoggerFactory.getLogger(App.class);

    public static void main(String... args) throws Exception {
        S3AsyncClient client = S3AsyncClient.builder().build();
        S3TransferManager mgr = S3TransferManager.builder().s3Client(client).build();
        Path file = Path.of("/Users/dengket/project/graalvm/test-crt/sample-project-crt/file.txt");
        long length = file.toFile().length();
        ArrayList<PipedOutputStream> streams = new ArrayList<PipedOutputStream>();
        ArrayList<InputStream> input_streams = new ArrayList<InputStream>();
        var futures = new ArrayList<CompletableFuture<CompletedUpload>>();

        // Executor service for asynchronous operations
        ExecutorService uploadExecutor = Executors.newFixedThreadPool(20);  // Define thread pool for upload tasks

        for (int i =0; i < 20; i++) {
            var putRequest = PutObjectRequest.builder().bucket(bucket).key("prefix/" + i).build();
            PipedOutputStream outputStream = new PipedOutputStream();
            streams.add(outputStream);
            InputStream inputStream = new PipedInputStream(outputStream);
            input_streams.add(inputStream);

            var uploadRequest = UploadRequest.builder()
                    .putObjectRequest(putRequest)
                    .requestBody(AsyncRequestBody.fromInputStream(inputStream, length, uploadExecutor))
                    .build();

            futures.add(mgr.upload(uploadRequest).completionFuture());
        }

        // Run the file reading and writing task asynchronously
        uploadExecutor.submit(() -> {
            try (var fis = new BufferedReader(new FileReader(file.toFile()))) {
                String line;
                while ((line = fis.readLine()) != null) {
                    for (var stream : streams) {
                        stream.write((line + "\n").getBytes());
                    }
                }

                // Close all PipedOutputStreams after writing is done
                for (var stream : streams) {
                    stream.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        });

        CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)).join();
        mgr.close();
        client.close();
        uploadExecutor.shutdown();
    }

And I cannot reproduce the hanging.
I used a 1 MB file to test, it works fine, but when I bump the file size, I got bunch of errors seems to because of the connection been idle too long to send any data. And, it's hanging, I got bunch of failures reported as expected.

What kind of hanging you seen? Is it hanging during waiting for the futures to complete? If so, can you provide any trace level logs from CRT? (Refer to here to enable the log)

@LarryFinn
Copy link
Author

LarryFinn commented Oct 24, 2024

@TingDaoK you didn't use the CRT client in your example.
you have S3AsyncClient client = S3AsyncClient.builder().build(); but it should be
S3AsyncClient client = S3AsyncClient.crtBuilder().build()
i believe your example will fail if you use crtBuilder too
My code freezes, makes no progress. Here are some thread dumps (i made it do 15 concurrent uploads instead of 10):

   java.lang.Thread.State: TIMED_WAITING (on object monitor)
        at java.lang.Object.wait([email protected]/Native Method)
        - waiting on <no object reference available>
        at java.io.PipedInputStream.awaitSpace([email protected]/PipedInputStream.java:273)
        at java.io.PipedInputStream.receive([email protected]/PipedInputStream.java:231)
        - locked <0x0000000782fd1000> (a java.io.PipedInputStream)
        at java.io.PipedOutputStream.write([email protected]/PipedOutputStream.java:150)
        at java.io.OutputStream.write([email protected]/OutputStream.java:127)
......

"upload-0" #73 prio=5 os_prio=31 cpu=2.75ms elapsed=28.52s tid=0x000000010f01ba00 nid=0x14403 in Object.wait()  [0x000000029b93a000]
   java.lang.Thread.State: TIMED_WAITING (on object monitor)
	at java.lang.Object.wait([email protected]/Native Method)
	- waiting on <no object reference available>
	at java.io.PipedInputStream.read([email protected]/PipedInputStream.java:326)
	- locked <0x0000000782dacb20> (a java.io.PipedInputStream)
	at java.io.PipedInputStream.read([email protected]/PipedInputStream.java:377)
	- locked <0x0000000782dacb20> (a java.io.PipedInputStream)
	at java.io.FilterInputStream.read([email protected]/FilterInputStream.java:132)
	at software.amazon.awssdk.core.internal.io.SdkLengthAwareInputStream.read(SdkLengthAwareInputStream.java:65)
	at java.io.FilterInputStream.read([email protected]/FilterInputStream.java:106)
	at software.amazon.awssdk.utils.async.InputStreamConsumingPublisher.doBlockingWrite(InputStreamConsumingPublisher.java:55)
	at software.amazon.awssdk.core.async.BlockingInputStreamAsyncRequestBody.writeInputStream(BlockingInputStreamAsyncRequestBody.java:92)
	at software.amazon.awssdk.core.internal.async.InputStreamWithExecutorAsyncRequestBody.doBlockingWrite(InputStreamWithExecutorAsyncRequestBody.java:107)
	at software.amazon.awssdk.core.internal.async.InputStreamWithExecutorAsyncRequestBody.lambda$subscribe$0(InputStreamWithExecutorAsyncRequestBody.java:80)
	at software.amazon.awssdk.core.internal.async.InputStreamWithExecutorAsyncRequestBody$$Lambda$1781/0x00000090017c7bb0.run(Unknown Source)
	at java.util.concurrent.Executors$RunnableAdapter.call([email protected]/Executors.java:539)
	at java.util.concurrent.FutureTask.run([email protected]/FutureTask.java:264)
	at java.util.concurrent.ThreadPoolExecutor.runWorker([email protected]/ThreadPoolExecutor.java:1136)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run([email protected]/ThreadPoolExecutor.java:635)
	at java.lang.Thread.run([email protected]/Thread.java:840)

   Locked ownable synchronizers:
	- <0x0000000782fd4e20> (a java.util.concurrent.ThreadPoolExecutor$Worker)

"upload-1" #74 prio=5 os_prio=31 cpu=0.48ms elapsed=28.49s tid=0x000000010ea61200 nid=0x11303 in Object.wait()  [0x000000029bb46000]
   java.lang.Thread.State: TIMED_WAITING (on object monitor)
	at java.lang.Object.wait([email protected]/Native Method)
	- waiting on <no object reference available>
	at java.io.PipedInputStream.read([email protected]/PipedInputStream.java:326)
	- locked <0x0000000782fdc240> (a java.io.PipedInputStream)
	at java.io.PipedInputStream.read([email protected]/PipedInputStream.java:377)
	- locked <0x0000000782fdc240> (a java.io.PipedInputStream)
	at java.io.FilterInputStream.read([email protected]/FilterInputStream.java:132)
	at software.amazon.awssdk.core.internal.io.SdkLengthAwareInputStream.read(SdkLengthAwareInputStream.java:65)
	at java.io.FilterInputStream.read([email protected]/FilterInputStream.java:106)
	at software.amazon.awssdk.utils.async.InputStreamConsumingPublisher.doBlockingWrite(InputStreamConsumingPublisher.java:55)
	at software.amazon.awssdk.core.async.BlockingInputStreamAsyncRequestBody.writeInputStream(BlockingInputStreamAsyncRequestBody.java:92)
	at software.amazon.awssdk.core.internal.async.InputStreamWithExecutorAsyncRequestBody.doBlockingWrite(InputStreamWithExecutorAsyncRequestBody.java:107)
	at software.amazon.awssdk.core.internal.async.InputStreamWithExecutorAsyncRequestBody.lambda$subscribe$0(InputStreamWithExecutorAsyncRequestBody.java:80)
	at software.amazon.awssdk.core.internal.async.InputStreamWithExecutorAsyncRequestBody$$Lambda$1781/0x00000090017c7bb0.run(Unknown Source)
	at java.util.concurrent.Executors$RunnableAdapter.call([email protected]/Executors.java:539)
	at java.util.concurrent.FutureTask.run([email protected]/FutureTask.java:264)
	at java.util.concurrent.ThreadPoolExecutor.runWorker([email protected]/ThreadPoolExecutor.java:1136)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run([email protected]/ThreadPoolExecutor.java:635)
	at java.lang.Thread.run([email protected]/Thread.java:840)

   Locked ownable synchronizers:
	- <0x0000000782fdc330> (a java.util.concurrent.ThreadPoolExecutor$Worker)

"upload-2" #75 prio=5 os_prio=31 cpu=0.51ms elapsed=28.49s tid=0x0000000140160e00 nid=0x11503 in Object.wait()  [0x000000029bd52000]
   java.lang.Thread.State: TIMED_WAITING (on object monitor)
	at java.lang.Object.wait([email protected]/Native Method)
	- waiting on <no object reference available>
	at java.io.PipedInputStream.read([email protected]/PipedInputStream.java:326)
	- locked <0x0000000782fd8750> (a java.io.PipedInputStream)
	at java.io.PipedInputStream.read([email protected]/PipedInputStream.java:377)
	- locked <0x0000000782fd8750> (a java.io.PipedInputStream)
	at java.io.FilterInputStream.read([email protected]/FilterInputStream.java:132)
	at software.amazon.awssdk.core.internal.io.SdkLengthAwareInputStream.read(SdkLengthAwareInputStream.java:65)
	at java.io.FilterInputStream.read([email protected]/FilterInputStream.java:106)
	at software.amazon.awssdk.utils.async.InputStreamConsumingPublisher.doBlockingWrite(InputStreamConsumingPublisher.java:55)
	at software.amazon.awssdk.core.async.BlockingInputStreamAsyncRequestBody.writeInputStream(BlockingInputStreamAsyncRequestBody.java:92)
	at software.amazon.awssdk.core.internal.async.InputStreamWithExecutorAsyncRequestBody.doBlockingWrite(InputStreamWithExecutorAsyncRequestBody.java:107)
	at software.amazon.awssdk.core.internal.async.InputStreamWithExecutorAsyncRequestBody.lambda$subscribe$0(InputStreamWithExecutorAsyncRequestBody.java:80)
	at software.amazon.awssdk.core.internal.async.InputStreamWithExecutorAsyncRequestBody$$Lambda$1781/0x00000090017c7bb0.run(Unknown Source)
	at java.util.concurrent.Executors$RunnableAdapter.call([email protected]/Executors.java:539)
	at java.util.concurrent.FutureTask.run([email protected]/FutureTask.java:264)
	at java.util.concurrent.ThreadPoolExecutor.runWorker([email protected]/ThreadPoolExecutor.java:1136)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run([email protected]/ThreadPoolExecutor.java:635)
	at java.lang.Thread.run([email protected]/Thread.java:840)

   Locked ownable synchronizers:
	- <0x0000000782fd8840> (a java.util.concurrent.ThreadPoolExecutor$Worker)

"upload-3" #76 prio=5 os_prio=31 cpu=0.46ms elapsed=28.48s tid=0x000000012e10d200 nid=0x14203 in Object.wait()  [0x000000029bf5e000]
   java.lang.Thread.State: TIMED_WAITING (on object monitor)
	at java.lang.Object.wait([email protected]/Native Method)
	- waiting on <no object reference available>
	at java.io.PipedInputStream.read([email protected]/PipedInputStream.java:326)
	- locked <0x0000000782ff2900> (a java.io.PipedInputStream)
	at java.io.PipedInputStream.read([email protected]/PipedInputStream.java:377)
	- locked <0x0000000782ff2900> (a java.io.PipedInputStream)
	at java.io.FilterInputStream.read([email protected]/FilterInputStream.java:132)
	at software.amazon.awssdk.core.internal.io.SdkLengthAwareInputStream.read(SdkLengthAwareInputStream.java:65)
	at java.io.FilterInputStream.read([email protected]/FilterInputStream.java:106)
	at software.amazon.awssdk.utils.async.InputStreamConsumingPublisher.doBlockingWrite(InputStreamConsumingPublisher.java:55)
	at software.amazon.awssdk.core.async.BlockingInputStreamAsyncRequestBody.writeInputStream(BlockingInputStreamAsyncRequestBody.java:92)
	at software.amazon.awssdk.core.internal.async.InputStreamWithExecutorAsyncRequestBody.doBlockingWrite(InputStreamWithExecutorAsyncRequestBody.java:107)
	at software.amazon.awssdk.core.internal.async.InputStreamWithExecutorAsyncRequestBody.lambda$subscribe$0(InputStreamWithExecutorAsyncRequestBody.java:80)
	at software.amazon.awssdk.core.internal.async.InputStreamWithExecutorAsyncRequestBody$$Lambda$1781/0x00000090017c7bb0.run(Unknown Source)
	at java.util.concurrent.Executors$RunnableAdapter.call([email protected]/Executors.java:539)
	at java.util.concurrent.FutureTask.run([email protected]/FutureTask.java:264)
	at java.util.concurrent.ThreadPoolExecutor.runWorker([email protected]/ThreadPoolExecutor.java:1136)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run([email protected]/ThreadPoolExecutor.java:635)
	at java.lang.Thread.run([email protected]/Thread.java:840)

   Locked ownable synchronizers:
	- <0x0000000782ff29f0> (a java.util.concurrent.ThreadPoolExecutor$Worker)

"upload-4" #77 prio=5 os_prio=31 cpu=0.50ms elapsed=28.48s tid=0x000000012ba57a00 nid=0x11603 in Object.wait()  [0x000000029c16a000]
   java.lang.Thread.State: TIMED_WAITING (on object monitor)
	at java.lang.Object.wait([email protected]/Native Method)
	- waiting on <no object reference available>
	at java.io.PipedInputStream.read([email protected]/PipedInputStream.java:326)
	- locked <0x0000000782fdfd30> (a java.io.PipedInputStream)
	at java.io.PipedInputStream.read([email protected]/PipedInputStream.java:377)
	- locked <0x0000000782fdfd30> (a java.io.PipedInputStream)
	at java.io.FilterInputStream.read([email protected]/FilterInputStream.java:132)
	at software.amazon.awssdk.core.internal.io.SdkLengthAwareInputStream.read(SdkLengthAwareInputStream.java:65)
	at java.io.FilterInputStream.read([email protected]/FilterInputStream.java:106)
	at software.amazon.awssdk.utils.async.InputStreamConsumingPublisher.doBlockingWrite(InputStreamConsumingPublisher.java:55)
	at software.amazon.awssdk.core.async.BlockingInputStreamAsyncRequestBody.writeInputStream(BlockingInputStreamAsyncRequestBody.java:92)
	at software.amazon.awssdk.core.internal.async.InputStreamWithExecutorAsyncRequestBody.doBlockingWrite(InputStreamWithExecutorAsyncRequestBody.java:107)
	at software.amazon.awssdk.core.internal.async.InputStreamWithExecutorAsyncRequestBody.lambda$subscribe$0(InputStreamWithExecutorAsyncRequestBody.java:80)
	at software.amazon.awssdk.core.internal.async.InputStreamWithExecutorAsyncRequestBody$$Lambda$1781/0x00000090017c7bb0.run(Unknown Source)
	at java.util.concurrent.Executors$RunnableAdapter.call([email protected]/Executors.java:539)
	at java.util.concurrent.FutureTask.run([email protected]/FutureTask.java:264)
	at java.util.concurrent.ThreadPoolExecutor.runWorker([email protected]/ThreadPoolExecutor.java:1136)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run([email protected]/ThreadPoolExecutor.java:635)
	at java.lang.Thread.run([email protected]/Thread.java:840)

   Locked ownable synchronizers:
	- <0x0000000782fdfe20> (a java.util.concurrent.ThreadPoolExecutor$Worker)

"upload-5" #78 prio=5 os_prio=31 cpu=0.43ms elapsed=28.47s tid=0x000000014015f800 nid=0x11703 in Object.wait()  [0x000000029c376000]
   java.lang.Thread.State: TIMED_WAITING (on object monitor)
	at java.lang.Object.wait([email protected]/Native Method)
	- waiting on <no object reference available>
	at java.io.PipedInputStream.read([email protected]/PipedInputStream.java:326)
	- locked <0x0000000783004010> (a java.io.PipedInputStream)
	at java.io.PipedInputStream.read([email protected]/PipedInputStream.java:377)
	- locked <0x0000000783004010> (a java.io.PipedInputStream)
	at java.io.FilterInputStream.read([email protected]/FilterInputStream.java:132)
	at software.amazon.awssdk.core.internal.io.SdkLengthAwareInputStream.read(SdkLengthAwareInputStream.java:65)
	at java.io.FilterInputStream.read([email protected]/FilterInputStream.java:106)
	at software.amazon.awssdk.utils.async.InputStreamConsumingPublisher.doBlockingWrite(InputStreamConsumingPublisher.java:55)
	at software.amazon.awssdk.core.async.BlockingInputStreamAsyncRequestBody.writeInputStream(BlockingInputStreamAsyncRequestBody.java:92)
	at software.amazon.awssdk.core.internal.async.InputStreamWithExecutorAsyncRequestBody.doBlockingWrite(InputStreamWithExecutorAsyncRequestBody.java:107)
	at software.amazon.awssdk.core.internal.async.InputStreamWithExecutorAsyncRequestBody.lambda$subscribe$0(InputStreamWithExecutorAsyncRequestBody.java:80)
	at software.amazon.awssdk.core.internal.async.InputStreamWithExecutorAsyncRequestBody$$Lambda$1781/0x00000090017c7bb0.run(Unknown Source)
	at java.util.concurrent.Executors$RunnableAdapter.call([email protected]/Executors.java:539)
	at java.util.concurrent.FutureTask.run([email protected]/FutureTask.java:264)
	at java.util.concurrent.ThreadPoolExecutor.runWorker([email protected]/ThreadPoolExecutor.java:1136)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run([email protected]/ThreadPoolExecutor.java:635)
	at java.lang.Thread.run([email protected]/Thread.java:840)

   Locked ownable synchronizers:
	- <0x00000007830040e0> (a java.util.concurrent.ThreadPoolExecutor$Worker)

"upload-6" #79 prio=5 os_prio=31 cpu=0.46ms elapsed=28.46s tid=0x0000000129c3da00 nid=0x13e03 in Object.wait()  [0x000000029c582000]
   java.lang.Thread.State: TIMED_WAITING (on object monitor)
	at java.lang.Object.wait([email protected]/Native Method)
	- waiting on <no object reference available>
	at java.io.PipedInputStream.read([email protected]/PipedInputStream.java:326)
	- locked <0x0000000782fe36b0> (a java.io.PipedInputStream)
	at java.io.PipedInputStream.read([email protected]/PipedInputStream.java:377)
	- locked <0x0000000782fe36b0> (a java.io.PipedInputStream)
	at java.io.FilterInputStream.read([email protected]/FilterInputStream.java:132)
	at software.amazon.awssdk.core.internal.io.SdkLengthAwareInputStream.read(SdkLengthAwareInputStream.java:65)
	at java.io.FilterInputStream.read([email protected]/FilterInputStream.java:106)
	at software.amazon.awssdk.utils.async.InputStreamConsumingPublisher.doBlockingWrite(InputStreamConsumingPublisher.java:55)
	at software.amazon.awssdk.core.async.BlockingInputStreamAsyncRequestBody.writeInputStream(BlockingInputStreamAsyncRequestBody.java:92)
	at software.amazon.awssdk.core.internal.async.InputStreamWithExecutorAsyncRequestBody.doBlockingWrite(InputStreamWithExecutorAsyncRequestBody.java:107)
	at software.amazon.awssdk.core.internal.async.InputStreamWithExecutorAsyncRequestBody.lambda$subscribe$0(InputStreamWithExecutorAsyncRequestBody.java:80)
	at software.amazon.awssdk.core.internal.async.InputStreamWithExecutorAsyncRequestBody$$Lambda$1781/0x00000090017c7bb0.run(Unknown Source)
	at java.util.concurrent.Executors$RunnableAdapter.call([email protected]/Executors.java:539)
	at java.util.concurrent.FutureTask.run([email protected]/FutureTask.java:264)
	at java.util.concurrent.ThreadPoolExecutor.runWorker([email protected]/ThreadPoolExecutor.java:1136)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run([email protected]/ThreadPoolExecutor.java:635)
	at java.lang.Thread.run([email protected]/Thread.java:840)

   Locked ownable synchronizers:
	- <0x00000007831ff398> (a java.util.concurrent.ThreadPoolExecutor$Worker)

"upload-7" #80 prio=5 os_prio=31 cpu=0.40ms elapsed=28.46s tid=0x000000012c809c00 nid=0x11903 in Object.wait()  [0x000000029c78e000]
   java.lang.Thread.State: TIMED_WAITING (on object monitor)
	at java.lang.Object.wait([email protected]/Native Method)
	- waiting on <no object reference available>
	at java.io.PipedInputStream.read([email protected]/PipedInputStream.java:326)
	- locked <0x0000000782feac90> (a java.io.PipedInputStream)
	at java.io.PipedInputStream.read([email protected]/PipedInputStream.java:377)
	- locked <0x0000000782feac90> (a java.io.PipedInputStream)
	at java.io.FilterInputStream.read([email protected]/FilterInputStream.java:132)
	at software.amazon.awssdk.core.internal.io.SdkLengthAwareInputStream.read(SdkLengthAwareInputStream.java:65)
	at java.io.FilterInputStream.read([email protected]/FilterInputStream.java:106)
	at software.amazon.awssdk.utils.async.InputStreamConsumingPublisher.doBlockingWrite(InputStreamConsumingPublisher.java:55)
	at software.amazon.awssdk.core.async.BlockingInputStreamAsyncRequestBody.writeInputStream(BlockingInputStreamAsyncRequestBody.java:92)
	at software.amazon.awssdk.core.internal.async.InputStreamWithExecutorAsyncRequestBody.doBlockingWrite(InputStreamWithExecutorAsyncRequestBody.java:107)
	at software.amazon.awssdk.core.internal.async.InputStreamWithExecutorAsyncRequestBody.lambda$subscribe$0(InputStreamWithExecutorAsyncRequestBody.java:80)
	at software.amazon.awssdk.core.internal.async.InputStreamWithExecutorAsyncRequestBody$$Lambda$1781/0x00000090017c7bb0.run(Unknown Source)
	at java.util.concurrent.Executors$RunnableAdapter.call([email protected]/Executors.java:539)
	at java.util.concurrent.FutureTask.run([email protected]/FutureTask.java:264)
	at java.util.concurrent.ThreadPoolExecutor.runWorker([email protected]/ThreadPoolExecutor.java:1136)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run([email protected]/ThreadPoolExecutor.java:635)
	at java.lang.Thread.run([email protected]/Thread.java:840)

   Locked ownable synchronizers:
	- <0x000000078320c248> (a java.util.concurrent.ThreadPoolExecutor$Worker)

"upload-8" #81 prio=5 os_prio=31 cpu=0.66ms elapsed=28.46s tid=0x000000012ba40c00 nid=0x11a03 in Object.wait()  [0x000000029c99a000]
   java.lang.Thread.State: TIMED_WAITING (on object monitor)
	at java.lang.Object.wait([email protected]/Native Method)
	- waiting on <no object reference available>
	at java.io.PipedInputStream.read([email protected]/PipedInputStream.java:326)
	- locked <0x0000000782fe71a0> (a java.io.PipedInputStream)
	at java.io.PipedInputStream.read([email protected]/PipedInputStream.java:377)
	- locked <0x0000000782fe71a0> (a java.io.PipedInputStream)
	at java.io.FilterInputStream.read([email protected]/FilterInputStream.java:132)
	at software.amazon.awssdk.core.internal.io.SdkLengthAwareInputStream.read(SdkLengthAwareInputStream.java:65)
	at java.io.FilterInputStream.read([email protected]/FilterInputStream.java:106)
	at software.amazon.awssdk.utils.async.InputStreamConsumingPublisher.doBlockingWrite(InputStreamConsumingPublisher.java:55)
	at software.amazon.awssdk.core.async.BlockingInputStreamAsyncRequestBody.writeInputStream(BlockingInputStreamAsyncRequestBody.java:92)
	at software.amazon.awssdk.core.internal.async.InputStreamWithExecutorAsyncRequestBody.doBlockingWrite(InputStreamWithExecutorAsyncRequestBody.java:107)
	at software.amazon.awssdk.core.internal.async.InputStreamWithExecutorAsyncRequestBody.lambda$subscribe$0(InputStreamWithExecutorAsyncRequestBody.java:80)
	at software.amazon.awssdk.core.internal.async.InputStreamWithExecutorAsyncRequestBody$$Lambda$1781/0x00000090017c7bb0.run(Unknown Source)
	at java.util.concurrent.Executors$RunnableAdapter.call([email protected]/Executors.java:539)
	at java.util.concurrent.FutureTask.run([email protected]/FutureTask.java:264)
	at java.util.concurrent.ThreadPoolExecutor.runWorker([email protected]/ThreadPoolExecutor.java:1136)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run([email protected]/ThreadPoolExecutor.java:635)
	at java.lang.Thread.run([email protected]/Thread.java:840)

   Locked ownable synchronizers:
	- <0x0000000783229048> (a java.util.concurrent.ThreadPoolExecutor$Worker)

"upload-9" #82 prio=5 os_prio=31 cpu=0.42ms elapsed=28.46s tid=0x000000012ba4a600 nid=0x13b03 in Object.wait()  [0x000000029cba6000]
   java.lang.Thread.State: TIMED_WAITING (on object monitor)
	at java.lang.Object.wait([email protected]/Native Method)
	- waiting on <no object reference available>
	at java.io.PipedInputStream.read([email protected]/PipedInputStream.java:326)
	- locked <0x0000000783007b00> (a java.io.PipedInputStream)
	at java.io.PipedInputStream.read([email protected]/PipedInputStream.java:377)
	- locked <0x0000000783007b00> (a java.io.PipedInputStream)
	at java.io.FilterInputStream.read([email protected]/FilterInputStream.java:132)
	at software.amazon.awssdk.core.internal.io.SdkLengthAwareInputStream.read(SdkLengthAwareInputStream.java:65)
	at java.io.FilterInputStream.read([email protected]/FilterInputStream.java:106)
	at software.amazon.awssdk.utils.async.InputStreamConsumingPublisher.doBlockingWrite(InputStreamConsumingPublisher.java:55)
	at software.amazon.awssdk.core.async.BlockingInputStreamAsyncRequestBody.writeInputStream(BlockingInputStreamAsyncRequestBody.java:92)
	at software.amazon.awssdk.core.internal.async.InputStreamWithExecutorAsyncRequestBody.doBlockingWrite(InputStreamWithExecutorAsyncRequestBody.java:107)
	at software.amazon.awssdk.core.internal.async.InputStreamWithExecutorAsyncRequestBody.lambda$subscribe$0(InputStreamWithExecutorAsyncRequestBody.java:80)
	at software.amazon.awssdk.core.internal.async.InputStreamWithExecutorAsyncRequestBody$$Lambda$1781/0x00000090017c7bb0.run(Unknown Source)
	at java.util.concurrent.Executors$RunnableAdapter.call([email protected]/Executors.java:539)
	at java.util.concurrent.FutureTask.run([email protected]/FutureTask.java:264)
	at java.util.concurrent.ThreadPoolExecutor.runWorker([email protected]/ThreadPoolExecutor.java:1136)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run([email protected]/ThreadPoolExecutor.java:635)
	at java.lang.Thread.run([email protected]/Thread.java:840)

   Locked ownable synchronizers:
	- <0x0000000783212a58> (a java.util.concurrent.ThreadPoolExecutor$Worker)

"upload-10" #83 prio=5 os_prio=31 cpu=0.45ms elapsed=28.45s tid=0x000000012c80a600 nid=0x11c03 in Object.wait()  [0x000000029cdb2000]
   java.lang.Thread.State: TIMED_WAITING (on object monitor)
	at java.lang.Object.wait([email protected]/Native Method)
	- waiting on <no object reference available>
	at java.io.PipedInputStream.read([email protected]/PipedInputStream.java:326)
	- locked <0x0000000782fe71e8> (a java.io.PipedInputStream)
	at java.io.PipedInputStream.read([email protected]/PipedInputStream.java:377)
	- locked <0x0000000782fe71e8> (a java.io.PipedInputStream)
	at java.io.FilterInputStream.read([email protected]/FilterInputStream.java:132)
	at software.amazon.awssdk.core.internal.io.SdkLengthAwareInputStream.read(SdkLengthAwareInputStream.java:65)
	at java.io.FilterInputStream.read([email protected]/FilterInputStream.java:106)
	at software.amazon.awssdk.utils.async.InputStreamConsumingPublisher.doBlockingWrite(InputStreamConsumingPublisher.java:55)
	at software.amazon.awssdk.core.async.BlockingInputStreamAsyncRequestBody.writeInputStream(BlockingInputStreamAsyncRequestBody.java:92)
	at software.amazon.awssdk.core.internal.async.InputStreamWithExecutorAsyncRequestBody.doBlockingWrite(InputStreamWithExecutorAsyncRequestBody.java:107)
	at software.amazon.awssdk.core.internal.async.InputStreamWithExecutorAsyncRequestBody.lambda$subscribe$0(InputStreamWithExecutorAsyncRequestBody.java:80)
	at software.amazon.awssdk.core.internal.async.InputStreamWithExecutorAsyncRequestBody$$Lambda$1781/0x00000090017c7bb0.run(Unknown Source)
	at java.util.concurrent.Executors$RunnableAdapter.call([email protected]/Executors.java:539)
	at java.util.concurrent.FutureTask.run([email protected]/FutureTask.java:264)
	at java.util.concurrent.ThreadPoolExecutor.runWorker([email protected]/ThreadPoolExecutor.java:1136)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run([email protected]/ThreadPoolExecutor.java:635)
	at java.lang.Thread.run([email protected]/Thread.java:840)

   Locked ownable synchronizers:
	- <0x0000000783219548> (a java.util.concurrent.ThreadPoolExecutor$Worker)

Gist of crt log is here https://gist.github.com/LarryFinn/833db0013c982e2392279d6e2b15dc54

@LarryFinn
Copy link
Author

My gut says this has something to do with how uploads work and the event pool in crt client. if you look at the gist 14 start up (which is the number of cores on my laptop) and my gut says bc im doing 15 uploads that block eachother, the event loop pool cannot handle that

@TingDaoK
Copy link
Contributor

TingDaoK commented Nov 4, 2024

yeah, sorry that I made the mistake to use the default java client instead of CRT client.

From the example I have. I made couple modifications and I made sure I am using the CRT client using the following code:

        S3AsyncClient client = S3AsyncClient.crtBuilder().build();
        S3TransferManager mgr = S3TransferManager.builder().s3Client(client).build();
        Path file = Path.of("/Users/dengket/project/graalvm/test-crt/sample-project-crt/file.txt");
        long length = file.toFile().length();
        ArrayList<PipedOutputStream> streams = new ArrayList<PipedOutputStream>();
        ArrayList<InputStream> input_streams = new ArrayList<InputStream>();
        var futures = new ArrayList<CompletableFuture<CompletedUpload>>();

        // Executor service for asynchronous operations
        ExecutorService uploadExecutor = Executors.newFixedThreadPool(22);  // Define thread pool for upload tasks
        ThreadPoolExecutor threadPool = (ThreadPoolExecutor) uploadExecutor;

        for (int i =0; i < 20; i++) {
            var putRequest = PutObjectRequest.builder().bucket("bucket").key("prefix/" + "1").build();
            PipedOutputStream outputStream = new PipedOutputStream();
            streams.add(outputStream);
            InputStream inputStream = new PipedInputStream(outputStream);
            input_streams.add(inputStream);

            var uploadRequest = UploadRequest.builder()
                    .putObjectRequest(putRequest)
                    .requestBody(AsyncRequestBody.fromInputStream(inputStream, length, uploadExecutor))
                    .build();

            futures.add(mgr.upload(uploadRequest).completionFuture());
        }
        int activeCount = threadPool.getActiveCount();
        System.out.println(activeCount);
        try (var fis = new BufferedReader(new FileReader(file.toFile()))) {
            String line;
            while ((line = fis.readLine()) != null) {
                for (var stream : streams) {
                    stream.write((line + "\n").getBytes());
                }
            }

            for (var stream : streams) {
                stream.close();
            }
        }
        activeCount = threadPool.getActiveCount();
        System.out.println(activeCount);

        CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)).join();
        mgr.close();
        client.close();
        uploadExecutor.shutdown();

I did reproduce the hanging, when the number of the threads in the Executors.newFixedThreadPool() is too small (in my test above, when I have only 2 threads for the executor, it hangs).

but, if I get 20 threads, which equals to the number of the requests in concurrent, it didn't hang.

I believe if you have too less threads for the pool, each request will submit a task to the executor. https://github.com/aws/aws-sdk-java-v2/blob/master/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/InputStreamWithExecutorAsyncRequestBody.java#L80

and if you have much less threads comparing to the number of requests, it will be blocking each other.

@LarryFinn
Copy link
Author

@TingDaoK which version of the libraries are you using? im still seeing it get stuck for some number of concurrent sends regardless of threadpool size (i was originally using a cached threadpool anyway, so that size was moot)

@LarryFinn
Copy link
Author

I modified the loop to have a little more output (im doing 8 concurrent writes)

            var lineNumber = 0;
            String line;
            while ((line = fis.readLine()) != null) {
                lineNumber++;
                for (int j = 0; j < streams.size(); j++) {
                    LOG.infof("Writing to stream=%d line=%d", j, lineNumber);
                    streams.get(j).write((line + "\n").getBytes());
                }
            }

and the output looks like


2024-11-04 11:17:24.285  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=0 line=1
2024-11-04 11:17:24.288  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=1 line=1
2024-11-04 11:17:24.289  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=2 line=1
2024-11-04 11:17:24.289  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=3 line=1
2024-11-04 11:17:24.289  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=4 line=1
2024-11-04 11:17:24.289  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=5 line=1
2024-11-04 11:17:24.289  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=6 line=1
2024-11-04 11:17:24.289  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=7 line=1
2024-11-04 11:17:24.290  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=0 line=2
2024-11-04 11:17:24.290  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=1 line=2
2024-11-04 11:17:24.290  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=2 line=2
2024-11-04 11:17:24.290  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=3 line=2
2024-11-04 11:17:24.290  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=4 line=2
2024-11-04 11:17:24.290  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=5 line=2
2024-11-04 11:17:24.290  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=6 line=2
2024-11-04 11:17:24.291  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=7 line=2
2024-11-04 11:17:24.291  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=0 line=3
2024-11-04 11:17:24.291  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=1 line=3
2024-11-04 11:17:24.291  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=2 line=3
2024-11-04 11:17:24.291  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=3 line=3
2024-11-04 11:17:24.291  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=4 line=3
2024-11-04 11:17:24.291  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=5 line=3
2024-11-04 11:17:24.292  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=6 line=3
2024-11-04 11:17:24.292  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=7 line=3
2024-11-04 11:17:24.292  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=0 line=4
2024-11-04 11:17:24.292  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=1 line=4
2024-11-04 11:17:24.292  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=2 line=4
2024-11-04 11:17:24.292  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=3 line=4
2024-11-04 11:17:24.292  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=4 line=4
2024-11-04 11:17:24.293  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=5 line=4
2024-11-04 11:17:24.293  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=6 line=4
2024-11-04 11:17:24.293  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=7 line=4
2024-11-04 11:17:24.293  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=0 line=5
2024-11-04 11:17:24.293  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=1 line=5
2024-11-04 11:17:24.293  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=2 line=5
2024-11-04 11:17:24.293  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=3 line=5
2024-11-04 11:17:24.293  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=4 line=5
2024-11-04 11:17:24.294  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=5 line=5
2024-11-04 11:17:24.294  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=6 line=5
2024-11-04 11:17:24.294  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=7 line=5
2024-11-04 11:17:24.294  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=0 line=6
2024-11-04 11:17:24.294  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=1 line=6
2024-11-04 11:17:24.294  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=2 line=6
2024-11-04 11:17:24.294  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=3 line=6
2024-11-04 11:17:24.295  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=4 line=6
2024-11-04 11:17:24.295  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=5 line=6
2024-11-04 11:17:24.295  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=6 line=6
2024-11-04 11:17:24.295  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=7 line=6
2024-11-04 11:17:24.295  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=0 line=7
2024-11-04 11:17:24.295  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=1 line=7
2024-11-04 11:17:24.295  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=2 line=7
2024-11-04 11:17:24.296  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=3 line=7
2024-11-04 11:17:24.296  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=4 line=7
2024-11-04 11:17:24.296  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=5 line=7
2024-11-04 11:17:24.296  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=6 line=7
2024-11-04 11:17:24.296  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=7 line=7
2024-11-04 11:17:24.296  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=0 line=8
2024-11-04 11:17:24.296  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=1 line=8
2024-11-04 11:17:24.296  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=2 line=8
2024-11-04 11:17:24.297  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=3 line=8
2024-11-04 11:17:24.297  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=4 line=8
2024-11-04 11:17:24.297  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=5 line=8
2024-11-04 11:17:24.297  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=6 line=8
2024-11-04 11:17:24.297  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=7 line=8
2024-11-04 11:17:24.297  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=0 line=9
2024-11-04 11:17:24.297  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=1 line=9
2024-11-04 11:17:24.297  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=2 line=9
2024-11-04 11:17:24.298  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=3 line=9
2024-11-04 11:17:24.298  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=4 line=9
2024-11-04 11:17:24.298  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=5 line=9
2024-11-04 11:17:24.298  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=6 line=9
2024-11-04 11:17:24.298  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=7 line=9
2024-11-04 11:17:24.298  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=0 line=10
2024-11-04 11:17:24.298  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=1 line=10
2024-11-04 11:17:24.298  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=2 line=10
2024-11-04 11:17:24.299  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=3 line=10
2024-11-04 11:17:24.299  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=4 line=10
2024-11-04 11:17:24.299  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=5 line=10
2024-11-04 11:17:24.299  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=6 line=10
2024-11-04 11:17:24.299  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=7 line=10
2024-11-04 11:17:24.299  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=0 line=11
2024-11-04 11:17:24.299  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=1 line=11
2024-11-04 11:17:24.300  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=2 line=11
2024-11-04 11:17:24.300  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=3 line=11
2024-11-04 11:17:24.300  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=4 line=11
2024-11-04 11:17:24.300  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=5 line=11
2024-11-04 11:17:24.300  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=6 line=11
2024-11-04 11:17:24.300  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=7 line=11
2024-11-04 11:17:24.300  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=0 line=12
2024-11-04 11:17:25.464  INFO ${sys:PID} [    Test worker] c.a.s.s.u.f.p.LarryTest                  : Writing to stream=1 line=12

@TingDaoK
Copy link
Contributor

TingDaoK commented Nov 4, 2024

I am using the latest version of crt java, but I also tried to use the one you mentioned.

It could related to your file size, how large is the data you try to upload?

@LarryFinn
Copy link
Author

code du -h /tmp/TEXT-subscribers-100.csv
87M /tmp/TEXT-subscribers-100.csv
code wc -l /tmp/TEXT-subscribers-100.csv
1000000 /tmp/TEXT-subscribers-100.csv

@TingDaoK
Copy link
Contributor

TingDaoK commented Nov 4, 2024

I did reproduce the issue when I bump up the file size. I will reach out to Java SDK team for their support on this.

Quick question, is your use case requiring to use the PipedInputStream and PipedOutputStream to provide data async?

@LarryFinn
Copy link
Author

One of our big use-cases needs this, or something like this, yes

@TingDaoK
Copy link
Contributor

TingDaoK commented Nov 22, 2024

Okay, big thanks for @zoewangg to help out debug this.

Firstly, in your code sample:

        try (var fis = new BufferedReader(new FileReader(file.toFile()))) {
            String line;
            while ((line = fis.readLine()) != null) {
                for (var stream : streams) {
                    stream.write((line + "\n").getBytes());
                }
            

You are writing into the streams all from the main thread. But, in CRT, we have a scheduling logic that will try to get one part for each request and then focus on the first request before working on the others. And CRT will keep reading until the data for the first request is enough to be sent as a part.

So, in the main thread, you are writing the same amount of data into all the streams. In the end, CRT client has its own order to work on the requests, and (I believe) Java side blocks on the threads/resource usage as CRT requests the data that was not provided yet, while the main thread is trying to provide the data that CRT is not consuming and leads to the hang.

Apparently my reproduce code that tries to send the request from another thread pool has a bug, which will lead to multiple threads write into the same stream. Sorry for the confusion.

The proper way to provide data for CRT client is something like:

        ExecutorService writeExecutor = Executors.newFixedThreadPool(50);  // Define thread pool for upload tasks

        for (int j = 0; j < streams.size(); j++) {
            final int finalJ = j;
            writeExecutor.submit(() -> {
                    try (BufferedReader fis = new BufferedReader(new FileReader(file.toFile()))) {
                        char[] buffer = new char[chunkSize];
                        int bytesRead;
                        while ((bytesRead = fis.read(buffer)) != -1) {
                            String dataChunk = new String(buffer, 0, bytesRead);
                            logger.info("before writing to stream=" + finalJ + "  bytes=" + dataChunk.length());
                            streams.get(finalJ).write((dataChunk + "\n").getBytes());
                            logger.info("after writing to stream=" + finalJ + "  bytes=" + dataChunk.length());
                        }
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                }
            );

        }

So, for each stream, submit a task to read from the file and write to the stream. So that when CRT asks for data, there will be a thread to provide the data and not blocking on others. You can check out some sample code here

@LarryFinn
Copy link
Author

hi @TingDaoK thanks for getting back to me!
The problem with the suggested solution is it is functionally different. in my example im writing different lines of a source file to different s3 files. so imagine if you have one source file you want to split across 4 destination s3 files, . in the example you posted it is writing the same file to all different destination s3 files. is CRT just not capable of doing this?

@TingDaoK
Copy link
Contributor

I believe your example here:

        try (var fis = new BufferedReader(new FileReader(file.toFile()))) {
            String line;
            while ((line = fis.readLine()) != null) {
                for (var stream : streams) {
                    stream.write((line + "\n").getBytes());
                }
            }
        }

is writing the same line to all the streams.

But, to write different lines to different streams, you can still do that by editing how the write works.
eg: if you want to split the file evenly to multiple s3 objects, you can modify the code as following:

uploadExecutor.submit(() -> {
                    try (BufferedReader fis = new BufferedReader(new FileReader(file.toFile()))) {
                        char[] buffer = new char[chunkSize];
                        int bytesRead;
                        int lines = 0;
                        while ((bytesRead = fis.read(buffer)) != -1) {
                            lines++;
                            if(lines%streams.size() == finalJ) {
                                String dataChunk = new String(buffer, 0, bytesRead);
                                logger.info("before writing to stream=" + finalJ + "  bytes=" + dataChunk.length());
                                streams.get(finalJ).write((dataChunk + "\n").getBytes());
                                logger.info("after writing to stream=" + finalJ + "  bytes=" + dataChunk.length());
                            }
                        }
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                }
            );

Note: you will also need to update your content length carefully.

But anyhow, the idea is the streams should NOT block each other.

@LarryFinn
Copy link
Author

@TingDaoK oh sorry you're absolutely right about what the example code is doing, this example i gave is a bit different than what we actually do so i got confused. in your prior comment you wrote
"we have a scheduling logic that will try to get one part for each request and then focus on the first request before working on the others. And CRT will keep reading until the data for the first request is enough to be sent as a part." does this have anything to do with the aws crt event loop threadpool size or something? because it seems to be fine for smaller number of threads? also, how big is the part size typically, or can we configure that?

The reason i am kinda digging into this is because our actual use case is a bit more complicated. we could buffer some amount of the data in memory to make sure the first concurrent send/request completes its part. im just a little hesitant about buffering a lot of stuff into memory since we deal with very big datasets.

@TingDaoK
Copy link
Contributor

I still don't full understand what resource blocked the program that leads to the hang.

the CRT eventloop pool size is half the CPU numbers. The part size is by default 8 MB, and it's configurable.

I think in the case of the order when you provide the data is different then the order client asks for data, you cannot avoid buffering the data in memory. Either it is buffered before you submit it to the client, or client has to buffer the data somewhere.

In C, we do have an alternative API to provide data asynchronous, which can put some request on hold and work on other requests. It have not integrated with Java yet, but it has integrated with mountpoint, https://github.com/awslabs/mountpoint-s3, which maybe the tool you will be interested in.

If you only want Java support, maybe create a feature request, we can prioritize it. With current API, we cannot change the schedule logic on the client, so that in your case, you will have to buffer the data.

@LarryFinn
Copy link
Author

Ah cool, thanks so much for the info! is there any way to dig into the original question about pthread lock system calls when using multiple s3crt clients or is the answer just "don't use multiple clients"?

@TingDaoK
Copy link
Contributor

TingDaoK commented Nov 27, 2024

you can create multiple clients, but with multiple client, the network eventloop will be increased as each client will try to take the network bandwidth to meet the target throughput. So that, it's not recommended.

And with multiple client, the schedule logic will be the same, each client will work on the first request assigned to that client first.

I guess you can try to have one client to download, and then multiple client to upload, and only one request for each upload client. Also, need to configure the target throughput for each client to meet the total network bandwidth you have.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug This issue is a bug. p2 This is a standard priority issue
Projects
None yet
Development

No branches or pull requests

4 participants