Skip to content

Commit

Permalink
WIP migrating to ghapi for continuous API support
Browse files Browse the repository at this point in the history
  • Loading branch information
primetheus committed Aug 22, 2023
1 parent 713b364 commit b53e012
Show file tree
Hide file tree
Showing 2 changed files with 185 additions and 84 deletions.
1 change: 1 addition & 0 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ tox-gh-actions = "*"

[packages]
"github3.py" = "*"
ghapi = "*"
flask = "*"
pyyaml = "*"
ldap3 = "*"
Expand Down
268 changes: 184 additions & 84 deletions githubapp/core.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,64 @@
"""
Flask extension for rapid GitHub app development
"""
import os.path
import hmac
import time
import logging
import distutils

from flask import abort, current_app, jsonify, request, _app_ctx_stack
from github3 import GitHub, GitHubEnterprise
from flask import abort, current_app, jsonify, make_response, request, _app_ctx_stack
from ghapi.all import GhApi
import requests
import jwt

LOG = logging.getLogger(__name__)

STATUS_FUNC_CALLED = "HIT"
STATUS_NO_FUNC_CALLED = "MISS"


class GitHubApp(object):
class GitHubAppError(Exception):
pass


class GitHubAppValidationError(Exception):
pass


class GitHubAppBadCredentials(Exception):
pass


class GithubUnauthorized(Exception):
pass


class GithubAppUnkownObject(Exception):
pass


class InstallationAuthorization:
"""
This class represents InstallationAuthorizations
"""
The GitHubApp object provides the central interface for interacting GitHub hooks

def __init__(self, token, expires_at):
self.token = token
self.expires_at = expires_at

def token(self):
return self._token

def expires_at(self):
return self._expires_at

def expired(self):
if self.expires_at:
return time.time() > self.expires_at
return False


class GitHubApp(object):
"""The GitHubApp object provides the central interface for interacting GitHub hooks
and creating GitHub app clients.
GitHubApp object allows using the "on" decorator to make GitHub hooks to functions
Expand All @@ -29,24 +70,12 @@ class GitHubApp(object):

def __init__(self, app=None):
self._hook_mappings = {}
self._access_token = None
if app is not None:
self.init_app(app)

@staticmethod
def load_env(app):
app.config["GITHUBAPP_ID"] = int(os.environ["APP_ID"])
app.config["GITHUBAPP_SECRET"] = os.environ["WEBHOOK_SECRET"]
if "GHE_HOST" in os.environ:
app.config["GITHUBAPP_URL"] = "https://{}".format(os.environ["GHE_HOST"])
app.config["VERIFY_SSL"] = bool(
distutils.util.strtobool(os.environ.get("VERIFY_SSL", "false"))
)
with open(os.environ["PRIVATE_KEY_PATH"], "rb") as key_file:
app.config["GITHUBAPP_KEY"] = key_file.read()

def init_app(self, app):
"""
Initializes GitHubApp app by setting configuration variables.
"""Initializes GitHubApp app by setting configuration variables.
The GitHubApp instance is given the following configuration variables by calling on Flask's configuration:
Expand All @@ -62,7 +91,8 @@ def init_app(self, app):
`GITHUBAPP_SECRET`:
Secret used to secure webhooks as bytes or utf-8 encoded string (required).
Secret used to secure webhooks as bytes or utf-8 encoded string (required). set to `False` to disable
verification (not recommended for production).
Default: None
`GITHUBAPP_URL`:
Expand All @@ -75,14 +105,19 @@ def init_app(self, app):
Path used for GitHub hook requests as a string.
Default: '/'
"""
self.load_env(app)
required_settings = ["GITHUBAPP_ID", "GITHUBAPP_KEY", "GITHUBAPP_SECRET"]
for setting in required_settings:
if not app.config.get(setting):
if not setting in app.config:
raise RuntimeError(
"Flask-GitHubApp requires the '%s' config var to be set" % setting
"Flask-GitHubApplication requires the '%s' config var to be set"
% setting
)

if app.config.get("GITHUBAPP_URL"):
self.base_url = app.config.get("GITHUBAPP_URL")
else:
self.base_url = "https://api.github.com"

app.add_url_rule(
app.config.get("GITHUBAPP_ROUTE", "/"),
view_func=self._flask_view_func,
Expand Down Expand Up @@ -111,16 +146,6 @@ def secret(self):
def _api_url(self):
return current_app.config["GITHUBAPP_URL"]

@property
def client(self):
"""Unauthenticated GitHub client"""
if current_app.config.get("GITHUBAPP_URL"):
return GitHubEnterprise(
current_app.config["GITHUBAPP_URL"],
verify=current_app.config["VERIFY_SSL"],
)
return GitHub()

@property
def payload(self):
"""GitHub hook payload"""
Expand All @@ -132,58 +157,97 @@ def payload(self):
)

@property
def installation_client(self):
def installation_token(self):
return self._access_token

def client(self, installation_id=None):
"""GitHub client authenticated as GitHub app installation"""
ctx = _app_ctx_stack.top
if ctx is not None:
if not hasattr(ctx, "githubapp_installation"):
client = self.client
client.login_as_app_installation(
self.key, self.id, self.payload["installation"]["id"]
)
ctx.githubapp_installation = client
if installation_id is None:
installation_id = self.payload["installation"]["id"]
self._access_token = self.get_access_token(installation_id).token
ctx.githubapp_installation = GhApi(token=self._access_token)
return ctx.githubapp_installation

@property
def app_client(self):
"""GitHub client authenticated as GitHub app"""
ctx = _app_ctx_stack.top
if ctx is not None:
if not hasattr(ctx, "githubapp_app"):
client = self.client
client.login_as_app(self.key, self.id)
ctx.githubapp_app = client
return ctx.githubapp_app

@property
def installation_token(self):
def _create_jwt(self, expiration=60):
"""
Creates a signed JWT, valid for 60 seconds by default.
The expiration can be extended beyond this, to a maximum of 600 seconds.
:param expiration: int
:return string:
"""
now = int(time.time())
payload = {"iat": now, "exp": now + expiration, "iss": self.id}
encrypted = jwt.encode(payload, key=self.key, algorithm="RS256")

:return:
if isinstance(encrypted, bytes):
encrypted = encrypted.decode("utf-8")
return encrypted

def get_access_token(self, installation_id, user_id=None):
"""
Get an access token for the given installation id.
POSTs https://api.github.com/app/installations/<installation_id>/access_tokens
:param user_id: int
:param installation_id: int
:return: :class:`github.InstallationAuthorization.InstallationAuthorization`
"""
return self.installation_client.session.auth.token
body = {}
if user_id:
body = {"user_id": user_id}
response = requests.post(
f"{self.base_url}/app/installations/{installation_id}/access_tokens",
headers={
"Authorization": f"Bearer {self._create_jwt()}",
"Accept": "application/vnd.github.v3+json",
"User-Agent": "Flask-GithubApplication/Python",
},
json=body,
)
if response.status_code == 201:
return InstallationAuthorization(
token=response.json()["token"], expires_at=response.json()["expires_at"]
)
elif response.status_code == 403:
raise GitHubAppBadCredentials(
status=response.status_code, data=response.text
)
elif response.status_code == 404:
raise GithubAppUnkownObject(status=response.status_code, data=response.text)
raise Exception(status=response.status_code, data=response.text)

def app_installation(self, installation_id=None):
def list_installations(self, per_page=30, page=1):
"""
Login as installation when triggered on a non-webhook event.
This is necessary for scheduling tasks
:param installation_id:
:return:
GETs https://api.github.com/app/installations
:return: :obj: `list` of installations
"""
"""GitHub client authenticated as GitHub app installation"""
ctx = _app_ctx_stack.top
if installation_id is None:
raise RuntimeError("Installation ID is not specified.")
if ctx is not None:
if not hasattr(ctx, "githubapp_installation"):
client = self.client
client.login_as_app_installation(self.key, self.id, installation_id)
ctx.githubapp_installation = client
return ctx.githubapp_installation
params = {"page": page, "per_page": per_page}

response = requests.get(
f"{self.base_url}/app/installations",
headers={
"Authorization": f"Bearer {self._create_jwt()}",
"Accept": "application/vnd.github.v3+json",
"User-Agent": "Flask-GithubApplication/python",
},
params=params,
)
if response.status_code == 200:
return response.json()
elif response.status_code == 401:
raise GithubUnauthorized(status=response.status_code, data=response.text)
elif response.status_code == 403:
raise GitHubAppBadCredentials(
status=response.status_code, data=response.text
)
elif response.status_code == 404:
raise GithubAppUnkownObject(status=response.status_code, data=response.text)
raise Exception(status=response.status_code, data=response.text)

def on(self, event_action):
"""
Decorator routes a GitHub hook to the wrapped function.
"""Decorator routes a GitHub hook to the wrapped function.
Functions decorated as a hook recipient are registered as the function for the given GitHub event.
Expand All @@ -192,7 +256,7 @@ def cruel_closer():
owner = github_app.payload['repository']['owner']['login']
repo = github_app.payload['repository']['name']
num = github_app.payload['issue']['id']
issue = github_app.installation_client.issue(owner, repo, num)
issue = github_app.client.issue(owner, repo, num)
issue.create_comment('Could not replicate.')
issue.close()
Expand All @@ -213,14 +277,41 @@ def decorator(f):

return decorator

def _validate_request(self):
if not request.is_json:
raise GitHubAppValidationError(
"Invalid HTTP Content-Type header for JSON body "
"(must be application/json or application/*+json)."
)
try:
request.json
except BadRequest:
raise GitHubAppValidationError("Invalid HTTP body (must be JSON).")

event = request.headers.get("X-GitHub-Event")

if event is None:
raise GitHubAppValidationError("Missing X-GitHub-Event HTTP header.")

action = request.json.get("action")

return event, action

def _flask_view_func(self):
functions_to_call = []
calls = {}

event = request.headers["X-GitHub-Event"]
action = request.json.get("action")
try:
event, action = self._validate_request()
except GitHubAppValidationError as e:
LOG.error(e)
error_response = make_response(
jsonify(status="ERROR", description=str(e)), 400
)
return abort(error_response)

self._verify_webhook()
if current_app.config["GITHUBAPP_SECRET"] is not False:
self._verify_webhook()

if event in self._hook_mappings:
functions_to_call += self._hook_mappings[event]
Expand All @@ -239,15 +330,24 @@ def _flask_view_func(self):
return jsonify({"status": status, "calls": calls})

def _verify_webhook(self):
hub_signature = "X-HUB-SIGNATURE"
if hub_signature not in request.headers:
LOG.warning("Github Hook Signature not found.")
abort(400)

signature = request.headers[hub_signature].split("=")[1]
signature_header = "X-Hub-Signature-256"
signature_header_legacy = "X-Hub-Signature"

if request.headers.get(signature_header):
signature = request.headers[signature_header].split("=")[1]
digestmod = "sha256"
elif request.headers.get(signature_header_legacy):
signature = request.headers[signature_header_legacy].split("=")[1]
digestmod = "sha1"
else:
LOG.warning(
"Signature header missing. Configure your GitHub App with a secret or set GITHUBAPP_SECRET"
"to False to disable verification."
)
return abort(400)

mac = hmac.new(self.secret, msg=request.data, digestmod="sha1")
mac = hmac.new(self.secret, msg=request.data, digestmod=digestmod)

if not hmac.compare_digest(mac.hexdigest(), signature):
LOG.warning("GitHub hook signature verification failed.")
abort(400)
return abort(400)

0 comments on commit b53e012

Please sign in to comment.