Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support async-profiler grpc interface #156

Merged
merged 29 commits into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
312771e
Added preliminary support for asyncprofiler
Nov 7, 2024
6bcfd12
Added preliminary support for asyncprofiler
Nov 7, 2024
71bdd67
feat: async profiler interface
zhengziyi0117 Nov 8, 2024
1155262
feat: async profiler interface
zhengziyi0117 Nov 8, 2024
6691030
feat: support BidirectionalStream
zhengziyi0117 Nov 8, 2024
f31af0c
feat: e2e test case
zhengziyi0117 Nov 8, 2024
c012243
fix: e2e test case and comment and license header
zhengziyi0117 Nov 8, 2024
811f866
fix: go lint ci
zhengziyi0117 Nov 9, 2024
6ac42f5
feat: update document
zhengziyi0117 Nov 9, 2024
e0f5105
fix: go lint ci && e2e test case
zhengziyi0117 Nov 9, 2024
3a4bd75
fix: update gha actions/upload-artifact version to v4
zhengziyi0117 Nov 9, 2024
148899b
fix: e2e test case
zhengziyi0117 Nov 10, 2024
7b31cc9
feat: receiver and forwarder document
zhengziyi0117 Nov 10, 2024
54acca7
fix: ebpf profiling e2e test
zhengziyi0117 Nov 10, 2024
06fa101
fix: tracing profile e2e test case
zhengziyi0117 Nov 11, 2024
e2afdd0
fix: tracing profile e2e test case
zhengziyi0117 Nov 11, 2024
14f3a21
fix: ebpf profiling test case
Nov 11, 2024
f42918b
Bump up Helm Chart to make ES run on recent Kubernetes
kezhenxu94 Nov 11, 2024
2271492
Fix ES
kezhenxu94 Nov 11, 2024
cc0ec97
Fix upload artifact
kezhenxu94 Nov 11, 2024
f5b0ea0
remove the UT test in the windows
mrproliu Nov 11, 2024
efbe303
Fix E2E
mrproliu Nov 11, 2024
c31d2cb
Fix Istio E2E
mrproliu Nov 11, 2024
158684b
Update ALS E2E
mrproliu Nov 11, 2024
c4ee5c8
Update Istio Metrics E2E
mrproliu Nov 11, 2024
00dff5d
Update Istio Metrics E2E
mrproliu Nov 11, 2024
9704fb7
Upload logs when ALS E2E failure
mrproliu Nov 11, 2024
7703a27
Try to fix ALS E2E
mrproliu Nov 11, 2024
680ba6c
Try to fix ALS E2E
mrproliu Nov 11, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/build-and-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ jobs:
- name: Lint
run: make lint
- name: Test
if: matrix.runner == 'ubuntu'
run: make test
- name: Build
run: make build
Expand Down
14 changes: 10 additions & 4 deletions .github/workflows/e2e-istio.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ jobs:
fail-fast: true
matrix:
analyzer: [k8s-mesh, mx-mesh]
istio_version: [1.7.1, 1.8.2, 1.9.1]
istio_version: [ 1.15.0, 1.16.0, 1.17.0, 1.18.0 ]
name: Istio(${{ matrix.istio_version }})+ALS(${{ matrix.analyzer }})
env:
ISTIO_VERSION: ${{ matrix.istio_version }}
Expand All @@ -54,6 +54,12 @@ jobs:
- uses: apache/skywalking-infra-e2e@cf589b4a0b9f8e6f436f78e9cfd94a1ee5494180
with:
e2e-file: test/e2e/case/istio/als/e2e.yaml
- uses: actions/upload-artifact@v4
if: ${{ failure() }}
name: Upload Logs
with:
name: logs-${{ matrix.istio_version }}
path: "${{ env.SW_INFRA_E2E_LOG_DIR }}"

metrics-service:
runs-on: ubuntu-latest
Expand All @@ -62,7 +68,7 @@ jobs:
strategy:
fail-fast: true
matrix:
istio_version: [1.8.2, 1.9.1]
istio_version: [1.15.0, 1.16.0, 1.17.0, 1.18.0]
env:
ISTIO_VERSION: ${{ matrix.istio_version }}
steps:
Expand All @@ -80,9 +86,9 @@ jobs:
- uses: apache/skywalking-infra-e2e@cf589b4a0b9f8e6f436f78e9cfd94a1ee5494180
with:
e2e-file: test/e2e/case/istio/metrics/e2e.yaml
- uses: actions/upload-artifact@v2
- uses: actions/upload-artifact@v4
if: ${{ failure() }}
name: Upload Logs
with:
name: logs
name: logs-${{ matrix.istio_version }}
path: "${{ env.SW_INFRA_E2E_LOG_DIR }}"
6 changes: 4 additions & 2 deletions .github/workflows/e2e-native.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ jobs:
config: test/e2e/case/native-protocols/e2e.yaml
- name: Native Process/EBPF Profiling
config: test/e2e/case/native-ebpf/e2e.yaml
- name: Native Process/Async Profiler
config: test/e2e/case/native-asyncprofiler/e2e.yaml
fail-fast: true
name: Native E2E test
runs-on: ubuntu-latest
Expand All @@ -47,9 +49,9 @@ jobs:
uses: apache/skywalking-infra-e2e@cf589b4a0b9f8e6f436f78e9cfd94a1ee5494180
with:
e2e-file: $GITHUB_WORKSPACE/${{ matrix.test.config }}
- uses: actions/upload-artifact@v2
- uses: actions/upload-artifact@v4
if: ${{ failure() }}
name: Upload Logs
with:
name: logs
name: logs-${{ matrix.test.name }}
path: "${{ env.SW_INFRA_E2E_LOG_DIR }}"
26 changes: 26 additions & 0 deletions configs/satellite_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,32 @@ pipes:
client_name: grpc-client
forwarders:
- plugin_name: native-profile-grpc-forwarder
- common_config:
pipe_name: asyncprofilerpipe
gatherer:
server_name: "grpc-server"
receiver:
plugin_name: "grpc-native-async-profiler-receiver"
queue:
plugin_name: "memory-queue"
# The maximum buffer event size.
event_buffer_size: ${SATELLITE_QUEUE_EVENT_BUFFER_SIZE:5000}
# The partition count of queue.
partition: ${SATELLITE_QUEUE_PARTITION:4}
processor:
filters:
sender:
fallbacker:
plugin_name: none-fallbacker
# The time interval between two flush operations. And the time unit is millisecond.
flush_time: ${SATELLITE_PROFILEPIPE_SENDER_FLUSH_TIME:1000}
# The maximum buffer elements.
max_buffer_size: ${SATELLITE_PROFILEPIPE_SENDER_MAX_BUFFER_SIZE:200}
# The minimum flush elements.
min_flush_events: ${SATELLITE_PROFILEPIPE_SENDER_MIN_FLUSH_EVENTS:1}
client_name: grpc-client
forwarders:
- plugin_name: native-async-profiler-grpc-forwarder
- common_config:
pipe_name: cdspipe
gatherer:
Expand Down
Empty file modified docs/en/setup/plugins/client_grpc-client.md
100755 → 100644
Empty file.
Empty file modified docs/en/setup/plugins/client_kafka-client.md
100755 → 100644
Empty file.
Empty file modified docs/en/setup/plugins/fallbacker_none-fallbacker.md
100755 → 100644
Empty file.
Empty file modified docs/en/setup/plugins/fallbacker_timer-fallbacker.md
100755 → 100644
Empty file.
Empty file modified docs/en/setup/plugins/forwarder_envoy-als-v2-grpc-forwarder.md
100755 → 100644
Empty file.
Empty file modified docs/en/setup/plugins/forwarder_envoy-als-v3-grpc-forwarder.md
100755 → 100644
Empty file.
Empty file.
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Forwarder/native-async-profiler-grpc-forwarder
## Description
This is a grpc forwarder with the SkyWalking native async profiler protocol.
## DefaultConfig
```yaml```
## Configuration
|Name|Type|Description|
|----|----|-----------|

Empty file modified docs/en/setup/plugins/forwarder_native-cds-grpc-forwarder.md
100755 → 100644
Empty file.
Empty file modified docs/en/setup/plugins/forwarder_native-clr-grpc-forwarder.md
100755 → 100644
Empty file.
Empty file.
Empty file.
Empty file modified docs/en/setup/plugins/forwarder_native-event-grpc-forwarder.md
100755 → 100644
Empty file.
Empty file modified docs/en/setup/plugins/forwarder_native-jvm-grpc-forwarder.md
100755 → 100644
Empty file.
Empty file modified docs/en/setup/plugins/forwarder_native-log-grpc-forwarder.md
100755 → 100644
Empty file.
Empty file modified docs/en/setup/plugins/forwarder_native-log-kafka-forwarder.md
100755 → 100644
Empty file.
Empty file.
Empty file modified docs/en/setup/plugins/forwarder_native-meter-grpc-forwarder.md
100755 → 100644
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
2 changes: 2 additions & 0 deletions docs/en/setup/plugins/plugin-list.md
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
- [Envoy ALS v3 GRPC Forwarder](./forwarder_envoy-als-v3-grpc-forwarder.md)
- [Envoy Metrics v2 GRPC Forwarder](./forwarder_envoy-metrics-v2-grpc-forwarder.md)
- [Envoy Metrics v3 GRPC Forwarder](./forwarder_envoy-metrics-v3-grpc-forwarder.md)
- [Native Async Profiler GRPC Forwarder](./forwarder_native-async-profiler-grpc-forwarder.md)
- [Native CDS GRPC Forwarder](./forwarder_native-cds-grpc-forwarder.md)
- [Native CLR GRPC Forwarder](./forwarder_native-clr-grpc-forwarder.md)
- [GRPC Native EBFP Access Log Forwarder](./forwarder_native-ebpf-accesslog-grpc-forwarder.md)
Expand All @@ -36,6 +37,7 @@
- [GRPC Envoy ALS v3 Receiver](./receiver_grpc-envoy-als-v3-receiver.md)
- [GRPC Envoy Metrics v2 Receiver](./receiver_grpc-envoy-metrics-v2-receiver.md)
- [GRPC Envoy Metrics v3 Receiver](./receiver_grpc-envoy-metrics-v3-receiver.md)
- [GRPC Native Async Profiler Receiver](./receiver_grpc-native-async-profiler-receiver.md)
- [GRPC Native CDS Receiver](./receiver_grpc-native-cds-receiver.md)
- [GRPC Native CLR Receiver](./receiver_grpc-native-clr-receiver.md)
- [GRPC Native EBFP Accesslog Receiver](./receiver_grpc-native-ebpf-accesslog-receiver.md)
Expand Down
Empty file modified docs/en/setup/plugins/queue_memory-queue.md
100755 → 100644
Empty file.
Empty file modified docs/en/setup/plugins/queue_mmap-queue.md
100755 → 100644
Empty file.
Empty file modified docs/en/setup/plugins/queue_none-queue.md
100755 → 100644
Empty file.
Empty file modified docs/en/setup/plugins/receiver_grpc-envoy-als-v2-receiver.md
100755 → 100644
Empty file.
Empty file modified docs/en/setup/plugins/receiver_grpc-envoy-als-v3-receiver.md
100755 → 100644
Empty file.
Empty file.
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Receiver/grpc-native-async-profiler-receiver
## Description
This is a receiver for SkyWalking native async-profiler format, which is defined at https://github.com/apache/skywalking-data-collect-protocol/blob/master/asyncprofiler/AsyncProfiler.proto.
## Support Forwarders
- [native-async-profiler-grpc-forwarder](forwarder_native-async-profiler-grpc-forwarder.md)
## DefaultConfig
```yaml```
## Configuration
|Name|Type|Description|
|----|----|-----------|

Empty file modified docs/en/setup/plugins/receiver_grpc-native-cds-receiver.md
100755 → 100644
Empty file.
Empty file modified docs/en/setup/plugins/receiver_grpc-native-clr-receiver.md
100755 → 100644
Empty file.
Empty file.
Empty file.
Empty file modified docs/en/setup/plugins/receiver_grpc-native-event-receiver.md
100755 → 100644
Empty file.
Empty file modified docs/en/setup/plugins/receiver_grpc-native-jvm-receiver.md
100755 → 100644
Empty file.
Empty file modified docs/en/setup/plugins/receiver_grpc-native-log-receiver.md
100755 → 100644
Empty file.
Empty file.
Empty file modified docs/en/setup/plugins/receiver_grpc-native-meter-receiver.md
100755 → 100644
Empty file.
Empty file modified docs/en/setup/plugins/receiver_grpc-native-process-receiver.md
100755 → 100644
Empty file.
Empty file modified docs/en/setup/plugins/receiver_grpc-native-profile-receiver.md
100755 → 100644
Empty file.
Empty file modified docs/en/setup/plugins/receiver_grpc-native-tracing-receiver.md
100755 → 100644
Empty file.
Empty file modified docs/en/setup/plugins/receiver_grpc-otlp-metrics-v1-receiver.md
100755 → 100644
Empty file.
Empty file modified docs/en/setup/plugins/receiver_http-native-log-receiver.md
100755 → 100644
Empty file.
Empty file modified docs/en/setup/plugins/server_grpc-server.md
100755 → 100644
Empty file.
Empty file modified docs/en/setup/plugins/server_http-server.md
100755 → 100644
Empty file.
4 changes: 4 additions & 0 deletions docs/menu.yml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ catalog:
path: /en/setup/plugins/forwarder_envoy-metrics-v2-grpc-forwarder
- name: Envoy Metrics v3 GRPC Forwarder
path: /en/setup/plugins/forwarder_envoy-metrics-v3-grpc-forwarder
- name: Native Async Profiler GRPC Forwarder
path: /en/setup/plugins/forwarder_native-async-profiler-grpc-forwarder
- name: Native CDS GRPC Forwarder
path: /en/setup/plugins/forwarder_native-cds-grpc-forwarder
- name: Native CLR GRPC Forwarder
Expand Down Expand Up @@ -137,6 +139,8 @@ catalog:
path: /en/setup/plugins/receiver_grpc-envoy-metrics-v2-receiver
- name: GRPC Envoy Metrics v3 Receiver
path: /en/setup/plugins/receiver_grpc-envoy-metrics-v3-receiver
- name: GRPC Native Async Profiler Receiver
path: /en/setup/plugins/receiver_grpc-native-async-profiler-receiver
- name: GRPC Native CDS Receiver
path: /en/setup/plugins/receiver_grpc-native-cds-receiver
- name: GRPC Native CLR Receiver
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ require (
google.golang.org/protobuf v1.34.2
gopkg.in/yaml.v3 v3.0.1
k8s.io/apimachinery v0.26.2
skywalking.apache.org/repo/goapi v0.0.0-20240118085825-a7fa70cc38b7
skywalking.apache.org/repo/goapi v0.0.0-20241106011455-ef3dbfac3128
)

require (
Expand Down
16 changes: 8 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a/go.mod h1:LzIPMQfyMNhhGPh
golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20211108221036-ceb1ce70b4fa/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4=
golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg=
golang.org/x/crypto v0.25.0 h1:ypSNr+bnYL2YhwoMt2zPxHFmbAN1KZs/njMG3hxUp30=
golang.org/x/crypto v0.25.0/go.mod h1:T+wALwcMOSE0kXgUAnPAHqTLW+XHgcELELW8VaDgm/M=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
Expand Down Expand Up @@ -542,7 +542,7 @@ golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d/go.mod h1:9nx3DQGgdP8bBQD5qx
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY=
golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys=
golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
Expand Down Expand Up @@ -619,14 +619,14 @@ golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI=
golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo=
golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U=
golang.org/x/term v0.16.0/go.mod h1:yn7UURbUtPyrVJPGPq404EukNFxcm/foM+bV/bfcDsY=
golang.org/x/term v0.22.0 h1:BbsgPEJULsl2fV/AT3v15Mjva5yXKQDyKf+TbDz7QJk=
golang.org/x/term v0.22.0/go.mod h1:F3qCibpT5AMpCRfhfT53vVJwhLtIVHhB9XDjfFvnMI4=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
Expand All @@ -640,7 +640,7 @@ golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4=
golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
Expand Down Expand Up @@ -809,7 +809,7 @@ google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlba
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=
google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
Expand Down Expand Up @@ -877,5 +877,5 @@ sigs.k8s.io/structured-merge-diff/v4 v4.2.3 h1:PRbqxJClWWYMNV1dhaG4NsibJbArud9kF
sigs.k8s.io/structured-merge-diff/v4 v4.2.3/go.mod h1:qjx8mGObPmV2aSZepjQjbmb2ihdVs8cGKBraizNC69E=
sigs.k8s.io/yaml v1.3.0 h1:a2VclLzOGrwOHDiV8EfBGhvjHvP46CtW5j6POvhYGGo=
sigs.k8s.io/yaml v1.3.0/go.mod h1:GeOyir5tyXNByN85N/dRIT9es5UQNerPYEKK56eTBm8=
skywalking.apache.org/repo/goapi v0.0.0-20240118085825-a7fa70cc38b7 h1:iUx3ovyKy4IMlXv0hB/qRYvUsCIQkAU6DLnLoCt6Qck=
skywalking.apache.org/repo/goapi v0.0.0-20240118085825-a7fa70cc38b7/go.mod h1:oD2dxcDAHVIt95Ee7kJHgZ5f64QNhrqTjQYARwfafc4=
skywalking.apache.org/repo/goapi v0.0.0-20241106011455-ef3dbfac3128 h1:wrHG0+CwYwqcgPWwJf/JI1x1sk+jD1Wdy93M1YuecO0=
skywalking.apache.org/repo/goapi v0.0.0-20241106011455-ef3dbfac3128/go.mod h1:+n8BMuS8eRdzdnGh15ElRGBXPi0eYZSs2TKySBDmRTE=
11 changes: 8 additions & 3 deletions internal/satellite/module/api/sync_invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,14 @@

package api

import v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
import (
"google.golang.org/grpc"
v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
)

type SyncInvoker interface {
// SyncInvoke means synchronized process event
SyncInvoke(d *v1.SniffData) (*v1.SniffData, error)
// SyncInvoke means synchronized process event.
// The returned result grpc.ClientStream is the stream initiated by satellite to oap server,
// which is used to provide bidirectional stream support
SyncInvoke(d *v1.SniffData) (*v1.SniffData, grpc.ClientStream, error)
}
3 changes: 2 additions & 1 deletion internal/satellite/module/gatherer/receiver_gatherer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"time"

"github.com/sirupsen/logrus"
"google.golang.org/grpc"

"github.com/apache/skywalking-satellite/internal/pkg/log"
"github.com/apache/skywalking-satellite/internal/satellite/event"
Expand Down Expand Up @@ -178,7 +179,7 @@ func (r *ReceiverGatherer) Ack(lastOffset *event.Offset) {
r.runningQueue.Ack(lastOffset)
}

func (r *ReceiverGatherer) SyncInvoke(d *v1.SniffData) (*v1.SniffData, error) {
func (r *ReceiverGatherer) SyncInvoke(d *v1.SniffData) (*v1.SniffData, grpc.ClientStream, error) {
return r.processor.SyncInvoke(d)
}

Expand Down
4 changes: 3 additions & 1 deletion internal/satellite/module/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"errors"
"sync"

"google.golang.org/grpc"

"github.com/apache/skywalking-satellite/internal/pkg/log"
"github.com/apache/skywalking-satellite/internal/satellite/event"
"github.com/apache/skywalking-satellite/internal/satellite/module/api"
Expand Down Expand Up @@ -92,7 +94,7 @@ func (p *Processor) processPerPartition(ctx context.Context, partition int, wg *
func (p *Processor) Shutdown() {
}

func (p *Processor) SyncInvoke(d *v1.SniffData) (*v1.SniffData, error) {
func (p *Processor) SyncInvoke(d *v1.SniffData) (*v1.SniffData, grpc.ClientStream, error) {
// direct send data to sender
return p.sender.SyncInvoke(d)
}
Expand Down
7 changes: 4 additions & 3 deletions internal/satellite/module/sender/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"time"

"github.com/sirupsen/logrus"
"google.golang.org/grpc"

v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"

Expand Down Expand Up @@ -266,17 +267,17 @@ func (s *Sender) InputDataChannel(partition int) chan<- *event.OutputEventContex
return s.inputs[partition]
}

func (s *Sender) SyncInvoke(d *v1.SniffData) (*v1.SniffData, error) {
func (s *Sender) SyncInvoke(d *v1.SniffData) (*v1.SniffData, grpc.ClientStream, error) {
supportSyncInvoke := make([]forwarder.Forwarder, 0)
for inx := range s.runningForwarders {
if s.runningForwarders[inx].SupportedSyncInvoke() {
supportSyncInvoke = append(supportSyncInvoke, s.runningForwarders[inx])
}
}
if len(supportSyncInvoke) > 1 {
return nil, fmt.Errorf("only support single forwarder")
return nil, nil, fmt.Errorf("only support single forwarder")
} else if len(supportSyncInvoke) == 0 {
return nil, fmt.Errorf("could not found forwarder")
return nil, nil, fmt.Errorf("could not found forwarder")
}
return supportSyncInvoke[0].SyncForward(d)
}
Expand Down
18 changes: 15 additions & 3 deletions plugins/client/grpc/client_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ import (
"google.golang.org/grpc/metadata"
)

type ctxKey struct{}

var CtxBidirectionalStreamKey = ctxKey{}

// loadConfig use the client params to build the grpc client config.
func (c *Client) loadConfig() (*[]grpc.DialOption, error) {
options := make([]grpc.DialOption, 0)
Expand Down Expand Up @@ -68,14 +72,18 @@ func (c *Client) loadConfig() (*[]grpc.DialOption, error) {
if authHeader != nil {
ctx = metadata.NewOutgoingContext(ctx, authHeader)
}
supportBidirectionalStream := false
if b := ctx.Value(CtxBidirectionalStreamKey); b != nil {
supportBidirectionalStream = b.(bool)
}
timeout, timeoutFunc := context.WithTimeout(ctx, streamRequestTimeout)
clientStream, err := streamer(timeout, desc, cc, method, opts...)
if err != nil {
timeoutFunc()
c.reportError(err)
return nil, err
}
streamWrapper := &timeoutClientStream{clientStream, timeoutFunc}
streamWrapper := &timeoutClientStream{clientStream, supportBidirectionalStream, timeoutFunc}
return streamWrapper, err
}))
grpc.WithUnaryInterceptor(func(ctx context.Context, method string, req, reply interface{},
Expand All @@ -100,11 +108,15 @@ func (c *Client) loadConfig() (*[]grpc.DialOption, error) {

type timeoutClientStream struct {
grpc.ClientStream
timeoutFunc context.CancelFunc
bidirectionalStream bool
timeoutFunc context.CancelFunc
}

func (t *timeoutClientStream) RecvMsg(m interface{}) error {
defer t.timeoutFunc()
if !t.bidirectionalStream {
defer t.timeoutFunc()
}

return t.ClientStream.RecvMsg(m)
}

Expand Down
5 changes: 4 additions & 1 deletion plugins/forwarder/api/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package api
import (
"reflect"

"google.golang.org/grpc"
v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"

"github.com/apache/skywalking-satellite/internal/pkg/plugin"
Expand All @@ -34,7 +35,9 @@ type Forwarder interface {
// Forward the batch events to the external services, such as Kafka MQ and SkyWalking OAP cluster.
Forward(batch event.BatchEvents) error
// SyncForward the single event to the external service with sync forward
SyncForward(event *v1.SniffData) (*v1.SniffData, error)
// The returned result grpc.ClientStream is the stream initiated by satellite to oap server,
// which is used to provide bidirectional stream support
SyncForward(event *v1.SniffData) (*v1.SniffData, grpc.ClientStream, error)
mrproliu marked this conversation as resolved.
Show resolved Hide resolved
// ForwardType returns the supported event type.
ForwardType() v1.SniffType
// SupportedSyncInvoke return is support SyncForward
Expand Down
2 changes: 2 additions & 0 deletions plugins/forwarder/forwarder_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/apache/skywalking-satellite/plugins/forwarder/grpc/envoyalsv3"
"github.com/apache/skywalking-satellite/plugins/forwarder/grpc/envoymetricsv2"
"github.com/apache/skywalking-satellite/plugins/forwarder/grpc/envoymetricsv3"
grpc_asyncprofiler "github.com/apache/skywalking-satellite/plugins/forwarder/grpc/nativeasyncprofiler"
grpc_nativecds "github.com/apache/skywalking-satellite/plugins/forwarder/grpc/nativecds"
grpc_nativeclr "github.com/apache/skywalking-satellite/plugins/forwarder/grpc/nativeclr"
grpc_nativeebpfaccesslog "github.com/apache/skywalking-satellite/plugins/forwarder/grpc/nativeebpfaccesslog"
Expand Down Expand Up @@ -66,6 +67,7 @@ func RegisterForwarderPlugins() {
new(envoymetricsv3.Forwarder),
new(otlpmetricsv1.Forwarder),
new(grpc_nativeebpfaccesslog.Forwarder),
new(grpc_asyncprofiler.Forwarder),
}
for _, forwarder := range forwarders {
plugin.RegisterPlugin(forwarder)
Expand Down
Loading
Loading