Skip to content

Commit

Permalink
BOS-4363 fix credentials (#229)
Browse files Browse the repository at this point in the history
* [BOS-4364] Add credentials to Event Store requests

* [BOS-4364] Raise exception if Client.publish() is not authenticated

* black

* a couple of makefile tweaks

* progress on getting credentials in, some minor test refactors

* fix write tests, lint

* found out why a thing

* fix travis docker run

* standardise on 'credential' not 'credentials'

* get rid of some oldfashioned dunderinit things

* remove unnecessary test, restore old two-streams-two-events thingie

* actually remove unnecessary test

* move a sanity-check assert around

* tidyup

* Linting 🧹

* add a test that unauthenticated access still works

* Add blank lines

Co-authored-by: Shaunick Mistry <[email protected]>
  • Loading branch information
hjwp and shaunickmistry authored May 5, 2020
1 parent 628784f commit 50a0bf5
Show file tree
Hide file tree
Showing 11 changed files with 178 additions and 119 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ matrix:
services:
- docker
before_install:
- docker run -d -p 127.0.0.1:1113:1113 eventstore/eventstore
- make eventstore_docker
install:
- pip install tox black
script:
Expand Down
18 changes: 17 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
BLACK_EXCLUSION=photonpump/__init__.py|photonpump/_version.py|versioneer.py|.tox|.venv
SHELL = /bin/bash
default: fast_tests
travis: check_lint tox

Expand All @@ -24,5 +25,20 @@ check_lint:
continous_test:
PYASYNCIODEBUG=1 ptw

cleanup:
- docker rm -f eventstore_local
- docker rm -f eventstore_local_noauth

eventstore_docker:
docker run -d -p 2113:2113 -p 1113:1113 eventstore/eventstore
docker run -d --name eventstore_local -p 2113:2113 -p 1113:1113 eventstore/eventstore
docker run -d --name eventstore_local_noauth -p 22113:2113 -p 11113:1113 eventstore/eventstore
for i in {1..10}; do curl -f -i "http://127.0.0.1:2113/users" --user admin:changeit && break || sleep 1; done
for i in {1..10}; do curl -f -i "http://127.0.0.1:22113/users" --user admin:changeit && break || sleep 1; done
curl -f -i "http://127.0.0.1:2113/streams/%24settings" \
--user admin:changeit \
-H "Content-Type: application/vnd.eventstore.events+json" \
-d @default-acl.json
curl -f -i "http://127.0.0.1:2113/users" \
--user admin:changeit \
-H "Content-Type: application/json" \
-d @test-user.json
20 changes: 20 additions & 0 deletions default-acl.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
[{
"eventId": "7c314750-05e1-439f-b2eb-f5b0e019be72",
"eventType": "update-default-acl",
"data": {
"$userStreamAcl" : {
"$r" : ["$admin", "$ops", "test-user"],
"$w" : ["$admin", "$ops", "test-user"],
"$d" : ["$admin", "$ops"],
"$mr" : ["$admin", "$ops"],
"$mw" : ["$admin", "$ops"]
},
"$systemStreamAcl" : {
"$r" : "$admins",
"$w" : "$admins",
"$d" : "$admins",
"$mr" : "$admins",
"$mw" : "$admins"
}
}
}]
17 changes: 10 additions & 7 deletions photonpump/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,7 @@ async def publish_event(
[event],
expected_version=expected_version,
require_master=require_master,
credential=self.credential,
)
result = await self.dispatcher.start_conversation(conversation)

Expand All @@ -679,9 +680,9 @@ async def publish(
events,
expected_version=expected_version,
require_master=require_master,
credential=self.credential,
)
result = await self.dispatcher.start_conversation(cmd)

return await result

async def get_event(
Expand Down Expand Up @@ -722,6 +723,7 @@ async def get_event(
resolve_links,
require_master,
conversation_id=correlation_id,
credential=self.credential,
)

result = await self.dispatcher.start_conversation(cmd)
Expand Down Expand Up @@ -785,6 +787,7 @@ async def get(
resolve_links,
require_master,
direction=direction,
credential=self.credential,
)
result = await self.dispatcher.start_conversation(cmd)

Expand Down Expand Up @@ -838,7 +841,7 @@ async def get_all(
resolve_links,
require_master,
direction=direction,
credentials=self.credential,
credential=self.credential,
)
result = await self.dispatcher.start_conversation(cmd)

Expand Down Expand Up @@ -899,7 +902,7 @@ async def iter(
batch_size,
resolve_links,
direction=direction,
credentials=self.credential,
credential=self.credential,
)
result = await self.dispatcher.start_conversation(cmd)
iterator = await result
Expand Down Expand Up @@ -986,7 +989,7 @@ async def create_subscription(
checkpoint_max_count: int = 1000,
checkpoint_min_count: int = 10,
subscriber_max_count: int = 10,
credentials: msg.Credential = None,
credential: msg.Credential = None,
conversation_id: uuid.UUID = None,
consumer_strategy: str = msg.ROUND_ROBIN,
):
Expand All @@ -1006,7 +1009,7 @@ async def create_subscription(
checkpoint_max_count=checkpoint_max_count,
checkpoint_min_count=checkpoint_min_count,
subscriber_max_count=subscriber_max_count,
credentials=credentials or self.credential,
credential=credential or self.credential,
conversation_id=conversation_id,
consumer_strategy=consumer_strategy,
)
Expand All @@ -1024,7 +1027,7 @@ async def connect_subscription(
cmd = convo.ConnectPersistentSubscription(
subscription,
stream,
credentials=self.credential,
credential=self.credential,
conversation_id=conversation_id,
)
future = await self.dispatcher.start_conversation(cmd)
Expand Down Expand Up @@ -1079,7 +1082,7 @@ async def subscribe_to(

if start_from == -1:
cmd: convo.Conversation = convo.SubscribeToStream(
stream, resolve_link_tos, credentials=self.credential
stream, resolve_link_tos, credential=self.credential
)
else:
cmd = convo.CatchupSubscription(
Expand Down
46 changes: 26 additions & 20 deletions photonpump/conversations.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@
except ImportError:
from asyncio.futures import InvalidStateError
from enum import IntEnum
from typing import NamedTuple, Optional, Sequence, Union
from typing import Optional, Sequence, Union
from uuid import UUID, uuid4

from photonpump import exceptions
from photonpump import messages as messages
from photonpump import messages
from photonpump import messages_pb2 as proto
from photonpump.messages import (
AllStreamSlice,
Expand Down Expand Up @@ -315,6 +315,12 @@ async def reply(self, message: InboundMessage, output: Queue) -> None:
self.expect_only(message, TcpCommand.WriteEventsCompleted)
result = proto.WriteEventsCompleted()
result.ParseFromString(message.payload)
if result.result == proto.AccessDenied:
await self.error(
exceptions.AccessDenied(
self.conversation_id, type(self).__name__, result.message
)
)
try:
self.result.set_result(result)
self.is_complete = True
Expand Down Expand Up @@ -451,10 +457,10 @@ def __init__(
resolve_links: bool = True,
require_master: bool = False,
conversation_id: Optional[UUID] = None,
credentials=None,
credential=None,
) -> None:

Conversation.__init__(self, conversation_id, credential=credentials)
super().__init__(conversation_id, credential=credential)
self.stream = stream
self.event_number = event_number
self.require_master = require_master
Expand Down Expand Up @@ -550,11 +556,11 @@ def __init__(
resolve_links: bool = True,
require_master: bool = False,
direction: StreamDirection = StreamDirection.Forward,
credentials=None,
credential=None,
conversation_id: UUID = None,
) -> None:

Conversation.__init__(self, conversation_id, credential=credentials)
super().__init__(conversation_id, credential=credential)
self.has_first_page = False
self.direction = direction
self.from_position = from_position
Expand Down Expand Up @@ -604,11 +610,11 @@ def __init__(
resolve_links: bool = True,
require_master: bool = False,
direction: StreamDirection = StreamDirection.Forward,
credentials=None,
credential=None,
conversation_id: UUID = None,
) -> None:

Conversation.__init__(self, conversation_id, credential=credentials)
super().__init__(conversation_id, credential=credential)
self.has_first_page = False
self.stream = stream
self.direction = direction
Expand Down Expand Up @@ -673,11 +679,11 @@ def __init__(
resolve_links: bool = True,
require_master: bool = False,
direction: StreamDirection = StreamDirection.Forward,
credentials=None,
credential=None,
conversation_id: UUID = None,
):

Conversation.__init__(self, conversation_id, credentials)
super().__init__(conversation_id, credential)
self.batch_size = batch_size
self.has_first_page = False
self.resolve_link_tos = resolve_links
Expand Down Expand Up @@ -754,11 +760,11 @@ def __init__(
resolve_links: bool = True,
require_master: bool = False,
direction: StreamDirection = StreamDirection.Forward,
credentials=None,
credential=None,
conversation_id: UUID = None,
):

Conversation.__init__(self, conversation_id, credentials)
super().__init__(conversation_id, credential)
self.batch_size = batch_size
self.has_first_page = False
self.stream = stream
Expand Down Expand Up @@ -874,11 +880,11 @@ def __init__(
checkpoint_max_count=1024,
checkpoint_min_count=10,
subscriber_max_count=10,
credentials=None,
credential=None,
conversation_id=None,
consumer_strategy=messages.ROUND_ROBIN,
) -> None:
super().__init__(conversation_id, credentials)
super().__init__(conversation_id, credential)
self.stream = stream
self.name = name
self.resolve_links = resolve_links
Expand Down Expand Up @@ -965,11 +971,11 @@ def __init__(
name,
stream,
max_in_flight=10,
credentials=None,
credential=None,
conversation_id=None,
auto_ack=False,
) -> None:
super().__init__(conversation_id, credentials)
super().__init__(conversation_id, credential)
self.stream = stream
self.max_in_flight = max_in_flight
self.name = name
Expand Down Expand Up @@ -1067,12 +1073,12 @@ async def reply(self, message: InboundMessage, output: Queue) -> None:

class SubscribeToStream(Conversation):
def __init__(
self, stream, resolve_link_tos=True, conversation_id=None, credentials=None
self, stream, resolve_link_tos=True, conversation_id=None, credential=None
):
self.stream = stream
self.resolve_link_tos = resolve_link_tos
self.is_live = False
super().__init__(conversation_id, credentials)
super().__init__(conversation_id, credential)

async def start(self, output: Queue) -> None:
msg = proto.SubscribeToStream()
Expand Down Expand Up @@ -1327,7 +1333,7 @@ def __init__(
self.subscribe_from = -1
self.next_event_number = self.from_event
self.last_event_number = -1
Conversation.__init__(self, conversation_id, credential)
super().__init__(conversation_id, credential)

async def start(self, output):
if self.phase > CatchupSubscriptionPhase.READ_HISTORICAL:
Expand Down Expand Up @@ -1441,7 +1447,7 @@ def __init__(
self.buffer = []
self.next_position = self.from_position
self.last_position = Position.min
Conversation.__init__(self, conversation_id, credential)
super().__init__(conversation_id, credential)

async def _yield_events(self, events):
for event in events:
Expand Down
3 changes: 3 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,6 @@ versionfile_source = photonpump/_version.py
versionfile_build = photonpump/_version.py
tag_prefix = v
parentdir_prefix =

[mypy]
ignore_missing_imports = True
6 changes: 6 additions & 0 deletions test-user.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"LoginName": "test-user",
"FullName": "Test User",
"Groups": [],
"Password": "test-password"
}
4 changes: 2 additions & 2 deletions test/conversations/test_authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ async def test_authenticated_request():
output = TeeQueue()

conversation_id = uuid4()
credentials = Credential("username", "password")
convo = Ping(conversation_id, credentials)
credential = Credential("username", "password")
convo = Ping(conversation_id, credential)

await convo.start(output)

Expand Down
9 changes: 5 additions & 4 deletions test/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@


async def given_a_stream_with_three_events(c, stream_name):
await c.publish(
result = await c.publish(
stream_name,
[
messages.NewEvent(
Expand All @@ -19,11 +19,12 @@ async def given_a_stream_with_three_events(c, stream_name):
),
],
)
assert "denied" not in str(result).lower() # this should now never happen


async def given_two_streams_with_two_events(c, id):
async def given_two_streams_with_two_events(c, unique_id):
await c.publish(
"stream_one_{}".format(id),
f"stream_one_{unique_id}",
[
messages.NewEvent(
"pony_splits",
Expand All @@ -36,7 +37,7 @@ async def given_two_streams_with_two_events(c, id):
],
)
await c.publish(
"stream_two_{}".format(id),
f"stream_two_{unique_id}",
[
messages.NewEvent(
"pony_splits",
Expand Down
Loading

0 comments on commit 50a0bf5

Please sign in to comment.