Skip to content

Commit

Permalink
Cisco: Fixing formatting and deprecated options (datetime.utcnow)
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 627838938
  • Loading branch information
Capirca Team committed Apr 24, 2024
1 parent f03e6c4 commit 8619edb
Showing 1 changed file with 74 additions and 34 deletions.
108 changes: 74 additions & 34 deletions capirca/lib/cisco.py
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,7 @@ def GetProtocol(port_num, proto, platform='cisco'):
except KeyError:
return port_num


class Term(aclgenerator.Term):
"""A single ACL Term."""
ALLOWED_PROTO_STRINGS = ['eigrp', 'gre', 'icmp', 'igmp', 'igrp', 'ip',
Expand Down Expand Up @@ -573,29 +574,41 @@ def __str__(self):

# options
opts = [str(x) for x in self.term.option]
if ((self.PROTO_MAP['tcp'] in protocol or 'tcp' in protocol)
and ('tcp-established' in opts or 'established' in opts)):
if (self.PROTO_MAP['tcp'] in protocol or 'tcp' in protocol) and (
'tcp-established' in opts or 'established' in opts
):
if 'established' not in self.options:
self.options.append('established')
# Using both 'fragments' and 'is-fragment', ref Github Issue #187
if ('ip' in protocol) and (('fragments' in opts) or
('is-fragment' in opts)):
if ('ip' in protocol) and (
('fragments' in opts) or ('is-fragment' in opts)
):
if 'fragments' not in self.options:
self.options.append('fragments')
# ACL-based Forwarding
if (self.platform == 'ciscoxr'
) and not self.term.action and self.term.next_ip and (
'nexthop1' not in opts):
if (
(self.platform == 'ciscoxr')
and not self.term.action
and self.term.next_ip
and ('nexthop1' not in opts)
):
if len(self.term.next_ip) > 1:
raise CiscoNextIpError('The following term has more than one next IP '
'value: %s' % self.term.name)
if (not isinstance(self.term.next_ip[0], nacaddr.IPv4) and
not isinstance(self.term.next_ip[0], nacaddr.IPv6)):
raise CiscoNextIpError('Next IP value must be an IP address. '
'Invalid term: %s' % self.term.name)
raise CiscoNextIpError(
'The following term has more than one next IP value: %s'
% self.term.name
)
if not isinstance(self.term.next_ip[0], nacaddr.IPv4) and not isinstance(
self.term.next_ip[0], nacaddr.IPv6
):
raise CiscoNextIpError(
'Next IP value must be an IP address. Invalid term: %s'
% self.term.name
)
if self.term.next_ip[0].num_addresses > 1:
raise CiscoNextIpError('The following term has a subnet instead of a '
'host: %s' % self.term.name)
raise CiscoNextIpError(
'The following term has a subnet instead of a host: %s'
% self.term.name
)
nexthop = self.term.next_ip[0].network_address
nexthop_protocol = 'ipv4' if nexthop.version == 4 else 'ipv6'
self.options.append('nexthop1 %s %s' % (nexthop_protocol, nexthop))
Expand Down Expand Up @@ -930,7 +943,7 @@ def _BuildTokens(self):

def _TranslatePolicy(self, pol, exp_info):
self.cisco_policies = []
current_date = datetime.datetime.utcnow().date()
current_date = datetime.datetime.now(datetime.timezone.utc).date()
exp_info_date = current_date + datetime.timedelta(weeks=exp_info)

# a mixed filter outputs both ipv4 and ipv6 acls in the same output file
Expand Down Expand Up @@ -998,45 +1011,72 @@ def _TranslatePolicy(self, pol, exp_info):
continue

# Ignore if the term is for a different AF
if term.restrict_address_family and term.restrict_address_family != af:
if (
term.restrict_address_family
and term.restrict_address_family != af
):
continue

if term.expiration:
if term.expiration <= exp_info_date:
logging.info('INFO: Term %s in policy %s expires '
'in less than two weeks.', term.name, filter_name)
logging.info(
'INFO: Term %s in policy %s expires in less than two weeks.',
term.name,
filter_name,
)
if term.expiration <= current_date:
logging.warning('WARNING: Term %s in policy %s is expired and '
'will not be rendered.', term.name, filter_name)
logging.warning(
'WARNING: Term %s in policy %s is expired and '
'will not be rendered.',
term.name,
filter_name,
)
continue

# render terms based on filter type
if next_filter == 'standard':
# keep track of sequence numbers across terms
new_terms.append(TermStandard(term, filter_name, self._PLATFORM,
self.verbose))
new_terms.append(
TermStandard(term, filter_name, self._PLATFORM, self.verbose)
)
elif next_filter == 'extended':
enable_dsmo = (len(filter_options) > 2 and
filter_options[2] == 'enable_dsmo')
enable_dsmo = (
len(filter_options) > 2 and filter_options[2] == 'enable_dsmo'
)
new_terms.append(
Term(term, proto_int=self._PROTO_INT, enable_dsmo=enable_dsmo,
term_remark=self._TERM_REMARK, platform=self._PLATFORM,
verbose=self.verbose))
Term(
term,
proto_int=self._PROTO_INT,
enable_dsmo=enable_dsmo,
term_remark=self._TERM_REMARK,
platform=self._PLATFORM,
verbose=self.verbose,
)
)
elif next_filter == 'object-group':
obj_target.AddTerm(term)
new_terms.append(self._GetObjectGroupTerm(term, filter_name,
verbose=self.verbose))
new_terms.append(
self._GetObjectGroupTerm(
term, filter_name, verbose=self.verbose
)
)
elif next_filter == 'inet6':
new_terms.append(
Term(
term, 6, proto_int=self._PROTO_INT,
platform=self._PLATFORM, verbose=self.verbose))
term,
6,
proto_int=self._PROTO_INT,
platform=self._PLATFORM,
verbose=self.verbose,
)
)

# cisco requires different name for the v4 and v6 acls
if filter_type == 'mixed' and next_filter == 'inet6':
filter_name = 'ipv6-%s' % filter_name
self.cisco_policies.append((header, filter_name, [next_filter],
new_terms, obj_target))
self.cisco_policies.append(
(header, filter_name, [next_filter], new_terms, obj_target)
)

def _GetObjectGroupTerm(self, term, filter_name, verbose=True):
"""Returns an ObjectGroupTerm object."""
Expand Down

0 comments on commit 8619edb

Please sign in to comment.