From 4177d6dee687b8cd2040ba44c53806f1dfae2e6c Mon Sep 17 00:00:00 2001 From: dmbokhan Date: Sun, 27 Oct 2024 21:40:32 +0300 Subject: [PATCH] Add Address Space Hierarchy endpoint --- prsw/ripe_stat.py | 6 + prsw/stat/address_space_hierarchy.py | 130 ++++++++++++++++ .../unit/stat/test_address_space_hierarchy.py | 147 ++++++++++++++++++ 3 files changed, 283 insertions(+) create mode 100644 prsw/stat/address_space_hierarchy.py create mode 100644 tests/unit/stat/test_address_space_hierarchy.py diff --git a/prsw/ripe_stat.py b/prsw/ripe_stat.py index 1cc80d0..673c9b4 100644 --- a/prsw/ripe_stat.py +++ b/prsw/ripe_stat.py @@ -5,6 +5,7 @@ from .api import get from .stat.abuse_contact_finder import AbuseContactFinder +from .stat.address_space_hierarchy import AddressSpaceHierarchy from .stat.announced_prefixes import AnnouncedPrefixes from .stat.asn_neighbours import ASNNeighbours from .stat.looking_glass import LookingGlass @@ -87,6 +88,11 @@ def _get(self, path, params=None): def abuse_contact_finder(self) -> Type[AbuseContactFinder]: """Lazy alias to :class:`.stat.AbuseContactFinder`.""" return partial(AbuseContactFinder, self) + + @property + def address_space_hierarchy(self) -> Type[AddressSpaceHierarchy]: + """Lazy alias to :class:`.stat.AddressSpaceHierarchy`.""" + return partial(AddressSpaceHierarchy, self) @property def announced_prefixes(self) -> Type[AnnouncedPrefixes]: diff --git a/prsw/stat/address_space_hierarchy.py b/prsw/stat/address_space_hierarchy.py new file mode 100644 index 0000000..f7a888e --- /dev/null +++ b/prsw/stat/address_space_hierarchy.py @@ -0,0 +1,130 @@ +"""Provides the Address Space Hierarchy endpoint.""" +from collections import namedtuple + +import ipaddress +from datetime import datetime + +from prsw.validators import Validators + + +class AddressSpaceHierarchy: + """ + This data call returns address space objects (inetnum or inet6num) + from the RIPE Database related to the queried resource. + Less- and more-specific results are first-level only, further levels + would have to be retrieved iteratively. + + Reference: `` + + ========================== =============================================================== + Property Description + ========================== =============================================================== + ``resource`` The ASN this query is based on. + ``exact_inetnums`` A list containing exact matches for the queried resource + ``more_specific_inetnums`` A list containing first level more specific blocks underneath the queried resource. Some of these may be aggregated according to the 'aggr_levels_below' query parameter. + ``less_specific`` A list containing first level less specific (parent) blocks above the queried resource. + ``rir`` Name of the RIR where the results are from. + ``query_time`` Holds the time the query was based on + ========================== =============================================================== + + .. code-block:: python + + import prsw + + ripe = prsw.RIPEstat() + result = ripe.address_space_hierarchy('193.0.0.0/21') + + print(result) + """ + + PATH = "/address-space-hierarchy" + VERSION = "1.3" + + def __init__(self, RIPEstat, resource): + """ + Initialize and request AddressSpaceHierarchy. + + :param resource: The prefix or IP range the address space hierarchy should be returned for. + + """ + + if Validators._validate_ip_network(resource): + resource = ipaddress.ip_network(resource, strict=False) + else: + raise ValueError("prefix must be valid IP network") + + params = { + "preferred_version": AddressSpaceHierarchy.VERSION, + "resource": str(resource) + } + + self._api = RIPEstat._get(AddressSpaceHierarchy.PATH, params) + + @property + def resource(self): + """The prefix this query is based on.""" + return ipaddress.ip_network(self._api.data["resource"]) + + @property + def exact_inetnums(self): + """ + Returns a list containing exact matches for the queried resource + + .. code-block:: python + + import prsw + + ripe = prsw.RIPEstat() + result = ripe.address_space_hierarchy('193.0.0.0/21') + + for inetnum in result.exact_inetnums: + print(inetnum) + + """ + return self._api.data["exact"] + + @property + def more_specific_inetnums(self): + """ + Returns a list containing first level more specific blocks underneath + the queried resource. Some of these may be aggregated according to + the 'aggr_levels_below' query parameter. + + .. code-block:: python + + finder = ripe.address_space_hierarchy('193.0.0.0/21') + + for inetnum in finder.more_specific_inetnums: + print(inetnum) + + """ + return self._api.data["more_specific"] + + @property + def less_specific_inetnums(self): + """ + Returns a list containing first level less specific (parent) blocks + above the queried resource. + + .. code-block:: python + + finder = ripe.address_space_hierarchy('193.0.0.0/21') + + for inetnum in finder.less_specific_inetnums: + print(inetnum) + + """ + return self._api.data["less_specific"] + + @property + def rir(self): + """Name of the RIR where the results are from.""" + return self._api.data["rir"] + + @property + def query_time(self): + """**datetime** of used by query.""" + return datetime.fromisoformat( + self._api.data["query_time"] + ) + diff --git a/tests/unit/stat/test_address_space_hierarchy.py b/tests/unit/stat/test_address_space_hierarchy.py new file mode 100644 index 0000000..8f3560d --- /dev/null +++ b/tests/unit/stat/test_address_space_hierarchy.py @@ -0,0 +1,147 @@ +"""Test prsw.stat.address_space_hierarchy.""" + +import pytest +import ipaddress +from datetime import datetime +from typing import Iterable +from unittest.mock import patch + +from .. import UnitTest + +from prsw.api import API_URL, Output +from prsw.stat.address_space_hierarchy import AddressSpaceHierarchy + + +class TestAddressSpaceHierarchy(UnitTest): + + RESPONSE = { + "messages": [], + "see_also": [], + "version": "1.3", + "data_call_name": "address-space-hierarchy", + "data_call_status": "supported", + "cached": False, + "data": { + "rir": "ripe", + "resource": "193.0.0.0/21", + "exact": [ + { + "inetnum": "193.0.0.0 - 193.0.7.255", + "netname": "RIPE-NCC", + "descr": "RIPE Network Coordination Centre, Amsterdam, Netherlands", + "org": "ORG-RIEN1-RIPE", + "remarks": "Used for RIPE NCC infrastructure.", + "country": "NL", + "admin-c": "BRD-RIPE", + "tech-c": "OPS4-RIPE", + "status": "ASSIGNED PA", + "mnt-by": "RIPE-NCC-MNT", + "created": "2003-03-17T12:15:57Z", + "last-modified": "2017-12-04T14:42:31Z", + "source": "RIPE" + } + ], + "less_specific": [ + { + "inetnum": "193.0.0.0 - 193.0.23.255", + "netname": "NL-RIPENCC-OPS-990305", + "country": "NL", + "org": "ORG-RIEN1-RIPE", + "admin-c": "BRD-RIPE", + "tech-c": "OPS4-RIPE", + "status": "ALLOCATED PA", + "remarks": "Amsterdam, Netherlands", + "mnt-by": "RIPE-NCC-HM-MNT, RIPE-NCC-MNT", + "mnt-routes": "RIPE-NCC-MNT, RIPE-GII-MNT { 193.0.8.0/23 }", + "created": "2012-03-09T15:03:38Z", + "last-modified": "2024-07-24T15:35:02Z", + "source": "RIPE" + } + ], + "more_specific": [], + "query_time": "2024-10-10T14:42:39", + "parameters": { + "resource": "193.0.0.0/21", + "cache": None + } + }, + "query_id": "20241010144239-e4fea150-ac7e-4ad4-94e3-1207a9c00f73", + "process_time": 60, + "server_id": "app127", + "build_version": "live.2024.9.25.217", + "status": "ok", + "status_code": 200, + "time": "2024-10-10T14:42:39.989690" + } + + + def setup_method(self): + url = f"{API_URL}{AddressSpaceHierarchy.PATH}data.json?resource=193.0.0.0/21" + + self.api_response = Output(url, **TestAddressSpaceHierarchy.RESPONSE) + self.params = { + "preferred_version": AddressSpaceHierarchy.VERSION, + "resource": "193.0.0.0/21", + } + + return super().setup_method() + + @pytest.fixture(scope="session") + def mock_get(self): + self.setup_method() + + with patch.object(self.ripestat, "_get") as mocked_get: + mocked_get.return_value = self.api_response + + yield self + + mocked_get.assert_called_with(AddressSpaceHierarchy.PATH, self.params) + + def test__init__valid_resource(self, mock_get): + response = AddressSpaceHierarchy( + mock_get.ripestat, resource=self.params["resource"] + ) + + assert isinstance(response, AddressSpaceHierarchy) + + def test_resource(self, mock_get): + response = AddressSpaceHierarchy( + mock_get.ripestat, + resource=self.params["resource"], + ) + + assert isinstance(response.resource, ipaddress.IPv4Network) + assert response.resource == ipaddress.ip_network(self.params["resource"]) + + def test_exact_inetnums(self, mock_get): + response = AddressSpaceHierarchy( + mock_get.ripestat, + resource=self.params["resource"] + ) + + assert isinstance(response.exact_inetnums, Iterable) + + def test_more_specific_inetnums(self, mock_get): + response = AddressSpaceHierarchy( + mock_get.ripestat, + resource=self.params["resource"] + ) + + assert isinstance(response.more_specific_inetnums, Iterable) + + def test_less_specific_inetnums(self, mock_get): + response = AddressSpaceHierarchy( + mock_get.ripestat, + resource=self.params["resource"] + ) + + assert isinstance(response.less_specific_inetnums, Iterable) + + def test_query_time(self, mock_get): + response = AddressSpaceHierarchy(mock_get.ripestat, "193.0.0.0/21") + assert isinstance(response.query_time, datetime) + + query_time = self.RESPONSE["data"]["query_time"] + assert response.query_time == datetime.fromisoformat(query_time) + +