Skip to content

Commit

Permalink
Universal Profiling integration: open socket for communication (#3602)
Browse files Browse the repository at this point in the history
  • Loading branch information
JonasKunz authored May 2, 2024
1 parent b3297f7 commit c339b00
Show file tree
Hide file tree
Showing 2 changed files with 190 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,46 @@
import co.elastic.apm.agent.impl.transaction.Transaction;
import co.elastic.apm.agent.sdk.logging.Logger;
import co.elastic.apm.agent.sdk.logging.LoggerFactory;
import co.elastic.apm.agent.util.ExecutorUtils;
import co.elastic.otel.JvmtiAccess;
import co.elastic.otel.UniversalProfilingCorrelation;
import co.elastic.otel.profiler.DecodeException;
import co.elastic.otel.profiler.ProfilerMessage;
import co.elastic.otel.profiler.ProfilerRegistrationMessage;
import co.elastic.otel.profiler.TraceCorrelationMessage;

import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.Base64;
import java.util.Random;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;

public class UniversalProfilingIntegration {

/**
* The frequency at which the processor polls the unix domain socket for new messages from the
* profiler.
*/
static final long POLL_FREQUENCY_MS = 20;

private static final Logger log = LoggerFactory.getLogger(UniversalProfilingIntegration.class);

private volatile ElasticApmTracer tracer;

// Visible for testing
volatile boolean isActive = false;

// Visible for testing
String socketPath = null;

private ScheduledExecutorService executor;

private ActivationListener activationListener = new ActivationListener() {

@Override
Expand Down Expand Up @@ -70,22 +95,51 @@ public void start(ElasticApmTracer tracer) {
try {
log.debug("Starting universal profiling correlation");

socketPath = openProfilerSocket(config.getSocketDir());

CoreConfiguration coreConfig = tracer.getConfig(CoreConfiguration.class);
ByteBuffer processCorrelationStorage = ProfilerSharedMemoryWriter.generateProcessCorrelationStorage(
coreConfig.getServiceName(), coreConfig.getEnvironment(), "");
coreConfig.getServiceName(), coreConfig.getEnvironment(), socketPath);
UniversalProfilingCorrelation.setProcessStorage(processCorrelationStorage);

executor = ExecutorUtils.createSingleThreadSchedulingDaemonPool("profiling-integration");
executor.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
periodicTimer();
}
}, POLL_FREQUENCY_MS, POLL_FREQUENCY_MS, TimeUnit.MILLISECONDS);

isActive = true;
tracer.registerSpanListener(activationListener);
} catch (Exception e) {
log.error("Failed to start universal profiling integration", e);
if (socketPath != null) {
try {
UniversalProfilingCorrelation.stopProfilerReturnChannel();
socketPath = null;
} catch (Exception e2) {
log.error("Failed to clean up universal profiling integration socket", e2);
}
}
}
}

private void periodicTimer() {
consumeProfilerMessages();
}

public void stop() {
try {
if (executor != null) {
executor.shutdown();
executor.awaitTermination(10, TimeUnit.SECONDS);
executor = null;
}
if (isActive) {
UniversalProfilingCorrelation.stopProfilerReturnChannel();
JvmtiAccess.destroy();
isActive = false;
}
} catch (Exception e) {
log.error("Failed to stop universal profiling integration", e);
Expand Down Expand Up @@ -114,4 +168,65 @@ public void correlateAndReport(Transaction endedTransaction) {
public void drop(Transaction endedTransaction) {
//TODO: remove dropped transactions from correlation storage without reporting
}


private String openProfilerSocket(String socketDir) {
Path dir = Paths.get(socketDir);
if (!Files.exists(dir) && !dir.toFile().mkdirs()) {
throw new IllegalArgumentException("Could not create directory '" + socketDir + "'");
}
Path socketFile;
do {
socketFile = dir.resolve(randomSocketFileName());
} while (Files.exists(socketFile));

String absolutePath = socketFile.toAbsolutePath().toString();
log.debug("Opening profiler correlation socket {}", absolutePath);
UniversalProfilingCorrelation.startProfilerReturnChannel(absolutePath);
return absolutePath;
}

private String randomSocketFileName() {
StringBuilder name = new StringBuilder("essock");
String allowedChars = "abcdefghijklmonpqrstuvwxzyABCDEFGHIJKLMONPQRSTUVWXYZ0123456789";
Random rnd = new Random();
for (int i = 0; i < 8; i++) {
int idx = rnd.nextInt(allowedChars.length());
name.append(allowedChars.charAt(idx));
}
return name.toString();
}

private void consumeProfilerMessages() {
try {
while (true) {
try {
ProfilerMessage message =
UniversalProfilingCorrelation.readProfilerReturnChannelMessage();
if (message == null) {
break;
} else if (message instanceof TraceCorrelationMessage) {
handleMessage((TraceCorrelationMessage) message);
} else if (message instanceof ProfilerRegistrationMessage) {
handleMessage((ProfilerRegistrationMessage) message);
} else {
log.debug("Received unknown message type from profiler: {}", message);
}
} catch (DecodeException e) {
log.warn("Failed to read profiler message", e);
// intentionally no break here, subsequent messages might be decodeable
}
}
} catch (Exception e) {
log.error("Cannot read from profiler socket", e);
}
}

private void handleMessage(ProfilerRegistrationMessage message) {
//TODO: handle message
}

private void handleMessage(TraceCorrelationMessage message) {
//TODO: handle message
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.EnabledOnOs;
import org.junit.jupiter.api.condition.OS;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.stagemonitor.configuration.ConfigurationRegistry;
Expand All @@ -46,6 +47,9 @@
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.function.Consumer;

import static co.elastic.apm.agent.testutils.assertions.Assertions.assertThat;
Expand All @@ -62,6 +66,16 @@ public class UniversalProfilingIntegrationTest {

private TestObjectPoolFactory poolFactory;

@TempDir
Path tempDir;

void setupTracer() {
setupTracer(config -> {
UniversalProfilingConfiguration conf = config.getConfig(UniversalProfilingConfiguration.class);
doReturn(true).when(conf).isEnabled();
doReturn(tempDir.toAbsolutePath().toString()).when(conf).getSocketDir();
});
}
void setupTracer(Consumer<ConfigurationRegistry> configCustomizer) {
ConfigurationRegistry config = SpyConfiguration.createSpyConfig();
configCustomizer.accept(config);
Expand Down Expand Up @@ -96,6 +110,7 @@ public void ensureDisabledOnWindows() {

verify(mockTracer, never()).registerSpanListener(any());
assertThat(universalProfilingIntegration.isActive).isFalse();
assertThat(universalProfilingIntegration.socketPath).isNull();
}

@Test
Expand All @@ -107,6 +122,7 @@ public void ensureDisabledByDefault() {

verify(mockTracer, never()).registerSpanListener(any());
assertThat(universalProfilingIntegration.isActive).isFalse();
assertThat(universalProfilingIntegration.socketPath).isNull();
}

@Nested
Expand All @@ -115,8 +131,7 @@ class SharedMemory {

@Test
public void testNestedActivations() {
setupTracer(conf -> doReturn(true)
.when(conf.getConfig(UniversalProfilingConfiguration.class)).isEnabled());
setupTracer();

Transaction first = tracer.startRootTransaction(null);
Transaction second = tracer.startRootTransaction(null);
Expand Down Expand Up @@ -168,7 +183,9 @@ public void testNestedActivations() {
@ValueSource(strings = {"my nameßspace", ""})
public void testProcessStoragePopulated(String environment) {
setupTracer(conf -> {
doReturn(true).when(conf.getConfig(UniversalProfilingConfiguration.class)).isEnabled();
UniversalProfilingConfiguration profConfig = conf.getConfig(UniversalProfilingConfiguration.class);
doReturn(true).when(profConfig).isEnabled();
doReturn(tempDir.toAbsolutePath().toString()).when(profConfig).getSocketDir();
CoreConfiguration core = conf.getConfig(CoreConfiguration.class);
doReturn("service Ä 1").when(core).getServiceName();
doReturn(environment).when(core).getEnvironment();
Expand All @@ -179,7 +196,8 @@ public void testProcessStoragePopulated(String environment) {
assertThat(buffer.getChar()).describedAs("layout-minor-version").isEqualTo((char) 1);
assertThat(readUtf8Str(buffer)).isEqualTo("service Ä 1");
assertThat(readUtf8Str(buffer)).isEqualTo(environment);
assertThat(readUtf8Str(buffer)).describedAs("socket-path").isEqualTo("");
assertThat(readUtf8Str(buffer)).describedAs("socket-path")
.startsWith(tempDir.toAbsolutePath().toString() + "/essock");
}

private String readUtf8Str(ByteBuffer buffer) {
Expand Down Expand Up @@ -220,6 +238,58 @@ static void checkTlsIs(@Nullable AbstractSpan<?> span) {
}
}

@DisabledOnOs(OS.WINDOWS)
@Nested
class SpanCorrelation {

@Test
void badSocketPath() throws Exception {
Path notADir = tempDir.resolve("not_a_dir");
Files.createFile(notADir);
String absPath = notADir.toAbsolutePath().toString();

ConfigurationRegistry configRegistry = SpyConfiguration.createSpyConfig();
UniversalProfilingConfiguration config = configRegistry.getConfig(UniversalProfilingConfiguration.class);

doReturn(true).when(config).isEnabled();
doReturn(absPath).when(config).getSocketDir();

UniversalProfilingIntegration universalProfilingIntegration = new UniversalProfilingIntegration();
ElasticApmTracer mockTracer = MockTracer.create(configRegistry);

universalProfilingIntegration.start(mockTracer);

verify(mockTracer, never()).registerSpanListener(any());
assertThat(universalProfilingIntegration.isActive).isFalse();
assertThat(universalProfilingIntegration.socketPath).isNull();
}

@Test
void socketParentDirCreated() throws Exception {
Path subDirs = tempDir.resolve("create/me");
String absolute = subDirs.toAbsolutePath().toString();

ConfigurationRegistry configRegistry = SpyConfiguration.createSpyConfig();
UniversalProfilingConfiguration config = configRegistry.getConfig(UniversalProfilingConfiguration.class);

doReturn(true).when(config).isEnabled();
doReturn(absolute).when(config).getSocketDir();

UniversalProfilingIntegration universalProfilingIntegration = new UniversalProfilingIntegration();
ElasticApmTracer mockTracer = MockTracer.create(configRegistry);

universalProfilingIntegration.start(mockTracer);
try {
assertThat(Paths.get(universalProfilingIntegration.socketPath)).exists();
assertThat(universalProfilingIntegration.socketPath).startsWith(absolute + "/");
} finally {
universalProfilingIntegration.stop();
}
}

}


private static byte[] idToBytes(Id id) {
byte[] buff = new byte[32];
int len = id.toBytes(buff, 0);
Expand Down

0 comments on commit c339b00

Please sign in to comment.