Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

firewall: T6257: Show member information for dynamic groups in op-mode (backport #3369) #3385

Merged
merged 1 commit into from
May 1, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 70 additions & 8 deletions src/op_mode/firewall.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import argparse
import ipaddress
import json
import re
import tabulate
import textwrap
Expand Down Expand Up @@ -89,10 +90,38 @@ def get_nftables_details(family, hook, priority):
out[rule_id] = rule
return out

def output_firewall_vertical(rules, headers):
def get_nftables_group_members(family, table, name):
prefix = 'ip6' if family == 'ipv6' else 'ip'
out = []

try:
results_str = cmd(f'sudo nft -j list set {prefix} {table} {name}')
results = json.loads(results_str)
except:
return out

if 'nftables' not in results:
return out

for obj in results['nftables']:
if 'set' not in obj:
continue

set_obj = obj['set']

if 'elem' in set_obj:
for elem in set_obj['elem']:
if isinstance(elem, str):
out.append(elem)
elif isinstance(elem, dict) and 'elem' in elem:
out.append(elem['elem'])

return out

def output_firewall_vertical(rules, headers, adjust=True):
for rule in rules:
adjusted_rule = rule + [""] * (len(headers) - len(rule)) # account for different header length, like default-action
transformed_rule = [[header, textwrap.fill(adjusted_rule[i].replace('\n', ' '), 65)] for i, header in enumerate(headers)] # create key-pair list from headers and rules lists; wrap at 100 char
adjusted_rule = rule + [""] * (len(headers) - len(rule)) if adjust else rule # account for different header length, like default-action
transformed_rule = [[header, textwrap.fill(adjusted_rule[i].replace('\n', ' '), 65)] for i, header in enumerate(headers) if i < len(adjusted_rule)] # create key-pair list from headers and rules lists; wrap at 100 char

print(tabulate.tabulate(transformed_rule, tablefmt="presto"))
print()
Expand Down Expand Up @@ -453,6 +482,7 @@ def find_references(group_type, group_name):
return out

rows = []
header_tail = []

for group_type, group_type_conf in firewall['group'].items():
##
Expand All @@ -479,21 +509,53 @@ def find_references(group_type, group_name):
rows.append(row)

else:
if not args.detail:
header_tail = ['Timeout', 'Expires']

for dynamic_type in ['address_group', 'ipv6_address_group']:
family = 'ipv4' if dynamic_type == 'address_group' else 'ipv6'
prefix = 'DA_' if dynamic_type == 'address_group' else 'DA6_'
if dynamic_type in firewall['group']['dynamic_group']:
for dynamic_name, dynamic_conf in firewall['group']['dynamic_group'][dynamic_type].items():
references = find_references(dynamic_type, dynamic_name)
row = [dynamic_name, textwrap.fill(dynamic_conf.get('description') or '', 50), dynamic_type + '(dynamic)', '\n'.join(references) or 'N/D']
row.append('N/D')
rows.append(row)

members = get_nftables_group_members(family, 'vyos_filter', f'{prefix}{dynamic_name}')

if not members:
if args.detail:
row.append('N/D')
else:
row += ["N/D"] * 3
rows.append(row)
continue

for idx, member in enumerate(members):
val = member.get('val', 'N/D')
timeout = str(member.get('timeout', 'N/D'))
expires = str(member.get('expires', 'N/D'))

if args.detail:
row.append(f'{val} (timeout: {timeout}, expires: {expires})')
continue

if idx > 0:
row = [""] * 4

row += [val, timeout, expires]
rows.append(row)

if args.detail:
header_tail += [""] * (len(members) - 1)
rows.append(row)

if rows:
print('Firewall Groups\n')
if args.detail:
header = ['Name', 'Description','Type', 'References', 'Members']
output_firewall_vertical(rows, header)
header = ['Name', 'Description', 'Type', 'References', 'Members'] + header_tail
output_firewall_vertical(rows, header, adjust=False)
else:
header = ['Name', 'Type', 'References', 'Members']
header = ['Name', 'Type', 'References', 'Members'] + header_tail
for i in rows:
rows[rows.index(i)].pop(1)
print(tabulate.tabulate(rows, header))
Expand Down
Loading