Skip to content

Commit

Permalink
Merge pull request #3908 from vyos/mergify/bp/circinus/pr-3763
Browse files Browse the repository at this point in the history
ipsec: T6148: Fixed reset command by adding init after terminating (backport #3763)
  • Loading branch information
c-po authored Jul 31, 2024
2 parents 10bf3c8 + 9be079a commit cdfa92e
Show file tree
Hide file tree
Showing 2 changed files with 458 additions and 168 deletions.
136 changes: 100 additions & 36 deletions python/vyos/ipsec.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2020-2023 VyOS maintainers and contributors <[email protected]>
# Copyright 2020-2024 VyOS maintainers and contributors <[email protected]>
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
Expand All @@ -13,50 +13,58 @@
# You should have received a copy of the GNU Lesser General Public
# License along with this library. If not, see <http://www.gnu.org/licenses/>.

#Package to communicate with Strongswan VICI
# Package to communicate with Strongswan VICI


class ViciInitiateError(Exception):
"""
VICI can't initiate a session.
VICI can't initiate a session.
"""

pass


class ViciCommandError(Exception):
"""
VICI can't execute a command by any reason.
VICI can't execute a command by any reason.
"""

pass


def get_vici_sas():
from vici import Session as vici_session

try:
session = vici_session()
except Exception:
raise ViciInitiateError("IPsec not initialized")
raise ViciInitiateError('IPsec not initialized')
try:
sas = list(session.list_sas())
return sas
except Exception:
raise ViciCommandError(f'Failed to get SAs')
raise ViciCommandError('Failed to get SAs')


def get_vici_connections():
from vici import Session as vici_session

try:
session = vici_session()
except Exception:
raise ViciInitiateError("IPsec not initialized")
raise ViciInitiateError('IPsec not initialized')
try:
connections = list(session.list_conns())
return connections
except Exception:
raise ViciCommandError(f'Failed to get connections')
raise ViciCommandError('Failed to get connections')


def get_vici_sas_by_name(ike_name: str, tunnel: str) -> list:
"""
Find sas by IKE_SA name and/or CHILD_SA name
and return list of OrdinaryDicts with SASs info
If tunnel is not None return value is list of OrdenaryDicts contained only
Find installed SAs by IKE_SA name and/or CHILD_SA name
and return list with SASs info.
If tunnel is not None return a list contained only
CHILD_SAs wich names equal tunnel value.
:param ike_name: IKE SA name
:type ike_name: str
Expand All @@ -70,7 +78,7 @@ def get_vici_sas_by_name(ike_name: str, tunnel: str) -> list:
try:
session = vici_session()
except Exception:
raise ViciInitiateError("IPsec not initialized")
raise ViciInitiateError('IPsec not initialized')
vici_dict = {}
if ike_name:
vici_dict['ike'] = ike_name
Expand All @@ -80,7 +88,31 @@ def get_vici_sas_by_name(ike_name: str, tunnel: str) -> list:
sas = list(session.list_sas(vici_dict))
return sas
except Exception:
raise ViciCommandError(f'Failed to get SAs')
raise ViciCommandError('Failed to get SAs')


def get_vici_connection_by_name(ike_name: str) -> list:
"""
Find loaded SAs by IKE_SA name and return list with SASs info
:param ike_name: IKE SA name
:type ike_name: str
:return: list of Ordinary Dicts with SASs
:rtype: list
"""
from vici import Session as vici_session

try:
session = vici_session()
except Exception:
raise ViciInitiateError('IPsec is not initialized')
vici_dict = {}
if ike_name:
vici_dict['ike'] = ike_name
try:
sas = list(session.list_conns(vici_dict))
return sas
except Exception:
raise ViciCommandError('Failed to get SAs')


def terminate_vici_ikeid_list(ike_id_list: list) -> None:
Expand All @@ -94,19 +126,17 @@ def terminate_vici_ikeid_list(ike_id_list: list) -> None:
try:
session = vici_session()
except Exception:
raise ViciInitiateError("IPsec not initialized")
raise ViciInitiateError('IPsec is not initialized')
try:
for ikeid in ike_id_list:
session_generator = session.terminate(
{'ike-id': ikeid, 'timeout': '-1'})
session_generator = session.terminate({'ike-id': ikeid, 'timeout': '-1'})
# a dummy `for` loop is required because of requirements
# from vici. Without a full iteration on the output, the
# command to vici may not be executed completely
for _ in session_generator:
pass
except Exception:
raise ViciCommandError(
f'Failed to terminate SA for IKE ids {ike_id_list}')
raise ViciCommandError(f'Failed to terminate SA for IKE ids {ike_id_list}')


def terminate_vici_by_name(ike_name: str, child_name: str) -> None:
Expand All @@ -123,9 +153,9 @@ def terminate_vici_by_name(ike_name: str, child_name: str) -> None:
try:
session = vici_session()
except Exception:
raise ViciInitiateError("IPsec not initialized")
raise ViciInitiateError('IPsec is not initialized')
try:
vici_dict: dict= {}
vici_dict: dict = {}
if ike_name:
vici_dict['ike'] = ike_name
if child_name:
Expand All @@ -138,16 +168,48 @@ def terminate_vici_by_name(ike_name: str, child_name: str) -> None:
pass
except Exception:
if child_name:
raise ViciCommandError(
f'Failed to terminate SA for IPSEC {child_name}')
raise ViciCommandError(f'Failed to terminate SA for IPSEC {child_name}')
else:
raise ViciCommandError(
f'Failed to terminate SA for IKE {ike_name}')
raise ViciCommandError(f'Failed to terminate SA for IKE {ike_name}')


def vici_initiate_all_child_sa_by_ike(ike_sa_name: str, child_sa_list: list) -> bool:
"""
Initiate IKE SA with scpecified CHILD_SAs in list
Args:
ike_sa_name (str): an IKE SA connection name
child_sa_list (list): a list of child SA names
Returns:
bool: a result of initiation command
"""
from vici import Session as vici_session

try:
session = vici_session()
except Exception:
raise ViciInitiateError('IPsec is not initialized')

try:
for child_sa_name in child_sa_list:
session_generator = session.initiate(
{'ike': ike_sa_name, 'child': child_sa_name, 'timeout': '-1'}
)
# a dummy `for` loop is required because of requirements
# from vici. Without a full iteration on the output, the
# command to vici may not be executed completely
for _ in session_generator:
pass
return True
except Exception:
raise ViciCommandError(f'Failed to initiate SA for IKE {ike_sa_name}')


def vici_initiate(ike_sa_name: str, child_sa_name: str, src_addr: str,
dst_addr: str) -> bool:
"""Initiate IKE SA connection with specific peer
def vici_initiate(
ike_sa_name: str, child_sa_name: str, src_addr: str, dst_addr: str
) -> bool:
"""Initiate IKE SA with one child_sa connection with specific peer
Args:
ike_sa_name (str): an IKE SA connection name
Expand All @@ -163,21 +225,23 @@ def vici_initiate(ike_sa_name: str, child_sa_name: str, src_addr: str,
try:
session = vici_session()
except Exception:
raise ViciInitiateError("IPsec not initialized")
raise ViciInitiateError('IPsec is not initialized')

try:
session_generator = session.initiate({
'ike': ike_sa_name,
'child': child_sa_name,
'timeout': '-1',
'my-host': src_addr,
'other-host': dst_addr
})
session_generator = session.initiate(
{
'ike': ike_sa_name,
'child': child_sa_name,
'timeout': '-1',
'my-host': src_addr,
'other-host': dst_addr,
}
)
# a dummy `for` loop is required because of requirements
# from vici. Without a full iteration on the output, the
# command to vici may not be executed completely
for _ in session_generator:
pass
return True
except Exception:
raise ViciCommandError(f'Failed to initiate SA for IKE {ike_sa_name}')
raise ViciCommandError(f'Failed to initiate SA for IKE {ike_sa_name}')
Loading

0 comments on commit cdfa92e

Please sign in to comment.