Skip to content

Commit

Permalink
Add bypass of herokuapp access restriction (#85)
Browse files Browse the repository at this point in the history
  • Loading branch information
zupo committed Nov 8, 2024
1 parent 1ba9e62 commit 0d56bb1
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 3 deletions.
5 changes: 4 additions & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ Usage example for tweens::

The ``pyramid_heroku.herokuapp_access`` tween depends on
``pyramid_heroku.client_addr`` tween and it requires you to list allowlisted IPs
in the ``pyramid_heroku.herokuapp_allowlist`` setting.
in the ``pyramid_heroku.herokuapp_allowlist`` setting. A bypass is possible
by setting the `HEROKUAPP_ACCESS_BYPASS` environment variable to a secret value
and then sending a request with the `HEROKUAPP_ACCESS_BYPASS` header set to the
same secret value.

The ``pyramid_heroku.client_addr`` tween sets request.client_addr to an IP we
can trust. It handles IP spoofing via ``X-Forwarded-For`` headers and
Expand Down
25 changes: 24 additions & 1 deletion pyramid_heroku/herokuapp_access.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,14 @@
from pyramid.response import Response

import logging
import os


def includeme(config):
config.add_tween("pyramid_heroku.herokuapp_access.HerokuappAccess")
config.add_tween(
"pyramid_heroku.herokuapp_access.HerokuappAccess",
under="pyramid_heroku.client_addr.ClientAddr",
)


class HerokuappAccess(object):
Expand All @@ -20,6 +24,8 @@ class HerokuappAccess(object):
tween.
"""

import os

def __init__(self, handler, registry):
self.handler = handler
self.registry = registry
Expand All @@ -30,6 +36,23 @@ def __call__(self, request):
"pyramid_heroku.herokuapp_allowlist", ""
).split("\n")

if os.environ.get("HEROKUAPP_ACCESS_BYPASS"):
if request.headers.get("HEROKUAPP_ACCESS_BYPASS") == os.environ.get(
"HEROKUAPP_ACCESS_BYPASS"
):
if request.registry.settings.get("pyramid_heroku.structlog"):
import structlog

logger = structlog.getLogger(__name__)
logger.info(
"Herokuapp access bypassed", user_ip=request.client_addr
)
else:
logger = logging.getLogger(__name__)
logger.info(f"Herokuapp access bypassed by {request.client_addr}")

return self.handler(request)

if (
"herokuapp.com" in request.headers["Host"]
and request.client_addr not in allowlisted_ips
Expand Down
46 changes: 45 additions & 1 deletion pyramid_heroku/tests/test_herokuapp_access.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import logging
import mock
import os
import structlog
import unittest

Expand Down Expand Up @@ -66,7 +67,7 @@ def test_non_allowlisted_ip(self):
assert not self.handler.called, "handler should not be called"
self.assertEqual(len(tweens_handler.records), 1)
self.assertEqual(
"Denied Herokuapp access for Host foo.herokuapp.com and IP 6.6.6.6", # noqa
"Denied Herokuapp access for Host foo.herokuapp.com and IP 6.6.6.6",
tweens_handler.records[0].msg,
)
self.assertEqual(response.status_code, 403)
Expand Down Expand Up @@ -101,3 +102,46 @@ def test_herokuapp_allowlist_empty(self):

HerokuappAccess(self.handler, self.request.registry)(self.request)
assert not self.handler.called, "handler should not be called"

@mock.patch.dict(os.environ, {"HEROKUAPP_ACCESS_BYPASS": "foo"})
def test_herokuapp_access_bypass(self):
"The IP check can be bypassed by setting a correct header."
from pyramid_heroku.herokuapp_access import HerokuappAccess

self.request.client_addr = "6.6.6.6"
self.request.headers = {
"Host": "foo.herokuapp.com",
"HEROKUAPP_ACCESS_BYPASS": "foo",
}

# structlog version
HerokuappAccess(self.handler, self.request.registry)(self.request)
self.handler.assert_called_with(self.request)
self.assertEqual(len(tweens_handler.records), 1)
self.assertEqual("Herokuapp access bypassed", tweens_handler.records[0].msg)

# standard logging version
self.request.registry.settings["pyramid_heroku.structlog"] = False
tweens_handler.clear()
HerokuappAccess(self.handler, self.request.registry)(self.request)
self.handler.assert_called_with(self.request)
self.assertEqual(len(tweens_handler.records), 1)
self.assertEqual(
"Herokuapp access bypassed by 6.6.6.6",
tweens_handler.records[0].msg,
)

@mock.patch.dict(os.environ, {"HEROKUAPP_ACCESS_BYPASS": "foo"})
def test_herokuapp_access_bypass_invalid(self):
"Invalid bypass code is rejected."
from pyramid_heroku.herokuapp_access import HerokuappAccess

self.request.client_addr = "6.6.6.6"
self.request.headers = {
"Host": "foo.herokuapp.com",
"HEROKUAPP_ACCESS_BYPASS": "bar",
}
self.request.registry.settings = {}

HerokuappAccess(self.handler, self.request.registry)(self.request)
assert not self.handler.called, "handler should not be called"

0 comments on commit 0d56bb1

Please sign in to comment.