diff --git a/app/app/application/l7_flow_tracing.py b/app/app/application/l7_flow_tracing.py index 98a502e..4462b80 100644 --- a/app/app/application/l7_flow_tracing.py +++ b/app/app/application/l7_flow_tracing.py @@ -204,16 +204,19 @@ async def trace_l7_flow(self, related_map[dataframe_flowmetas['_id'][0]] = [ f"{dataframe_flowmetas['_id'][0]}-base" ] + # tempo api trace_id = self.args.get("trace_id") if self.args.get( "trace_id") else '' allow_multiple_trace_ids_in_tracing_result = config.allow_multiple_trace_ids_in_tracing_result call_apm_api_to_supplement_trace = config.call_apm_api_to_supplement_trace multi_trace_ids = set() + query_simple_trace_id = False for i in range(max_iteration): if type(dataframe_flowmetas) != DataFrame: break filters = [] - + new_trace_id_flows = pd.DataFrame() + new_trace_id_filters = [] # 主动注入的追踪信息 if not allow_multiple_trace_ids_in_tracing_result: delete_index = [] @@ -225,9 +228,19 @@ async def trace_l7_flow(self, delete_index.append(index) if not trace_id: trace_id = dataframe_flowmetas['trace_id'][index] - if trace_id: - filters.append(f"trace_id='{trace_id}'") + if trace_id and not query_simple_trace_id: + new_trace_id_filters.append(f"trace_id='{trace_id}'") + # Trace id query separately + new_trace_id_flows = await self.query_flowmetas( + time_filter, ' OR '.join(new_trace_id_filters)) + if type(new_trace_id_flows) != DataFrame: + break + new_trace_id_flows.rename(columns={'_id_str': '_id'}, + inplace=True) + query_simple_trace_id = True dataframe_flowmetas = dataframe_flowmetas.drop(delete_index) + dataframe_flowmetas = dataframe_flowmetas.reset_index( + drop=True) if call_apm_api_to_supplement_trace and trace_id not in multi_trace_ids: get_third_app_span_url = f"http://{config.querier_server}:{config.querier_port}/api/v1/adapter/tracing?traceid={trace_id}" app_spans_res, app_spans_code = await curl_perform( @@ -282,10 +295,17 @@ async def trace_l7_flow(self, trace_ids |= new_trace_ids if new_trace_ids: trace_ids_set = set([nxrid[1] for nxrid in new_trace_ids]) - filters.append('(' + ' OR '.join([ + new_trace_id_filters.append('(' + ' OR '.join([ "trace_id='{tid}'".format(tid=tid) for tid in trace_ids_set ]) + ')') + # Trace id query separately + new_trace_id_flows = await self.query_flowmetas( + time_filter, ' OR '.join(new_trace_id_filters)) + if type(new_trace_id_flows) != DataFrame: + break + new_trace_id_flows.rename(columns={'_id_str': '_id'}, + inplace=True) # 新的网络追踪信息 new_network_metas = set() @@ -407,22 +427,11 @@ async def trace_l7_flow(self, ]) + ')' filters.append(x_request_filters) - # L7 Flow ID信息 - l7_flow_ids |= set(dataframe_flowmetas['_id']) - len_of_flows = len(l7_flow_ids) - - if not filters: + if not filters and not new_trace_id_filters: break - - if not allow_multiple_trace_ids_in_tracing_result and trace_id: - new_filters = [] - new_filters.append(f"({'OR '.join(filters)})") - new_filters.append(f"(trace_id='{trace_id}' OR trace_id='')") - new_flows = await self.query_flowmetas( - time_filter, ' AND '.join(new_filters)) - else: - new_flows = await self.query_flowmetas(time_filter, - ' OR '.join(filters)) + # Non-trace_id relational queries + new_flows = await self.query_flowmetas(time_filter, + ' OR '.join(filters)) if type(new_flows) != DataFrame: break new_flows.rename(columns={'_id_str': '_id'}, inplace=True) @@ -442,11 +451,17 @@ async def trace_l7_flow(self, if apps: for app in apps: app.set_relate(new_flows, related_map) - dataframe_flowmetas = pd.concat([dataframe_flowmetas, new_flows], - join="outer", - ignore_index=True).drop_duplicates( - ["_id"]).reset_index(drop=True) - if len(set(dataframe_flowmetas['_id'])) - len_of_flows < 1: + # Merge all flows and check if any new flows are generated + old_flows_length = len(dataframe_flowmetas) + dataframe_flowmetas = pd.concat( + [dataframe_flowmetas, new_flows, new_trace_id_flows], + join="outer", + ignore_index=True).drop_duplicates(["_id" + ]).reset_index(drop=True) + # L7 Flow ID信息 + l7_flow_ids |= set(dataframe_flowmetas['_id']) + new_flows_length = len(dataframe_flowmetas) + if old_flows_length == new_flows_length: break if not l7_flow_ids: return {} @@ -521,7 +536,11 @@ async def query_flowmetas(self, time_filter: str, base_filter=base_filter, l7_tracing_limit=config.l7_tracing_limit) response = await self.query_ck(sql) - self.status.append("Query FlowMetas", response) + # Hit Select Limit + status_discription = "Query FlowMetas" + if len(response.get("data", [])) == config.l7_tracing_limit: + status_discription += " Hit Select Limit" + self.status.append(status_discription, response) return response.get("data", []) async def query_all_flows(self, time_filter: str, l7_flow_ids: list, @@ -1621,20 +1640,26 @@ def sort_tracing(self): for i, trace in enumerate(self.traces) } spans = [] + finded_child_ids = [] for trace in self.traces: if trace["parent_id"] == -1: spans.append(trace) - spans.extend(self.find_child(trace["childs"])) + spans.extend(self.find_child(trace["childs"], + finded_child_ids)) return spans - def find_child(self, child_ids): + def find_child(self, child_ids, finded_child_ids): spans = [] for _id in child_ids: if _id not in self.uid_index_map: continue + # Avoid ring + if _id in finded_child_ids: + continue trace = self.traces[self.uid_index_map[_id]] spans.append(trace) - spans.extend(self.find_child(trace["childs"])) + finded_child_ids.append(_id) + spans.extend(self.find_child(trace["childs"], finded_child_ids)) return spans @@ -1799,7 +1824,6 @@ def network_flow_sort(traces): const.TAP_SIDE_SERVER_GATEWAY_HAPERVISOR ] or trace['tap'] != "虚拟网络": response_duration_sort = True - break if trace['tap_side'] in [const.TAP_SIDE_LOCAL, const.TAP_SIDE_REST]: local_rest_traces.append(trace) elif trace['tap_side'] in [ @@ -1810,7 +1834,7 @@ def network_flow_sort(traces): sorted_traces.append(trace) if response_duration_sort: sorted_traces = sorted( - traces, + sorted_traces + local_rest_traces, key=lambda x: (-x['response_duration'], const.TAP_SIDE_RANKS.get(x['tap_side']), x['tap_side'])) @@ -1821,7 +1845,7 @@ def network_flow_sort(traces): sorted_traces.append(sys_trace) return sorted_traces sorted_traces = sorted( - sorted_traces, + sorted_traces + sys_traces, key=lambda x: (const.TAP_SIDE_RANKS.get(x['tap_side']), x['tap_side'])) if not sorted_traces: sorted_traces += local_rest_traces @@ -1848,11 +1872,17 @@ def network_flow_sort(traces): def get_parent_trace(parent_trace, traces): + if not traces: + return parent_trace for trace in traces: if trace.get('_uid') == parent_trace.get('_uid'): continue if trace.get('x_request_id_0') == parent_trace.get('x_request_id_1'): - return get_parent_trace(trace, traces) + # Avoid ring + new_traces = [ + i for i in traces if i.get('_uid') != trace.get('_uid') + ] + return get_parent_trace(trace, new_traces) else: return parent_trace