Skip to content

Commit

Permalink
Fix test
Browse files Browse the repository at this point in the history
  • Loading branch information
acharneski committed Nov 12, 2023
1 parent acc2e78 commit f64d144
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 54 deletions.
115 changes: 64 additions & 51 deletions core/src/main/java/com/simiacryptus/skyenet/OutputInterceptor.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package com.simiacryptus.skyenet;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.util.HashMap;
import java.util.Map;
import java.util.WeakHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -16,83 +16,96 @@ private OutputInterceptor() {
private static final PrintStream originalOut = System.out;
private static final PrintStream originalErr = System.err;
private static final AtomicBoolean isSetup = new AtomicBoolean(false);
private static final Object globalStreamLock = new Object();

public static void setupInterceptor() {
if (isSetup.getAndSet(true)) return;
System.setOut(createInterceptorStream(originalOut));
System.setErr(createInterceptorStream(originalErr));
System.setOut(new PrintStream(new OutputStreamRouter(originalOut)));
System.setErr(new PrintStream(new OutputStreamRouter(originalErr)));
}

private static final ByteArrayOutputStream centralStream = new ByteArrayOutputStream();

public static void initThreadOutputStream() {
setOutputStream(new ByteArrayOutputStream());
}

public static void resetThreadOutputStream() {
setOutputStream(centralStream);
}
private static final ByteArrayOutputStream globalStream = new ByteArrayOutputStream();

private static final Map<Thread, ByteArrayOutputStream> threadLocalBuffer = new WeakHashMap<>();

public static void setOutputStream(ByteArrayOutputStream stream) {
threadLocalBuffer.put(Thread.currentThread(), stream);
}

public static ByteArrayOutputStream getOutputStream() {
return threadLocalBuffer.getOrDefault(Thread.currentThread(), new ByteArrayOutputStream());
private static ByteArrayOutputStream getThreadOutputStream() {
Thread currentThread = Thread.currentThread();
ByteArrayOutputStream outputStream;
synchronized (threadLocalBuffer) {
if ((outputStream = threadLocalBuffer.get(currentThread)) != null) return outputStream;
outputStream = new ByteArrayOutputStream();
threadLocalBuffer.put(currentThread, outputStream);
}
return outputStream;
}

public static String getThreadOutput() {
return getOutputStream().toString();
ByteArrayOutputStream outputStream = getThreadOutputStream();
try {
outputStream.flush();
} catch (IOException e) {
throw new RuntimeException(e);
}
return outputStream.toString();
}

public static void clearThreadOutput() {
getOutputStream().reset();
getThreadOutputStream().reset();
}

public static String getGlobalOutput() {
return centralStream.toString();
synchronized (globalStreamLock) {
return globalStream.toString();
}
}

public static void clearGlobalOutput() {
centralStream.reset();
synchronized (globalStreamLock) {
globalStream.reset();
}
}

public static PrintStream createInterceptorStream(PrintStream originalStream) {
private static class OutputStreamRouter extends ByteArrayOutputStream {
private final PrintStream originalStream;
int maxGlobalBuffer = 8 * 1024 * 1024;
int maxThreadBuffer = 1024 * 1024;
return new PrintStream(new ByteArrayOutputStream() {
@Override
public void write(int b) {
originalStream.write(b);
if(centralStream.size() > maxGlobalBuffer) {
centralStream.reset();
}
centralStream.write(b);
ByteArrayOutputStream stream = getOutputStream();
if(stream.size() > maxThreadBuffer) {
stream.reset();
}
stream.write(b);
}

@Override
public void write(byte[] b, int off, int len) {
originalStream.write(b, off, len);
if(centralStream.size() > 1024 * 1024) {
centralStream.reset();
public OutputStreamRouter(PrintStream originalStream) {
this.originalStream = originalStream;
}

@Override
public void write(int b) {
originalStream.write(b);
synchronized (globalStreamLock) {
if (globalStream.size() > maxGlobalBuffer) {
globalStream.reset();
}
centralStream.write(b, off, len);
ByteArrayOutputStream threadStream = getOutputStream();
if (threadStream != null) {
if (threadStream.size() > 1024 * 1024) {
threadStream.reset();
}
threadStream.write(b, off, len);
globalStream.write(b);
}
ByteArrayOutputStream threadOutputStream = getThreadOutputStream();
if (threadOutputStream.size() > maxThreadBuffer) {
threadOutputStream.reset();
}
threadOutputStream.write(b);
}

@Override
public void write(byte[] b, int off, int len) {
originalStream.write(b, off, len);
synchronized (globalStreamLock) {
if (globalStream.size() > maxGlobalBuffer) {
globalStream.reset();
}
globalStream.write(b, off, len);
}
});
ByteArrayOutputStream threadOutputStream = getThreadOutputStream();
if (threadOutputStream.size() > maxThreadBuffer) {
threadOutputStream.reset();
}
threadOutputStream.write(b, off, len);
}
}

}


Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,20 @@ public void testThreadedInterceptor() throws InterruptedException {
String threadName = Thread.currentThread().getName();
System.out.println("Thread: " + threadName + " output");
System.err.println("Thread: " + threadName + " error");

String expectedOutput = "Thread: " + threadName + " output\nThread: " + threadName + " error\n";
String threadOutput = OutputInterceptor.getThreadOutput().replace("\r", "");
try {
Thread.sleep(1);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
String expectedOutput = ("Thread: " + threadName + " output\nThread: " + threadName + " error\n").trim();
String threadOutput = OutputInterceptor.getThreadOutput().replace("\r", "").trim();
if (threadOutput.trim().equals(expectedOutput.trim())) {
successCounter.incrementAndGet();
} else {
synchronized (lock) {
System.out.println("Expected:\n " + expectedOutput.replaceAll("\n", "\n "));
System.out.println("Actual:\n " + threadOutput.replaceAll("\n", "\n "));
System.out.flush();
}
}
};
Expand Down

0 comments on commit f64d144

Please sign in to comment.