Skip to content

Commit

Permalink
Adding E2E for testing gRPC server (#13)
Browse files Browse the repository at this point in the history
  • Loading branch information
mrproliu authored Aug 7, 2024
1 parent c6ea170 commit c48d8db
Show file tree
Hide file tree
Showing 17 changed files with 537 additions and 18 deletions.
54 changes: 54 additions & 0 deletions .github/workflows/CI.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,57 @@ jobs:
make poetry
poetry install --only lint
make lint
docker:
name: Build Docker Image
runs-on: ubuntu-latest
steps:
- name: Checkout source codes
uses: actions/checkout@v3
with:
submodules: true
- name: Build and Save Docker image
run: |
docker build -t r3:latest .
docker save -o r3-image.tar r3:latest
- name: Upload docker images
uses: actions/upload-artifact@v3
with:
name: docker-image
path: r3-image.tar

e2e:
name: E2E test
runs-on: ubuntu-latest
needs: [ docker ]
timeout-minutes: 60
steps:
- uses: actions/checkout@v3
with:
submodules: true
- uses: actions/setup-python@v5
with:
python-version: '3.10'
- name: Install docker-compose
shell: bash
run: |
if ! command docker-compose 2>&1 > /dev/null; then
echo "Installing docker-compose"
sudo curl -L "https://github.com/docker/compose/releases/download/1.29.2/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose
sudo chmod +x /usr/local/bin/docker-compose
fi
- uses: actions/download-artifact@v3
name: Download docker images
with:
name: docker-image
path: docker-image
- name: Load docker images
run: |
find docker-image -name "*.tar" -exec docker load -i {} \;
find docker-image -name "*.tar" -exec rm {} \;
- name: Setup Python Path
run: echo "PYTHONPATH=$(pwd)" >> $GITHUB_ENV
- name: Run E2E test
uses: apache/skywalking-infra-e2e@cf589b4a0b9f8e6f436f78e9cfd94a1ee5494180
with:
e2e-file: $GITHUB_WORKSPACE/test/e2e/e2e.yaml
15 changes: 8 additions & 7 deletions models/Configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,14 @@ Currently, all similar content is replaced with `{var}` by default.

Drain is the core algorithm of URI Drain.

| Name | Type(Unit) | Environment Key | Default | Description |
|------------------|------------|------------------------|---------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| sim_th | float | DRAIN_SIM_TH | 0.4 | The similarity threshold to decide if a new sequence should be merged into an existing cluster. |
| depth | int | DRAIN_DEPTH | 4 | Max depth levels of pattern. Minimum is 2. |
| max_children | int | DRAIN_MAX_CHILDREN | 100 | Max number of children of an internal node. |
| max_clusters | int | DRAIN_MAX_CLUSTERS | 1024 | Max number of tracked clusters (unlimited by default). When this number is reached, model starts replacing old clusters with a new ones according to the LRU policy. |
| extra_delimiters | string | DRAIN_EXTRA_DELIMITERS | \["/"\] | The extra delimiters to split the sequence. |
| Name | Type(Unit) | Environment Key | Default | Description |
|------------------------|------------|------------------------------|---------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| sim_th | float | DRAIN_SIM_TH | 0.4 | The similarity threshold to decide if a new sequence should be merged into an existing cluster. |
| depth | int | DRAIN_DEPTH | 4 | Max depth levels of pattern. Minimum is 2. |
| max_children | int | DRAIN_MAX_CHILDREN | 100 | Max number of children of an internal node. |
| max_clusters | int | DRAIN_MAX_CLUSTERS | 1024 | Max number of tracked clusters (unlimited by default). When this number is reached, model starts replacing old clusters with a new ones according to the LRU policy. |
| extra_delimiters | string | DRAIN_EXTRA_DELIMITERS | \["/"\] | The extra delimiters to split the sequence. |
| analysis_min_url_count | int | DRAIN_ANALYSIS_MIN_URL_COUNT | 20 | The minimum number of unique URLs(each service) to trigger the analysis. |

### Profiling

Expand Down
12 changes: 9 additions & 3 deletions models/uri_drain/template_miner_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

env_regular_regex = re.compile(r'\${(?P<ENV>[_A-Z0-9]+):(?P<DEF>.*)}')


class TemplateMinerConfig:
def __init__(self):
self.engine = "Drain"
Expand All @@ -26,6 +27,7 @@ def __init__(self):
self.drain_depth = 4
self.drain_max_children = 100
self.drain_max_clusters = None
self.drain_analysis_min_url_count = 20
self.masking_instructions = []
self.mask_prefix = "<"
self.mask_suffix = ">"
Expand All @@ -45,8 +47,10 @@ def load(self, config_filename: str):

self.engine = self.read_config_value(parser, section_drain, 'engine', str, self.engine)

self.profiling_enabled = self.read_config_value(parser, section_profiling, 'enabled', bool, self.profiling_enabled)
self.profiling_report_sec = self.read_config_value(parser, section_profiling, 'report_sec', int, self.profiling_report_sec)
self.profiling_enabled = self.read_config_value(parser, section_profiling, 'enabled', bool,
self.profiling_enabled)
self.profiling_report_sec = self.read_config_value(parser, section_profiling, 'report_sec', int,
self.profiling_report_sec)

self.snapshot_interval_minutes = self.read_config_value(parser, section_snapshot, 'snapshot_interval_minutes',
int, self.snapshot_interval_minutes)
Expand Down Expand Up @@ -76,6 +80,8 @@ def load(self, config_filename: str):
self.parameter_extraction_cache_capacity = self.read_config_value(parser, section_masking,
'parameter_extraction_cache_capacity', int,
self.parameter_extraction_cache_capacity)
self.drain_analysis_min_url_count = self.read_config_value(parser, section_drain, 'analysis_min_url_count', int,
self.drain_analysis_min_url_count)

masking_instructions = []
masking_list = json.loads(masking_instructions_str)
Expand All @@ -100,6 +106,6 @@ def read_config_value(self, parser, section, key, tp, default):
val = self.read_value_with_env(conf_value)
if tp == bool:
if val.lower() not in parser.BOOLEAN_STATES:
raise ValueError('Not a boolean: %s' % val)
raise ValueError(f'Not a boolean: {val}')
return parser.BOOLEAN_STATES[val.lower()]
return tp(val)
2 changes: 1 addition & 1 deletion servers/simple/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def run():
for service in miners:
shared_results_object.set_dict_field(service=service, value=miners[service].drain.cluster_patterns)

producer_process = multiprocessing.Process(target=run_server, args=(uri_main_queue, shared_results_object))
producer_process = multiprocessing.Process(target=run_server, args=(uri_main_queue, shared_results_object, config))
consumer_process = multiprocessing.Process(target=run_worker, args=(uri_main_queue, shared_results_object, config, miners))

producer_process.start()
Expand Down
15 changes: 8 additions & 7 deletions servers/simple/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,12 @@ class HttpUriRecognitionServicer(ai_http_uri_recognition_pb2_grpc.HttpUriRecogni
TODO: SOREUSEPORT for load balancing? << NOT going to work, its stateful
"""

def __init__(self, uri_main_queue, shared_results_object):
def __init__(self, uri_main_queue, shared_results_object, conf):
super().__init__()
self.shared_results_object = shared_results_object
self.uri_main_queue = uri_main_queue
self.known_services = defaultdict(int) # service_name: received_count
self.conf = conf

async def fetchAllPatterns(self, request, context):
# TODO OAP SIDE OR THIS SIDE must save the version, e.g. oap should check if version is > got version, since
Expand Down Expand Up @@ -87,18 +88,18 @@ async def feedRawData(self, request, context):

# This is an experimental mechanism to avoid identifying non-restful uris unnecessarily.
self.known_services[service] += len(set(uris))
if self.known_services[service] < 20: # This hard-coded as 20 in SkyWalking UI as a heuristic
print(f'Unique Uri count too low for service {service}, skipping')
if self.known_services[service] < self.conf.drain_analysis_min_url_count:
print(f'Unique Uri count too low({self.known_services[service]} < {self.conf.drain_analysis_min_url_count}) for service {service}, skipping')
return Empty()
self.uri_main_queue.put((uris, service))
return Empty()


async def serve(uri_main_queue, shared_results_object):
async def serve(uri_main_queue, shared_results_object, conf):
server = grpc.aio.server(futures.ThreadPoolExecutor(max_workers=10))

ai_http_uri_recognition_pb2_grpc.add_HttpUriRecognitionServiceServicer_to_server(
HttpUriRecognitionServicer(uri_main_queue=uri_main_queue, shared_results_object=shared_results_object), server)
HttpUriRecognitionServicer(uri_main_queue=uri_main_queue, shared_results_object=shared_results_object, conf=conf), server)

server.add_insecure_port('[::]:17128') # TODO: change to config injection

Expand All @@ -109,12 +110,12 @@ async def serve(uri_main_queue, shared_results_object):
await server.wait_for_termination() # timeout=5


def run_server(uri_main_queue, shared_results_object):
def run_server(uri_main_queue, shared_results_object, conf):
loop = asyncio.get_event_loop()
try:
# Here `amain(loop)` is the core coroutine that may spawn any
# number of tasks
sys.exit(loop.run_until_complete(serve(uri_main_queue, shared_results_object)))
sys.exit(loop.run_until_complete(serve(uri_main_queue, shared_results_object, conf)))
except KeyboardInterrupt:
# Optionally show a message if the shutdown may take a while
print("Attempting graceful shutdown, press Ctrl+C again to exit…", flush=True)
Expand Down
1 change: 1 addition & 0 deletions servers/simple/uri_drain.ini
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ depth = ${DRAIN_DEPTH:4}
max_children = ${DRAIN_MAX_CHILDREN:100}
max_clusters = ${DRAIN_MAX_CLUSTERS:1024}
extra_delimiters = ${DRAIN_EXTRA_DELIMITERS:["/"]}
analysis_min_url_count = ${DRAIN_ANALYSIS_MIN_URL_COUNT:20}

[PROFILING]
enabled = ${PROFILING_ENABLED:False}
Expand Down
13 changes: 13 additions & 0 deletions test/e2e/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright 2023 SkyAPM org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
68 changes: 68 additions & 0 deletions test/e2e/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# Copyright 2023 SkyAPM org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Run this file to get a web demo of the URI Drain algorithm.
"""
import sys

import grpc
import yaml

from servers.protos.generated.ai_http_uri_recognition_pb2 import HttpUriRecognitionRequest, HttpRawUri
from servers.protos.generated.ai_http_uri_recognition_pb2_grpc import HttpUriRecognitionServiceStub

mode = sys.argv[1]
service_name = sys.argv[2]

channel = grpc.insecure_channel('localhost:17128')
stub = HttpUriRecognitionServiceStub(channel)


def feed_data():
file_path = sys.argv[3]
with open(file_path, 'r') as file:
urls = file.readlines()
stub.feedRawData(
HttpUriRecognitionRequest(service=service_name, unrecognizedUris=list(map(lambda x: HttpRawUri(name=x), urls))))
print("ok")


def fetch_data():
patterns = stub.fetchAllPatterns(HttpUriRecognitionRequest(service=service_name))
print(yaml.dump(HTTPUriRecognitionResp(sorted([p.pattern for p in patterns.patterns]), patterns.version), Dumper=NoTagDumper))


class HTTPUriRecognitionResp:

def __init__(self, patterns, version):
self.patterns = patterns
self.version = version


class NoTagDumper(yaml.SafeDumper):
def ignore_aliases(self, data):
return True


def remove_representer(dumper, data):
return dumper.represent_dict(data.__dict__)


yaml.add_representer(HTTPUriRecognitionResp, remove_representer, Dumper=NoTagDumper)

if __name__ == '__main__':
if mode == 'feed':
feed_data()
elif mode == 'fetch':
fetch_data()
33 changes: 33 additions & 0 deletions test/e2e/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Copyright 2023 SkyAPM org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

version: '2.1'

services:
r3:
image: r3:latest
ports:
- "17128:17128"
networks:
- e2e
environment:
DRAIN_ANALYSIS_MIN_URL_COUNT: 1
healthcheck:
test: ["CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/17128"]
interval: 5s
timeout: 60s
retries: 120

networks:
e2e:
60 changes: 60 additions & 0 deletions test/e2e/e2e.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# Copyright 2023 SkyAPM org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

setup:
env: compose
file: docker-compose.yml
timeout: 20m
steps:
- name: set PATH
command: export PATH=/tmp/skywalking-infra-e2e/bin:$PATH
- name: make install
command: make install
- name: install dependency for client.py
command: pip install -r test/e2e/requirements.txt
- name: adding python path
command: export PYTHONPATH=$(pwd)

verify:
# verify with retry strategy
retry:
# max retry count
count: 20
# the interval between two retries, in millisecond.
interval: 3s
cases:
- query: python test/e2e/client.py feed test1 demo/Endpoint100_counterexamples.txt && sleep 1
expected: expected/feed-ok.yaml
- query: python test/e2e/client.py fetch test1
expected: expected/endpoint_counterexamples.yaml

- query: python test/e2e/client.py feed test2 demo/Endpoint100_trivial.txt && sleep 1
expected: expected/feed-ok.yaml
- query: python test/e2e/client.py fetch test2
expected: expected/endpoint_trivial.yaml

- query: python test/e2e/client.py feed test3 demo/Endpoint100_trivial_3k_repeat.txt && sleep 1
expected: expected/feed-ok.yaml
- query: python test/e2e/client.py fetch test3
expected: expected/endpoint_trivial_3k.yaml

- query: python test/e2e/client.py feed test4 demo/Endpoint200_hard.txt && sleep 1
expected: expected/feed-ok.yaml
- query: python test/e2e/client.py fetch test4
expected: expected/endpoint_hard.yaml

- query: python test/e2e/client.py feed test5 demo/Endpoint200_hard_3k_repeat.txt && sleep 1
expected: expected/feed-ok.yaml
- query: python test/e2e/client.py fetch test5
expected: expected/endpoint_hard_3k.yaml
18 changes: 18 additions & 0 deletions test/e2e/expected/endpoint_counterexamples.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Copyright 2023 SkyAPM org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

patterns:
- "/api/v1/usernames/{var}"
- "/api/v1/users/{var}"
version: "1"
Loading

0 comments on commit c48d8db

Please sign in to comment.