diff --git a/nacos/client.py b/nacos/client.py index ff527ad..6094bd9 100644 --- a/nacos/client.py +++ b/nacos/client.py @@ -7,6 +7,7 @@ import platform import time import hmac +import random try: import ssl @@ -41,10 +42,11 @@ logger = logging.getLogger(__name__) DEBUG = False -VERSION = "0.1.11" +VERSION = "0.1.13-beta" DEFAULT_GROUP_NAME = "DEFAULT_GROUP" DEFAULT_NAMESPACE = "" +ADDRESS_SERVER_TIMEOUT = 3 # in seconds WORD_SEPARATOR = u'\x02' LINE_SEPARATOR = u'\x01' @@ -213,6 +215,16 @@ def parse_nacos_server_addr(server_addr): port = int(sp[1]) if len(sp) > 1 else 8848 return sp[0], port +def is_ipv4_address(address): + try: + socket.inet_aton(address) + except socket.error: + return False + return True + +ADDRESS_URL_PTN = "http://%s/diamond-server/diamond" + +ADDRESS_URL_PTN_FOR_UNIT = "http://%s/diamond-server/diamond-unit-%s?nofix=1" class NacosClient: debug = False @@ -232,12 +244,41 @@ def set_debugging(): def get_md5(content): return hashlib.md5(content.encode("UTF-8")).hexdigest() if content is not None else None - def __init__(self, server_addresses, endpoint=None, namespace=None, ak=None, sk=None, username=None, password=None): + def __init__(self, server_addresses=None, endpoint=None, namespace=None, ak=None, sk=None, username=None, password=None, unit_name=None): self.server_list = list() - + default_port = "8080" try: - for server_addr in server_addresses.split(","): - self.server_list.append(parse_nacos_server_addr(server_addr.strip())) + if not server_addresses == None: + for server_addr in server_addresses.split(","): + self.server_list.append(parse_nacos_server_addr(server_addr.strip())) + else: + if endpoint == None: + endpoint = "jmenv.tbsite.net:8080" + if ":" not in endpoint: + endpoint = endpoint + ":" + default_port + if "?" not in endpoint and "#" not in endpoint: + if unit_name == None: + url = ADDRESS_URL_PTN % endpoint + else: + url = ADDRESS_URL_PTN_FOR_UNIT % endpoint + server_list_content = urlopen(url, timeout=ADDRESS_SERVER_TIMEOUT).read() + logger.debug("[get-server-list] content from endpoint:%s" % server_list_content) + + if server_list_content: + for server_info in server_list_content.decode().strip().split("\n"): + sp = server_info.strip().split(":") + if len(sp) == 1: + # endpoint中没有指定port + self.server_list.append((sp[0], default_port)) + else: + try: + port = sp.strip().split("/")[0] + self.server_list.append((sp[0], int(port))) + except ValueError: + logger.warning("[get-server-list] bad server address:%s ignored" % server_info) + + random.shuffle(self.server_list) + except Exception as ex: logger.exception("[init] bad server address for %s" % server_addresses) raise ex @@ -273,9 +314,77 @@ def __init__(self, server_addresses, endpoint=None, namespace=None, ak=None, sk= self.snapshot_base = DEFAULTS["SNAPSHOT_BASE"] self.no_snapshot = False self.proxies = None + self.unit_name = unit_name logger.info("[client-init] endpoint:%s, tenant:%s" % (endpoint, namespace)) + def get_server_list(endpoint, default_port=8080, cai_enabled=True, unit_name=None): + server_list = list() + if not cai_enabled: + logger.info("[get-server-list] cai server is not used, regard endpoint:%s as server." % endpoint) + content = endpoint.encode() + else: + try: + # use 8080 as default port. + if ":" not in endpoint: + endpoint = endpoint + ":8080" + + url = ADDRESS_URL_PTN % endpoint + + if unit_name: + logger.info("[get-server-list] getting server for unit:%s" % unit_name) + url = ADDRESS_URL_PTN_FOR_UNIT % (endpoint, unit_name) + + content = urlopen(url, timeout=ADDRESS_SERVER_TIMEOUT).read() + logger.debug("[get-server-list] content from endpoint:%s" % content) + except (URLError, OSError, socket.timeout) as e: + logger.error("[get-server-list] get server from %s failed, cause:%s" % (endpoint, e)) + return server_list + + if content: + for server_info in content.decode().strip().split("\n"): + sp = server_info.strip().split(":") + if len(sp) == 1: + server_list.append((sp[0], default_port, is_ipv4_address(sp[0]))) + else: + try: + server_list.append((sp[0], int(sp[1]), is_ipv4_address(sp[0]))) + except ValueError: + logger.warning("[get-server-list] bad server address:%s ignored" % server_info) + + random.shuffle(server_list) + + return server_list + + + def _refresh_server_list(self): + with self.server_list_lock: + if self.server_refresh_running: + logger.warning("[refresh-server] task is running, aborting") + return + self.server_refresh_running = True + + while True: + try: + time.sleep(30) + logger.debug("[refresh-server] try to refresh server list") + server_list = self.get_server_list(self.endpoint, 443 if self.tls_enabled else 8080, self.cai_enabled, + self.unit_name) + logger.debug( + "[refresh-server] server_num:%s server_list:%s" % (len(server_list), server_list)) + if not server_list: + logger.error("[refresh-server] empty server_list get from %s, do not refresh" % self.endpoint) + continue + with self.server_list_lock: + self.server_list = server_list + self.server_offset = 0 + if self.current_server not in server_list: + logger.warning("[refresh-server] %s is not effective, change one" % str(self.current_server)) + self.current_server = server_list[self.server_offset] + except Exception as e: + logger.exception("[refresh-server] exception %s occur" % str(e)) + + def set_options(self, **kwargs): for k, v in kwargs.items(): if k not in OPTIONS: @@ -1166,3 +1275,6 @@ def stop_subscribe(self): if DEBUG: NacosClient.set_debugging() + +if __name__ == "__main__": + client = NacosClient(None, None) diff --git a/test/client_test.py b/test/client_test.py index 3d25daa..fd69aaf 100644 --- a/test/client_test.py +++ b/test/client_test.py @@ -21,7 +21,7 @@ USERNAME = None PASSWORD = None -client = nacos.NacosClient(SERVER_ADDRESSES, namespace=NAMESPACE, username=USERNAME, password=PASSWORD) +client = nacos.NacosClient(endpoint="jmenv.tbsite.net") # Set the following option if http requests need through by proxy # client.set_options(proxies={"http":"192.168.56.1:809"})