Skip to content

Commit

Permalink
Support async-profiler grpc interface (apache#156)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhengziyi0117 authored Nov 11, 2024
1 parent 76103da commit cb2032d
Show file tree
Hide file tree
Showing 126 changed files with 1,149 additions and 274 deletions.
1 change: 1 addition & 0 deletions .github/workflows/build-and-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ jobs:
- name: Lint
run: make lint
- name: Test
if: matrix.runner == 'ubuntu'
run: make test
- name: Build
run: make build
Expand Down
14 changes: 10 additions & 4 deletions .github/workflows/e2e-istio.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ jobs:
fail-fast: true
matrix:
analyzer: [k8s-mesh, mx-mesh]
istio_version: [1.7.1, 1.8.2, 1.9.1]
istio_version: [ 1.15.0, 1.16.0, 1.17.0, 1.18.0 ]
name: Istio(${{ matrix.istio_version }})+ALS(${{ matrix.analyzer }})
env:
ISTIO_VERSION: ${{ matrix.istio_version }}
Expand All @@ -54,6 +54,12 @@ jobs:
- uses: apache/skywalking-infra-e2e@cf589b4a0b9f8e6f436f78e9cfd94a1ee5494180
with:
e2e-file: test/e2e/case/istio/als/e2e.yaml
- uses: actions/upload-artifact@v4
if: ${{ failure() }}
name: Upload Logs
with:
name: logs-${{ matrix.istio_version }}
path: "${{ env.SW_INFRA_E2E_LOG_DIR }}"

metrics-service:
runs-on: ubuntu-latest
Expand All @@ -62,7 +68,7 @@ jobs:
strategy:
fail-fast: true
matrix:
istio_version: [1.8.2, 1.9.1]
istio_version: [1.15.0, 1.16.0, 1.17.0, 1.18.0]
env:
ISTIO_VERSION: ${{ matrix.istio_version }}
steps:
Expand All @@ -80,9 +86,9 @@ jobs:
- uses: apache/skywalking-infra-e2e@cf589b4a0b9f8e6f436f78e9cfd94a1ee5494180
with:
e2e-file: test/e2e/case/istio/metrics/e2e.yaml
- uses: actions/upload-artifact@v2
- uses: actions/upload-artifact@v4
if: ${{ failure() }}
name: Upload Logs
with:
name: logs
name: logs-${{ matrix.istio_version }}
path: "${{ env.SW_INFRA_E2E_LOG_DIR }}"
6 changes: 4 additions & 2 deletions .github/workflows/e2e-native.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ jobs:
config: test/e2e/case/native-protocols/e2e.yaml
- name: Native Process/EBPF Profiling
config: test/e2e/case/native-ebpf/e2e.yaml
- name: Native Process/Async Profiler
config: test/e2e/case/native-asyncprofiler/e2e.yaml
fail-fast: true
name: Native E2E test
runs-on: ubuntu-latest
Expand All @@ -47,9 +49,9 @@ jobs:
uses: apache/skywalking-infra-e2e@cf589b4a0b9f8e6f436f78e9cfd94a1ee5494180
with:
e2e-file: $GITHUB_WORKSPACE/${{ matrix.test.config }}
- uses: actions/upload-artifact@v2
- uses: actions/upload-artifact@v4
if: ${{ failure() }}
name: Upload Logs
with:
name: logs
name: logs-${{ matrix.test.name }}
path: "${{ env.SW_INFRA_E2E_LOG_DIR }}"
26 changes: 26 additions & 0 deletions configs/satellite_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,32 @@ pipes:
client_name: grpc-client
forwarders:
- plugin_name: native-profile-grpc-forwarder
- common_config:
pipe_name: asyncprofilerpipe
gatherer:
server_name: "grpc-server"
receiver:
plugin_name: "grpc-native-async-profiler-receiver"
queue:
plugin_name: "memory-queue"
# The maximum buffer event size.
event_buffer_size: ${SATELLITE_QUEUE_EVENT_BUFFER_SIZE:5000}
# The partition count of queue.
partition: ${SATELLITE_QUEUE_PARTITION:4}
processor:
filters:
sender:
fallbacker:
plugin_name: none-fallbacker
# The time interval between two flush operations. And the time unit is millisecond.
flush_time: ${SATELLITE_PROFILEPIPE_SENDER_FLUSH_TIME:1000}
# The maximum buffer elements.
max_buffer_size: ${SATELLITE_PROFILEPIPE_SENDER_MAX_BUFFER_SIZE:200}
# The minimum flush elements.
min_flush_events: ${SATELLITE_PROFILEPIPE_SENDER_MIN_FLUSH_EVENTS:1}
client_name: grpc-client
forwarders:
- plugin_name: native-async-profiler-grpc-forwarder
- common_config:
pipe_name: cdspipe
gatherer:
Expand Down
Empty file modified docs/en/setup/plugins/client_grpc-client.md
100755 → 100644
Empty file.
Empty file modified docs/en/setup/plugins/client_kafka-client.md
100755 → 100644
Empty file.
Empty file modified docs/en/setup/plugins/fallbacker_none-fallbacker.md
100755 → 100644
Empty file.
Empty file modified docs/en/setup/plugins/fallbacker_timer-fallbacker.md
100755 → 100644
Empty file.
Empty file modified docs/en/setup/plugins/forwarder_envoy-als-v2-grpc-forwarder.md
100755 → 100644
Empty file.
Empty file modified docs/en/setup/plugins/forwarder_envoy-als-v3-grpc-forwarder.md
100755 → 100644
Empty file.
Empty file.
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Forwarder/native-async-profiler-grpc-forwarder
## Description
This is a grpc forwarder with the SkyWalking native async profiler protocol.
## DefaultConfig
```yaml```
## Configuration
|Name|Type|Description|
|----|----|-----------|

Empty file modified docs/en/setup/plugins/forwarder_native-cds-grpc-forwarder.md
100755 → 100644
Empty file.
Empty file modified docs/en/setup/plugins/forwarder_native-clr-grpc-forwarder.md
100755 → 100644
Empty file.
Empty file.
Empty file.
Empty file modified docs/en/setup/plugins/forwarder_native-event-grpc-forwarder.md
100755 → 100644
Empty file.
Empty file modified docs/en/setup/plugins/forwarder_native-jvm-grpc-forwarder.md
100755 → 100644
Empty file.
Empty file modified docs/en/setup/plugins/forwarder_native-log-grpc-forwarder.md
100755 → 100644
Empty file.
Empty file modified docs/en/setup/plugins/forwarder_native-log-kafka-forwarder.md
100755 → 100644
Empty file.
Empty file.
Empty file modified docs/en/setup/plugins/forwarder_native-meter-grpc-forwarder.md
100755 → 100644
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
2 changes: 2 additions & 0 deletions docs/en/setup/plugins/plugin-list.md
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
- [Envoy ALS v3 GRPC Forwarder](./forwarder_envoy-als-v3-grpc-forwarder.md)
- [Envoy Metrics v2 GRPC Forwarder](./forwarder_envoy-metrics-v2-grpc-forwarder.md)
- [Envoy Metrics v3 GRPC Forwarder](./forwarder_envoy-metrics-v3-grpc-forwarder.md)
- [Native Async Profiler GRPC Forwarder](./forwarder_native-async-profiler-grpc-forwarder.md)
- [Native CDS GRPC Forwarder](./forwarder_native-cds-grpc-forwarder.md)
- [Native CLR GRPC Forwarder](./forwarder_native-clr-grpc-forwarder.md)
- [GRPC Native EBFP Access Log Forwarder](./forwarder_native-ebpf-accesslog-grpc-forwarder.md)
Expand All @@ -36,6 +37,7 @@
- [GRPC Envoy ALS v3 Receiver](./receiver_grpc-envoy-als-v3-receiver.md)
- [GRPC Envoy Metrics v2 Receiver](./receiver_grpc-envoy-metrics-v2-receiver.md)
- [GRPC Envoy Metrics v3 Receiver](./receiver_grpc-envoy-metrics-v3-receiver.md)
- [GRPC Native Async Profiler Receiver](./receiver_grpc-native-async-profiler-receiver.md)
- [GRPC Native CDS Receiver](./receiver_grpc-native-cds-receiver.md)
- [GRPC Native CLR Receiver](./receiver_grpc-native-clr-receiver.md)
- [GRPC Native EBFP Accesslog Receiver](./receiver_grpc-native-ebpf-accesslog-receiver.md)
Expand Down
Empty file modified docs/en/setup/plugins/queue_memory-queue.md
100755 → 100644
Empty file.
Empty file modified docs/en/setup/plugins/queue_mmap-queue.md
100755 → 100644
Empty file.
Empty file modified docs/en/setup/plugins/queue_none-queue.md
100755 → 100644
Empty file.
Empty file modified docs/en/setup/plugins/receiver_grpc-envoy-als-v2-receiver.md
100755 → 100644
Empty file.
Empty file modified docs/en/setup/plugins/receiver_grpc-envoy-als-v3-receiver.md
100755 → 100644
Empty file.
Empty file.
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Receiver/grpc-native-async-profiler-receiver
## Description
This is a receiver for SkyWalking native async-profiler format, which is defined at https://github.com/apache/skywalking-data-collect-protocol/blob/master/asyncprofiler/AsyncProfiler.proto.
## Support Forwarders
- [native-async-profiler-grpc-forwarder](forwarder_native-async-profiler-grpc-forwarder.md)
## DefaultConfig
```yaml```
## Configuration
|Name|Type|Description|
|----|----|-----------|

Empty file modified docs/en/setup/plugins/receiver_grpc-native-cds-receiver.md
100755 → 100644
Empty file.
Empty file modified docs/en/setup/plugins/receiver_grpc-native-clr-receiver.md
100755 → 100644
Empty file.
Empty file.
Empty file.
Empty file modified docs/en/setup/plugins/receiver_grpc-native-event-receiver.md
100755 → 100644
Empty file.
Empty file modified docs/en/setup/plugins/receiver_grpc-native-jvm-receiver.md
100755 → 100644
Empty file.
Empty file modified docs/en/setup/plugins/receiver_grpc-native-log-receiver.md
100755 → 100644
Empty file.
Empty file.
Empty file modified docs/en/setup/plugins/receiver_grpc-native-meter-receiver.md
100755 → 100644
Empty file.
Empty file modified docs/en/setup/plugins/receiver_grpc-native-process-receiver.md
100755 → 100644
Empty file.
Empty file modified docs/en/setup/plugins/receiver_grpc-native-profile-receiver.md
100755 → 100644
Empty file.
Empty file modified docs/en/setup/plugins/receiver_grpc-native-tracing-receiver.md
100755 → 100644
Empty file.
Empty file modified docs/en/setup/plugins/receiver_grpc-otlp-metrics-v1-receiver.md
100755 → 100644
Empty file.
Empty file modified docs/en/setup/plugins/receiver_http-native-log-receiver.md
100755 → 100644
Empty file.
Empty file modified docs/en/setup/plugins/server_grpc-server.md
100755 → 100644
Empty file.
Empty file modified docs/en/setup/plugins/server_http-server.md
100755 → 100644
Empty file.
4 changes: 4 additions & 0 deletions docs/menu.yml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ catalog:
path: /en/setup/plugins/forwarder_envoy-metrics-v2-grpc-forwarder
- name: Envoy Metrics v3 GRPC Forwarder
path: /en/setup/plugins/forwarder_envoy-metrics-v3-grpc-forwarder
- name: Native Async Profiler GRPC Forwarder
path: /en/setup/plugins/forwarder_native-async-profiler-grpc-forwarder
- name: Native CDS GRPC Forwarder
path: /en/setup/plugins/forwarder_native-cds-grpc-forwarder
- name: Native CLR GRPC Forwarder
Expand Down Expand Up @@ -137,6 +139,8 @@ catalog:
path: /en/setup/plugins/receiver_grpc-envoy-metrics-v2-receiver
- name: GRPC Envoy Metrics v3 Receiver
path: /en/setup/plugins/receiver_grpc-envoy-metrics-v3-receiver
- name: GRPC Native Async Profiler Receiver
path: /en/setup/plugins/receiver_grpc-native-async-profiler-receiver
- name: GRPC Native CDS Receiver
path: /en/setup/plugins/receiver_grpc-native-cds-receiver
- name: GRPC Native CLR Receiver
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ require (
google.golang.org/protobuf v1.34.2
gopkg.in/yaml.v3 v3.0.1
k8s.io/apimachinery v0.26.2
skywalking.apache.org/repo/goapi v0.0.0-20240118085825-a7fa70cc38b7
skywalking.apache.org/repo/goapi v0.0.0-20241106011455-ef3dbfac3128
)

require (
Expand Down
16 changes: 8 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a/go.mod h1:LzIPMQfyMNhhGPh
golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20211108221036-ceb1ce70b4fa/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4=
golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg=
golang.org/x/crypto v0.25.0 h1:ypSNr+bnYL2YhwoMt2zPxHFmbAN1KZs/njMG3hxUp30=
golang.org/x/crypto v0.25.0/go.mod h1:T+wALwcMOSE0kXgUAnPAHqTLW+XHgcELELW8VaDgm/M=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
Expand Down Expand Up @@ -542,7 +542,7 @@ golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d/go.mod h1:9nx3DQGgdP8bBQD5qx
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY=
golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys=
golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
Expand Down Expand Up @@ -619,14 +619,14 @@ golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI=
golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo=
golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U=
golang.org/x/term v0.16.0/go.mod h1:yn7UURbUtPyrVJPGPq404EukNFxcm/foM+bV/bfcDsY=
golang.org/x/term v0.22.0 h1:BbsgPEJULsl2fV/AT3v15Mjva5yXKQDyKf+TbDz7QJk=
golang.org/x/term v0.22.0/go.mod h1:F3qCibpT5AMpCRfhfT53vVJwhLtIVHhB9XDjfFvnMI4=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
Expand All @@ -640,7 +640,7 @@ golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4=
golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
Expand Down Expand Up @@ -809,7 +809,7 @@ google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlba
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=
google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
Expand Down Expand Up @@ -877,5 +877,5 @@ sigs.k8s.io/structured-merge-diff/v4 v4.2.3 h1:PRbqxJClWWYMNV1dhaG4NsibJbArud9kF
sigs.k8s.io/structured-merge-diff/v4 v4.2.3/go.mod h1:qjx8mGObPmV2aSZepjQjbmb2ihdVs8cGKBraizNC69E=
sigs.k8s.io/yaml v1.3.0 h1:a2VclLzOGrwOHDiV8EfBGhvjHvP46CtW5j6POvhYGGo=
sigs.k8s.io/yaml v1.3.0/go.mod h1:GeOyir5tyXNByN85N/dRIT9es5UQNerPYEKK56eTBm8=
skywalking.apache.org/repo/goapi v0.0.0-20240118085825-a7fa70cc38b7 h1:iUx3ovyKy4IMlXv0hB/qRYvUsCIQkAU6DLnLoCt6Qck=
skywalking.apache.org/repo/goapi v0.0.0-20240118085825-a7fa70cc38b7/go.mod h1:oD2dxcDAHVIt95Ee7kJHgZ5f64QNhrqTjQYARwfafc4=
skywalking.apache.org/repo/goapi v0.0.0-20241106011455-ef3dbfac3128 h1:wrHG0+CwYwqcgPWwJf/JI1x1sk+jD1Wdy93M1YuecO0=
skywalking.apache.org/repo/goapi v0.0.0-20241106011455-ef3dbfac3128/go.mod h1:+n8BMuS8eRdzdnGh15ElRGBXPi0eYZSs2TKySBDmRTE=
11 changes: 8 additions & 3 deletions internal/satellite/module/api/sync_invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,14 @@

package api

import v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
import (
"google.golang.org/grpc"
v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
)

type SyncInvoker interface {
// SyncInvoke means synchronized process event
SyncInvoke(d *v1.SniffData) (*v1.SniffData, error)
// SyncInvoke means synchronized process event.
// The returned result grpc.ClientStream is the stream initiated by satellite to oap server,
// which is used to provide bidirectional stream support
SyncInvoke(d *v1.SniffData) (*v1.SniffData, grpc.ClientStream, error)
}
3 changes: 2 additions & 1 deletion internal/satellite/module/gatherer/receiver_gatherer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"time"

"github.com/sirupsen/logrus"
"google.golang.org/grpc"

"github.com/apache/skywalking-satellite/internal/pkg/log"
"github.com/apache/skywalking-satellite/internal/satellite/event"
Expand Down Expand Up @@ -178,7 +179,7 @@ func (r *ReceiverGatherer) Ack(lastOffset *event.Offset) {
r.runningQueue.Ack(lastOffset)
}

func (r *ReceiverGatherer) SyncInvoke(d *v1.SniffData) (*v1.SniffData, error) {
func (r *ReceiverGatherer) SyncInvoke(d *v1.SniffData) (*v1.SniffData, grpc.ClientStream, error) {
return r.processor.SyncInvoke(d)
}

Expand Down
4 changes: 3 additions & 1 deletion internal/satellite/module/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"errors"
"sync"

"google.golang.org/grpc"

"github.com/apache/skywalking-satellite/internal/pkg/log"
"github.com/apache/skywalking-satellite/internal/satellite/event"
"github.com/apache/skywalking-satellite/internal/satellite/module/api"
Expand Down Expand Up @@ -92,7 +94,7 @@ func (p *Processor) processPerPartition(ctx context.Context, partition int, wg *
func (p *Processor) Shutdown() {
}

func (p *Processor) SyncInvoke(d *v1.SniffData) (*v1.SniffData, error) {
func (p *Processor) SyncInvoke(d *v1.SniffData) (*v1.SniffData, grpc.ClientStream, error) {
// direct send data to sender
return p.sender.SyncInvoke(d)
}
Expand Down
7 changes: 4 additions & 3 deletions internal/satellite/module/sender/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"time"

"github.com/sirupsen/logrus"
"google.golang.org/grpc"

v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"

Expand Down Expand Up @@ -266,17 +267,17 @@ func (s *Sender) InputDataChannel(partition int) chan<- *event.OutputEventContex
return s.inputs[partition]
}

func (s *Sender) SyncInvoke(d *v1.SniffData) (*v1.SniffData, error) {
func (s *Sender) SyncInvoke(d *v1.SniffData) (*v1.SniffData, grpc.ClientStream, error) {
supportSyncInvoke := make([]forwarder.Forwarder, 0)
for inx := range s.runningForwarders {
if s.runningForwarders[inx].SupportedSyncInvoke() {
supportSyncInvoke = append(supportSyncInvoke, s.runningForwarders[inx])
}
}
if len(supportSyncInvoke) > 1 {
return nil, fmt.Errorf("only support single forwarder")
return nil, nil, fmt.Errorf("only support single forwarder")
} else if len(supportSyncInvoke) == 0 {
return nil, fmt.Errorf("could not found forwarder")
return nil, nil, fmt.Errorf("could not found forwarder")
}
return supportSyncInvoke[0].SyncForward(d)
}
Expand Down
18 changes: 15 additions & 3 deletions plugins/client/grpc/client_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ import (
"google.golang.org/grpc/metadata"
)

type ctxKey struct{}

var CtxBidirectionalStreamKey = ctxKey{}

// loadConfig use the client params to build the grpc client config.
func (c *Client) loadConfig() (*[]grpc.DialOption, error) {
options := make([]grpc.DialOption, 0)
Expand Down Expand Up @@ -68,14 +72,18 @@ func (c *Client) loadConfig() (*[]grpc.DialOption, error) {
if authHeader != nil {
ctx = metadata.NewOutgoingContext(ctx, authHeader)
}
supportBidirectionalStream := false
if b := ctx.Value(CtxBidirectionalStreamKey); b != nil {
supportBidirectionalStream = b.(bool)
}
timeout, timeoutFunc := context.WithTimeout(ctx, streamRequestTimeout)
clientStream, err := streamer(timeout, desc, cc, method, opts...)
if err != nil {
timeoutFunc()
c.reportError(err)
return nil, err
}
streamWrapper := &timeoutClientStream{clientStream, timeoutFunc}
streamWrapper := &timeoutClientStream{clientStream, supportBidirectionalStream, timeoutFunc}
return streamWrapper, err
}))
grpc.WithUnaryInterceptor(func(ctx context.Context, method string, req, reply interface{},
Expand All @@ -100,11 +108,15 @@ func (c *Client) loadConfig() (*[]grpc.DialOption, error) {

type timeoutClientStream struct {
grpc.ClientStream
timeoutFunc context.CancelFunc
bidirectionalStream bool
timeoutFunc context.CancelFunc
}

func (t *timeoutClientStream) RecvMsg(m interface{}) error {
defer t.timeoutFunc()
if !t.bidirectionalStream {
defer t.timeoutFunc()
}

return t.ClientStream.RecvMsg(m)
}

Expand Down
5 changes: 4 additions & 1 deletion plugins/forwarder/api/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package api
import (
"reflect"

"google.golang.org/grpc"
v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"

"github.com/apache/skywalking-satellite/internal/pkg/plugin"
Expand All @@ -34,7 +35,9 @@ type Forwarder interface {
// Forward the batch events to the external services, such as Kafka MQ and SkyWalking OAP cluster.
Forward(batch event.BatchEvents) error
// SyncForward the single event to the external service with sync forward
SyncForward(event *v1.SniffData) (*v1.SniffData, error)
// The returned result grpc.ClientStream is the stream initiated by satellite to oap server,
// which is used to provide bidirectional stream support
SyncForward(event *v1.SniffData) (*v1.SniffData, grpc.ClientStream, error)
// ForwardType returns the supported event type.
ForwardType() v1.SniffType
// SupportedSyncInvoke return is support SyncForward
Expand Down
2 changes: 2 additions & 0 deletions plugins/forwarder/forwarder_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/apache/skywalking-satellite/plugins/forwarder/grpc/envoyalsv3"
"github.com/apache/skywalking-satellite/plugins/forwarder/grpc/envoymetricsv2"
"github.com/apache/skywalking-satellite/plugins/forwarder/grpc/envoymetricsv3"
grpc_asyncprofiler "github.com/apache/skywalking-satellite/plugins/forwarder/grpc/nativeasyncprofiler"
grpc_nativecds "github.com/apache/skywalking-satellite/plugins/forwarder/grpc/nativecds"
grpc_nativeclr "github.com/apache/skywalking-satellite/plugins/forwarder/grpc/nativeclr"
grpc_nativeebpfaccesslog "github.com/apache/skywalking-satellite/plugins/forwarder/grpc/nativeebpfaccesslog"
Expand Down Expand Up @@ -66,6 +67,7 @@ func RegisterForwarderPlugins() {
new(envoymetricsv3.Forwarder),
new(otlpmetricsv1.Forwarder),
new(grpc_nativeebpfaccesslog.Forwarder),
new(grpc_asyncprofiler.Forwarder),
}
for _, forwarder := range forwarders {
plugin.RegisterPlugin(forwarder)
Expand Down
Loading

0 comments on commit cb2032d

Please sign in to comment.