Skip to content

Commit

Permalink
MessageWriter Connection lost fix (#140)
Browse files Browse the repository at this point in the history
Fixed WriteEvents conversation to complete on result
  • Loading branch information
epoplavskis authored Apr 26, 2019
1 parent 8fb94cf commit ff0736c
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 6 deletions.
11 changes: 8 additions & 3 deletions photonpump/conversations.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import sys
import time
from asyncio import Future, Queue
from asyncio.base_futures import InvalidStateError
from enum import IntEnum
from typing import NamedTuple, Optional, Sequence, Union
from uuid import UUID, uuid4
Expand Down Expand Up @@ -127,7 +128,7 @@ async def respond_to(self, response: InboundMessage, output: Queue) -> None:

return await self.reply(response, output)
except Exception as exn:
self._logger.error("Failed to read server response", exc_info=True)
self._logger.exception("Failed to read server response", exc_info=True)
exc_info = sys.exc_info()

return await self.error(
Expand Down Expand Up @@ -310,8 +311,12 @@ async def reply(self, message: InboundMessage, output: Queue) -> None:
self.expect_only(message, TcpCommand.WriteEventsCompleted)
result = proto.WriteEventsCompleted()
result.ParseFromString(message.payload)

self.result.set_result(result)
try:
self.result.set_result(result)
self.is_complete = True
except InvalidStateError as exn:
logging.error(self.result, message, self, exc_info=True)
raise exn


class ReadAllEventsBehaviour:
Expand Down
43 changes: 40 additions & 3 deletions test/conversations/test_write_events_conversation.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from asyncio import Queue
import json
from uuid import uuid4

import pytest
import asyncio

from uuid import uuid4
from asyncio import Queue

import photonpump.messages as msg
import photonpump.messages_pb2 as proto
Expand Down Expand Up @@ -139,6 +140,42 @@ async def test_one_event_response():
assert result.first_event_number == 73
assert result.last_event_number == 73
assert result.result == msg.OperationResult.Success
assert conversation.is_complete


@pytest.mark.asyncio
async def test_completing_write_events_twice():

output = Queue()
conversation = given_a_write_events_message()

await conversation.start(output)

await output.get()
payload = proto.WriteEventsCompleted()
payload.result = msg.OperationResult.Success
payload.first_event_number = 73
payload.last_event_number = 73

await conversation.respond_to(
msg.InboundMessage(
conversation.conversation_id,
msg.TcpCommand.WriteEventsCompleted,
payload.SerializeToString(),
),
output,
)

with pytest.raises(asyncio.base_futures.InvalidStateError) as exn:

await conversation.respond_to(
msg.InboundMessage(
conversation.conversation_id,
msg.TcpCommand.WriteEventsCompleted,
payload.SerializeToString(),
),
output,
)


@pytest.mark.skip(reason="upcoming feature")
Expand Down

0 comments on commit ff0736c

Please sign in to comment.