Skip to content

Commit

Permalink
avahi: add connectivity checker to verify IP addresses are reachable
Browse files Browse the repository at this point in the history
Signed-off-by: Martin Belanger <[email protected]>
  • Loading branch information
Martin Belanger committed Jul 13, 2023
1 parent cb082b7 commit 689df67
Show file tree
Hide file tree
Showing 4 changed files with 291 additions and 90 deletions.
3 changes: 3 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@
New features:

- Support for nBFT (NVMe-oF Boot Table).
- The Avahi driver will now verify reachability of services discovered through mDNS to make sure all discovered IP addresses can be connected to. This avoids invoking the NVMe kernel driver with invalid IP addresses and getting error messages in the syslog.
- The Avahi driver will now print an error message if the same IP address is found on multiple interfaces. This indicates a misconfiguration of the network.

Bug fixes:

* For TCP transport: use `sysfs` controller `src_addr` attribute when matching to a configured "candidate" controller. This is to determine when an existing controller (located under the `sysfs`) can be reused instead of creating a new one. This avoids creating unnecessary duplicate connections.
* Udev event handling: use `systemctl restart` instead of `systemctl start`. There is a small chance that a `start` operation has not completed when a new `start` is required. Issuing a `start` while a `start` is being performed has no effect. However, a `restart` will be handled properly.
* `stafd`: Do not delete and recreate DC objects on kernel events indicating that an nvme device associated to a discovery controller was removed by the kernel. This was done to kick start the reconnect process, but was also causing the DLPE (Discovery Log Page Entries) cache to be lost. This could potentially result in `stacd` disconnecting from I/O controllers. Instead, keep the existing DC object which contains a valid DLPE cache and simply restart the "retry to connect" timer. This way the DLPE cache is maintained throughout the reconnect to DC process.

## Changes with release 2.2.3

Expand Down
2 changes: 1 addition & 1 deletion meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
project(
'nvme-stas',
meson_version: '>= 0.53.0',
version: '2.3-rc1',
version: '2.3-rc2',
license: 'Apache-2.0',
default_options: [
'buildtype=release',
Expand Down
278 changes: 189 additions & 89 deletions staslib/avahi.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import dasbus.client.proxy
import dasbus.client.observer
from gi.repository import GLib
from staslib import defs, conf, gutil
from staslib import defs, conf, gutil, iputil


def _txt2dict(txt: list):
Expand Down Expand Up @@ -54,6 +54,141 @@ def _proto2trans(protocol):
return None


def mk_service_key(interface, protocol, name, stype, domain):
'''Return a tuple used as a service key (unique identifier)'''
return (interface, protocol, name, stype, domain)


def fmt_service_str(interface, protocol, name, stype, domain, flags): # pylint: disable=too-many-arguments
'''Return service identifier as a string'''
return (
f'interface={interface}:{(socket.if_indextoname(interface) + ","):<9} '
f'protocol={Avahi.protocol_as_string(protocol)}, '
f'stype={stype}, '
f'domain={domain}, '
f'flags={flags}:{(Avahi.result_flags_as_string(flags) + ","):<12} '
f'name={name}'
)


# ******************************************************************************
class Service: # pylint: disable=too-many-instance-attributes
'''Object used to keep track of the services discovered from the avahi-daemon'''

interface_name = property(lambda self: self._interface_name)
interface = property(lambda self: self._interface_id)
ip_family = property(lambda self: self._ip_family)
reachable = property(lambda self: self._reachable)
protocol = property(lambda self: self._protocol_id)
key_str = property(lambda self: self._key_str)
domain = property(lambda self: self._domain)
stype = property(lambda self: self._stype)
data = property(lambda self: self._data)
name = property(lambda self: self._name)
key = property(lambda self: self._key)
ip = property(lambda self: self._ip)

def __init__(self, args, identified_cback):
self._identified_cback = identified_cback
self._interface_id = args[0]
self._protocol_id = args[1]
self._name = args[2]
self._stype = args[3]
self._domain = args[4]
self._flags = args[5]
self._ip_family = 4 if self._protocol_id == Avahi.PROTO_INET else 6

self._interface_name = socket.if_indextoname(self._interface_id).strip()
self._protocol_name = Avahi.protocol_as_string(self._protocol_id)
self._flags_str = '(' + Avahi.result_flags_as_string(self._flags) + '),'

self._key = mk_service_key(self._interface_id, self._protocol_id, self._name, self._stype, self._domain)
self._key_str = f'({self._interface_name}, {self._protocol_name}, {self._name}.{self._domain}, {self._stype})'

self._id = fmt_service_str(
self._interface_id, self._protocol_id, self._name, self._stype, self._domain, self._flags
)

self._ip = None
self._resolver = None
self._data = {}
self._reachable = False
self._connect_checker = None

def info(self):
'''Return debug info'''
info = self._data
info['reachable'] = str(self._reachable)
return info

def __str__(self):
return self._id

def set_identity(self, transport, address, port, txt): # pylint: disable=too-many-arguments
'''Complete identification and check connectivity (if needed)
Return True if identification is complete. Return False if
we need to check connectivity.
'''
traddr = address.strip()
trsvcid = str(port).strip()
# host-iface permitted for tcp alone and not rdma
host_iface = self._interface_name if transport == 'tcp' else ''
self._data = {
'transport': transport,
'traddr': traddr,
'trsvcid': trsvcid,
# host-iface permitted for tcp alone and not rdma
'host-iface': host_iface,
'subsysnqn': txt.get('nqn', defs.WELL_KNOWN_DISC_NQN).strip()
if conf.NvmeOptions().discovery_supp
else defs.WELL_KNOWN_DISC_NQN,
}

self._ip = iputil.get_ipaddress_obj(traddr, ipv4_mapped_convert=True)

if transport != 'tcp':
self._reachable = True
self._identified_cback()
return

self._reachable = False
connect_checker = gutil.TcpChecker(traddr, trsvcid, host_iface, self._tcp_connect_check_cback)

try:
connect_checker.connect()
except RuntimeError as err:
logging.error('Unable to verify connectivity: %s', err)
connect_checker.close()
connect_checker = None

self._connect_checker = connect_checker

def _tcp_connect_check_cback(self, connected):
if self._connect_checker is not None:
self._connect_checker.close()
self._connect_checker = None
self._reachable = connected
self._identified_cback()

def set_resolver(self, resolver):
'''Set the resolver object'''
self._resolver = resolver

def close(self):
'''Close this object and release all resources'''
if self._connect_checker is not None:
self._connect_checker.close()
self._connect_checker = None

if self._resolver is not None:
try:
self._resolver.Free()
dasbus.client.proxy.disconnect_proxy(self._resolver)
except (AttributeError, dasbus.error.DBusError) as ex:
logging.debug('Service.close() - Failed to Free() resolver. %s', ex)
self._resolver = None


# ******************************************************************************
class Avahi: # pylint: disable=too-many-instance-attributes
'''@brief Avahi Server proxy. Set up the D-Bus connection to the Avahi
Expand Down Expand Up @@ -182,16 +317,10 @@ def kill(self):

def info(self) -> dict:
'''@brief return debug info about this object'''
services = dict()
for service, obj in self._services.items():
interface, protocol, name, stype, domain = service
key = f'({socket.if_indextoname(interface)}, {Avahi.protos.get(protocol, "unknown")}, {name}.{domain}, {stype})'
services[key] = obj.get('data', {})

info = {
'avahi wake up timer': str(self._kick_avahi_tmr),
'service types': list(self._stypes),
'services': services,
'services': {service.key_str: service.info() for service in self._services.values()},
}

return info
Expand All @@ -217,7 +346,7 @@ def get_controllers(self) -> list:
[...]
]
'''
return [service['data'] for service in self._services.values() if len(service['data'])]
return [service.data for service in self._services.values() if service.reachable]

def config_stypes(self, stypes: list):
'''@brief Configure the service types that we want to discover.
Expand All @@ -234,18 +363,17 @@ def kick_start(self):
'''
self._kick_avahi_tmr.clear()

def _remove_service(self, service_to_rm: typing.Tuple[int, int, str, str, str]):
service = self._services.pop(service_to_rm)
if service is not None:
service.close()

def _disconnect(self):
logging.debug('Avahi._disconnect()')
for service in self._services.values():
resolver = service.pop('resolver', None)
if resolver is not None:
try:
resolver.Free()
dasbus.client.proxy.disconnect_proxy(resolver)
except (AttributeError, dasbus.error.DBusError) as ex:
logging.debug('Avahi._disconnect() - Failed to Free() resolver. %s', ex)
service.close()

self._services = dict()
self._services.clear()

for browser in self._service_browsers.values():
try:
Expand Down Expand Up @@ -296,15 +424,9 @@ def _configure_browsers(self):
logging.debug('Avahi._configure_browsers() - Failed to Free() browser. %s', ex)

# Find the cached services corresponding to stype_to_rm and remove them
services_to_rm = [service for service in self._services if service[3] == stype_to_rm]
for service in services_to_rm:
resolver = self._services.pop(service, {}).pop('resolver', None)
if resolver is not None:
try:
resolver.Free()
dasbus.client.proxy.disconnect_proxy(resolver)
except (AttributeError, dasbus.error.DBusError) as ex:
logging.debug('Avahi._configure_browsers() - Failed to Free() resolver. %s', ex)
services_to_rm = [service.key for service in self._services.values() if service.stype == stype_to_rm]
for service_to_rm in services_to_rm:
self._remove_service(service_to_rm)

for stype in stypes_to_add:
try:
Expand All @@ -329,31 +451,25 @@ def _service_discovered(
args: typing.Tuple[int, int, str, str, str, int],
*_user_data,
):
(interface, protocol, name, stype, domain, flags) = args
logging.debug(
'Avahi._service_discovered() - interface=%s (%s), protocol=%s, stype=%s, domain=%s, flags=%s %-14s name=%s',
interface,
socket.if_indextoname(interface),
Avahi.protocol_as_string(protocol),
stype,
domain,
flags,
'(' + Avahi.result_flags_as_string(flags) + '),',
name,
)
service = Service(args, self._change_cb)
logging.debug('Avahi._service_discovered() - %s', service)

service = (interface, protocol, name, stype, domain)
if service not in self._services:
if service.key not in self._services:
try:
obj_path = self._avahi.ServiceResolverNew(
interface, protocol, name, stype, domain, Avahi.PROTO_UNSPEC, Avahi.LOOKUP_USE_MULTICAST
service.interface,
service.protocol,
service.name,
service.stype,
service.domain,
Avahi.PROTO_UNSPEC,
Avahi.LOOKUP_USE_MULTICAST,
)
self._services[service] = {
'resolver': self._sysbus.get_proxy(Avahi.DBUS_NAME, obj_path),
'data': {},
}
service.set_resolver(self._sysbus.get_proxy(Avahi.DBUS_NAME, obj_path))
except dasbus.error.DBusError as ex:
logging.warning('Failed to create resolver: "%s", "%s", "%s". %s', interface, name, stype, ex)
logging.warning('Failed to create resolver - %s: %s', service, ex)

self._services[service.key] = service

def _service_removed(
self,
Expand All @@ -367,27 +483,14 @@ def _service_removed(
):
(interface, protocol, name, stype, domain, flags) = args
logging.debug(
'Avahi._service_removed() - interface=%s (%s), protocol=%s, stype=%s, domain=%s, flags=%s %-14s name=%s',
interface,
socket.if_indextoname(interface),
Avahi.protocol_as_string(protocol),
stype,
domain,
flags,
'(' + Avahi.result_flags_as_string(flags) + '),',
name,
'Avahi._service_removed() - %s',
fmt_service_str(interface, protocol, name, stype, domain, flags),
)

service = (interface, protocol, name, stype, domain)
resolver = self._services.pop(service, {}).pop('resolver', None)
if resolver is not None:
try:
resolver.Free()
dasbus.client.proxy.disconnect_proxy(resolver)
except (AttributeError, dasbus.error.DBusError) as ex:
logging.debug('Avahi._service_removed() - Failed to Free() resolver. %s', ex)

self._change_cb()
service_key = mk_service_key(interface, protocol, name, stype, domain)
self._remove_service(service_key)
if self._change_cb is not None:
self._change_cb()

def _service_identified( # pylint: disable=too-many-locals
self,
Expand All @@ -402,38 +505,21 @@ def _service_identified( # pylint: disable=too-many-locals
(interface, protocol, name, stype, domain, host, aprotocol, address, port, txt, flags) = args
txt = _txt2dict(txt)
logging.debug(
'Avahi._service_identified() - interface=%s (%s), protocol=%s, stype=%s, domain=%s, flags=%s %-14s name=%s, host=%s, aprotocol=%s, address=%s, port=%s, txt=%s',
interface,
socket.if_indextoname(interface),
Avahi.protocol_as_string(protocol),
stype,
domain,
flags,
'(' + Avahi.result_flags_as_string(flags) + '),',
name,
'Avahi._service_identified() - %s, host=%s, aprotocol=%s, port=%s, address=%s, txt=%s',
fmt_service_str(interface, protocol, name, stype, domain, flags),
host,
Avahi.protocol_as_string(aprotocol),
address,
port,
address,
txt,
)

service = (interface, protocol, name, stype, domain)
if service in self._services:
service_key = mk_service_key(interface, protocol, name, stype, domain)
service = self._services.get(service_key, None)
if service is not None:
transport = _proto2trans(txt.get('p'))
if transport is not None:
self._services[service]['data'] = {
'transport': transport,
'traddr': address.strip(),
'trsvcid': str(port).strip(),
# host-iface permitted for tcp alone and not rdma
'host-iface': socket.if_indextoname(interface).strip() if transport == 'tcp' else '',
'subsysnqn': txt.get('nqn', defs.WELL_KNOWN_DISC_NQN).strip()
if conf.NvmeOptions().discovery_supp
else defs.WELL_KNOWN_DISC_NQN,
}

self._change_cb()
service.set_identity(transport, address, port, txt)
else:
logging.error(
'Received invalid/undefined protocol in mDNS TXT field: address=%s, iface=%s, TXT=%s',
Expand All @@ -442,6 +528,8 @@ def _service_identified( # pylint: disable=too-many-locals
txt,
)

self._check_for_duplicate_ips()

def _failure_handler( # pylint: disable=no-self-use
self,
_connection,
Expand All @@ -456,3 +544,15 @@ def _failure_handler( # pylint: disable=no-self-use
if 'ServiceResolver' not in interface_name or 'TimeoutError' not in error:
# ServiceResolver may fire a timeout event after being Free'd(). This seems to be normal.
logging.error('Avahi._failure_handler() - name=%s, error=%s', interface_name, error)

def _check_for_duplicate_ips(self):
'''This is to identify misconfigured networks where the
same IP addresses are discovered on two or more interfaces.'''
ips = {}
for service in self._services.values():
if service.ip is not None:
ips.setdefault(service.ip.compressed, []).append(service.interface_name)

for ip, ifaces in ips.items():
if len(ifaces) > 1:
logging.error('IP address %s was found on multiple interfaces: %s', ip, ','.join(ifaces))
Loading

0 comments on commit 689df67

Please sign in to comment.