diff --git a/.github/workflows/build_release.yml b/.github/workflows/build_release.yml index 16756a5..2b2668c 100644 --- a/.github/workflows/build_release.yml +++ b/.github/workflows/build_release.yml @@ -38,7 +38,7 @@ jobs: - name: Build executables (Linux) if: matrix.os == 'ubuntu-latest' run: | - for script in SetDNS.py SetHosts-IPv4.py SetHosts.py; do + for script in SetDNS.py SetHosts.py SetHosts_Classic.py; do pyinstaller --onefile "$script" mv "dist/${script%.py}" "dist/${script%.py}-Linux-x64" done @@ -47,7 +47,7 @@ jobs: - name: Build executables (Windows) if: matrix.os == 'windows-latest' run: | - $scripts = @("SetDNS.py", "SetHosts-IPv4.py", "SetHosts.py") + $scripts = @("SetDNS.py", "SetHosts.py", "SetHosts_Classic.py") if (Test-Path -Path "dist") { Remove-Item -Recurse -Force "dist" } @@ -70,7 +70,7 @@ jobs: - name: Build executables (macOS) if: matrix.os == 'macos-latest' run: | - for script in SetDNS.py SetHosts-IPv4.py SetHosts.py; do + for script in SetDNS.py SetHosts.py SetHosts_Classic.py; do pyinstaller --onefile "$script" mv "dist/${script%.py}" "dist/${script%.py}-macOS-x64" done diff --git a/README.md b/README.md index 98b8292..e87fc5c 100644 --- a/README.md +++ b/README.md @@ -5,10 +5,14 @@ ](https://github.com/sinspired/cnNetTool) [![GitHub last commit](https://img.shields.io/github/last-commit/sinspired/cnNetTool?logo=github&label=最后提交:)](ttps://github.com/sinspired/cnNetTool) -基于Python的网络小工具。 +全面解锁Github,解决加载慢、无法访问等问题!解锁Google翻译,支持chrome网页翻译及插件,解锁划词翻译,以及依赖Google翻译API的各种平台插件。解锁tinyMediaManager影视刮削。 -针对某些区域DNS污染问题,自动筛选解析速度最快的 DNS 服务器、自动设置hosts文件(包括tinyMediaManager刮削源tmdb.org、themoviedb.org,Google/chrome网页翻译等),支持Windows、Linux、MacOS。 +自动设置最佳DNS服务器。 +> 适合部分地区饱受dns污染困扰,访问 GitHub 卡顿、抽风、图裂,无法使用Chrome浏览器 自带翻译功能,无法刮削影视封面等问题。分别使用 `setDNS` 自动查找最快服务器并设置,使用 `setHosts` 自动查找DNS映射主机并设置。支持Windows、Linux、MacOS。Enjoy!❤ + +> [!NOTE] +> 首次运行大约需要2分钟以获取DNS主机,请耐心等待。后续运行速度大约10秒左右 # 安装 @@ -23,12 +27,12 @@ pip install -r requirements.txt # 参数说明 -**CloudflareBestIP** 可以接受以下参数: +**cnNetTool** 可以接受以下参数: ### DNS 服务器工具 `SetDNS.py` * --debug 启用调试日志 -* --show-availbale-list, --list 显示可用dns列表,通过 --num 控制娴熟数量 +* --show-availbale-list, --list 显示可用dns列表,通过 --num 控制显示数量 * --best-dns-num BEST_DNS_NUM, --num 显示最佳DNS服务器的数量 * --algorithm --mode {region,overall} 默认 `region` 平衡IPv4和ipv6 DNS * --show-resolutions, --show 显示域名解析结果 @@ -36,7 +40,7 @@ pip install -r requirements.txt ### Hosts文件工具 `SetHosts.py` * --log-level 设置日志输出等级,'DEBUG', 'INFO', 'WARNING', 'ERROR' -* --num-fastest 设置选择的最快IP数量 +* --num-fastest 限定Hosts主机 ip 数量 * --max-latency 设置允许的最大延迟(毫秒) 命令行键入 `-h` `help` 获取帮助 diff --git a/SetDNS.py b/SetDNS.py index 58e38c0..59a9f70 100644 --- a/SetDNS.py +++ b/SetDNS.py @@ -274,7 +274,7 @@ def set_dns_servers(ipv4_dns_list: list[str], ipv6_dns_list: list[str]): "static", ipv4_dns_list[0], ], - check=True + check=True, ) for dns in ipv4_dns_list[1:]: subprocess.run( @@ -288,7 +288,7 @@ def set_dns_servers(ipv4_dns_list: list[str], ipv6_dns_list: list[str]): dns, "index=2", ], - check=True + check=True, ) except subprocess.CalledProcessError as e: logger.error(f"设置IPv4 DNS for {interface}失败: {e}") @@ -308,7 +308,7 @@ def set_dns_servers(ipv4_dns_list: list[str], ipv6_dns_list: list[str]): "static", ipv6_dns_list[0], ], - check=True + check=True, ) for dns in ipv6_dns_list[1:]: subprocess.run( @@ -322,7 +322,7 @@ def set_dns_servers(ipv4_dns_list: list[str], ipv6_dns_list: list[str]): dns, "index=2", ], - check=True + check=True, ) except subprocess.CalledProcessError as e: logger.error(f"设置IPv6 DNS for {interface}失败: {e}") @@ -583,7 +583,19 @@ def run_as_admin(): if __name__ == "__main__": parser = argparse.ArgumentParser( - description="DNS解析器和设置工具,请使用管理员权限运行" + description=( + "------------------------------------------------------------\n" + "DNS解析器和设置工具,请使用管理员权限运行,自动设置最佳 DNS 服务器。\n" + "------------------------------------------------------------\n" + ), + epilog=( + "------------------------------------------------------------\n" + "项目: https://github.com/sinspired/cnNetTool\n" + "作者: Sinspired\n" + "邮箱: ggmomo@gmail.com\n" + "发布: 2024-11-11\n" + ), + formatter_class=argparse.RawTextHelpFormatter, # 允许换行格式 ) parser.add_argument("--debug", action="store_true", help="启用调试日志") parser.add_argument( diff --git a/SetHosts.py b/SetHosts.py index 6031511..b29f817 100644 --- a/SetHosts.py +++ b/SetHosts.py @@ -1,3 +1,4 @@ +from math import floor import os import sys from pathlib import Path @@ -13,7 +14,14 @@ from enum import Enum from datetime import datetime, timedelta, timezone from typing import List, Set, Optional, Dict, Tuple -from rich.progress import Progress, SpinnerColumn, TextColumn +from rich.progress import ( + Progress, + BarColumn, + TaskID, + TimeRemainingColumn, + SpinnerColumn, + TextColumn, +) from rich import print as rprint import ctypes import re @@ -102,42 +110,60 @@ def backup_hosts_file(hosts_file_path: str): f"\n[blue]已备份 [underline]{hosts_file_path}[/underline] 到 [underline]{backup_path}[/underline][/blue]" ) - def get_align_str( - i, - group_name, - reference_str="启动 setHosts 自动更新··· ", - ): + def get_formatted_line(char="-", color="green", width_percentage=0.97): """ - 创建一个经过填充的进度字符串,使其显示宽度与参考字符串相同 - - Args: - i: 当前处理的组索引 - group_name: 组名称 - reference_str: 参考字符串,用于对齐长度 + 生成格式化的分隔线 - Returns: - 调整后的格式化字符串 + 参数: + char: 要重复的字符 + color: rich支持的颜色名称 + width_percentage: 终端宽度的百分比(0.0-1.0) """ - # 计算参考字符串的显示宽度 - ref_width = wcwidth.wcswidth(reference_str) + # 获取终端宽度 + terminal_width = shutil.get_terminal_size().columns + # 计算目标宽度(终端宽度的指定百分比) + target_width = floor(terminal_width * width_percentage) - # 构建基础字符串(不包含尾部填充) - base_str = f"正在处理第 {i} 组域名: {group_name}" + # 生成重复字符 + line = char * target_width - # 计算基础字符串的显示宽度 - base_width = wcwidth.wcswidth(base_str) + # 返回带颜色标记的行 + return f"[{color}]{line}[/{color}]" - # 计算需要添加的空格数量 - # 需要考虑Rich标签不计入显示宽度 - padding_needed = ref_width - base_width + def get_formatted_output(text, fill_char=".", align_position=0.97): + """ + 格式化输出文本,确保不超出终端宽度 - # 确保填充不会为负数 - padding_needed = max(0, padding_needed) + 参数: + text: 要格式化的文本 + fill_char: 填充字符 + align_position: 终端宽度的百分比(0.0-1.0) + """ + # 获取终端宽度并计算目标宽度 + terminal_width = shutil.get_terminal_size().columns + target_width = floor(terminal_width * align_position) + + # 移除rich标记计算实际文本长度 + plain_text = ( + text.replace("[blue on green]", "").replace("[/blue on green]", "") + # .replace("[完成]", "") + ) - # 构建最终的格式化字符串 - formatted_str = f"\n[bold white on bright_black]正在处理第 [green]{i}[/green] 组域名: {group_name}{' ' * padding_needed}[/bold white on bright_black]" + if "[完成]" in text: + main_text = plain_text.strip() + completion_mark = "[完成]" + # 关键修改:直接从目标宽度减去主文本长度,不再额外预留[完成]的空间 + fill_count = target_width - len(main_text) - len(completion_mark) - 6 + fill_count = max(0, fill_count) - return formatted_str + filled_text = f"{main_text}{fill_char * fill_count}{completion_mark}" + return f"[blue on green]{filled_text}[/blue on green]" + else: + # 普通文本的处理保持不变 + fill_count = target_width - len(plain_text.strip()) - 6 + fill_count = max(0, fill_count) + filled_text = f"{plain_text.strip()}{' ' * fill_count}" + return f"[blue on green]{filled_text}[/blue on green]" # -------------------- 域名与分组管理 -------------------- # @@ -210,8 +236,8 @@ async def resolve_domain(self, domain: str) -> Set[str]: ips = set() # 1. 首先通过常规DNS服务器解析 - # dns_ips = await self._resolve_via_dns(domain) - # ips.update(dns_ips) + dns_ips = await self._resolve_via_dns(domain) + ips.update(dns_ips) # 2. 然后通过DNS_records解析 # 由于init时已经处理了过期文件,这里只需要检查域名是否在缓存中 @@ -339,6 +365,8 @@ class LatencyTester: def __init__(self, resolver: DomainResolver, hosts_num: int): self.resolver = resolver self.hosts_num = hosts_num + self.progress = None + self.current_task = None async def get_latency(self, ip: str, port: int = 443) -> float: try: @@ -392,24 +420,24 @@ async def get_host_average_latency( logging.debug(f"ping {ip} 时出错: {e}") return ip, float("inf") + def set_progress(self, progress: Progress, task_description: str = None): + """设置进度显示实例""" + self.progress = progress + if task_description: + self.current_task = self.progress.add_task(task_description, total=1) + async def get_lowest_latency_hosts( self, domains: List[str], file_ips: Set[str], latency_limit: int ) -> List[Tuple[str, float]]: all_ips = set() - if args.log.upper() == "INFO": - with Progress( - SpinnerColumn(), - TextColumn("[progress.description]{task.description}\n"), - transient=True, - ) as progress: - task = progress.add_task("正在解析域名...", total=len(domains)) - tasks = [self.resolver.resolve_domain(domain) for domain in domains] - - for ips in await asyncio.gather(*tasks): - all_ips.update(ips) - progress.update(task, advance=1) - all_ips.update(file_ips) + # 解析域名 + if self.progress and self.current_task: + self.progress.update(self.current_task, description="正在解析域名...") + tasks = [self.resolver.resolve_domain(domain) for domain in domains] + for ips in await asyncio.gather(*tasks): + all_ips.update(ips) + all_ips.update(file_ips) else: tasks = [self.resolver.resolve_domain(domain) for domain in domains] for ips in await asyncio.gather(*tasks): @@ -420,18 +448,15 @@ async def get_lowest_latency_hosts( f"[bright_black]- 找到 [bold bright_green]{len(all_ips)}[/bold bright_green] 个唯一IP地址[/bright_black]" ) - if args.log.upper() == "INFO": - with Progress( - SpinnerColumn(), - TextColumn("[progress.description]{task.description}"), - transient=True, - ) as progress: - task = progress.add_task("正在 ping 所有IP地址...", total=len(all_ips)) - ping_tasks = [self.get_host_average_latency(ip) for ip in all_ips] - results = [] - for result in await asyncio.gather(*ping_tasks): - results.append(result) - progress.update(task, advance=1) + # Ping所有IP + if self.progress and self.current_task: + self.progress.update( + self.current_task, description="正在 ping 所有IP地址..." + ) + ping_tasks = [self.get_host_average_latency(ip) for ip in all_ips] + results = [] + for result in await asyncio.gather(*ping_tasks): + results.append(result) else: ping_tasks = [self.get_host_average_latency(ip) for ip in all_ips] results = [] @@ -461,6 +486,9 @@ async def get_lowest_latency_hosts( else: best_hosts = sorted(valid_results, key=lambda x: x[1])[: self.hosts_num] + if self.progress and self.current_task: + self.progress.update(self.current_task, advance=1) + rprint( f"[bold yellow]最快的 DNS主机 IP(优先选择 IPv6) 丨 延迟 < {latency_limit}ms :[/bold yellow]" ) @@ -495,7 +523,7 @@ def write_to_hosts_file(self, new_entries: List[str]): new_content = [] skip = False - skip_tags = ("# cnNetTool-SetHosts", "# Update", "# Star", "# GitHub") + skip_tags = ("# cnNetTool", "# Update", "# Star", "# GitHub") for line in existing_content: line = line.strip() @@ -535,10 +563,10 @@ def write_to_hosts_file(self, new_entries: List[str]): .replace("+0800", "+08:00") ) - rprint("\n[bold yellow]正在更新hosts文件...[/bold yellow]") + rprint("\n[bold yellow]正在更新 hosts 文件...[/bold yellow]") # 1. 添加标题 - new_content.append("\n# cnNetTool-SetHosts 脚本 Start\n") + new_content.append("\n# cnNetTool Start\n") # 2. 添加主机条目 for entry in new_entries: @@ -568,7 +596,7 @@ def write_to_hosts_file(self, new_entries: List[str]): [ f"\n# Update time: {update_time}", "# GitHub仓库: https://github.com/sinspired/cnNetTool", - "# cnNetTool-SetHosts 脚本 End\n", + "# cnNetTool End\n", ] ) @@ -590,76 +618,137 @@ def __init__( self.resolver = resolver self.tester = tester self.hosts_manager = hosts_manager + # 添加并发限制 + self.semaphore = asyncio.Semaphore(5) # 限制并发请求数 + # 添加计数器用于控制ipaddress.com的访问频率 + self.ipaddress_counter = 0 + self.ipaddress_limit = 20 # 每批次最大请求数 + self.ipaddress_delay = 60 # 批次之间的延迟(秒) + + # 添加进度显示实例 + self.progress = Progress( + "[progress.description]{task.description}", + BarColumn(), + "[progress.percentage]{task.percentage:>3.0f}%", + TimeRemainingColumn(), + ) - async def update_hosts(self): - # 更新hosts文件的主逻辑 - all_entries = [] - for i, group in enumerate(self.domain_groups, 1): - progress_str = Utils.get_align_str(i, group.name) - rprint(progress_str) + async def _resolve_domains_batch( + self, domains: List[str], task_id: TaskID + ) -> Dict[str, Set[str]]: + """批量解析域名,带进度更新""" + results = {} + total_domains = len(domains) + + async with self.semaphore: + for i, domain in enumerate(domains, 1): + try: + ips = await self.resolver.resolve_domain(domain) + results[domain] = ips + # 更新进度 + self.progress.update(task_id, advance=1) + except Exception as e: + logging.error(f"解析域名 {domain} 失败: {e}") + results[domain] = set() - all_ips = group.ips.copy() # 从预设IP开始 + if "_resolve_via_ipaddress" in str(self.resolver.resolve_domain): + self.ipaddress_counter += 1 + if self.ipaddress_counter >= self.ipaddress_limit: + await asyncio.sleep(self.ipaddress_delay) + self.ipaddress_counter = 0 - # 2. 根据不同组设置IP - if group.group_type == GroupType.SEPARATE: - for domain in group.domains: - rprint(f"\n为域名 {domain} 设置 DNS 映射主机") - all_ips = set() - resolved_ips = await self.resolver.resolve_domain(domain) - all_ips.update(resolved_ips) - if not all_ips: - logging.warning(f"{domain} 未找到任何可用IP。跳过该域名。") - continue - - fastest_ips = await self.tester.get_lowest_latency_hosts( - [domain], - all_ips, - self.resolver.max_latency, - ) - if not fastest_ips: - logging.warning(f"{domain} 未找到延迟满足要求的IP。") - continue + return results - new_entries = [f"{ip}\t{domain}" for ip, latency in fastest_ips] - all_entries.extend(new_entries) - else: - # 收集组内所有域名的DNS解析结果 - for domain in group.domains: - resolved_ips = await self.resolver.resolve_domain(domain) - all_ips.update(resolved_ips) + async def _process_domain_group(self, group: DomainGroup, index: int) -> List[str]: + """处理单个域名组""" + entries = [] + all_ips = group.ips.copy() - if not all_ips: - logging.warning(f"组 {group.name} 未找到任何可用IP。跳过该组。") - continue + # 创建该组的进度任务 + task_id = self.progress.add_task( + f"处理组 {group.name}", total=len(group.domains) + ) - rprint(f" 找到 {len(all_ips)} 个 DNS 主机记录") + # 为 LatencyTester 设置进度显示 + self.tester.set_progress(self.progress, f"处理组 {group.name} 的延迟测试") + + if group.group_type == GroupType.SEPARATE: + for domain in group.domains: + resolved_ips = await self._resolve_domains_batch([domain], task_id) + domain_ips = resolved_ips.get(domain, set()) + + if not domain_ips: + logging.warning(f"{domain} 未找到任何可用IP。跳过该域名。") + continue fastest_ips = await self.tester.get_lowest_latency_hosts( - # [group.domains[0]], # 只需传入一个域名,因为只是用来测试IP - group.domains, # 传入所有域名以获得更准确的延迟测试结果 - all_ips, + [domain], + domain_ips, self.resolver.max_latency, ) + if fastest_ips: + entries.extend(f"{ip}\t{domain}" for ip, latency in fastest_ips) + else: + logging.warning(f"{domain} 未找到延迟满足要求的IP。") + else: + resolved_ips_dict = await self._resolve_domains_batch( + group.domains, task_id + ) - if not fastest_ips: - logging.warning(f"组 {group.name} 未找到延迟满足要求的IP。") - continue + for ips in resolved_ips_dict.values(): + all_ips.update(ips) - rprint( - f"\n[bold]为组 {group.name} 内所有域名应用延迟最低的 DNS 映射主机IP:[/bold]" - ) + if not all_ips: + logging.warning(f"组 {group.name} 未找到任何可用IP。跳过该组。") + return entries + + logging.info(f"组 {group.name} 找到 {len(all_ips)} 个 DNS 主机记录") + + fastest_ips = await self.tester.get_lowest_latency_hosts( + group.domains, + all_ips, + self.resolver.max_latency, + ) + + if fastest_ips: for domain in group.domains: - new_entries = [f"{ip}\t{domain}" for ip, latency in fastest_ips] - rprint(f"[bright_black] - {domain}[/bright_black]") - all_entries.extend(new_entries) + entries.extend(f"{ip}\t{domain}" for ip, latency in fastest_ips) + logging.info(f"已处理域名: {domain}") + else: + logging.warning(f"组 {group.name} 未找到延迟满足要求的IP。") + + # 标记该组处理完成 + self.progress.update(task_id, completed=len(group.domains)) + return entries + + async def update_hosts(self): + """优化后的主更新函数,支持并发进度显示""" + cache_valid = self.resolver._is_dns_cache_valid() + + with self.progress: + if cache_valid: + # 并发处理所有组 + tasks = [ + self._process_domain_group(group, i) + for i, group in enumerate(self.domain_groups, 1) + ] + all_entries_lists = await asyncio.gather(*tasks) + all_entries = [ + entry for entries in all_entries_lists for entry in entries + ] + else: + # 顺序处理所有组 + all_entries = [] + for i, group in enumerate(self.domain_groups, 1): + entries = await self._process_domain_group(group, i) + all_entries.extend(entries) if all_entries: self.hosts_manager.write_to_hosts_file(all_entries) - rprint( - "\n[blue on green]Hosts 文件更新......................................... [完成][/blue on green]" - ) + rprint(Utils.get_formatted_output("Hosts文件更新[完成]")) else: - rprint("\n[bold red]警告: 没有有效条目可写入。hosts文件未更新。[/bold red]") + logging.warning("没有有效条目可写入") + rprint("[bold red]警告: 没有有效条目可写入。hosts文件未更新。[/bold red]") # -------------------- 权限提升模块-------------------- # @@ -693,7 +782,7 @@ def run_as_admin(): class Config: DOMAIN_GROUPS = [ DomainGroup( - name="GitHub", + name="GitHub Services", group_type=GroupType.SEPARATE, domains=[ "github.com", @@ -719,7 +808,7 @@ class Config: ips={}, ), DomainGroup( - name="GitHub Shared 01", + name="GitHub Asset", group_type=GroupType.SHARED, domains=[ "github.io", @@ -730,7 +819,7 @@ class Config: ips={}, ), DomainGroup( - name="GitHub Shared 02", + name="GitHub Static", group_type=GroupType.SHARED, domains=[ "avatars.githubusercontent.com", @@ -831,9 +920,23 @@ class Config: @staticmethod def get_dns_cache_file() -> Path: """获取 DNS 缓存文件路径,并确保目录存在。""" - current_dir = Path(__file__).resolve().parent # 获取当前脚本所在的目录 - dns_cache_dir = current_dir / "dns_cache" - dns_cache_dir.mkdir(exist_ok=True) # 确保缓存目录存在 + if getattr(sys, "frozen", False): + # 打包后的执行文件路径 + # current_dir = Path(sys.executable).resolve().parent + # dns_cache_dir = current_dir / "dns_cache" + + # 获取用户目录下的 .setHosts,以防止没有写入权限 + dns_cache_dir = ( + Path(os.getenv("USERPROFILE", os.getenv("HOME"))) + / ".setHosts" + / "dns_cache" + ) + else: + # 脚本运行时路径 + current_dir = Path(__file__).resolve().parent + dns_cache_dir = current_dir / "dns_cache" + + dns_cache_dir.mkdir(parents=True, exist_ok=True) # 确保目录存在 # (提示:dns_records.json 文件将存储 A、AAAA 等 DNS 资源记录缓存。) return dns_cache_dir / "dns_records.json" # 返回缓存文件路径 @@ -841,13 +944,10 @@ def get_dns_cache_file() -> Path: # -------------------- 主函数入口 -------------------- # async def main(): - rprint("[green]----------------------------------------------------------[/green]") - rprint( - "[blue on green]启动 setHosts 自动更新··· [/blue on green]" - ) - rprint( - "[green]----------------------------------------------------------[/green]\n" - ) + rprint(Utils.get_formatted_line()) # 默认绿色横线 + rprint(Utils.get_formatted_output("启动 setHosts 自动更新···")) + rprint(Utils.get_formatted_line()) # 默认绿色横线 + print() start_time = datetime.now() # 记录程序开始运行时间 diff --git a/SetHosts_Classic.py b/SetHosts_Classic.py new file mode 100644 index 0000000..bc62435 --- /dev/null +++ b/SetHosts_Classic.py @@ -0,0 +1,1039 @@ +import os +import sys +from pathlib import Path +import dns.resolver +import json +import shutil +import asyncio +import platform +import logging +import argparse +import aiohttp +import socket +from enum import Enum +from datetime import datetime, timedelta, timezone +from typing import List, Set, Optional, Dict, Tuple +from rich.progress import Progress, SpinnerColumn, TextColumn +from rich import print as rprint +import ctypes +import re +from functools import wraps + +import wcwidth + +# -------------------- 常量设置 -------------------- # +RESOLVER_TIMEOUT = 1 # DNS 解析超时时间 秒 +HOSTS_NUM = 1 # 每个域名限定Hosts主机 ipv4 数量 +MAX_LATENCY = 300 # 允许的最大延迟 +PING_TIMEOUT = 1 # ping 超时时间 +NUM_PINGS = 4 # ping次数 + +# 初始化日志模块 +logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") + + +# -------------------- 解析参数 -------------------- # +def parse_args(): + parser = argparse.ArgumentParser( + description=( + "------------------------------------------------------------\n" + "Hosts文件更新工具,此工具可自动解析域名并优化系统的hosts文件\n" + "------------------------------------------------------------\n" + ), + epilog=( + "------------------------------------------------------------\n" + "项目: https://github.com/sinspired/cnNetTool\n" + "作者: Sinspired\n" + "邮箱: ggmomo@gmail.com\n" + "发布: 2024-11-11\n" + ), + formatter_class=argparse.RawTextHelpFormatter, # 允许换行格式 + ) + + parser.add_argument( + "--log", + default="info", + choices=["debug", "info", "warning", "error"], + help="设置日志输出等级", + ) + parser.add_argument( + "--hosts-num", + "--num", + default=HOSTS_NUM, + type=int, + help="限定Hosts主机 ip 数量", + ) + parser.add_argument( + "--max-latency", + "--max", + default=MAX_LATENCY, + type=int, + help="设置允许的最大延迟(毫秒)", + ) + return parser.parse_args() + + +args = parse_args() +logging.getLogger().setLevel(args.log.upper()) + + +# -------------------- 辅助功能模块 -------------------- # +class Utils: + @staticmethod + def is_ipv6(ip: str) -> bool: + return ":" in ip + + @staticmethod + def get_hosts_file_path() -> str: + os_type = platform.system().lower() + if os_type == "windows": + return r"C:\Windows\System32\drivers\etc\hosts" + elif os_type in ["linux", "darwin"]: + return "/etc/hosts" + else: + raise ValueError("不支持的操作系统") + + @staticmethod + def backup_hosts_file(hosts_file_path: str): + if os.path.exists(hosts_file_path): + backup_path = f"{hosts_file_path}.bak" + shutil.copy(hosts_file_path, backup_path) + rprint( + f"\n[blue]已备份 [underline]{hosts_file_path}[/underline] 到 [underline]{backup_path}[/underline][/blue]" + ) + + def get_align_str( + i, + group_name, + reference_str="启动 setHosts 自动更新··· ", + ): + """ + 创建一个经过填充的进度字符串,使其显示宽度与参考字符串相同 + + Args: + i: 当前处理的组索引 + group_name: 组名称 + reference_str: 参考字符串,用于对齐长度 + + Returns: + 调整后的格式化字符串 + """ + # 计算参考字符串的显示宽度 + ref_width = wcwidth.wcswidth(reference_str) + + # 构建基础字符串(不包含尾部填充) + base_str = f"正在处理第 {i} 组域名: {group_name}" + + # 计算基础字符串的显示宽度 + base_width = wcwidth.wcswidth(base_str) + + # 计算需要添加的空格数量 + # 需要考虑Rich标签不计入显示宽度 + padding_needed = ref_width - base_width + + # 确保填充不会为负数 + padding_needed = max(0, padding_needed) + + # 构建最终的格式化字符串 + formatted_str = f"\n[bold white on bright_black]正在处理第 [green]{i}[/green] 组域名: {group_name}{' ' * padding_needed}[/bold white on bright_black]" + + return formatted_str + + +# -------------------- 域名与分组管理 -------------------- # +class GroupType(Enum): + SHARED = "shared hosts" # 多个域名共用一组DNS主机 IP + SEPARATE = "separate hosts" # 每个域名独立拥有DNS主机 IP + + +class DomainGroup: + def __init__( + self, + name: str, + domains: List[str], + ips: Optional[Set[str]] = None, + group_type: GroupType = GroupType.SHARED, + ): + self.name = name + self.domains = domains if isinstance(domains, list) else [domains] + self.ips = ips or set() + self.group_type = group_type + + +# -------------------- 域名解析模块 -------------------- # +class DomainResolver: + # 设置缓存过期时间为1周 + DNS_CACHE_EXPIRY_TIME = timedelta(weeks=1) + + def __init__(self, dns_servers: List[str], max_latency: int, dns_cache_file: str): + self.dns_servers = dns_servers + self.max_latency = max_latency + self.dns_cache_file = Path(dns_cache_file) + self.dns_records = self._init_dns_cache() + + def _init_dns_cache(self) -> dict: + """初始化 DNS 缓存,如果缓存文件存在且未过期则加载,否则返回空字典""" + if self._is_dns_cache_valid(): + return self.load_hosts_cache() + # 如果 DNS 缓存过期,删除旧缓存文件 + if self.dns_cache_file.exists(): + self.dns_cache_file.unlink() + return {} + + def _is_dns_cache_valid(self) -> bool: + """检查 DNS 缓存是否有效""" + if not self.dns_cache_file.exists(): + return False + + file_age = datetime.now() - datetime.fromtimestamp( + os.path.getmtime(self.dns_cache_file) + ) + return file_age <= self.DNS_CACHE_EXPIRY_TIME + + def load_hosts_cache(self) -> Dict[str, Dict]: + try: + with open(self.dns_cache_file, "r", encoding="utf-8") as f: + return json.load(f) + except Exception as e: + logging.error(f"加载 DNS 缓存文件失败: {e}") + return {} + + def save_hosts_cache(self): + try: + with open(self.dns_cache_file, "w", encoding="utf-8") as f: + json.dump(self.dns_records, f, indent=4, ensure_ascii=False) + logging.debug(f"成功保存 DNS 缓存到文件 {self.dns_cache_file}") + except Exception as e: + logging.error(f"保存 DNS 缓存到文件时发生错误: {e}") + + async def resolve_domain(self, domain: str) -> Set[str]: + ips = set() + + # 1. 首先通过常规DNS服务器解析 + # dns_ips = await self._resolve_via_dns(domain) + # ips.update(dns_ips) + + # 2. 然后通过DNS_records解析 + # 由于init时已经处理了过期文件,这里只需要检查域名是否在缓存中 + if domain in self.dns_records: + domain_hosts = self.dns_records.get(domain, {}) + ipv4_ips = domain_hosts.get("ipv4", []) + ipv6_ips = domain_hosts.get("ipv6", []) + + ips.update(ipv4_ips + ipv6_ips) + else: + ipaddress_ips = await self._resolve_via_ipaddress(domain) + ips.update(ipaddress_ips) + + if ips: + logging.debug(f"成功解析 {domain}, 找到 {len(ips)} 个 DNS 主机") + else: + logging.debug(f"警告: 无法解析 {domain}") + + return ips + + async def _resolve_via_dns(self, domain: str) -> Set[str]: + ips = set() + for dns_server in self.dns_servers: + try: + resolver = dns.resolver.Resolver() + resolver.nameservers = [dns_server] + resolver.lifetime = RESOLVER_TIMEOUT + + for qtype in ["A", "AAAA"]: + try: + answers = await asyncio.to_thread( + resolver.resolve, domain, qtype + ) + ips.update(answer.address for answer in answers) + except dns.resolver.NoAnswer: + pass + + if ips: + logging.debug(f"成功使用 {dns_server} 解析 {domain}") + logging.debug(f"DNS_resolver:\n {ips}") + return ips + except Exception as e: + logging.debug(f"使用 {dns_server} 解析 {domain} 失败: {e}") + + return ips + + def retry_async(tries=3, delay=0): + def decorator(func): + @wraps(func) + async def wrapper(*args, **kwargs): + for attempt in range(tries): + try: + return await func(*args, **kwargs) + except Exception as e: + if attempt == tries - 1: + raise e + await asyncio.sleep(delay) + return None + + return wrapper + + return decorator + + @retry_async(tries=3) + async def _resolve_via_ipaddress(self, domain: str) -> Set[str]: + ips = set() + url = f"https://sites.ipaddress.com/{domain}" + headers = { + "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " + "AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/106.0.0.0 Safari/537.36" + } + + try: + async with aiohttp.ClientSession() as session: + async with session.get(url, headers=headers, timeout=5) as response: + if response.status != 200: + logging.info( + f"DNS_records(ipaddress.com) 查询请求失败: {response.status}" + ) + return ips + + content = await response.text() + # 匹配IPv4地址 + ipv4_pattern = r"\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b" + ipv4_ips = set(re.findall(ipv4_pattern, content)) + + # 匹配IPv6地址 + ipv6_pattern = r"(?:[0-9a-fA-F]{1,4}:){7}[0-9a-fA-F]{1,4}" + ipv6_ips = set(re.findall(ipv6_pattern, content)) + + ips.update(ipv4_ips) + ips.update(ipv6_ips) + + if ips: + # 更新hosts缓存 + current_time = datetime.now().isoformat() + self.dns_records[domain] = { + "last_update": current_time, + "ipv4": list(ipv4_ips), + "ipv6": list(ipv6_ips), + "source": "DNS_records", + } + # 保存到文件 + self.save_hosts_cache() + logging.debug( + f"通过 ipaddress.com 成功解析 {domain} 并更新 DNS_records 缓存" + ) + logging.debug(f"DNS_records:\n {ips}") + else: + logging.warning( + f"ipaddress.com 未找到 {domain} 的 DNS_records 地址" + ) + + except Exception as e: + logging.error(f"通过DNS_records解析 {domain} 失败: {e}") + + return ips + + +# -------------------- 延迟测速模块 -------------------- # + + +class LatencyTester: + def __init__(self, resolver: DomainResolver, hosts_num: int): + self.resolver = resolver + self.hosts_num = hosts_num + + async def get_latency(self, ip: str, port: int = 443) -> float: + try: + # 使用 getaddrinfo 来获取正确的地址格式 + addrinfo = await asyncio.get_event_loop().getaddrinfo( + ip, port, family=socket.AF_UNSPEC, type=socket.SOCK_STREAM + ) + + for family, type, proto, canonname, sockaddr in addrinfo: + try: + start = asyncio.get_event_loop().time() + _, writer = await asyncio.wait_for( + asyncio.open_connection(sockaddr[0], sockaddr[1]), + timeout=PING_TIMEOUT, + ) + end = asyncio.get_event_loop().time() + writer.close() + await writer.wait_closed() + return (end - start) * 1000 + except asyncio.TimeoutError: + continue + except Exception as e: + logging.debug(f"连接测试失败 {ip} (sockaddr: {sockaddr}): {e}") + continue + + return float("inf") + except Exception as e: + logging.error(f"获取地址信息失败 {ip}: {e}") + return float("inf") + + async def get_host_average_latency( + self, ip: str, port: int = 443 + ) -> Tuple[str, float]: + try: + response_times = await asyncio.gather( + *[self.get_latency(ip, port) for _ in range(NUM_PINGS)] + ) + response_times = [t for t in response_times if t != float("inf")] + if response_times: + average_response_time = sum(response_times) / len(response_times) + else: + average_response_time = float("inf") + + if average_response_time == 0: + logging.error(f"{ip} 平均延迟为 0 ms,视为无效") + return ip, float("inf") + + logging.debug(f"{ip} 平均延迟: {average_response_time:.2f} ms") + return ip, average_response_time + except Exception as e: + logging.debug(f"ping {ip} 时出错: {e}") + return ip, float("inf") + + async def get_lowest_latency_hosts( + self, domains: List[str], file_ips: Set[str], latency_limit: int + ) -> List[Tuple[str, float]]: + all_ips = set() + + if args.log.upper() == "INFO": + with Progress( + SpinnerColumn(), + TextColumn("[progress.description]{task.description}\n"), + transient=True, + ) as progress: + task = progress.add_task("正在解析域名...", total=len(domains)) + tasks = [self.resolver.resolve_domain(domain) for domain in domains] + + for ips in await asyncio.gather(*tasks): + all_ips.update(ips) + progress.update(task, advance=1) + all_ips.update(file_ips) + else: + tasks = [self.resolver.resolve_domain(domain) for domain in domains] + for ips in await asyncio.gather(*tasks): + all_ips.update(ips) + all_ips.update(file_ips) + + rprint( + f"[bright_black]- 找到 [bold bright_green]{len(all_ips)}[/bold bright_green] 个唯一IP地址[/bright_black]" + ) + + if args.log.upper() == "INFO": + with Progress( + SpinnerColumn(), + TextColumn("[progress.description]{task.description}"), + transient=True, + ) as progress: + task = progress.add_task("正在 ping 所有IP地址...", total=len(all_ips)) + ping_tasks = [self.get_host_average_latency(ip) for ip in all_ips] + results = [] + for result in await asyncio.gather(*ping_tasks): + results.append(result) + progress.update(task, advance=1) + else: + ping_tasks = [self.get_host_average_latency(ip) for ip in all_ips] + results = [] + for result in await asyncio.gather(*ping_tasks): + results.append(result) + + valid_results = [result for result in results if result[1] < latency_limit] + + if not valid_results: + logging.warning(f"未找到延迟小于 {latency_limit}ms 的IP。") + if results: + latency_limit = latency_limit * 2 + logging.info(f"放宽延迟限制为 {latency_limit}ms 重新搜索...") + valid_results = [ + result for result in results if result[1] < latency_limit + ] + if not valid_results: + return [] + + ipv4_results = [r for r in valid_results if not Utils.is_ipv6(r[0])] + ipv6_results = [r for r in valid_results if Utils.is_ipv6(r[0])] + + best_hosts = [] + if ipv4_results and ipv6_results: + best_hosts.append(min(ipv4_results, key=lambda x: x[1])) + best_hosts.append(min(ipv6_results, key=lambda x: x[1])) + else: + best_hosts = sorted(valid_results, key=lambda x: x[1])[: self.hosts_num] + + rprint( + f"[bold yellow]最快的 DNS主机 IP(优先选择 IPv6) 丨 延迟 < {latency_limit}ms :[/bold yellow]" + ) + for ip, time in best_hosts: + rprint( + f" [green]{ip}[/green] [bright_black]{time:.2f} ms[/bright_black]" + ) + return best_hosts + + +# -------------------- Hosts文件管理 -------------------- # +class HostsManager: + def __init__(self, resolver: DomainResolver): + # 自动根据操作系统获取hosts文件路径 + self.hosts_file_path = self._get_hosts_file_path() + self.resolver = resolver + + @staticmethod + def _get_hosts_file_path() -> str: + """根据操作系统自动获取 hosts 文件路径。""" + return Utils.get_hosts_file_path() + + def write_to_hosts_file(self, new_entries: List[str]): + Utils.backup_hosts_file(self.hosts_file_path) + + with open(self.hosts_file_path, "r") as f: + existing_content = f.read().splitlines() + + new_domains = { + entry.split()[1] for entry in new_entries if len(entry.split()) >= 2 + } + + new_content = [] + skip = False + skip_tags = ("# cnNetTool", "# Update", "# Star", "# GitHub") + + for line in existing_content: + line = line.strip() + + # 跳过标记块 + if any(line.startswith(tag) for tag in skip_tags): + skip = True + + if line == "": + skip = True + + if skip: + if line == "" or line.startswith("#"): + continue + skip = False + + # 非标记块内容保留 + if ( + not skip + and (line.startswith("#") or not line) + and not any(tag in line for tag in skip_tags) + ): + new_content.append(line) + continue + + # 检查域名是否为新条目 + parts = line.split() + if len(parts) >= 2 and parts[1] not in new_domains: + new_content.append(line) + else: + logging.debug(f"删除旧条目: {line}") + + update_time = ( + datetime.now(timezone.utc) + .astimezone(timezone(timedelta(hours=8))) + .strftime("%Y-%m-%d %H:%M:%S %z") + .replace("+0800", "+08:00") + ) + + rprint("\n[bold yellow]正在更新hosts文件...[/bold yellow]") + + # 1. 添加标题 + new_content.append("\n# cnNetTool Start\n") + + # 2. 添加主机条目 + for entry in new_entries: + # 分割 IP 和域名 + ip, domain = entry.strip().split(maxsplit=1) + + # 计算需要的制表符数量 + # IP 地址最长可能是 39 个字符 (IPv6) + # 我们使用制表符(8个空格)来对齐,确保视觉上的整齐 + ip_length = len(ip) + if ip_length <= 8: + tabs = "\t\t\t" # 两个制表符 + if ip_length <= 10: + tabs = "\t\t" # 两个制表符 + elif ip_length <= 16: + tabs = "\t" # 一个制表符 + else: + tabs = "\t" # 对于很长的IP,只使用一个空格 + + # 返回格式化后的条目 + formatedEntry = f"{ip}{tabs}{domain}" + new_content.append(formatedEntry) + rprint(f"+ {formatedEntry}") + + # 3. 添加项目描述 + new_content.extend( + [ + f"\n# Update time: {update_time}", + "# GitHub仓库: https://github.com/sinspired/cnNetTool", + "# cnNetTool End\n", + ] + ) + + # 4. 写入hosts文件 + with open(self.hosts_file_path, "w") as f: + f.write("\n".join(new_content)) + + +# -------------------- 主控制模块 -------------------- # +class HostsUpdater: + def __init__( + self, + domain_groups: List[DomainGroup], + resolver: DomainResolver, + tester: LatencyTester, + hosts_manager: HostsManager, + ): + self.domain_groups = domain_groups + self.resolver = resolver + self.tester = tester + self.hosts_manager = hosts_manager + + async def update_hosts(self): + # 更新hosts文件的主逻辑 + all_entries = [] + for i, group in enumerate(self.domain_groups, 1): + progress_str = Utils.get_align_str(i, group.name) + rprint(progress_str) + + all_ips = group.ips.copy() # 从预设IP开始 + + # 2. 根据不同组设置IP + if group.group_type == GroupType.SEPARATE: + for domain in group.domains: + rprint(f"\n为域名 {domain} 设置 DNS 映射主机") + all_ips = set() + resolved_ips = await self.resolver.resolve_domain(domain) + all_ips.update(resolved_ips) + if not all_ips: + logging.warning(f"{domain} 未找到任何可用IP。跳过该域名。") + continue + + fastest_ips = await self.tester.get_lowest_latency_hosts( + [domain], + all_ips, + self.resolver.max_latency, + ) + if not fastest_ips: + logging.warning(f"{domain} 未找到延迟满足要求的IP。") + continue + + new_entries = [f"{ip}\t{domain}" for ip, latency in fastest_ips] + all_entries.extend(new_entries) + else: + # 收集组内所有域名的DNS解析结果 + for domain in group.domains: + resolved_ips = await self.resolver.resolve_domain(domain) + all_ips.update(resolved_ips) + + if not all_ips: + logging.warning(f"组 {group.name} 未找到任何可用IP。跳过该组。") + continue + + rprint(f" 找到 {len(all_ips)} 个 DNS 主机记录") + + fastest_ips = await self.tester.get_lowest_latency_hosts( + # [group.domains[0]], # 只需传入一个域名,因为只是用来测试IP + group.domains, # 传入所有域名以获得更准确的延迟测试结果 + all_ips, + self.resolver.max_latency, + ) + + if not fastest_ips: + logging.warning(f"组 {group.name} 未找到延迟满足要求的IP。") + continue + + rprint( + f"\n[bold]为组 {group.name} 内所有域名应用延迟最低的 DNS 映射主机IP:[/bold]" + ) + for domain in group.domains: + new_entries = [f"{ip}\t{domain}" for ip, latency in fastest_ips] + rprint(f"[bright_black] - {domain}[/bright_black]") + all_entries.extend(new_entries) + + if all_entries: + self.hosts_manager.write_to_hosts_file(all_entries) + rprint( + "\n[blue on green]Hosts 文件更新......................................... [完成][/blue on green]" + ) + else: + rprint("\n[bold red]警告: 没有有效条目可写入。hosts文件未更新。[/bold red]") + + +# -------------------- 权限提升模块-------------------- # +class PrivilegeManager: + @staticmethod + def is_admin() -> bool: + try: + return os.getuid() == 0 + except AttributeError: + return ctypes.windll.shell32.IsUserAnAdmin() != 0 + + @staticmethod + def run_as_admin(): + if PrivilegeManager.is_admin(): + return + + if sys.platform.startswith("win"): + script = os.path.abspath(sys.argv[0]) + params = " ".join([script] + sys.argv[1:]) + ctypes.windll.shell32.ShellExecuteW( + None, "runas", sys.executable, params, None, 1 + ) + else: + os.execvp("sudo", ["sudo", "python3"] + sys.argv) + sys.exit(0) + + +# -------------------- 数据配置模块-------------------- # + + +class Config: + DOMAIN_GROUPS = [ + DomainGroup( + name="GitHub Services", + group_type=GroupType.SEPARATE, + domains=[ + "github.com", + "api.github.com", + "gist.github.com", + "alive.github.com", + "github.community", + "central.github.com", + "codeload.github.com", + "collector.github.com", + "vscode.dev", + "github.blog", + "live.github.com", + "education.github.com", + "github.global.ssl.fastly.net", + "pipelines.actions.githubusercontent.com", + "github-com.s3.amazonaws.com", + "github-cloud.s3.amazonaws.com", + "github-production-user-asset-6210df.s3.amazonaws.com", + "github-production-release-asset-2e65be.s3.amazonaws.com", + "github-production-repository-file-5c1aeb.s3.amazonaws.com", + ], + ips={}, + ), + DomainGroup( + name="GitHub Asset", + group_type=GroupType.SHARED, + domains=[ + "github.io", + "githubstatus.com", + "assets-cdn.github.com", + "github.githubassets.com", + ], + ips={}, + ), + DomainGroup( + name="GitHub Static", + group_type=GroupType.SHARED, + domains=[ + "avatars.githubusercontent.com", + "avatars0.githubusercontent.com", + "avatars1.githubusercontent.com", + "avatars2.githubusercontent.com", + "avatars3.githubusercontent.com", + "avatars4.githubusercontent.com", + "avatars5.githubusercontent.com", + "camo.githubusercontent.com", + "cloud.githubusercontent.com", + "desktop.githubusercontent.com", + "favicons.githubusercontent.com", + "github.map.fastly.net", + "raw.githubusercontent.com", + "media.githubusercontent.com", + "objects.githubusercontent.com", + "user-images.githubusercontent.com", + "private-user-images.githubusercontent.com", + ], + ips={}, + ), + DomainGroup( + name="TMDB API", + domains=[ + "tmdb.org", + "api.tmdb.org", + ], + ips={}, + ), + DomainGroup( + name="THE MOVIEDB", + domains=[ + "themoviedb.org", + "api.themoviedb.org", + "www.themoviedb.org", + "auth.themoviedb.org", + ], + ips={ + # "18.239.36.98", + # "108.160.169.178", + # "18.165.122.73", + # "13.249.146.88", + # "13.224.167.74", + # "13.249.146.96", + # "99.86.4.122", + # "108.160.170.44", + # "108.160.169.54", + # "98.159.108.58", + # "13.226.225.4", + # "31.13.80.37", + # "202.160.128.238", + # "13.224.167.16", + # "199.96.63.53", + # "104.244.43.6", + # "18.239.36.122", + # "66.220.149.32", + # "108.157.14.15", + # "202.160.128.14", + # "52.85.242.44", + # "199.59.149.207", + # "54.230.129.92", + # "54.230.129.11", + # "103.240.180.117", + # "66.220.148.145", + # "54.192.175.79", + # "143.204.68.100", + # "31.13.84.2", + # "18.239.36.64", + # "52.85.242.124", + # "54.230.129.83", + # "18.165.122.27", + # "13.33.88.3", + # "202.160.129.36", + # "108.157.14.112", + # "99.86.4.16", + # "199.59.149.237", + # "199.59.148.202", + # "54.230.129.74", + # "202.160.128.40", + # "199.16.156.39", + # "13.224.167.108", + # "192.133.77.133", + # "168.143.171.154", + # "54.192.175.112", + # "128.242.245.43", + # "54.192.175.108", + # "54.192.175.87", + # "199.59.148.229", + # "143.204.68.22", + # "13.33.88.122", + # "52.85.242.73", + # "18.165.122.87", + # "168.143.162.58", + # "103.228.130.61", + # "128.242.240.180", + # "99.86.4.8", + # "104.244.46.52", + # "199.96.58.85", + # "13.226.225.73", + # "128.121.146.109", + # "69.30.25.21", + # "13.249.146.22", + # "13.249.146.87", + # "157.240.12.5", + # "3.162.38.113", + # "143.204.68.72", + # "104.244.43.52", + # "13.224.167.10", + # "3.162.38.31", + # "3.162.38.11", + # "3.162.38.66", + # "202.160.128.195", + # "162.125.6.1", + # "104.244.43.128", + # "18.165.122.23", + # "99.86.4.35", + # "108.160.165.212", + # "108.157.14.27", + # "13.226.225.44", + # "157.240.9.36", + # "13.33.88.37", + # "18.239.36.92", + # "199.59.148.247", + # "13.33.88.97", + # "31.13.84.34", + # "124.11.210.175", + # "13.226.225.52", + # "31.13.86.21", + # "108.157.14.86", + # "143.204.68.36", + }, + ), + DomainGroup( + name="TMDB 封面", + domains=["image.tmdb.org", "images.tmdb.org"], + ips={ + # "89.187.162.242", + # "169.150.249.167", + # "143.244.50.209", + # "143.244.50.210", + # "143.244.50.88", + # "143.244.50.82", + # "169.150.249.165", + # "143.244.49.178", + # "143.244.49.179", + # "143.244.50.89", + # "143.244.50.212", + # "169.150.207.215", + # "169.150.249.163", + # "143.244.50.85", + # "143.244.50.91", + # "143.244.50.213", + # "169.150.249.164", + # "169.150.249.162", + # "169.150.249.166", + # "143.244.49.183", + # "143.244.49.177", + # "143.244.50.83", + # "138.199.9.104", + # "169.150.249.169", + # "143.244.50.214", + # "79.127.213.217", + # "143.244.50.87", + # "143.244.50.84", + # "169.150.249.168", + # "143.244.49.180", + # "143.244.50.86", + # "143.244.50.90", + # "143.244.50.211", + }, + ), + DomainGroup( + name="Google 翻译", + domains=[ + "translate.google.com", + "translate.googleapis.com", + "translate-pa.googleapis.com", + ], + ips={ + "216.239.32.40", + "2404:6800:4008:c15::94", + "2a00:1450:4001:829::201a", + "2404:6800:4008:c13::5a", + "2a00:1450:4001:803::201a", + "74.125.204.139", + "2607:f8b0:4004:c07::66", + "2607:f8b0:4004:c07::71", + "2607:f8b0:4004:c07::8a", + "2607:f8b0:4004:c07::8b", + "172.253.62.100", + "172.253.62.101", + "172.253.62.102", + "172.253.62.103", + "35.197.239.137", + "35.189.113.240", + "35.228.168.221", + "35.210.233.33", + }, + ), + DomainGroup( + name="JetBrain 插件下载", + domains=[ + "plugins.jetbrains.com", + "download.jetbrains.com", + "cache-redirector.jetbrains.com", + ], + ips={}, + ), + ] + + # DNS 服务器 + DNS_SERVERS = [ + "2402:4e00::", # DNSPod (IPv6) + "223.5.5.5", # Alibaba DNS (IPv4) + "119.29.29.29", # DNSPod (IPv4) + "2400:3200::1", # Alibaba DNS (IPv6) + "8.8.8.8", # Google Public DNS (IPv4) + "2001:4860:4860::8888", # Google Public DNS (IPv6) + "114.114.114.114", # 114 DNS + "208.67.222.222", # Open DNS (IPv4) + "2620:0:ccc::2", # Open DNS (IPv6) + ] + + @staticmethod + def get_dns_cache_file() -> Path: + """获取 DNS 缓存文件路径,并确保目录存在。""" + if getattr(sys, "frozen", False): + # 打包后的执行文件路径 + # current_dir = Path(sys.executable).resolve().parent + # dns_cache_dir = current_dir / "dns_cache" + + # 获取用户目录下的 .setHosts,以防止没有写入权限 + dns_cache_dir = ( + Path(os.getenv("USERPROFILE", os.getenv("HOME"))) + / ".setHosts" + / "dns_cache" + ) + else: + # 脚本运行时路径 + current_dir = Path(__file__).resolve().parent + dns_cache_dir = current_dir / "dns_cache" + + dns_cache_dir.mkdir(parents=True, exist_ok=True) # 确保目录存在 + + # (提示:dns_records.json 文件将存储 A、AAAA 等 DNS 资源记录缓存。) + return dns_cache_dir / "dns_records.json" # 返回缓存文件路径 + +# -------------------- 主函数入口 -------------------- # +async def main(): + rprint("[green]----------------------------------------------------------[/green]") + rprint( + "[blue on green]启动 setHosts 自动更新··· [/blue on green]" + ) + rprint( + "[green]----------------------------------------------------------[/green]\n" + ) + + start_time = datetime.now() # 记录程序开始运行时间 + + # 从配置类中加载DOMAIN_GROUPS、DNS_SERVERS和dns_cache_dir + DOMAIN_GROUPS = Config.DOMAIN_GROUPS + dns_servers = Config.DNS_SERVERS + dns_cache_file = Config.get_dns_cache_file() + + # 1.域名解析 + resolver = DomainResolver( + dns_servers=dns_servers, + max_latency=args.max_latency, + dns_cache_file=dns_cache_file, + ) + + # 2.延迟检测 + tester = LatencyTester(resolver=resolver, hosts_num=args.hosts_num) + + # 3.Hosts文件操作 + hosts_manager = HostsManager(resolver=resolver) + + # 4.初始化 Hosts更新器 参数 + updater = HostsUpdater( + domain_groups=DOMAIN_GROUPS, + resolver=resolver, + tester=tester, + hosts_manager=hosts_manager, + ) + + if not PrivilegeManager.is_admin(): + rprint( + "[bold red]需要管理员权限来修改hosts文件。正在尝试提升权限...[/bold red]" + ) + PrivilegeManager.run_as_admin() + + # 启动 Hosts更新器 + await updater.update_hosts() + + # 计算程序运行时间 + end_time = datetime.now() + total_time = end_time - start_time + rprint( + f"[bold]代码运行时间:[/bold] [cyan]{total_time.total_seconds():.2f} 秒[/cyan]" + ) + # input("\n任务执行完毕,按任意键退出!") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/requirements.txt b/requirements.txt index 44821d8..f34510b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,5 @@ +aiohttp==3.10.10 dnspython==2.7.0 -ping3==4.0.8 -prettytable==3.11.0 -rich==13.9.2 +prettytable==3.12.0 +rich==13.9.4 +wcwidth==0.2.13 diff --git a/setHosts_Classic.ico b/setHosts_Classic.ico new file mode 100644 index 0000000..b51f857 Binary files /dev/null and b/setHosts_Classic.ico differ