Skip to content

Commit

Permalink
Support transmit eBPF Access Log Protocol (#150)
Browse files Browse the repository at this point in the history
  • Loading branch information
mrproliu authored Jan 18, 2024
1 parent 463bc98 commit 261daa3
Show file tree
Hide file tree
Showing 14 changed files with 367 additions and 21 deletions.
15 changes: 4 additions & 11 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,13 @@ Changes by Version
==================
Release Notes.

1.2.0
1.3.0
------------------
#### Features
* Introduce `pprof` module.
* Support export multiple `telemetry` service.
* Update the base docker image.
* Add timeout configuration for gRPC client.
* Reduce log print when the enqueue data to the pipeline error.
* Support transmit the Continuous Profiling protocol.
* Support native eBPF Access Log protocol.

#### Bug Fixes
* Fix [CVE-2022-41721](https://avd.aquasec.com/nvd/cve-2022-41721).
* Use Go 19 to build the Docker image to fix CVEs.

#### Issues and PR
- All issues are [here](https://github.com/apache/skywalking/milestone/170?closed=1)
- All and pull requests are [here](https://github.com/apache/skywalking-satellite/pulls?q=is%3Apr+milestone%3A1.2.0+is%3Aclosed)
- All issues are [here](https://github.com/apache/skywalking/milestone/188?closed=1)
- All and pull requests are [here](https://github.com/apache/skywalking-satellite/pulls?q=is%3Apr+milestone%3A1.3.0+is%3Aclosed)
21 changes: 21 additions & 0 deletions changes/changes-1.2.0.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
Changes by Version
==================
Release Notes.

1.2.0
------------------
#### Features
* Introduce `pprof` module.
* Support export multiple `telemetry` service.
* Update the base docker image.
* Add timeout configuration for gRPC client.
* Reduce log print when the enqueue data to the pipeline error.
* Support transmit the Continuous Profiling protocol.

#### Bug Fixes
* Fix [CVE-2022-41721](https://avd.aquasec.com/nvd/cve-2022-41721).
* Use Go 19 to build the Docker image to fix CVEs.

#### Issues and PR
- All issues are [here](https://github.com/apache/skywalking/milestone/170?closed=1)
- All and pull requests are [here](https://github.com/apache/skywalking-satellite/pulls?q=is%3Apr+milestone%3A1.2.0+is%3Aclosed)
26 changes: 26 additions & 0 deletions configs/satellite_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,32 @@ pipes:
client_name: grpc-client
forwarders:
- plugin_name: native-ebpf-profiling-grpc-forwarder
- common_config:
pipe_name: ebpf-accesslog-pipe
gatherer:
server_name: "grpc-server"
receiver:
plugin_name: "grpc-native-ebpf-accesslog-receiver"
queue:
plugin_name: "memory-queue"
# The maximum buffer event size.
event_buffer_size: ${SATELLITE_QUEUE_EVENT_BUFFER_SIZE:5000}
# The partition count of queue.
partition: ${SATELLITE_QUEUE_PARTITION:4}
processor:
filters:
sender:
fallbacker:
plugin_name: none-fallbacker
# The time interval between two flush operations. And the time unit is millisecond.
flush_time: ${SATELLITE_EBPFACCESSLOG_SENDER_FLUSH_TIME:1000}
# The maximum buffer elements.
max_buffer_size: ${SATELLITE_EBPFPACCESSLOG_SENDER_MAX_BUFFER_SIZE:200}
# The minimum flush elements.
min_flush_events: ${SATELLITE_EBPFACCESSLOG_SENDER_MIN_FLUSH_EVENTS:1}
client_name: grpc-client
forwarders:
- plugin_name: native-ebpf-accesslog-grpc-forwarder
- common_config:
pipe_name: otlp-metrics-v1-pipe
gatherer:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Forwarder/native-ebpf-accesslog-grpc-forwarder
## Description
This is a synchronization grpc forwarder with the SkyWalking native eBPF access log protocol.
## DefaultConfig
```yaml```
## Configuration
|Name|Type|Description|
|----|----|-----------|

6 changes: 4 additions & 2 deletions docs/en/setup/plugins/plugin-list.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@
- [Envoy Metrics v2 GRPC Forwarder](./forwarder_envoy-metrics-v2-grpc-forwarder.md)
- [Envoy Metrics v3 GRPC Forwarder](./forwarder_envoy-metrics-v3-grpc-forwarder.md)
- [Native CDS GRPC Forwarder](./forwarder_native-cds-grpc-forwarder.md)
- [Native CLR GRPC Forwarder](./forwarder_native-clr-grpc-forwarder.md)
- [GRPC Native EBFP Access Log Forwarder](./forwarder_native-ebpf-accesslog-grpc-forwarder.md)
- [Native EBPF Profiling GRPC Forwarder](./forwarder_native-ebpf-profiling-grpc-forwarder.md)
- [Native Event GRPC Forwarder](./forwarder_native-event-grpc-forwarder.md)
- [Native JVM GRPC Forwarder](./forwarder_native-jvm-grpc-forwarder.md)
- [Native CLR GRPC Forwarder](./forwarder_native-clr-grpc-forwarder.md)
- [Native Log GRPC Forwarder](./forwarder_native-log-grpc-forwarder.md)
- [Native Log Kafka Forwarder](./forwarder_native-log-kafka-forwarder.md)
- [Native Management GRPC Forwarder](./forwarder_native-management-grpc-forwarder.md)
Expand All @@ -36,10 +37,11 @@
- [GRPC Envoy Metrics v2 Receiver](./receiver_grpc-envoy-metrics-v2-receiver.md)
- [GRPC Envoy Metrics v3 Receiver](./receiver_grpc-envoy-metrics-v3-receiver.md)
- [GRPC Native CDS Receiver](./receiver_grpc-native-cds-receiver.md)
- [GRPC Native CLR Receiver](./receiver_grpc-native-clr-receiver.md)
- [GRPC Native EBFP Accesslog Receiver](./receiver_grpc-native-ebpf-accesslog-receiver.md)
- [GRPC Native EBFP Profiling Receiver](./receiver_grpc-native-ebpf-profiling-receiver.md)
- [GRPC Native Event Receiver](./receiver_grpc-native-event-receiver.md)
- [GRPC Native JVM Receiver](./receiver_grpc-native-jvm-receiver.md)
- [GRPC Native CLR Receiver](./receiver_grpc-native-clr-receiver.md)
- [GRPC Native Log Receiver](./receiver_grpc-native-log-receiver.md)
- [GRPC Native Management Receiver](./receiver_grpc-native-management-receiver.md)
- [GRPC Native Meter Receiver](./receiver_grpc-native-meter-receiver.md)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Receiver/grpc-native-ebpf-accesslog-receiver
## Description
This is a receiver for SkyWalking native accesslog format, which is defined at https://github.com/apache/skywalking-data-collect-protocol/blob/master/ebpf/accesslog.proto.
## Support Forwarders
- [native-ebpf-accesslog-grpc-forwarder](forwarder_native-ebpf-accesslog-grpc-forwarder.md)
## DefaultConfig
```yaml```
## Configuration
|Name|Type|Description|
|----|----|-----------|

12 changes: 8 additions & 4 deletions docs/menu.yml
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,16 @@ catalog:
path: /en/setup/plugins/forwarder_envoy-metrics-v3-grpc-forwarder
- name: Native CDS GRPC Forwarder
path: /en/setup/plugins/forwarder_native-cds-grpc-forwarder
- name: Native CLR GRPC Forwarder
path: /en/setup/plugins/forwarder_native-clr-grpc-forwarder
- name: GRPC Native EBFP Access Log Forwarder
path: /en/setup/plugins/forwarder_native-ebpf-accesslog-grpc-forwarder
- name: Native EBPF Profiling GRPC Forwarder
path: /en/setup/plugins/forwarder_native-ebpf-profiling-grpc-forwarder
- name: Native Event GRPC Forwarder
path: /en/setup/plugins/forwarder_native-event-grpc-forwarder
- name: Native JVM GRPC Forwarder
path: /en/setup/plugins/forwarder_native-jvm-grpc-forwarder
- name: Native CLR GRPC Forwarder
path: /en/setup/plugins/forwarder_native-clr-grpc-forwarder
- name: Native Log GRPC Forwarder
path: /en/setup/plugins/forwarder_native-log-grpc-forwarder
- name: Native Log Kafka Forwarder
Expand Down Expand Up @@ -137,14 +139,16 @@ catalog:
path: /en/setup/plugins/receiver_grpc-envoy-metrics-v3-receiver
- name: GRPC Native CDS Receiver
path: /en/setup/plugins/receiver_grpc-native-cds-receiver
- name: GRPC Native CLR Receiver
path: /en/setup/plugins/receiver_grpc-native-clr-receiver
- name: GRPC Native EBFP Accesslog Receiver
path: /en/setup/plugins/receiver_grpc-native-ebpf-accesslog-receiver
- name: GRPC Native EBFP Profiling Receiver
path: /en/setup/plugins/receiver_grpc-native-ebpf-profiling-receiver
- name: GRPC Native Event Receiver
path: /en/setup/plugins/receiver_grpc-native-event-receiver
- name: GRPC Native JVM Receiver
path: /en/setup/plugins/receiver_grpc-native-jvm-receiver
- name: GRPC Native CLR Receiver
path: /en/setup/plugins/receiver_grpc-native-clr-receiver
- name: GRPC Native Log Receiver
path: /en/setup/plugins/receiver_grpc-native-log-receiver
- name: GRPC Native Management Receiver
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ require (
google.golang.org/protobuf v1.31.0
gopkg.in/yaml.v3 v3.0.1
k8s.io/apimachinery v0.26.2
skywalking.apache.org/repo/goapi v0.0.0-20230531132709-826aefddf3cb
skywalking.apache.org/repo/goapi v0.0.0-20240118085825-a7fa70cc38b7
)

require (
Expand Down
14 changes: 11 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,7 @@ golang.org/x/mod v0.4.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.9.0 h1:KENHtAZL2y3NLMYZeHY9DW8HW8V+kQyJsY/V9JlKvCs=
golang.org/x/mod v0.9.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
Expand Down Expand Up @@ -531,6 +532,7 @@ golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96b
golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM=
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
Expand All @@ -556,6 +558,7 @@ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down Expand Up @@ -605,11 +608,14 @@ golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.14.0 h1:Vz7Qs629MkJkGyHxUlRHizWJRG2j8fbQKjELVSNhy7Q=
golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo=
golang.org/x/term v0.13.0 h1:bb+I9cTfFazGW51MZqBVmZy7+JEJMouUHTUSKVQLBek=
golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
Expand All @@ -622,6 +628,8 @@ golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
Expand Down Expand Up @@ -683,6 +691,7 @@ golang.org/x/tools v0.0.0-20210108195828-e2f9c7f1fc8e/go.mod h1:emZCQorbCU4vsT4f
golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0=
golang.org/x/tools v0.1.3/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down Expand Up @@ -790,7 +799,6 @@ google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlba
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
Expand Down Expand Up @@ -858,5 +866,5 @@ sigs.k8s.io/structured-merge-diff/v4 v4.2.3 h1:PRbqxJClWWYMNV1dhaG4NsibJbArud9kF
sigs.k8s.io/structured-merge-diff/v4 v4.2.3/go.mod h1:qjx8mGObPmV2aSZepjQjbmb2ihdVs8cGKBraizNC69E=
sigs.k8s.io/yaml v1.3.0 h1:a2VclLzOGrwOHDiV8EfBGhvjHvP46CtW5j6POvhYGGo=
sigs.k8s.io/yaml v1.3.0/go.mod h1:GeOyir5tyXNByN85N/dRIT9es5UQNerPYEKK56eTBm8=
skywalking.apache.org/repo/goapi v0.0.0-20230531132709-826aefddf3cb h1:rsExxPGSCqiTScUfph4R3uNfQbVvaqMXYz84Hx3W6NI=
skywalking.apache.org/repo/goapi v0.0.0-20230531132709-826aefddf3cb/go.mod h1:bW4dg0GUN4rMCMS8DLlaY3ZiKUAJ1fQYKoZ91Bl0kTk=
skywalking.apache.org/repo/goapi v0.0.0-20240118085825-a7fa70cc38b7 h1:iUx3ovyKy4IMlXv0hB/qRYvUsCIQkAU6DLnLoCt6Qck=
skywalking.apache.org/repo/goapi v0.0.0-20240118085825-a7fa70cc38b7/go.mod h1:oD2dxcDAHVIt95Ee7kJHgZ5f64QNhrqTjQYARwfafc4=
2 changes: 2 additions & 0 deletions plugins/forwarder/forwarder_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/apache/skywalking-satellite/plugins/forwarder/grpc/envoymetricsv3"
grpc_nativecds "github.com/apache/skywalking-satellite/plugins/forwarder/grpc/nativecds"
grpc_nativeclr "github.com/apache/skywalking-satellite/plugins/forwarder/grpc/nativeclr"
grpc_nativeebpfaccesslog "github.com/apache/skywalking-satellite/plugins/forwarder/grpc/nativeebpfaccesslog"
grpc_nativeebpfprofiling "github.com/apache/skywalking-satellite/plugins/forwarder/grpc/nativeebpfprofiling"
grpc_nativeevent "github.com/apache/skywalking-satellite/plugins/forwarder/grpc/nativeevent"
grpc_nativejvm "github.com/apache/skywalking-satellite/plugins/forwarder/grpc/nativejvm"
Expand Down Expand Up @@ -64,6 +65,7 @@ func RegisterForwarderPlugins() {
new(envoymetricsv2.Forwarder),
new(envoymetricsv3.Forwarder),
new(otlpmetricsv1.Forwarder),
new(grpc_nativeebpfaccesslog.Forwarder),
}
for _, forwarder := range forwarders {
plugin.RegisterPlugin(forwarder)
Expand Down
124 changes: 124 additions & 0 deletions plugins/forwarder/grpc/nativeebpfaccesslog/forwarder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package nativeebpfaccesslog

import (
"context"
"fmt"
"io"
"reflect"

"google.golang.org/grpc"

"github.com/apache/skywalking-satellite/internal/pkg/config"
"github.com/apache/skywalking-satellite/internal/pkg/log"
"github.com/apache/skywalking-satellite/internal/satellite/event"
server_grpc "github.com/apache/skywalking-satellite/plugins/server/grpc"

v3 "skywalking.apache.org/repo/goapi/collect/ebpf/accesslog/v3"
v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
)

const (
Name = "native-ebpf-accesslog-grpc-forwarder"
ShowName = "GRPC Native EBFP Access Log Forwarder"
)

type Forwarder struct {
config.CommonFields

accessClient v3.EBPFAccessLogServiceClient
}

func (f *Forwarder) Name() string {
return Name
}

func (f *Forwarder) ShowName() string {
return ShowName
}

func (f *Forwarder) Description() string {
return "This is a synchronization grpc forwarder with the SkyWalking native eBPF access log protocol."
}

func (f *Forwarder) DefaultConfig() string {
return ``
}

func (f *Forwarder) Prepare(connection interface{}) error {
client, ok := connection.(*grpc.ClientConn)
if !ok {
return fmt.Errorf("the %s only accepts a grpc client, but received a %s",
f.Name(), reflect.TypeOf(connection).String())
}
f.accessClient = v3.NewEBPFAccessLogServiceClient(client)
return nil
}

func (f *Forwarder) Forward(batch event.BatchEvents) (err error) {
var stream v3.EBPFAccessLogService_CollectClient
for _, e := range batch {
data, ok := e.GetData().(*v1.SniffData_EBPFAccessLogList)
if !ok {
continue
}

stream, err = f.accessClient.Collect(context.Background())
if err != nil {
log.Logger.Errorf("open grpc stream error: %v", err)
return err
}

streamClosed := false
for _, message := range data.EBPFAccessLogList.Messages {
err := stream.SendMsg(server_grpc.NewOriginalData(message))
if err != nil {
log.Logger.Errorf("%s send log data error: %v", f.Name(), err)
f.closeStream(stream)
streamClosed = true
break
}
}

if !streamClosed {
f.closeStream(stream)
}
}

return nil
}

func (f *Forwarder) closeStream(stream v3.EBPFAccessLogService_CollectClient) {
_, err := stream.CloseAndRecv()
if err != nil && err != io.EOF {
log.Logger.Errorf("%s close stream error: %v", f.Name(), err)
}
}

func (f *Forwarder) ForwardType() v1.SniffType {
return v1.SniffType_EBPFAccessLogType
}

func (f *Forwarder) SyncForward(e *v1.SniffData) (*v1.SniffData, error) {
return nil, fmt.Errorf("unsupport sync forward")
}

func (f *Forwarder) SupportedSyncInvoke() bool {
return false
}
Loading

0 comments on commit 261daa3

Please sign in to comment.