Skip to content

Commit

Permalink
Merge pull request impstation#1023 from irismessage/changelog-webhook…
Browse files Browse the repository at this point in the history
…-debug

Fix sending webhook updates to github
  • Loading branch information
Darkmajia authored Dec 14, 2024
2 parents 256b175 + 199086d commit 1ba6f38
Showing 1 changed file with 120 additions and 87 deletions.
207 changes: 120 additions & 87 deletions Tools/actions_changelogs_since_last_run.py
Original file line number Diff line number Diff line change
@@ -1,70 +1,76 @@
#!/usr/bin/env python3

#
# Sends updates to a Discord webhook for new changelog entries since the last GitHub Actions publish run.
# Automatically figures out the last run and changelog contents with the GitHub API.
#
"""
Sends updates to a Discord webhook for new changelog entries since the last GitHub Actions publish run.
Automatically figures out the last run and changelog contents with the GitHub API.
"""

import io
import itertools
import os
from pathlib import Path
from typing import Any, Iterable

import requests
import yaml
from typing import Any, Iterable

GITHUB_API_URL = os.environ.get("GITHUB_API_URL", "https://api.github.com")
GITHUB_REPOSITORY = os.environ["GITHUB_REPOSITORY"]
GITHUB_RUN = os.environ["GITHUB_RUN_ID"]
GITHUB_TOKEN = os.environ["GITHUB_TOKEN"]
DEBUG = True
DEBUG_CHANGELOG_FILE_OLD = Path("../Changelog-Impstation-old.yml")
GITHUB_API_URL = os.environ.get("GITHUB_API_URL", "https://api.github.com")

# https://discord.com/developers/docs/resources/webhook
DISCORD_SPLIT_LIMIT = 2000
DISCORD_WEBHOOK_URL = os.environ.get("DISCORD_WEBHOOK_URL")

CHANGELOG_FILE = "Resources/Changelog/Impstation.yml"

TYPES_TO_EMOJI = {
"Fix": "🐛",
"Add": "🆕",
"Remove": "❌",
"Tweak": "⚒️"
}
TYPES_TO_EMOJI = {"Fix": "🐛", "Add": "🆕", "Remove": "❌", "Tweak": "⚒️"}

ChangelogEntry = dict[str, Any]


def main():
if not DISCORD_WEBHOOK_URL:
print("No discord webhook URL found, skipping discord send")
return

session = requests.Session()
session.headers["Authorization"] = f"Bearer {GITHUB_TOKEN}"
session.headers["Accept"] = "Accept: application/vnd.github+json"
session.headers["X-GitHub-Api-Version"] = "2022-11-28"
if DEBUG:
# to debug this script locally, you can use
# a separate local file as the old changelog
last_changelog_stream = DEBUG_CHANGELOG_FILE_OLD.read_text()
else:
# when running this normally in a GitHub actions workflow,
# it will get the old changelog from the GitHub API
last_changelog_stream = get_last_changelog()

most_recent = get_most_recent_workflow(session)
last_sha = most_recent['head_commit']['id']
print(f"Last successful publish job was {most_recent['id']}: {last_sha}")
last_changelog = yaml.safe_load(get_last_changelog(session, last_sha))
last_changelog = yaml.safe_load(last_changelog_stream)
with open(CHANGELOG_FILE, "r") as f:
cur_changelog = yaml.safe_load(f)

diff = diff_changelog(last_changelog, cur_changelog)
send_to_discord(diff)
message_lines = changelog_entries_to_message_lines(diff)
send_message_lines(message_lines)


def get_most_recent_workflow(sess: requests.Session) -> Any:
workflow_run = get_current_run(sess)
def get_most_recent_workflow(
sess: requests.Session, github_repository: str, github_run: str
) -> Any:
workflow_run = get_current_run(sess, github_repository, github_run)
past_runs = get_past_runs(sess, workflow_run)
for run in past_runs['workflow_runs']:
for run in past_runs["workflow_runs"]:
# First past successful run that isn't our current run.
if run["id"] == workflow_run["id"]:
continue

return run


def get_current_run(sess: requests.Session) -> Any:
resp = sess.get(f"{GITHUB_API_URL}/repos/{GITHUB_REPOSITORY}/actions/runs/{GITHUB_RUN}")
def get_current_run(
sess: requests.Session, github_repository: str, github_run: str
) -> Any:
resp = sess.get(
f"{GITHUB_API_URL}/repos/{github_repository}/actions/runs/{github_run}"
)
resp.raise_for_status()
return resp.json()

Expand All @@ -73,32 +79,55 @@ def get_past_runs(sess: requests.Session, current_run: Any) -> Any:
"""
Get all successful workflow runs before our current one.
"""
params = {
"status": "success",
"created": f"<={current_run['created_at']}"
}
params = {"status": "success", "created": f"<={current_run['created_at']}"}
resp = sess.get(f"{current_run['workflow_url']}/runs", params=params)
resp.raise_for_status()
return resp.json()


def get_last_changelog(sess: requests.Session, sha: str) -> str:
def get_last_changelog() -> str:
github_repository = os.environ["GITHUB_REPOSITORY"]
github_run = os.environ["GITHUB_RUN_ID"]
github_token = os.environ["GITHUB_TOKEN"]

session = requests.Session()
session.headers["Authorization"] = f"Bearer {github_token}"
session.headers["Accept"] = "Accept: application/vnd.github+json"
session.headers["X-GitHub-Api-Version"] = "2022-11-28"

most_recent = get_most_recent_workflow(session, github_repository, github_run)
last_sha = most_recent["head_commit"]["id"]
print(f"Last successful publish job was {most_recent['id']}: {last_sha}")
last_changelog_stream = get_last_changelog_by_sha(
session, last_sha, github_repository
)

return last_changelog_stream


def get_last_changelog_by_sha(
sess: requests.Session, sha: str, github_repository: str
) -> str:
"""
Use GitHub API to get the previous version of the changelog YAML (Actions builds are fetched with a shallow clone)
"""
params = {
"ref": sha,
}
headers = {
"Accept": "application/vnd.github.raw"
}
headers = {"Accept": "application/vnd.github.raw"}

resp = sess.get(f"{GITHUB_API_URL}/repos/{GITHUB_REPOSITORY}/contents/{CHANGELOG_FILE}", headers=headers, params=params)
resp = sess.get(
f"{GITHUB_API_URL}/repos/{github_repository}/contents/{CHANGELOG_FILE}",
headers=headers,
params=params,
)
resp.raise_for_status()
return resp.text


def diff_changelog(old: dict[str, Any], cur: dict[str, Any]) -> Iterable[ChangelogEntry]:
def diff_changelog(
old: dict[str, Any], cur: dict[str, Any]
) -> Iterable[ChangelogEntry]:
"""
Find all new entries not present in the previous publish.
"""
Expand All @@ -108,69 +137,73 @@ def diff_changelog(old: dict[str, Any], cur: dict[str, Any]) -> Iterable[Changel

def get_discord_body(content: str):
return {
"content": content,
# Do not allow any mentions.
"allowed_mentions": {
"parse": []
},
# SUPPRESS_EMBEDS
"flags": 1 << 2
}
"content": content,
# Do not allow any mentions.
"allowed_mentions": {"parse": []},
# SUPPRESS_EMBEDS
"flags": 1 << 2,
}


def send_discord(content: str):
def send_discord_webhook(lines: list[str]):
content = "".join(lines)
body = get_discord_body(content)

response = requests.post(DISCORD_WEBHOOK_URL, json=body)
response.raise_for_status()


def send_to_discord(entries: Iterable[ChangelogEntry]) -> None:
if not DISCORD_WEBHOOK_URL:
print(f"No discord webhook URL found, skipping discord send")
return
def changelog_entries_to_message_lines(entries: Iterable[ChangelogEntry]) -> list[str]:
"""Process structured changelog entries into a list of lines making up a formatted message."""
message_lines = []

message_content = io.StringIO()
# We need to manually split messages to avoid discord's character limit
# With that being said this isn't entirely robust
# e.g. a sufficiently large CL breaks it, but that's a future problem

for name, group in itertools.groupby(entries, lambda x: x["author"]):
# Need to split text to avoid discord character limit
group_content = io.StringIO()
group_content.write(f"**{name}** updated:\n")
for contributor_name, group in itertools.groupby(entries, lambda x: x["author"]):
message_lines.append(f"**{contributor_name}** updated:\n")

for entry in group:
url = entry.get("url")
if url and not url.strip():
url = None

for change in entry["changes"]:
emoji = TYPES_TO_EMOJI.get(change['type'], "❓")
message = change['message']
url = entry.get("url")
if url and url.strip():
group_content.write(f"{emoji} - {message} [PR]({url}) \n")
emoji = TYPES_TO_EMOJI.get(change["type"], "❓")
message = change["message"]

# if a single line is longer than the limit, it needs to be truncated
if len(message) > DISCORD_SPLIT_LIMIT:
message = message[: DISCORD_SPLIT_LIMIT - 100] + " [...]"

if url is not None:
line = f"{emoji} - {message} [PR]({url}) \n"
else:
group_content.write(f"{emoji} - {message}\n")
line = f"{emoji} - {message}\n"

message_lines.append(line)

return message_lines


def send_message_lines(message_lines: list[str]):
"""Join a list of message lines into chunks that are each below Discord's message length limit, and send them."""
chunk_lines = []
chunk_length = 0

for line in message_lines:
line_length = len(line)
new_chunk_length = chunk_length + line_length

group_text = group_content.getvalue()
message_text = message_content.getvalue()
message_length = len(message_text)
group_length = len(group_text)
if new_chunk_length > DISCORD_SPLIT_LIMIT:
send_discord_webhook(chunk_lines)

# If adding the text would bring it over the group limit then send the message and start a new one
if message_length + group_length >= DISCORD_SPLIT_LIMIT:
print("Split changelog and sending to discord")
send_discord(message_text)
new_chunk_length = line_length
chunk_lines.clear()

# Reset the message
message_content = io.StringIO()
chunk_lines.append(line)
chunk_length = new_chunk_length

# Flush the group to the message
message_content.write(group_text)

# Clean up anything remaining
message_text = message_content.getvalue()
if len(message_text) > 0:
print("Sending final changelog to discord")
send_discord(message_text)
if chunk_lines:
send_discord_webhook(chunk_lines)


main()
if __name__ == "__main__":
main()

0 comments on commit 1ba6f38

Please sign in to comment.