Skip to content

Commit

Permalink
Quickly integrate APM tracking data
Browse files Browse the repository at this point in the history
**Phenomenon and reproduction steps**

**Root cause and solution**

**Impactions**

**Test method**

**Affected branch(es)**

* main

**Checklist**

- [ ] Dependencies update required
- [ ] Common bug (similar problem in other repo)
  • Loading branch information
xiaochaoren1 committed Aug 23, 2023
1 parent 826e3dc commit 0e8de3d
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 79 deletions.
3 changes: 2 additions & 1 deletion app/app.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@ app:
spec:
l7_tracing_limit: 100
network_delay_us: 1000000
allow_multiple_trace_ids_in_tracing_result: false
allow_multiple_trace_ids_in_tracing_result: false
call_apm_api_to_supplement_trace: false
33 changes: 28 additions & 5 deletions app/app/application/application.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from sanic import Blueprint
from sanic.response import json as Response

from log import logger

from common.utils import json_response, format_response, app_exception
from common.const import API_PREFIX
from common.utils import json_response, format_response, app_exception, curl_perform
from common.const import API_PREFIX, HTTP_OK

from config import config
from .l7_flow_tracing import L7FlowTracing
from .tracing_completion import TracingCompletion
from models.models import FlowLogL7Tracing, TracingCompletionByExternalAppSpans
Expand All @@ -21,8 +21,31 @@
async def application_log_l7_tracing(request):
args = FlowLogL7Tracing(request.json)
args.validate()
status, response, failed_regions = await L7FlowTracing(
args, request.headers).query()
l7_flow_tracing = L7FlowTracing(args, request.headers)
if config.call_apm_api_to_supplement_trace:
trace_id, ch_res = await l7_flow_tracing.get_trace_id_by_id()
l7_flow_tracing.status.append("Query trace_id", ch_res)
if not trace_id:
status, response, failed_regions = l7_flow_tracing.status, {}, set(
)
else:
app_spans_res, app_spans_code = await curl_perform(
'get',
f"http://{config.querier_server}:{config.querier_port}/api/v1/adapter/tracing?traceid={trace_id}"
)
if app_spans_code != HTTP_OK:
log.Info("No app spans found!")
status, response, failed_regions = l7_flow_tracing.status, {}, set(
)
else:
args.app_spans = app_spans_res.get('data', {}).get('spans', [])
tracing_completion = TracingCompletion(args, request.headers)
tracing_completion.status.append("Query trace_id", ch_res)
status, response, failed_regions = await tracing_completion.query(
)
else:
status, response, failed_regions = await L7FlowTracing(
args, request.headers).query()
response_dict, code = format_response("Flow_Log_L7_Tracing", status,
response, args.debug, failed_regions)
return Response(json_response(**response_dict),
Expand Down
62 changes: 12 additions & 50 deletions app/app/application/l7_flow_tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,11 +155,22 @@ async def query(self):
async def get_id_by_trace_id(self, trace_id, time_filter):
sql = f"SELECT _id FROM l7_flow_log WHERE trace_id='{trace_id}' AND {time_filter} limit 1"
resp = await self.query_ck(sql)
self.status.append("Query _id", resp)
data = resp["data"]
if type(data) != DataFrame:
if type(data) != DataFrame or data.empty:
return ""
return data["_id"][0]

async def get_trace_id_by_id(self):
time_filter = f"time>={self.start_time} AND time<={self.end_time}"
_id = self.args.get("_id")
sql = f"SELECT trace_id FROM l7_flow_log WHERE _id={_id} AND {time_filter} limit 1"
resp = await self.query_ck(sql)
data = resp["data"]
if type(data) != DataFrame or data.empty:
return ""
return data["trace_id"][0], resp

async def trace_l7_flow(self,
time_filter: str,
base_filter: str,
Expand All @@ -184,10 +195,6 @@ async def trace_l7_flow(self,
app_metas = set()
x_request_metas = set()
l7_flow_ids = set()
networks = []
syscalls = []
apps = []
traceids = []
xrequests = []
related_map = defaultdict(list)

Expand Down Expand Up @@ -227,7 +234,6 @@ async def trace_l7_flow(self,
new_trace_ids -= trace_ids
trace_ids |= new_trace_ids
if new_trace_ids:
traceids = [L7TraceMeta(ntid) for ntid in new_trace_ids]
trace_ids_set = set([nxrid[1] for nxrid in new_trace_ids])
filters.append('(' + ' OR '.join([
"trace_id='{tid}'".format(tid=tid)
Expand Down Expand Up @@ -310,8 +316,6 @@ async def trace_l7_flow(self,
TAP_SIDE_CLIENT_APP, TAP_SIDE_SERVER_APP, TAP_SIDE_APP
] or not dataframe_flowmetas['span_id'][index]:
continue
#if dataframe_flowmetas['trace_id'][index] not in [0, '']:
# continue
if type(dataframe_flowmetas['span_id'][index]) == str and \
dataframe_flowmetas['span_id'][index] and \
type(dataframe_flowmetas['parent_span_id'][index]) == str and \
Expand Down Expand Up @@ -375,10 +379,6 @@ async def trace_l7_flow(self,
if type(new_flows) != DataFrame:
break

# if traceids:
# for trace_id in traceids:
# trace_id.set_relate(new_flows, related_map)

if xrequests:
for x_request in xrequests:
x_request.set_relate(new_flows, related_map)
Expand Down Expand Up @@ -506,27 +506,6 @@ async def query_all_flows(self, time_filter: str, l7_flow_ids: list,
return response["data"]


class L7TraceMeta:
"""
trace_id追踪
"""
def __init__(self, flow_metas: Tuple):
self._id = flow_metas[0]
self.trace_id = flow_metas[1]

def __eq__(self, rhs):
return (self.trace_id == rhs.trace_id)

def set_relate(self, df, related_map):
for i in range(len(df.index)):
if df._id[i] == self._id:
continue
if type(self.trace_id) == str and self.trace_id:
if self.trace_id == df.trace_id[i]:
related_map[df._id[i]].append(str(self._id) + "-traceid")
continue


class L7XrequestMeta:
"""
x_request_id追踪:
Expand Down Expand Up @@ -970,23 +949,6 @@ def attach_app_flow(self, flow: dict):
self.app_flow_of_direct_flows.append(flow)
return True

def attach_network(self, network: Networks):
for index in range(len(self.direct_flows)):
direct_flow = self.direct_flows[index]
if (
# 请求方向TCP SEQ无需比较(无请求方向的信息)、或相等
direct_flow['type'] == L7_FLOW_TYPE_RESPONSE or
(network.req_tcp_seq == direct_flow['req_tcp_seq']
and abs(network.start_time_us - direct_flow['start_time_us'])
< network_delay_us)) and (
# 响应方向TCP SEQ无需比较(无请求方向的信息)、或相等
flow['type'] == L7_FLOW_TYPE_REQUEST
or direct_flow['type'] == L7_FLOW_TYPE_REQUEST or
(flow['resp_tcp_seq'] == direct_flow['resp_tcp_seq']
and abs(flow['end_time_us'] - direct_flow['end_time_us'])
< network_delay_us)):
pass


def merge_flow(flows: list, flow: dict) -> bool:
"""
Expand Down
46 changes: 23 additions & 23 deletions app/app/application/tracing_completion.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def __init__(self, args, headers):
super().__init__(args, headers)
self.app_spans = [
app_span.to_primitive() for app_span in self.args.app_spans
]
] if not isinstance(self.args.app_spans, list) else self.args.app_spans
self.update_time()
self.complete_app_span()
self.app_spans_df = pd.DataFrame(self.app_spans)
Expand All @@ -38,32 +38,31 @@ async def query(self):
ntp_delay_us = self.args.get("ntp_delay_us", 10000)
self.failed_regions = set()
time_filter = f"time>={self.start_time} AND time<={self.end_time}"
_id = self.args.get("_id")
self.has_attributes = self.args.get("has_attributes", 0)
base_filter = f"_id={_id}"
rst = await self.trace_l7_flow(time_filter=time_filter,
base_filter=base_filter,
base_filter='',
return_fields=["related_ids"],
max_iteration=max_iteration,
network_delay_us=network_delay_us,
ntp_delay_us=ntp_delay_us)
if not rst:
return self.status, rst, self.failed_regions
rst.pop("services", None)
for res in rst.get("tracing", []):
res.pop("selftime", None)
res.pop("Enum(tap_side)", None)
res.pop("attribute", None)
res.pop("id", None)
res.pop("parent_id", None)
res.pop("childs", None)
res.pop("service_uid", None)
res.pop("service_uname", None)
res.pop("tap_port", None)
res.pop("tap_port_name", None)
res.pop("resource_from_vtap", None)
res.pop("set_parent_info", None)
res.pop("auto_instance", None)
if not config.call_apm_api_to_supplement_trace:
rst.pop("services", None)
for res in rst.get("tracing", []):
res.pop("selftime", None)
res.pop("Enum(tap_side)", None)
res.pop("attribute", None)
res.pop("id", None)
res.pop("parent_id", None)
res.pop("childs", None)
res.pop("service_uid", None)
res.pop("service_uname", None)
res.pop("tap_port", None)
res.pop("tap_port_name", None)
res.pop("resource_from_vtap", None)
res.pop("set_parent_info", None)
res.pop("auto_instance", None)
return self.status, rst, self.failed_regions

async def trace_l7_flow(self,
Expand Down Expand Up @@ -127,7 +126,6 @@ async def trace_l7_flow(self,
new_trace_ids -= trace_ids
trace_ids |= new_trace_ids
if new_trace_ids:
traceids = [L7TraceMeta(ntid) for ntid in new_trace_ids]
trace_ids_set = set([nxrid[1] for nxrid in new_trace_ids])
filters.append('(' + ' OR '.join([
"trace_id='{tid}'".format(tid=tid)
Expand Down Expand Up @@ -326,7 +324,7 @@ async def trace_l7_flow(self,
def update_time(self):
min_time = 0
max_time = 0
for app_span in self.args.app_spans:
for app_span in self.app_spans:
start_time_s = int(app_span.get("start_time_us", 0) / 1000000)
end_time_s = int(app_span.get("end_time_us", 0) / 1000000)
if not min_time:
Expand Down Expand Up @@ -361,7 +359,8 @@ def complete_app_span(self):
"process_id_1", "response_code", "request_id",
"subnet_id_0", "auto_service_type_1", "tap_id"
]:
app_span[tag_int] = 0
app_span[tag_int] = 0 if not app_span.get(
tag_int) else app_span[tag_int]
for tag_str in [
"x_request_id_0", "x_request_id_1", "auto_instance_0",
"auto_instance_1", "subnet_0", "app_service",
Expand All @@ -374,5 +373,6 @@ def complete_app_span(self):
"Enum(tap_side)", "tap_port_name", "endpoint",
"auto_service_1", "response_result", "tap"
]:
app_span[tag_str] = ""
app_span[tag_str] = "" if not app_span.get(
tag_str) else app_span[tag_str]
app_span["resource_from_vtap"] = (0, 0, "", 0, 0, "")
2 changes: 2 additions & 0 deletions app/app/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ def parse_spec(self, cfg):
self.network_delay_us = spec.get('network_delay_us', 1000000)
self.allow_multiple_trace_ids_in_tracing_result = spec.get(
'allow_multiple_trace_ids_in_tracing_result', False)
self.call_apm_api_to_supplement_trace = spec.get(
'call_apm_api_to_supplement_trace', False)

def parse_querier(self, cfg):
querier = cfg.get('querier', dict())
Expand Down

0 comments on commit 0e8de3d

Please sign in to comment.