Skip to content

Commit

Permalink
Separate trace_id queries (#141)
Browse files Browse the repository at this point in the history
**Phenomenon and reproduction steps**

**Root cause and solution**

**Impactions**

**Test method**

**Affected branch(es)**

* main

**Checklist**

- [ ] Dependencies update required
- [ ] Common bug (similar problem in other repo)
  • Loading branch information
xiaochaoren1 authored Oct 25, 2023
1 parent 81e67f5 commit dd95c27
Showing 1 changed file with 62 additions and 32 deletions.
94 changes: 62 additions & 32 deletions app/app/application/l7_flow_tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,16 +204,19 @@ async def trace_l7_flow(self,
related_map[dataframe_flowmetas['_id'][0]] = [
f"{dataframe_flowmetas['_id'][0]}-base"
]
# tempo api
trace_id = self.args.get("trace_id") if self.args.get(
"trace_id") else ''
allow_multiple_trace_ids_in_tracing_result = config.allow_multiple_trace_ids_in_tracing_result
call_apm_api_to_supplement_trace = config.call_apm_api_to_supplement_trace
multi_trace_ids = set()
query_simple_trace_id = False
for i in range(max_iteration):
if type(dataframe_flowmetas) != DataFrame:
break
filters = []

new_trace_id_flows = pd.DataFrame()
new_trace_id_filters = []
# 主动注入的追踪信息
if not allow_multiple_trace_ids_in_tracing_result:
delete_index = []
Expand All @@ -225,9 +228,19 @@ async def trace_l7_flow(self,
delete_index.append(index)
if not trace_id:
trace_id = dataframe_flowmetas['trace_id'][index]
if trace_id:
filters.append(f"trace_id='{trace_id}'")
if trace_id and not query_simple_trace_id:
new_trace_id_filters.append(f"trace_id='{trace_id}'")
# Trace id query separately
new_trace_id_flows = await self.query_flowmetas(
time_filter, ' OR '.join(new_trace_id_filters))
if type(new_trace_id_flows) != DataFrame:
break
new_trace_id_flows.rename(columns={'_id_str': '_id'},
inplace=True)
query_simple_trace_id = True
dataframe_flowmetas = dataframe_flowmetas.drop(delete_index)
dataframe_flowmetas = dataframe_flowmetas.reset_index(
drop=True)
if call_apm_api_to_supplement_trace and trace_id not in multi_trace_ids:
get_third_app_span_url = f"http://{config.querier_server}:{config.querier_port}/api/v1/adapter/tracing?traceid={trace_id}"
app_spans_res, app_spans_code = await curl_perform(
Expand Down Expand Up @@ -282,10 +295,17 @@ async def trace_l7_flow(self,
trace_ids |= new_trace_ids
if new_trace_ids:
trace_ids_set = set([nxrid[1] for nxrid in new_trace_ids])
filters.append('(' + ' OR '.join([
new_trace_id_filters.append('(' + ' OR '.join([
"trace_id='{tid}'".format(tid=tid)
for tid in trace_ids_set
]) + ')')
# Trace id query separately
new_trace_id_flows = await self.query_flowmetas(
time_filter, ' OR '.join(new_trace_id_filters))
if type(new_trace_id_flows) != DataFrame:
break
new_trace_id_flows.rename(columns={'_id_str': '_id'},
inplace=True)

# 新的网络追踪信息
new_network_metas = set()
Expand Down Expand Up @@ -407,22 +427,11 @@ async def trace_l7_flow(self,
]) + ')'
filters.append(x_request_filters)

# L7 Flow ID信息
l7_flow_ids |= set(dataframe_flowmetas['_id'])
len_of_flows = len(l7_flow_ids)

if not filters:
if not filters and not new_trace_id_filters:
break

if not allow_multiple_trace_ids_in_tracing_result and trace_id:
new_filters = []
new_filters.append(f"({'OR '.join(filters)})")
new_filters.append(f"(trace_id='{trace_id}' OR trace_id='')")
new_flows = await self.query_flowmetas(
time_filter, ' AND '.join(new_filters))
else:
new_flows = await self.query_flowmetas(time_filter,
' OR '.join(filters))
# Non-trace_id relational queries
new_flows = await self.query_flowmetas(time_filter,
' OR '.join(filters))
if type(new_flows) != DataFrame:
break
new_flows.rename(columns={'_id_str': '_id'}, inplace=True)
Expand All @@ -442,11 +451,17 @@ async def trace_l7_flow(self,
if apps:
for app in apps:
app.set_relate(new_flows, related_map)
dataframe_flowmetas = pd.concat([dataframe_flowmetas, new_flows],
join="outer",
ignore_index=True).drop_duplicates(
["_id"]).reset_index(drop=True)
if len(set(dataframe_flowmetas['_id'])) - len_of_flows < 1:
# Merge all flows and check if any new flows are generated
old_flows_length = len(dataframe_flowmetas)
dataframe_flowmetas = pd.concat(
[dataframe_flowmetas, new_flows, new_trace_id_flows],
join="outer",
ignore_index=True).drop_duplicates(["_id"
]).reset_index(drop=True)
# L7 Flow ID信息
l7_flow_ids |= set(dataframe_flowmetas['_id'])
new_flows_length = len(dataframe_flowmetas)
if old_flows_length == new_flows_length:
break
if not l7_flow_ids:
return {}
Expand Down Expand Up @@ -521,7 +536,11 @@ async def query_flowmetas(self, time_filter: str,
base_filter=base_filter,
l7_tracing_limit=config.l7_tracing_limit)
response = await self.query_ck(sql)
self.status.append("Query FlowMetas", response)
# Hit Select Limit
status_discription = "Query FlowMetas"
if len(response.get("data", [])) == config.l7_tracing_limit:
status_discription += " Hit Select Limit"
self.status.append(status_discription, response)
return response.get("data", [])

async def query_all_flows(self, time_filter: str, l7_flow_ids: list,
Expand Down Expand Up @@ -1621,20 +1640,26 @@ def sort_tracing(self):
for i, trace in enumerate(self.traces)
}
spans = []
finded_child_ids = []
for trace in self.traces:
if trace["parent_id"] == -1:
spans.append(trace)
spans.extend(self.find_child(trace["childs"]))
spans.extend(self.find_child(trace["childs"],
finded_child_ids))
return spans

def find_child(self, child_ids):
def find_child(self, child_ids, finded_child_ids):
spans = []
for _id in child_ids:
if _id not in self.uid_index_map:
continue
# Avoid ring
if _id in finded_child_ids:
continue
trace = self.traces[self.uid_index_map[_id]]
spans.append(trace)
spans.extend(self.find_child(trace["childs"]))
finded_child_ids.append(_id)
spans.extend(self.find_child(trace["childs"], finded_child_ids))
return spans


Expand Down Expand Up @@ -1799,7 +1824,6 @@ def network_flow_sort(traces):
const.TAP_SIDE_SERVER_GATEWAY_HAPERVISOR
] or trace['tap'] != "虚拟网络":
response_duration_sort = True
break
if trace['tap_side'] in [const.TAP_SIDE_LOCAL, const.TAP_SIDE_REST]:
local_rest_traces.append(trace)
elif trace['tap_side'] in [
Expand All @@ -1810,7 +1834,7 @@ def network_flow_sort(traces):
sorted_traces.append(trace)
if response_duration_sort:
sorted_traces = sorted(
traces,
sorted_traces + local_rest_traces,
key=lambda x:
(-x['response_duration'], const.TAP_SIDE_RANKS.get(x['tap_side']),
x['tap_side']))
Expand All @@ -1821,7 +1845,7 @@ def network_flow_sort(traces):
sorted_traces.append(sys_trace)
return sorted_traces
sorted_traces = sorted(
sorted_traces,
sorted_traces + sys_traces,
key=lambda x: (const.TAP_SIDE_RANKS.get(x['tap_side']), x['tap_side']))
if not sorted_traces:
sorted_traces += local_rest_traces
Expand All @@ -1848,11 +1872,17 @@ def network_flow_sort(traces):


def get_parent_trace(parent_trace, traces):
if not traces:
return parent_trace
for trace in traces:
if trace.get('_uid') == parent_trace.get('_uid'):
continue
if trace.get('x_request_id_0') == parent_trace.get('x_request_id_1'):
return get_parent_trace(trace, traces)
# Avoid ring
new_traces = [
i for i in traces if i.get('_uid') != trace.get('_uid')
]
return get_parent_trace(trace, new_traces)
else:
return parent_trace

Expand Down

0 comments on commit dd95c27

Please sign in to comment.