From 7a96136e1cc6a186c02337244f1d3d2fd0844334 Mon Sep 17 00:00:00 2001 From: stualan <128948691+stualan@users.noreply.github.com> Date: Tue, 19 Nov 2024 15:30:04 +0800 Subject: [PATCH] Update sync_status_readme.py --- sync_status_readme.py | 527 +++++++++++++++++++++++++++++++++++------- 1 file changed, 445 insertions(+), 82 deletions(-) diff --git a/sync_status_readme.py b/sync_status_readme.py index c0f0fbd..6dc6e0c 100644 --- a/sync_status_readme.py +++ b/sync_status_readme.py @@ -1,87 +1,450 @@ import os -from github import Github +import subprocess +import re +import requests from datetime import datetime, timedelta import pytz +import logging -# 初始化GitHub API -g = Github(os.environ['GITHUB_TOKEN']) -repo = g.get_repo(os.environ['GITHUB_REPOSITORY']) - -# 设置北京时区 -beijing_tz = pytz.timezone('Asia/Shanghai') - -# 获取所有贡献者 -contributors = set(c.login for c in repo.get_contributors()) - -# 定义日期范围(从6月24日到7月14日) -start_date = datetime(2024, 6, 24, tzinfo=beijing_tz) -end_date = datetime(2024, 7, 14, tzinfo=beijing_tz) -date_range = [(start_date + timedelta(days=x)).strftime("%m.%d") for x in range((end_date - start_date).days + 1)] - -# 获取当前北京时间 -current_date = datetime.now(beijing_tz) - -# 获取每个用户在每一天的提交状态 -user_commits = {user: {} for user in contributors} -for date in date_range: - day_start = datetime.strptime(date, "%m.%d").replace(year=2024, tzinfo=beijing_tz) - day_end = day_start + timedelta(days=1) - if day_end > current_date: - day_end = current_date - if day_start >= current_date: - continue # 跳过未来的日期 - - commits = repo.get_commits(since=day_start, until=day_end) - for commit in commits: - if commit.author: - commit_date = commit.commit.author.date.astimezone(beijing_tz) - commit_date_str = commit_date.strftime("%m.%d") - user_commits[commit.author.login][commit_date_str] = "✅" - -# 检查是否有人在一周内超过两天没有提交 -def check_weekly_status(user_commits, user, date): - week_start = datetime.strptime(date, "%m.%d").replace(year=2024, tzinfo=beijing_tz) - week_start -= timedelta(days=week_start.weekday()) # 调整到本周一 - week_dates = [(week_start + timedelta(days=x)).strftime("%m.%d") for x in range(7)] - week_dates = [d for d in week_dates if d in date_range and d <= date] - - missing_days = sum(1 for d in week_dates if user_commits[user].get(d, "⭕️") == "⭕️") - return "❌" if missing_days > 2 else user_commits[user].get(date, "⭕️") - -# 生成新的表格内容 -new_table = ['| EICL1st· Name | ' + ' | '.join(date_range) + ' |\n', - '| ------------- | ' + ' | '.join(['----' for _ in date_range]) + ' |\n'] - -for user in contributors: - row = f"| {user} |" - for date in date_range: - day = datetime.strptime(date, "%m.%d").replace(year=2024, tzinfo=beijing_tz) - if day > current_date: - status = " " # 未来的日期显示为空白 +# Constants +START_DATE = datetime.fromisoformat(os.environ.get( + 'START_DATE', '2024-06-24T00:00:00+00:00')).replace(tzinfo=pytz.UTC) +END_DATE = datetime.fromisoformat(os.environ.get( + 'END_DATE', '2024-07-14T23:59:59+00:00')).replace(tzinfo=pytz.UTC) +DEFAULT_TIMEZONE = 'Asia/Shanghai' +FILE_SUFFIX = os.environ.get('FILE_SUFFIX', '.md') +README_FILE = 'README.md' +FIELD_NAME = os.environ.get('FIELD_NAME', 'Name') +Content_START_MARKER = "" +Content_END_MARKER = "" +TABLE_START_MARKER = "" +TABLE_END_MARKER = "" +GITHUB_REPOSITORY_OWNER = os.environ.get('GITHUB_REPOSITORY_OWNER') +GITHUB_REPOSITORY = os.environ.get('GITHUB_REPOSITORY') + +# Configure logging +logging.basicConfig(level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s') + + +def print_env(): + print(f""" + START_DATE: {START_DATE} + END_DATE: {END_DATE} + DEFAULT_TIMEZONE: {DEFAULT_TIMEZONE} + FILE_SUFFIX: {FILE_SUFFIX} + README_FILE: {README_FILE} + FIELD_NAME: {FIELD_NAME} + Content_START_MARKER: {Content_START_MARKER} + Content_END_MARKER: {Content_END_MARKER} + TABLE_START_MARKER: {TABLE_START_MARKER} + TABLE_END_MARKER: {TABLE_END_MARKER} + """) + + +def print_variables(*args, **kwargs): + def format_value(value): + if isinstance(value, str) and ('\n' in value or '\r' in value): + return f'"""\n{value}\n"""' + return repr(value) + + variables = {} + + # 处理位置参数 + for arg in args: + if isinstance(arg, dict): + variables.update(arg) + else: + variables[arg] = eval(arg) + + # 处理关键字参数 + variables.update(kwargs) + + # 打印变量 + for name, value in variables.items(): + print(f"{name}: {format_value(value)}") + + +def get_date_range(): + return [START_DATE + timedelta(days=x) for x in range((END_DATE - START_DATE).days + 1)] + + +def get_user_timezone(file_content): + yaml_match = re.search(r'---\s*\ntimezone:\s*(\S+)\s*\n---', file_content) + if yaml_match: + try: + return pytz.timezone(yaml_match.group(1)) + except pytz.exceptions.UnknownTimeZoneError: + logging.warning( + f"Unknown timezone: {yaml_match.group(1)}. Using default {DEFAULT_TIMEZONE}.") + return pytz.timezone(DEFAULT_TIMEZONE) + + +def extract_content_between_markers(file_content): + start_index = file_content.find(Content_START_MARKER) + end_index = file_content.find(Content_END_MARKER) + if start_index == -1 or end_index == -1: + logging.warning("Content_START_MARKER markers not found in the file") + return "" + return file_content[start_index + len(Content_START_MARKER):end_index].strip() + + +def find_date_in_content(content, local_date): + date_patterns = [ + r'###\s*' + local_date.strftime("%Y.%m.%d"), + r'###\s*' + local_date.strftime("%Y.%m.%d").replace('.0', '.'), + r'###\s*' + + local_date.strftime("%m.%d").lstrip('0').replace('.0', '.'), + r'###\s*' + local_date.strftime("%Y/%m/%d"), + r'###\s*' + + local_date.strftime("%m/%d").lstrip('0').replace('/0', '/'), + r'###\s*' + local_date.strftime("%m.%d").zfill(5) + ] + combined_pattern = '|'.join(date_patterns) + return re.search(combined_pattern, content) + + +def get_content_for_date(content, start_pos): + next_date_pattern = r'###\s*(\d{4}\.)?(\d{1,2}[\.\/]\d{1,2})' + next_date_match = re.search(next_date_pattern, content[start_pos:]) + if next_date_match: + return content[start_pos:start_pos + next_date_match.start()] + return content[start_pos:] + + +def check_md_content(file_content, date, user_tz): + try: + content = extract_content_between_markers(file_content) + local_date = date.astimezone(user_tz).replace( + hour=0, minute=0, second=0, microsecond=0) + current_date_match = find_date_in_content(content, local_date) + + if not current_date_match: + logging.info( + f"No match found for date {local_date.strftime('%Y-%m-%d')}") + return False + + date_content = get_content_for_date(content, current_date_match.end()) + date_content = re.sub(r'\s', '', date_content) + logging.info( + f"Content length for {local_date.strftime('%Y-%m-%d')}: {len(date_content)}") + return len(date_content) > 10 + except Exception as e: + logging.error(f"Error in check_md_content: {str(e)}") + return False + + +def get_user_study_status(nickname): + user_status = {} + file_name = f"{nickname}{FILE_SUFFIX}" + try: + with open(file_name, 'r', encoding='utf-8') as file: + file_content = file.read() + user_tz = get_user_timezone(file_content) + logging.info( + f"File content length for {nickname}: {len(file_content)} user_tz: {user_tz}") + current_date = datetime.now(user_tz).replace( + hour=0, minute=0, second=0, microsecond=0) # - timedelta(days=1) + + for date in get_date_range(): + local_date = date.astimezone(user_tz).replace( + hour=0, minute=0, second=0, microsecond=0) + + if date.day == current_date.day: + user_status[date] = "✅" if check_md_content( + file_content, date, pytz.UTC) else " " + elif date > current_date: + user_status[date] = " " + else: + user_status[date] = "✅" if check_md_content( + file_content, date, pytz.UTC) else "⭕️" + + logging.info(f"Successfully processed file for user: {nickname}") + except FileNotFoundError: + logging.error(f"Error: Could not find file {file_name}") + user_status = {date: "⭕️" for date in get_date_range()} + except Exception as e: + logging.error( + f"Unexpected error processing file for {nickname}: {str(e)}") + user_status = {date: "⭕️" for date in get_date_range()} + return user_status + + +def check_weekly_status(user_status, date, user_tz): + try: + local_date = date.astimezone(user_tz).replace( + hour=0, minute=0, second=0, microsecond=0) + week_start = (local_date - timedelta(days=local_date.weekday())) + week_dates = [week_start + timedelta(days=x) for x in range(7)] + current_date = datetime.now(user_tz).replace( + hour=0, minute=0, second=0, microsecond=0) + week_dates = [d for d in week_dates if d.astimezone(pytz.UTC).date() in [ + date.date() for date in get_date_range()] and d <= min(local_date, current_date)] + + missing_days = sum(1 for d in week_dates if user_status.get(datetime.combine( + d.astimezone(pytz.UTC).date(), datetime.min.time()).replace(tzinfo=pytz.UTC), "⭕️") == "⭕️") + + if local_date == current_date and missing_days > 2: + return "❌" + elif local_date < current_date and missing_days > 2: + return "❌" + elif local_date > current_date: + return " " else: - status = check_weekly_status(user_commits, user, date) - row += f" {status} |" - new_table.append(row + '\n') - -# 读取README.md文件 -with open('README.md', 'r') as file: - content = file.read() - -# 查找标记并替换内容 -start_marker = "" -end_marker = "" -start_index = content.find(start_marker) -end_index = content.find(end_marker) - -if start_index != -1 and end_index != -1: - new_content = ( - content[:start_index + len(start_marker)] + - '\n' + ''.join(new_table) + '\n' + - content[end_index:] - ) - - # 写入更新后的内容 - with open('README.md', 'w') as file: - file.write(new_content) -else: - print("Error: Couldn't find the table markers in README.md") \ No newline at end of file + return user_status.get(datetime.combine(date.date(), datetime.min.time()).replace(tzinfo=pytz.UTC), "⭕️") + except Exception as e: + logging.error(f"Error in check_weekly_status: {str(e)}") + return "⭕️" + + +def get_all_user_files(): + exclude_prefixes = ('template', 'readme') + return [f[:-len(FILE_SUFFIX)] for f in os.listdir('.') + if f.lower().endswith(FILE_SUFFIX.lower()) + and not f.lower().startswith(exclude_prefixes)] + + +def update_readme(content): + try: + start_index = content.find(TABLE_START_MARKER) + end_index = content.find(TABLE_END_MARKER) + if start_index == -1 or end_index == -1: + logging.error( + "Error: Couldn't find the table markers in README.md") + return content + + new_table = [ + f'{TABLE_START_MARKER}\n', + f'| {FIELD_NAME} | ' + + ' | '.join(date.strftime("%m.%d").lstrip('0') + for date in get_date_range()) + ' |\n', + '| ------------- | ' + + ' | '.join(['----' for _ in get_date_range()]) + ' |\n' + ] + + existing_users = set() + table_rows = content[start_index + + len(TABLE_START_MARKER):end_index].strip().split('\n')[2:] + + for row in table_rows: + match = re.match(r'\|\s*([^|]+)\s*\|', row) + if match: + display_name = match.group(1).strip() + if display_name: # 检查 display_name 是否为非空 + existing_users.add(display_name) + new_table.append(generate_user_row(display_name)) + else: + logging.warning( + f"Skipping empty display name in row: {row}") + else: + logging.warning(f"Skipping invalid row: {row}") + + new_users = set(get_all_user_files()) - existing_users + for user in new_users: + if user.strip(): # 确保用户名不是空的或只包含空格 + new_table.append(generate_user_row(user)) + logging.info(f"Added new user: {user}") + else: + logging.warning(f"Skipping empty user: '{user}'") + new_table.append(f'{TABLE_END_MARKER}\n') + return content[:start_index] + ''.join(new_table) + content[end_index + len(TABLE_END_MARKER):] + except Exception as e: + logging.error(f"Error in update_readme: {str(e)}") + return content + + +def generate_user_row(user): + user_status = get_user_study_status(user) + with open(f"{user}{FILE_SUFFIX}", 'r', encoding='utf-8') as file: + file_content = file.read() + user_tz = get_user_timezone(file_content) + new_row = f"| {user} |" + is_eliminated = False + absent_count = 0 + current_week = None + + user_current_day = datetime.now(user_tz).replace( + hour=0, minute=0, second=0, microsecond=0) + for date in get_date_range(): + # 获取用户时区和当地时间进行比较,如果用户打卡时间大于当地时间,则不显示- timedelta(days=1) + user_datetime = date.astimezone(pytz.UTC).replace( + hour=0, minute=0, second=0, microsecond=0) + if is_eliminated or (user_datetime > user_current_day and user_datetime.day > user_current_day.day): + new_row += " |" + else: + user_date = user_datetime + # 检查是否是新的一周 + week = user_date.isocalendar()[1] # 获取ISO日历周数 + if week != current_week: + current_week = week + absent_count = 0 # 重置缺勤计数 + + status = user_status.get(user_date, "") + + if status == "⭕️": + absent_count += 1 + if absent_count > 2: + is_eliminated = True + new_row += " ❌ |" + else: + new_row += " ⭕️ |" + else: + new_row += f" {status} |" + + return new_row + '\n' + + +def get_repo_info(): + if 'GITHUB_REPOSITORY' in os.environ: + # 在GitHub Actions环境中 + full_repo = os.environ['GITHUB_REPOSITORY'] + owner, repo = full_repo.split('/') + else: + # 在本地环境中 + try: + remote_url = subprocess.check_output( + ['git', 'config', '--get', 'remote.origin.url']).decode('utf-8').strip() + if remote_url.startswith('https://github.com/'): + owner, repo = remote_url.split('/')[-2:] + elif remote_url.startswith('git@github.com:'): + owner, repo = remote_url.split(':')[-1].split('/') + else: + raise ValueError("Unsupported remote URL format") + repo = re.sub(r'\.git$', '', repo) + except subprocess.CalledProcessError: + logging.error( + "Failed to get repository information from git config") + return None, None + return owner, repo + + +def get_fork_count(): + owner, repo = get_repo_info() + if not owner or not repo: + logging.error("Failed to get repository information") + return None + + api_url = f"https://api.github.com/repos/{owner}/{repo}" + + try: + response = requests.get(api_url) + response.raise_for_status() + repo_data = response.json() + return repo_data['forks_count'] + except requests.RequestException as e: + logging.error(f"Error fetching fork count: {e}") + return None + + +def calculate_statistics(content): + start_index = content.find(TABLE_START_MARKER) + end_index = content.find(TABLE_END_MARKER) + if start_index == -1 or end_index == -1: + logging.error("Error: Couldn't find the table markers in README.md") + return None + + table_content = content[start_index + + len(TABLE_START_MARKER):end_index].strip() + rows = table_content.split('\n')[2:] # Skip header and separator rows + + total_participants = len(rows) + eliminated_participants = 0 + completed_participants = 0 + perfect_attendance_users = [] + completed_users = [] + + for row in rows: + user_name = row.split('|')[1].strip() + # Exclude first and last empty elements + statuses = [status.strip() for status in row.split('|')[2:-1]] + + if '❌' in statuses: + eliminated_participants += 1 + elif all(status == '✅' for status in statuses): + completed_participants += 1 + completed_users.append(user_name) + perfect_attendance_users.append(user_name) + elif all(status in ['✅', '⭕️', ' '] for status in statuses): + completed_participants += 1 + completed_users.append(user_name) + + elimination_rate = (eliminated_participants / + total_participants) * 100 if total_participants > 0 else 0 + fork_count = get_fork_count() + + return { + 'total_participants': total_participants, + 'completed_participants': completed_participants, + 'eliminated_participants': eliminated_participants, + 'elimination_rate': elimination_rate, + 'fork_count': fork_count, + 'perfect_attendance_users': perfect_attendance_users, + 'completed_users': completed_users + } + + +def main(): + try: + print_variables( + 'START_DATE', 'END_DATE', 'DEFAULT_TIMEZONE', + GITHUB_REPOSITORY_OWNER=GITHUB_REPOSITORY, + GITHUB_REPOSITORY=GITHUB_REPOSITORY, + FILE_SUFFIX=FILE_SUFFIX, + README_FILE=README_FILE, + FIELD_NAME=FIELD_NAME, + Content_START_MARKER=Content_START_MARKER, + Content_END_MARKER=Content_END_MARKER, + TABLE_START_MARKER=TABLE_START_MARKER, + TABLE_END_MARKER=TABLE_END_MARKER + ) + with open(README_FILE, 'r', encoding='utf-8') as file: + content = file.read() + new_content = update_readme(content) + current_date = datetime.now(pytz.UTC) + if current_date > END_DATE: + stats = calculate_statistics(new_content) + if stats: + stats_content = f"\n\n## 统计数据\n\n" + stats_content += f"- 总参与人数: {stats['total_participants']}\n" + stats_content += f"- 完成人数: {stats['completed_participants']}\n" + stats_content += f"- 完成用户: {', '.join(stats['completed_users'])}\n" + stats_content += f"- 全勤用户: {', '.join(stats['perfect_attendance_users'])}\n" + stats_content += f"- 淘汰人数: {stats['eliminated_participants']}\n" + stats_content += f"- 淘汰率: {stats['elimination_rate']:.2f}%\n" + stats_content += f"- Fork人数: {stats['fork_count']}\n" + # 将统计数据添加到文件末尾 + # 在标记后插入统计数据 + stats_start = new_content.find( + "") + stats_end = new_content.find("") + + if stats_start != -1 and stats_end != -1: + # Replace existing statistical data + new_content = new_content[:stats_start] + "\n" + stats_content + \ + "" + \ + new_content[stats_end + + len(""):] + else: + # Add new statistical data after + end_table_marker = "" + end_table_index = new_content.find(end_table_marker) + if end_table_index != -1: + insert_position = end_table_index + \ + len(end_table_marker) + new_content = new_content[:insert_position] + "\n\n\n" + \ + stats_content + "" + \ + new_content[insert_position:] + else: + logging.warning( + " marker not found. Appending stats to the end.") + new_content += "\n\n\n" + \ + stats_content + "" + with open(README_FILE, 'w', encoding='utf-8') as file: + file.write(new_content) + logging.info("README.md has been successfully updated.") + except Exception as e: + logging.error(f"An error occurred in main function: {str(e)}") + + +if __name__ == "__main__": + main()