From 546e7367f4b3ecf1929ecd13d00716f7ada9dc0f Mon Sep 17 00:00:00 2001 From: Christopher Amin Date: Fri, 20 Jan 2023 16:06:41 +0100 Subject: [PATCH 1/3] 2.0.0 with new version of the AtlasStream class --- CHANGES.rst | 14 ++ docs/use.rst | 46 ++--- ripe/atlas/cousteau/stream.py | 328 +++++++++++++++++---------------- ripe/atlas/cousteau/version.py | 4 +- setup.py | 3 +- tests/test_real_server.py | 199 +++++++++++--------- tox.ini | 4 +- 7 files changed, 331 insertions(+), 267 deletions(-) diff --git a/CHANGES.rst b/CHANGES.rst index fd51743..4e08a4d 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -1,6 +1,20 @@ Releases History ================ +2.0.0 (release 2023-01-20) +-------------------------- +Changes: +~~~~~~~~ +- The AtlasStream class has been updated to use the new WebSocket interface +- AtlasStream objects can now be iterated as an alternative to using callbacks +- There used to be both start_stream() and subscribe() methods which did the same thing, + except that start_stream() had extra validation. This extra validation has been + added to subscribe(), and start_stream() is now an alias to it. +- bind_channel() was renamed to bind(), although it is maintained as an alias, and + there is a corresponding unbind() to remove a callback +- Deprecated event aliases were dropped, you have to use full event names like + "atlas_result" and "atlas_metadata" when binding + 1.5.1 (release 2022-05-23) -------------------------- Bug Fixes: diff --git a/docs/use.rst b/docs/use.rst index 796dcba..f95ec29 100644 --- a/docs/use.rst +++ b/docs/use.rst @@ -271,14 +271,14 @@ Example: Streaming API ------------- -Atlas supports getting results and other events through a stream to get them close to real time. The stream is implemented using WebSockets and the `Socket.IO`_ protocol. +Atlas supports getting results and other events through a WebSocket stream to get them close to real time. The AtlasStream class provides an interface to this stream, and supports +both callback-based and an iterator-based access. -Measurement Results +Measurement results ^^^^^^^^^^^^^^^^^^^ -Besides fetching results from main API it is possible to get results though streaming API. You have to use AtlasStream object and bind to "result" channel. You can start the a result stream by specifying at least the measurement ID in the stream parameters. -More details on the available parameters of the stream can be found on the `streaming documentation`_. + You have to create an AtlasStream object and subscribe to the "result" stream type. More details on the available parameters of the stream can be found on the `streaming documentation`_. -Example: +Example using the callback-interface: .. code:: python @@ -295,13 +295,13 @@ Example: atlas_stream.connect() # Bind function we want to run with every result message received - atlas_stream.bind_channel("atlas_result", on_result_response) + atlas_stream.bind("atlas_result", on_result_response) # Subscribe to new stream for 1001 measurement results stream_parameters = {"msm": 1001} - atlas_stream.start_stream(stream_type="result", **stream_parameters) + atlas_stream.subscribe(stream_type="result", **stream_parameters) - # Timeout all subscriptions after 5 secs. Leave seconds empty for no timeout. + # Process incoming events for 5 seconds, calling the callback defined above. # Make sure you have this line after you start *all* your streams atlas_stream.timeout(seconds=5) @@ -311,33 +311,27 @@ Example: Connection Events ^^^^^^^^^^^^^^^^^ -Besides results, streaming API supports also probe's connect/disconnect events. Again you have to use AtlasStream object but this time you have to bind to "probe" channel. -More info about additional parameters can be found on the `streaming documentation`_. +Besides results, the streaming API also supports probe connect/disconnect events. +Again you have to create an AtlasStream object, but this time you subscribe to the +"probestatus" stream type. More info about additional parameters can be found on +the `streaming documentation`_. -Example: +Example using the iterator-interface: .. code:: python from ripe.atlas.cousteau import AtlasStream - def on_result_response(*args): - """ - Function that will be called every time we receive a new event. - Args is a tuple, so you should use args[0] to access the real event. - """ - print(args[0]) - atlas_stream = AtlasStream() atlas_stream.connect() - # Probe's connection status results - atlas_stream.bind_channel("atlas_probe", on_result_response) stream_parameters = {"enrichProbes": True} - atlas_stream.start_stream(stream_type="probestatus", **stream_parameters) + atlas_stream.subscribe(stream_type="probestatus", **stream_parameters) + + # Iterate over the incoming results for 5 seconds + for event_name, payload in atlas_stream.iter(seconds=5): + print(event_name, payload) - # Timeout all subscriptions after 5 secs. Leave seconds empty for no timeout. - # Make sure you have this line after you start *all* your streams - atlas_stream.timeout(seconds=5) # Shut down everything atlas_stream.disconnect() @@ -346,8 +340,8 @@ Example: .. _streaming documentation: https://atlas.ripe.net/docs/result-streaming/ -Using Sagan Library -------------------- +Using the Sagan Library +----------------------- In case you need to do further processing with any of the results you can use our official RIPE Atlas results parsing library called `Sagan`_. An example of how to combine two libraries is the below: diff --git a/ripe/atlas/cousteau/stream.py b/ripe/atlas/cousteau/stream.py index 6e710c0..dd6317e 100644 --- a/ripe/atlas/cousteau/stream.py +++ b/ripe/atlas/cousteau/stream.py @@ -1,4 +1,4 @@ -# Copyright (c) 2016 RIPE NCC +# Copyright (c) 2023 RIPE NCC # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -12,77 +12,29 @@ # # You should have received a copy of the GNU General Public License # along with this program. If not, see . -import warnings + +from urllib.parse import urlparse import os +from typing import Dict, Callable, List, Tuple, Any, Optional, Iterator +from typing_extensions import TypeAlias +import json import logging -from functools import partial +import re +import socket +import time +import warnings +import select import requests -import socketio +import websocket from .version import __version__ - -from logging import NullHandler - LOG = logging.getLogger("atlas-stream") -LOG.addHandler(NullHandler()) - -client = socketio.Client() - - -class AtlasNamespace: - """ - socket.io event handlers for handling automatic resubscription and populating - the atlas-stream debug log. - - These are overidden by whatever the user supplies to AtlasStream.bind_channel(). - """ - - SUBSCRIPTIONS = {} - - def on_connect(self, *args): - LOG.debug("Connected to RIPE Atlas Stream") - for subscription in self.SUBSCRIPTIONS.values(): - LOG.debug("(Re)subscribing to {}".format(subscription)) - self.emit("atlas_subscribe", subscription) - - def on_disconnect(self, *args): - LOG.debug("Disconnected from RIPE Atlas Stream") - - def trigger_event(self, event, *args): - handler_name = "on_" + event - if hasattr(self, handler_name): - return getattr(self, handler_name)(*args) - else: - return self.on_unbound_event(event, *args) - - def on_unbound_event(self, event, *args): - LOG.debug(f"Received '{event}' event but no handler is set") - - def on_atlas_subscribed(self, params): - """ - Keep track of subscriptions so that they can be resumed on reconnect. - """ - LOG.debug("Subscribed to params: {}".format(params)) - self.SUBSCRIPTIONS[str(params)] = params - - def on_atlas_unsubscribed(self, params): - """ - Remove this subscription from the state so we don't try to resubscribe. - """ - LOG.debug("Unsubscribed from params: {}".format(params)) - del self.SUBSCRIPTIONS[str(params)] - - def on_atlas_error(self, *args): - LOG.error("Got an error from stream server: {}".format(args[0])) - - -class AtlasClientNamespace(AtlasNamespace, socketio.ClientNamespace): - pass +LOG.addHandler(logging.NullHandler()) -class AtlasStream(object): +class AtlasStream: # For the current list of events see: # https://atlas.ripe.net/docs/result-streaming/ @@ -93,7 +45,15 @@ class AtlasStream(object): EVENT_NAME_PROBESTATUS = "atlas_probestatus" EVENT_NAME_SUBSCRIBED = "atlas_subscribed" EVENT_NAME_UNSUBSCRIBED = "atlas_unsubscribed" - EVENT_NAME_REPLAY_FINISHED = "atlas_replay_finished" + + VALID_EVENTS = ( + EVENT_NAME_ERROR, + EVENT_NAME_RESULTS, + EVENT_NAME_METADATA, + EVENT_NAME_PROBESTATUS, + EVENT_NAME_SUBSCRIBED, + EVENT_NAME_UNSUBSCRIBED, + ) # Events that you can emit EVENT_NAME_SUBSCRIBE = "atlas_subscribe" @@ -101,119 +61,179 @@ class AtlasStream(object): # atlas_subscribe stream types STREAM_TYPE_RESULT = "result" - STREAM_TYPE_PROBE = "probe" + STREAM_TYPE_PROBESTATUS = "probestatus" STREAM_TYPE_METADATA = "metadata" - # These were deprecated long ago but were still documented before 1.5.0. - # There isn't much overhead to having them so we can keep them for a while longer. - DEPRECATED_CHANNELS = { - "result": EVENT_NAME_RESULTS, - "probe": EVENT_NAME_PROBESTATUS, - "error": EVENT_NAME_ERROR, - } + VALID_STREAM_TYPES = ( + STREAM_TYPE_RESULT, + STREAM_TYPE_PROBESTATUS, + STREAM_TYPE_METADATA, + ) - Client = socketio.Client - Namespace = AtlasClientNamespace + StreamParams: TypeAlias = Dict[str, Any] def __init__( self, - base_url="https://atlas-stream.ripe.net", - path="/stream/socket.io/", - proxies=None, - headers=None, - transport="websocket", - ): + base_url: str = "https://atlas-stream.ripe.net", + path: str = "/stream/", + headers: Optional[Dict[str, str]] = None, + proxies: Optional[Dict[str, str]] = None, + transport: str = "websocket", + ) -> None: """Initialize stream""" - self.base_url = base_url - self.path = path + base_url = re.sub("^http", "ws", base_url) + path = re.sub("socket.io/?$", "", path) + self.url = base_url.rstrip("/") + "/" + path.lstrip("/") self.session = requests.Session() - self.socketIO = self.Client(http_session=self.session) - self.namespace = self.Namespace() - self.socketIO.register_namespace(self.namespace) - self.transport = transport + if transport != "websocket": + warnings.warn( + "Ignoring AtlasStream transport other than 'websocket'", + DeprecationWarning, + ) - proxies = proxies or {} headers = headers or {} - - if not headers or not headers.get("User-Agent", None): + if not headers.get("User-Agent", None): user_agent = "RIPE ATLAS Cousteau v{0}".format(__version__) headers["User-Agent"] = user_agent - # Force polling if http_proxy or https_proxy point to a SOCKS URL - for scheme in "http", "https": - proxy = proxies.get(scheme) - if not proxy: - proxy = os.environ.get(f"{scheme}_proxy") - if proxy and proxy.startswith("socks"): - warnings.warn( - "SOCKS proxies do not currently work with the websocket transport, forcing polling" + self.headers = headers + self.proxies = proxies or {} + + self.callbacks: Dict[str, Callable] = {} + self.subscriptions: List[Dict] = [] + + self.ws: Optional[websocket.WebSocket] = None + + def _get_proxy_options(self): + """ + Get websocket-client proxy options from requests-style self.proxies dict or + http(x)_proxy env variables if present. + """ + scheme = "https" if self.url.startswith("wss:") else "http" + + proxy_url = self.proxies.get(scheme) + if not proxy_url: + for key, value in os.environ.items(): + if key.lower() == f"{scheme}_proxy": + proxy_url = value + break + + if not proxy_url: + return {} + + parsed = urlparse(proxy_url) + return { + "proxy_type": parsed.scheme, + "http_proxy_host": parsed.hostname, + "http_proxy_port": parsed.port, + } + + def connect(self) -> None: + while self.ws is None: + try: + self.ws = websocket.create_connection( + self.url, header=self.headers, **self._get_proxy_options() ) - self.transport = "polling" - break + except socket.error as exc: + LOG.debug(f"{exc} while connecting to RIPE Atlas Stream") + time.sleep(1) + continue + msg = "Connected to RIPE Atlas Stream" + LOG.debug(msg) + for subscription in self.subscriptions: + self.send(self.ws, self.EVENT_NAME_SUBSCRIBE, subscription) + + def disconnect(self) -> None: + """Removes the channel bindings and shuts down the connection.""" + if self.ws is not None: + self.ws.close() + self.ws = None + self.callbacks = {} - self.session.headers.update(headers) - self.session.proxies.update(proxies) + def bind(self, channel: str, callback: Callable) -> None: + """Bind given channel with the given callback""" + if channel not in self.VALID_EVENTS: + raise ValueError("Invalid event channel") + self.callbacks[channel] = callback - def connect(self): - """Initiate the channel we want to start streams from.""" + bind_channel = bind - return self.socketIO.connect( - self.base_url, socketio_path=self.path, transports=[self.transport] - ) + def unbind(self, channel: str): + self.callbacks.pop(channel, None) - def disconnect(self): - """Removes the channel bindings and shuts down the connection.""" - self.socketIO.disconnect() + def subscribe(self, stream_type: str, **parameters: Any) -> None: + """Requests new stream for given type and parameters""" + if stream_type not in self.VALID_STREAM_TYPES: + raise ValueError("You need to set a valid stream type") + parameters = dict(parameters, stream_type=stream_type) + self.subscriptions.append(parameters) + if self.ws: + self.send(self.ws, self.EVENT_NAME_SUBSCRIBE, parameters) - def unpack_results(self, callback, data): - if isinstance(data, list): - for result in data: - callback(result) - else: - callback(data) + start_stream = subscribe - def bind_channel(self, channel, callback): - """Bind given channel with the given callback""" - channel = self.DEPRECATED_CHANNELS.get(channel, channel) - - if channel == self.EVENT_NAME_RESULTS: - self.socketIO.on(channel, partial(self.unpack_results, callback)) - else: - self.socketIO.on(channel, callback) - - def start_stream(self, stream_type, **stream_parameters): - """Starts new stream for given type with given parameters""" - if stream_type: - self.subscribe(stream_type, **stream_parameters) - else: - self.handle_error("You need to set a stream type") - - def _get_stream_params(self, stream_type, parameters): - parameters["stream_type"] = stream_type - if stream_type == self.STREAM_TYPE_RESULT and "buffering" not in parameters: - parameters["buffering"] = True - return parameters - - def subscribe(self, stream_type, **parameters): - """Subscribe to stream with give parameters.""" - self.socketIO.emit( - self.EVENT_NAME_SUBSCRIBE, self._get_stream_params(stream_type, parameters) - ) - - def unsubscribe(self, stream_type, **parameters): + def unsubscribe(self, stream_type: str, **parameters: Any) -> None: """Unsubscribe from a previous subscription""" - self.socketIO.emit( - self.EVENT_NAME_UNSUBSCRIBE, - self._get_stream_params(stream_type, parameters), - ) + parameters = dict(parameters, stream_type=stream_type) + if parameters not in self.subscriptions: + return + if self.ws: + self.send(self.ws, self.EVENT_NAME_UNSUBSCRIBE, parameters) + self.subscriptions.remove(parameters) + + def send(self, ws: websocket.WebSocket, msg_type: str, payload: Any) -> None: + """ + Send a message to the server. + """ + ws.send(json.dumps([msg_type, payload])) + + def recv(self, ws: websocket.WebSocket) -> Tuple[str, Any]: + """ + Receive a single message from the server. + """ + msg = ws.recv() + event_name, payload = json.loads(msg) + return event_name, payload - def timeout(self, seconds=None): + def iter(self, seconds: Optional[float] = None) -> Iterator[Tuple[str, Any]]: + """ + Yield incoming events for `seconds` if specified, or else forever. """ - Times out all streams after n seconds or wait forever if seconds is - None + t0 = time.perf_counter() + while True: + if seconds is not None: + elapsed = time.perf_counter() - t0 + remaining = seconds - elapsed + if remaining < 0: + break + rlist, _, _ = select.select([self.ws], [], [], remaining) + if not rlist: + break + try: + yield self.recv(self.ws) + except Exception as exc: + LOG.error(f"{exc} while reading from RIPE Atlas stream") + if isinstance(exc, websocket.WebSocketException): + self.connect() + continue + else: + break + + def timeout(self, seconds: Optional[float] = None) -> None: + """ + Process events for `seconds` if specified, or else forever, calling + a bound callback for each event if one is defined. + """ + for event_name, payload in self.iter(seconds=seconds): + callback = self.callbacks.get(event_name) + if callback: + callback(payload) + + def __iter__(self): + """ + Yield incoming events. + + To stop iterating after a given timeout, see the `iter()` method. """ - if seconds is None: - self.socketIO.wait() - else: - self.socketIO.sleep(seconds) + return self.iter() diff --git a/ripe/atlas/cousteau/version.py b/ripe/atlas/cousteau/version.py index 025c5a8..980118e 100644 --- a/ripe/atlas/cousteau/version.py +++ b/ripe/atlas/cousteau/version.py @@ -1,4 +1,4 @@ -# Copyright (c) 2016 RIPE NCC +# Copyright (c) 2023 RIPE NCC # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -13,4 +13,4 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -__version__ = "1.5.1" +__version__ = "2.0.0" diff --git a/setup.py b/setup.py index 92aaab6..8251b67 100644 --- a/setup.py +++ b/setup.py @@ -9,7 +9,8 @@ install_requires = [ "python-dateutil", "requests>=2.7.0", - "python-socketio[client]<5", + "websocket-client~=1.3.1", + "typing-extensions", ] # Get proper long description for package diff --git a/tests/test_real_server.py b/tests/test_real_server.py index e984dde..f9499db 100644 --- a/tests/test_real_server.py +++ b/tests/test_real_server.py @@ -1,4 +1,4 @@ -# Copyright (c) 2015 RIPE NCC +# Copyright (c) 2023 RIPE NCC # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -22,60 +22,90 @@ from ripe.atlas.cousteau import ( - AtlasSource, AtlasChangeSource, - AtlasRequest, AtlasCreateRequest, AtlasChangeRequest, - Ping, Dns, AtlasStopRequest, AtlasResultsRequest, - ProbeRequest, MeasurementRequest, Probe, Measurement, - AtlasStream, Ntp, Sslcert, Http, Traceroute, AnchorRequest + AtlasSource, + AtlasChangeSource, + AtlasRequest, + AtlasCreateRequest, + AtlasChangeRequest, + Ping, + Dns, + AtlasStopRequest, + AtlasResultsRequest, + ProbeRequest, + MeasurementRequest, + Probe, + Measurement, + AtlasStream, + Ntp, + Sslcert, + Http, + Traceroute, + AnchorRequest, ) class TestRealServer(unittest.TestCase): def setUp(self): - self.server = os.environ.get('ATLAS_SERVER', "") - self.create_key = os.environ.get('CREATE_KEY', "") - self.change_key = os.environ.get('CHANGE_KEY', "") - self.delete_key = os.environ.get('DELETE_KEY', "") + self.server = os.environ.get("ATLAS_SERVER", "") + self.create_key = os.environ.get("CREATE_KEY", "") + self.change_key = os.environ.get("CHANGE_KEY", "") + self.delete_key = os.environ.get("DELETE_KEY", "") self.delete_msm = None def test_create_delete_request(self): """Unittest for Atlas create and delete request""" - if self.server == "": + if not self.server or not self.create_key or not self.delete_key: pytest.skip("No ATLAS_SERVER defined") source = AtlasSource(**{"type": "area", "value": "WW", "requested": 38}) - ping = Ping(**{ - "target": "www.ripe.net", - "af": 4, - "description": "Cousteau testing", - "prefer_anchors": True - }) - traceroute = Traceroute(**{ - "target": "www.google.fr", - "af": 4, "protocol": "UDP", - "description": "Cousteau testing", - "dont_fragment": True - }) - dns = Dns(**{ - "target": "k.root-servers.net", "af": 4, - "description": "Cousteau testing", "query_type": "SOA", - "query_class": "IN", "query_argument": "nl", "retry": 6 - }) - ntp = Ntp(**{ - "target": "www.ripe.net", - "af": 4, - "description": "Cousteau testing", - "timeout": 1000 - }) - ssl = Sslcert(**{ - "target": "www.ripe.net", - "af": 4, - "description": "Cousteau testing", - }) - http = Http(**{ - "target": "www.ripe.net", - "af": 4, - "description": "Cousteau testing", - }) + ping = Ping( + **{ + "target": "www.ripe.net", + "af": 4, + "description": "Cousteau testing", + "prefer_anchors": True, + } + ) + traceroute = Traceroute( + **{ + "target": "www.google.fr", + "af": 4, + "protocol": "UDP", + "description": "Cousteau testing", + "dont_fragment": True, + } + ) + dns = Dns( + **{ + "target": "k.root-servers.net", + "af": 4, + "description": "Cousteau testing", + "query_type": "SOA", + "query_class": "IN", + "query_argument": "nl", + "retry": 6, + } + ) + ntp = Ntp( + **{ + "target": "www.ripe.net", + "af": 4, "description": "Cousteau testing", + "timeout": 1000, + } + ) + ssl = Sslcert( + **{ + "target": "www.ripe.net", + "af": 4, + "description": "Cousteau testing", + } + ) + http = Http( + **{ + "target": "www.ripe.net", + "af": 4, + "description": "Cousteau testing", + } + ) stop = datetime.utcnow() + timedelta(minutes=220) request = AtlasCreateRequest( **{ @@ -84,44 +114,49 @@ def test_create_delete_request(self): "key": self.create_key, "server": self.server, "measurements": [ping, traceroute, dns, ntp, ssl, http], - "sources": [source] + "sources": [source], } ) - result = namedtuple('Result', 'success response') + result = namedtuple("Result", "success response") (result.success, result.response) = request.create() print(result.response) self.assertTrue(result.success) self.delete_msm = result.response["measurements"][0] self.assertTrue(result.success) - kwargs = {"verify": False, "msm_id": self.delete_msm, "key": self.delete_key, "server": self.server} + kwargs = { + "verify": False, + "msm_id": self.delete_msm, + "key": self.delete_key, + "server": self.server, + } request = AtlasStopRequest(**kwargs) - result = namedtuple('Result', 'success response') + result = namedtuple("Result", "success response") (result.success, result.response) = request.create() print(result.response) self.assertTrue(result.success) def test_change_request(self): """Unittest for Atlas change request""" - if self.server == "": + if not self.server or not self.change_key: pytest.skip("No ATLAS_SERVER defined") - remove = AtlasChangeSource(**{ - "value": "6001", "requested": 1, "action": "remove", "type": "probes" - }) - add = AtlasChangeSource(**{ - "value": "6002", "requested": 1, "action": "add", "type": "probes" - }) + remove = AtlasChangeSource( + **{"value": "6001", "requested": 1, "action": "remove", "type": "probes"} + ) + add = AtlasChangeSource( + **{"value": "6002", "requested": 1, "action": "add", "type": "probes"} + ) request = AtlasChangeRequest( **{ "key": self.change_key, "verify": False, "msm_id": 1000032, "server": self.server, - "sources": [add, remove] + "sources": [add, remove], } ) - result = namedtuple('Result', 'success response') + result = namedtuple("Result", "success response") (result.success, result.response) = request.create() print(result.response) self.assertTrue(result.success) @@ -136,10 +171,10 @@ def test_result_request(self): "start": datetime(2011, 11, 21), "stop": datetime(2011, 11, 22), "verify": False, - "probe_ids": [743, 630] + "probe_ids": [743, 630], } - result = namedtuple('Result', 'success response') + result = namedtuple("Result", "success response") (result.success, result.response) = AtlasResultsRequest(**kwargs).create() print(result.success, result.response) self.assertTrue(result.response) @@ -192,7 +227,10 @@ def test_measurement_repr_request(self): Measurement(id=1000032, server=self.server, verify=False) def test_stream_results(self): - """Unittest for Atlas results request.""" + """ + Test receiving at least one MSM 1001 result in 5 seconds, using the callback + interface. + """ if self.server == "": pytest.skip("No ATLAS_SERVER defined") @@ -207,50 +245,45 @@ def on_result_response(*args): atlas_stream = AtlasStream() atlas_stream.connect() - channel = "result" + channel = "atlas_result" atlas_stream.bind_channel(channel, on_result_response) stream_parameters = {"msm": 1001} - atlas_stream.start_stream(stream_type="result", **stream_parameters) + atlas_stream.start_stream("result", **stream_parameters) + # Wait for 5 seconds of results (should always be something there for #1001) atlas_stream.timeout(seconds=5) atlas_stream.disconnect() self.assertNotEqual(results, []) def test_stream_probe(self): - """Unittest for Atlas probe connections request.""" + """ + Test receiving at least one probestatus event in 30 seconds, using + the iterator interface. + """ if self.server == "": pytest.skip("No ATLAS_SERVER defined") - results = [] - - def on_result_response(*args): - """ - Function that will be called every time we receive a new event. - Args is a tuple, so you should use args[0] to access the real message. - """ - results.append(args[0]) - atlas_stream = AtlasStream() atlas_stream.connect() channel = "atlas_probestatus" - atlas_stream.bind_channel(channel, on_result_response) stream_parameters = {"enrichProbes": True} - atlas_stream.start_stream(stream_type="probestatus", **stream_parameters) - atlas_stream.timeout(seconds=30) + atlas_stream.start_stream("probestatus", **stream_parameters) + # Wait for 2 messages (atlas_subscribed + atlas_probestatus) + found = False + for event_name, payload in atlas_stream.iter(30): + if event_name == channel: + found = True + break + if not found: + self.fail("No events") atlas_stream.disconnect() - self.assertNotEqual(results, []) def test_get_request(self): """Unittest for Atlas get request""" if self.server == "": pytest.skip("No ATLAS_SERVER defined") - request = AtlasRequest( - **{ - "verify": False, - "url_path": "/api/v2/anchors" - } - ) - result = namedtuple('Result', 'success response') + request = AtlasRequest(**{"verify": False, "url_path": "/api/v2/anchors"}) + result = namedtuple("Result", "success response") (result.success, result.response) = request.get() print(result.success, result.response) self.assertTrue(result.response["results"]) diff --git a/tox.ini b/tox.ini index 41b9517..f2abfde 100644 --- a/tox.ini +++ b/tox.ini @@ -5,4 +5,6 @@ deps= jsonschema commands= # flake8 --max-line-length=88 setup.py ripe/atlas/cousteau/ scripts/ tests/ - pytest -r a {posargs} + pytest {posargs} +setenv = + ATLAS_SERVER = atlas.ripe.net From 81dae1003f34c1c1f29acf78340f2d1a2227e17b Mon Sep 17 00:00:00 2001 From: Christopher Amin Date: Fri, 20 Jan 2023 16:10:40 +0100 Subject: [PATCH 2/3] Doc tweaks --- docs/use.rst | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/use.rst b/docs/use.rst index f95ec29..c7c1743 100644 --- a/docs/use.rst +++ b/docs/use.rst @@ -272,9 +272,9 @@ Example: Streaming API ------------- Atlas supports getting results and other events through a WebSocket stream to get them close to real time. The AtlasStream class provides an interface to this stream, and supports -both callback-based and an iterator-based access. +both callback-based and iterator-based access. -Measurement results +Measurement Results ^^^^^^^^^^^^^^^^^^^ You have to create an AtlasStream object and subscribe to the "result" stream type. More details on the available parameters of the stream can be found on the `streaming documentation`_. From c48fc904ca778f6e4ade850d80c9c2190efd7d6f Mon Sep 17 00:00:00 2001 From: Christopher Amin Date: Fri, 20 Jan 2023 16:11:37 +0100 Subject: [PATCH 3/3] Remove 3.6 testing --- .github/workflows/python-package.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 182f5f2..1a07920 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -16,7 +16,7 @@ jobs: strategy: fail-fast: false matrix: - python-version: ["3.6", "3.7", "3.8", "3.9", "3.10"] + python-version: ["3.7", "3.8", "3.9", "3.10"] steps: - uses: actions/checkout@v2