Skip to content

Commit

Permalink
Universal profiling integration: write shared memory (#3598)
Browse files Browse the repository at this point in the history
---------

Co-authored-by: SylvainJuge <[email protected]>
  • Loading branch information
JonasKunz and SylvainJuge authored Apr 26, 2024
1 parent 35e0d6d commit cc4d8a1
Show file tree
Hide file tree
Showing 10 changed files with 555 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ public class AgentInfo {
"com.blogspot.mydailyjava.weaklockfree",
"com.lmax.disruptor",
"com.dslplatform.json",
"com.googlecode.concurrentlinkedhashmap"
"com.googlecode.concurrentlinkedhashmap",
"co.elastic.otel"
));

private static final Set<String> agentRootPackages = new HashSet<>(Arrays.asList(
Expand Down
5 changes: 5 additions & 0 deletions apm-agent-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@
<artifactId>HdrHistogram</artifactId>
<version>2.1.11</version>
</dependency>
<dependency>
<groupId>co.elastic.otel</groupId>
<artifactId>jvmti-access</artifactId>
<version>0.3.0</version>
</dependency>
<!--
We can't use caffeine due to requiring Java 7.
As recommended by the author, we use concurrentlinkedhashmap-lru instead:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package co.elastic.apm.agent.configuration;

import org.stagemonitor.configuration.ConfigurationOption;
import org.stagemonitor.configuration.ConfigurationOptionProvider;

import static co.elastic.apm.agent.tracer.configuration.RangeValidator.isInRange;

public class UniversalProfilingConfiguration extends ConfigurationOptionProvider {

private static final String PROFILING_CATEGORY = "Profiling";

private final ConfigurationOption<Boolean> enabled = ConfigurationOption.booleanOption()
.key("universal_profiling_integration_enabled")
.tags("added[1.50.0]", "internal")
.configurationCategory(PROFILING_CATEGORY)
.description("If enabled, the apm agent will correlate it's transaction with the profiling data from elastic universal profiling running on the same host.")
.buildWithDefault(false);

private final ConfigurationOption<Long> bufferSize = ConfigurationOption.longOption()
.key("universal_profiling_integration_buffer_size")
.addValidator(isInRange(64L, Long.MAX_VALUE))
.tags("added[1.50.0]", "internal")
.configurationCategory(PROFILING_CATEGORY)
.description("The feature needs to buffer ended local-root spans for a short duration to ensure that all of its profiling data has been received." +
"This configuration option configures the buffer size in number of spans. " +
"The higher the number of local root spans per second, the higher this buffer size should be set.\n" +
"The agent will log a warning if it is not capable of buffering a span due to insufficient buffer size. " +
"This will cause the span to be exported immediately instead with possibly incomplete profiling correlation data.")
.buildWithDefault(4096L);

private final ConfigurationOption<String> socketDir = ConfigurationOption.stringOption()
.key("universal_profiling_integration_socket_dir")
.tags("added[1.50.0]", "internal")
.configurationCategory(PROFILING_CATEGORY)
.description("The extension needs to bind a socket to a file for communicating with the universal profiling host agent." +
"This configuration option can be used to change the location. " +
"Note that the total path name (including the socket) must not exceed 100 characters due to OS restrictions.\n" +
"If unset, the value of the `java.io.tmpdir` system property will be used.")
.build();

public boolean isEnabled() {
return enabled.get();
}

public long getBufferSize() {
return bufferSize.get();
}

public String getSocketDir() {
String dir = socketDir.get();
return dir == null || dir.isEmpty() ? System.getProperty("java.io.tmpdir") : dir;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import co.elastic.apm.agent.tracer.dispatch.HeaderGetter;
import co.elastic.apm.agent.tracer.reference.ReferenceCounted;
import co.elastic.apm.agent.tracer.reference.ReferenceCountedMap;
import co.elastic.apm.agent.universalprofiling.UniversalProfilingIntegration;
import co.elastic.apm.agent.util.DependencyInjectingServiceLoader;
import co.elastic.apm.agent.util.ExecutorUtils;
import com.dslplatform.json.JsonWriter;
Expand Down Expand Up @@ -143,6 +144,8 @@ protected ActiveStack initialValue() {
private final SpanConfiguration spanConfiguration;
private final List<ActivationListener> activationListeners;
private final MetricRegistry metricRegistry;

private final UniversalProfilingIntegration profilingIntegration;
private final ScheduledThreadPoolExecutor sharedPool;
private final int approximateContextSize;
private Sampler sampler;
Expand Down Expand Up @@ -261,6 +264,7 @@ public void onChange(ConfigurationOption<?> configurationOption, Double oldValue
// sets the assertionsEnabled flag to true if indeed enabled
//noinspection AssertWithSideEffects
assert assertionsEnabled = true;
profilingIntegration = new UniversalProfilingIntegration();
}

@Override
Expand Down Expand Up @@ -341,6 +345,7 @@ private void afterTransactionStart(@Nullable ClassLoader initiatingClassLoader,
if (serviceInfo != null) {
transaction.getTraceContext().setServiceInfo(serviceInfo.getServiceName(), serviceInfo.getServiceVersion());
}
profilingIntegration.afterTransactionStart(transaction);
}

public Transaction noopTransaction() {
Expand Down Expand Up @@ -525,8 +530,9 @@ public void endTransaction(Transaction transaction) {
if (!transaction.isNoop() &&
(transaction.isSampled() || apmServerClient.supportsKeepingUnsampledTransaction())) {
// we do report non-sampled transactions (without the context)
reporter.report(transaction);
profilingIntegration.correlateAndReport(transaction);
} else {
profilingIntegration.drop(transaction);
transaction.decrementReferences();
}
}
Expand Down Expand Up @@ -633,6 +639,7 @@ public synchronized void stop() {
logger.debug("Tracer stop stack trace: ", new Throwable("Expected - for debugging purposes"));
}

profilingIntegration.stop();
try {
configurationRegistry.close();
reporter.close();
Expand Down Expand Up @@ -738,6 +745,7 @@ private synchronized void startSync() {
}
apmServerClient.start();
reporter.start();
profilingIntegration.start(this);
for (LifecycleListener lifecycleListener : lifecycleListeners) {
try {
lifecycleListener.start(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public static SystemInfo create(final @Nullable String configuredHostname, final
return systemInfo.findContainerDetails();
}

static boolean isWindows(String osName) {
public static boolean isWindows(String osName) {
return osName.startsWith("Windows");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ public int toBytes(byte[] bytes, int offset) {
return offset + data.length;
}

public void writeToBuffer(ByteBuffer buffer) {
buffer.put(data);
}

public void fromLongs(long... values) {
if (values.length * Long.BYTES != data.length) {
throw new IllegalArgumentException("Invalid number of long values");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package co.elastic.apm.agent.universalprofiling;

import co.elastic.apm.agent.impl.transaction.AbstractSpan;
import co.elastic.apm.agent.impl.transaction.Transaction;
import co.elastic.apm.agent.sdk.logging.Logger;
import co.elastic.apm.agent.sdk.logging.LoggerFactory;
import co.elastic.otel.UniversalProfilingCorrelation;

import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.charset.StandardCharsets;

public class ProfilerSharedMemoryWriter {

private static final Logger log = LoggerFactory.getLogger(ProfilerSharedMemoryWriter.class);

private static final int TLS_MINOR_VERSION_OFFSET = 0;
private static final int TLS_VALID_OFFSET = 2;
private static final int TLS_TRACE_PRESENT_OFFSET = 3;
private static final int TLS_TRACE_FLAGS_OFFSET = 4;
private static final int TLS_TRACE_ID_OFFSET = 5;
private static final int TLS_SPAN_ID_OFFSET = 21;
private static final int TLS_LOCAL_ROOT_SPAN_ID_OFFSET = 29;
static final int TLS_STORAGE_SIZE = 37;

private static volatile int writeForMemoryBarrier = 0;

static ByteBuffer generateProcessCorrelationStorage(String serviceName, @Nullable String environment, String socketFilePath) {
ByteBuffer buffer = ByteBuffer.allocateDirect(4096);
buffer.order(ByteOrder.nativeOrder());
buffer.position(0);

buffer.putChar((char) 1); // layout-minor-version
writeUtf8Str(buffer, serviceName);
writeUtf8Str(buffer, environment == null ? "" : environment);
writeUtf8Str(buffer, socketFilePath);
return buffer;
}

private static void writeUtf8Str(ByteBuffer buffer, String str) {
byte[] utf8 = str.getBytes(StandardCharsets.UTF_8);
buffer.putInt(utf8.length);
buffer.put(utf8);
}

/**
* This method ensures that all writes which happened prior to this method call are not moved
* after the method call due to reordering.
*
* <p>This is realized based on the Java Memory Model guarantess for volatile variables. Relevant
* resources:
*
* <ul>
* <li><a
* href="https://stackoverflow.com/questions/17108541/happens-before-relationships-with-volatile-fields-and-synchronized-blocks-in-jav">StackOverflow
* topic</a>
* <li><a href="https://gee.cs.oswego.edu/dl/jmm/cookbook.html">JSR Compiler Cookbook</a>
* </ul>
*/
private static void memoryStoreStoreBarrier() {
writeForMemoryBarrier = 42;
}

static void updateThreadCorrelationStorage(@Nullable AbstractSpan<?> newSpan) {
try {
ByteBuffer tls = UniversalProfilingCorrelation.getCurrentThreadStorage(true, TLS_STORAGE_SIZE);
// tls might be null if unsupported or something went wrong on initialization
if (tls != null) {
// the valid flag is used to signal the host-agent that it is reading incomplete data
tls.put(TLS_VALID_OFFSET, (byte) 0);
memoryStoreStoreBarrier();
tls.putChar(TLS_MINOR_VERSION_OFFSET, (char) 1);

if (newSpan != null) {
Transaction tx = newSpan.getParentTransaction();
tls.put(TLS_TRACE_PRESENT_OFFSET, (byte) 1);
tls.put(TLS_TRACE_FLAGS_OFFSET, newSpan.getTraceContext().getFlags());
tls.position(TLS_TRACE_ID_OFFSET);
newSpan.getTraceContext().getTraceId().writeToBuffer(tls);
tls.position(TLS_SPAN_ID_OFFSET);
newSpan.getTraceContext().getId().writeToBuffer(tls);
tls.position(TLS_LOCAL_ROOT_SPAN_ID_OFFSET);
tx.getTraceContext().getId().writeToBuffer(tls);
} else {
tls.put(TLS_TRACE_PRESENT_OFFSET, (byte) 0);
}
memoryStoreStoreBarrier();
tls.put(TLS_VALID_OFFSET, (byte) 1);
}
} catch (Exception e) {
log.error("Failed to write profiling correlation tls", e);
}
}
}

Loading

0 comments on commit cc4d8a1

Please sign in to comment.