Skip to content

Commit

Permalink
fix: jdk8 send jfr data error (#725)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhengziyi0117 authored Oct 31, 2024
1 parent 54a3372 commit 26e5948
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@
import org.apache.skywalking.apm.network.language.asyncprofiler.v10.AsyncProfilerTaskGrpc;
import org.apache.skywalking.apm.network.language.asyncprofiler.v10.AsyncProfilingStatus;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.Objects;
import java.nio.file.Files;
import java.util.concurrent.TimeUnit;

import static org.apache.skywalking.apm.agent.core.conf.Config.AsyncProfiler.DATA_CHUNK_SIZE;
Expand Down Expand Up @@ -87,69 +87,71 @@ public void statusChanged(GRPCChannelStatus status) {
this.status = status;
}

public void sendData(AsyncProfilerTask task, FileChannel channel) throws IOException, InterruptedException {
if (status != GRPCChannelStatus.CONNECTED || Objects.isNull(channel) || !channel.isOpen()) {
public void sendData(AsyncProfilerTask task, File dumpFile) throws IOException, InterruptedException {
if (status != GRPCChannelStatus.CONNECTED) {
return;
}

int size = Math.toIntExact(channel.size());
final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false);
StreamObserver<AsyncProfilerData> dataStreamObserver = asyncProfilerTaskStub.withDeadlineAfter(
GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS
).collect(new ClientResponseObserver<AsyncProfilerData, AsyncProfilerCollectionResponse>() {
ClientCallStreamObserver<AsyncProfilerData> requestStream;

@Override
public void beforeStart(ClientCallStreamObserver<AsyncProfilerData> requestStream) {
this.requestStream = requestStream;
}
try (FileInputStream fileInputStream = new FileInputStream(dumpFile)) {
long fileSize = Files.size(dumpFile.toPath());
int size = Math.toIntExact(fileSize);
final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false);
StreamObserver<AsyncProfilerData> dataStreamObserver = asyncProfilerTaskStub.withDeadlineAfter(
GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS
).collect(new ClientResponseObserver<AsyncProfilerData, AsyncProfilerCollectionResponse>() {
ClientCallStreamObserver<AsyncProfilerData> requestStream;

@Override
public void beforeStart(ClientCallStreamObserver<AsyncProfilerData> requestStream) {
this.requestStream = requestStream;
}

@Override
public void onNext(AsyncProfilerCollectionResponse value) {
if (AsyncProfilingStatus.TERMINATED_BY_OVERSIZE.equals(value.getType())) {
LOGGER.warn("JFR is too large to be received by the oap server");
} else {
ByteBuffer buf = ByteBuffer.allocateDirect(DATA_CHUNK_SIZE);
try {
while (channel.read(buf) > 0) {
buf.flip();
AsyncProfilerData asyncProfilerData = AsyncProfilerData.newBuilder()
.setContent(ByteString.copyFrom(buf))
.build();
requestStream.onNext(asyncProfilerData);
buf.clear();
@Override
public void onNext(AsyncProfilerCollectionResponse value) {
if (AsyncProfilingStatus.TERMINATED_BY_OVERSIZE.equals(value.getType())) {
LOGGER.warn("JFR is too large to be received by the oap server");
} else {
byte[] buf = new byte[DATA_CHUNK_SIZE];
try {
int bytesRead;
while ((bytesRead = fileInputStream.read(buf)) != -1) {
AsyncProfilerData asyncProfilerData = AsyncProfilerData.newBuilder()
.setContent(ByteString.copyFrom(buf, 0, bytesRead))
.build();
requestStream.onNext(asyncProfilerData);
}
} catch (IOException e) {
LOGGER.error("Failed to read JFR file and failed to upload to oap", e);
}
} catch (IOException e) {
LOGGER.error("Failed to read JFR file and failed to upload to oap", e);
}
}

requestStream.onCompleted();
}

@Override
public void onError(Throwable t) {
status.finished();
LOGGER.error(t, "Send async profiler task data to collector fail with a grpc internal exception.");
ServiceManager.INSTANCE.findService(GRPCChannelManager.class).reportError(t);
}
requestStream.onCompleted();
}

@Override
public void onCompleted() {
status.finished();
}
});
AsyncProfilerMetaData metaData = AsyncProfilerMetaData.newBuilder()
.setService(Config.Agent.SERVICE_NAME)
.setServiceInstance(Config.Agent.INSTANCE_NAME)
.setType(AsyncProfilingStatus.PROFILING_SUCCESS)
.setContentSize(size)
.setTaskId(task.getTaskId())
.build();
AsyncProfilerData asyncProfilerData = AsyncProfilerData.newBuilder().setMetaData(metaData).build();
dataStreamObserver.onNext(asyncProfilerData);
@Override
public void onError(Throwable t) {
status.finished();
LOGGER.error(t, "Send async profiler task data to collector fail with a grpc internal exception.");
ServiceManager.INSTANCE.findService(GRPCChannelManager.class).reportError(t);
}

status.wait4Finish();
@Override
public void onCompleted() {
status.finished();
}
});
AsyncProfilerMetaData metaData = AsyncProfilerMetaData.newBuilder()
.setService(Config.Agent.SERVICE_NAME)
.setServiceInstance(Config.Agent.INSTANCE_NAME)
.setType(AsyncProfilingStatus.PROFILING_SUCCESS)
.setContentSize(size)
.setTaskId(task.getTaskId())
.build();
AsyncProfilerData asyncProfilerData = AsyncProfilerData.newBuilder().setMetaData(metaData).build();
dataStreamObserver.onNext(asyncProfilerData);

status.wait4Finish();
}
}

public void sendError(AsyncProfilerTask task, String errorMessage) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -90,20 +88,15 @@ private void stopWhenError(AsyncProfilerTask task, String errorMessage) {
}

private void stopWhenSuccess(AsyncProfilerTask task) {

// stop task and send data
try {
File dumpFile = task.stop(getAsyncProfiler());
// stop task
try (FileInputStream fileInputStream = new FileInputStream(dumpFile)) {
// upload file
FileChannel channel = fileInputStream.getChannel();

if (dumpFile != null && dumpFile.exists()) {
AsyncProfilerDataSender dataSender = ServiceManager.INSTANCE.findService(AsyncProfilerDataSender.class);
dataSender.sendData(task, channel);
}

if (!dumpFile.delete()) {
LOGGER.warn("Fail to delete the dump file of async profiler.");
dataSender.sendData(task, dumpFile);
if (!dumpFile.delete()) {
LOGGER.warn("Fail to delete the dump file of async profiler.");
}
}
} catch (Exception e) {
LOGGER.error("stop async profiler task error", e);
Expand Down

0 comments on commit 26e5948

Please sign in to comment.