Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add feature for waiting on subscribe request #53

Open
wants to merge 4 commits into
base: master
Choose a base branch
from

Conversation

jonasoreland
Copy link

No description provided.

Comment on lines 78 to +104
async def __socket_subscribe(
self,
subscription_string,
callback: Callable[[str, dict], Any]
callback: Callable[[str, dict], Any],
wait_for_reply_timeout_seconds
):
if self._subscribe_event is None:
self._subscribe_event = asyncio.Event()

self._subscriptions[subscription_string] = {
'callback': callback
}

self._subscribe_event.clear()
await self.__send({
'channel': '/meta/subscribe',
'clientId': self._client_id,
'subscription': subscription_string
})

# Wait for subscription ack message.
if wait_for_reply_timeout_seconds is not None:
try:
await asyncio.wait_for(self._subscribe_event.wait(), timeout=wait_for_reply_timeout_seconds)
except asyncio.TimeoutError:
logger.warning('timeout waiting for subscription reply!')

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of complicating __socket_subscribe what do you think about this solution?

async def __socket_subscribe(
    self,
    subscription_string,
    callback: Callable[[str, dict], Any],
):
    self._subscriptions[subscription_string] = {
        'callback': callback
    }

    await self.__send({
        'channel': '/meta/subscribe',
        'clientId': self._client_id,
        'subscription': subscription_string
    })

# This function calls __socket_subscribe and adds the timeout logic on top
async def __socket_subscribe_with_timeout(
    self,
    subscription_string,
    callback: Callable[[str, dict], Any],
    wait_for_reply_timeout_seconds: int # Now this parameter is required
):
    if self._subscribe_event is None:
        self._subscribe_event = asyncio.Event()
    else:
        self._subscribe_event.clear()

    await self.__socket_subscribe(
        subscription_string,
        callback
    )

    try:
        await asyncio.wait_for(self._subscribe_event.wait(), timeout=wait_for_reply_timeout_seconds)
    except asyncio.TimeoutError:
        logger.warning('timeout waiting for subscription reply!')

And then in subscribe_to_id(s):

async def subscribe_to_id(
    self,
    channel: ChannelType,
    id: str,
    callback: Callable[[str, dict], Any],
    wait_for_reply_timeout_seconds: Union[None, int] = None,
):
    return await self.subscribe_to_ids(channel, [id], callback, wait_for_reply_timeout_seconds)

async def subscribe_to_ids(
    self,
    channel: ChannelType,
    ids: Sequence[str],
    callback: Callable[[str, dict], Any],
    wait_for_reply_timeout_seconds: Union[None, int] = None
):
    valid_channels_for_multiple_ids = [
        ChannelType.ORDERS,
        ChannelType.DEALS,
        ChannelType.POSITIONS
    ]

    if (
        len(ids) > 1 and
        channel not in valid_channels_for_multiple_ids
    ):
        raise ValueError(
            f'Multiple ids is not supported for channels other than {valid_channels_for_multiple_ids}')

    subscription_string = f'/{channel.value}/{",".join(ids)}'

    if type(wait_for_reply_timeout_seconds) is int:
        await self.__socket_subscribe_with_timeout(subscription_string, callback, wait_for_reply_timeout_seconds)
    else:
        await self.__socket_subscribe(subscription_string, callback)

And then __register_subscription would look like this:

async def __register_subscription(self, message):
    subscription = message.get('subscription')
    if subscription is None:
        raise ValueError(
            'No subscription channel found on subscription message')

    self._subscriptions[subscription]['client_id'] = self._client_id

    if type(self._subscribe_event) is asyncio.Event:
        self._subscribe_event.set()

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi,

sorry for super late reply!
i've been using the version I proposed w/o problem, and did not watch closely.
But today I needed to update to new version of avanzi-api...and found your comment too.

I think you proposal look great. Shall I create a new version based on that ?
Or will/can you add it ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants