Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Idea: Is it possible to wait for a given/ hardcoded custom line ending "end_of_field" vs chunk_size #59

Open
clownfishnemoGitHub opened this issue Nov 16, 2022 · 2 comments

Comments

@clownfishnemoGitHub
Copy link

clownfishnemoGitHub commented Nov 16, 2022

Hello a newbie question

Idea: Is it possible to wait for a given/ hardcoded custom line ending "end_of_field" vs chunk_size
for example this one
end_of_field = re.compile(r'\n\n\r\n\x27|\r\n\x27') # \n\n\r\n' or \r\n'

My Signify Hue Bridge does send varialbe lenght of event message data which end with the \n\n\r\n'
or with \r\n' incase of long event message data then the Hue Bridge does split up in arround 4096bytes chunks followed \r\n'
and the end of any event message data is always indicated by the \n\n\r\n'

maybe chunk_size behaviour can be set to None to skip completely or set to chunk_size a max safeguard value like 10000
or for now i set chunk_size to 9 \n\n\r\n'

@clownfishnemoGitHub
Copy link
Author

clownfishnemoGitHub commented Nov 16, 2022

#end_of_field = re.compile(r'\r\n\r\n|\r\r|\n\n')
#end_of_field = re.compile(str('\x5C''n''\x5C''n''\x5C''r''\x5C''n''\x27'))  # \n\n\r\n'
#end_of_field = re.compile(str('\x5C''r''\x5C''n'))  # \r\n
#end_of_field = re.compile(str('$''\x5C''r''\x5C''n'), re.MULTILINE)  # <MatchesEndOfString_$>\r\n
end_of_field = re.compile(str('$''\x5C''r''\x5C''n''\x27'), re.MULTILINE)  # <MatchesEndOfString_$>\r\n'

https://pynative.com/python-regex-flags/

I just edit sseclient.py which i found at the appdata folder Python\Python38\site-packages
did build and run my test code and seems working, i use chunk_size of 8192 and if i hardcoded set chunk_size to 1
as test at sseclient.py this is used and working therefore i know the change i made is used

https://docs.python.org/3/library/io.html#io.BufferedReader

https://stackoverflow.com/questions/57726771/what-the-difference-between-read-and-read1-in-python

if size is omitted or < 0, then the size of available buffer is used, So no read() call performed on the raw stream in this case.
Hmmm i think i should set chunk_size to -1, seems to work

b'6\r\n: hi\n\n\r\n' i assume the Hue Bridge does add b'6\r\n and add the single quote character
at the event message data ' but # <MatchesEndOfString_$>\r\n or <MatchesEndOfString_$>\r\n'
seems to make no difference, i like the last regex, for my particular usage

@clownfishnemoGitHub
Copy link
Author

clownfishnemoGitHub commented Nov 16, 2022

As test purpose i stripped out the parse, for my purpose perspective the Signify Hue Bridge
does send alread valid event message data therefore no need to parse, but i am newbie therefore i can overlook
and also almost none knowdlege but try to intent to learn

end_of_field = re.compile(str('$''\x5C''r''\x5C''n''\x27'), re.MULTILINE)  # <MatchesEndOfString_$>\r\n'


class SSEClient(object):
    def __init__(self, url, last_id=None, retry=3000, session=None, chunk_size=1024, **kwargs):
        self.url = url
        self.last_id = last_id
        self.retry = retry
        self.chunk_size = chunk_size

        # Optional support for passing in a requests.Session()
        self.session = session

        # Any extra kwargs will be fed into the requests.get call later.
        self.requests_kwargs = kwargs

        # The SSE spec requires making requests with Cache-Control: nocache
        if 'headers' not in self.requests_kwargs:
            self.requests_kwargs['headers'] = {}
        self.requests_kwargs['headers']['Cache-Control'] = 'no-cache'

        # The 'Accept' header is not required, but explicit > implicit
        self.requests_kwargs['headers']['Accept'] = 'text/event-stream'

        # Keep data here as it streams in
        self.buf = ''

        self._connect()

    def _connect(self):
        if self.last_id:
            self.requests_kwargs['headers']['Last-Event-ID'] = self.last_id

        # Use session if set.  Otherwise fall back to requests module.
        requester = self.session or requests
        self.resp = requester.get(self.url, stream=True, **self.requests_kwargs)
        self.resp_iterator = self.iter_content()
        encoding = self.resp.encoding or self.resp.apparent_encoding
        self.decoder = codecs.getincrementaldecoder(encoding)(errors='replace')

        # TODO: Ensure we're handling redirects.  Might also stick the 'origin'
        # attribute on Events like the Javascript spec requires.
        self.resp.raise_for_status()

    def iter_content(self):
        def generate():
            while True:
                if hasattr(self.resp.raw, '_fp') and \
                        hasattr(self.resp.raw._fp, 'fp') and \
                        hasattr(self.resp.raw._fp.fp, 'read1'):
                    chunk = self.resp.raw._fp.fp.read1(self.chunk_size)
                else:
                    # _fp is not available, this means that we cannot use short
                    # reads and this will block until the full chunk size is
                    # actually read
                    chunk = self.resp.raw.read(self.chunk_size)
                if not chunk:
                    break
                yield chunk

        return generate()

    def _event_complete(self):
        return re.search(end_of_field, self.buf) is not None

    def __iter__(self):
        return self

    def __next__(self):
        while not self._event_complete():
            try:
                next_chunk = next(self.resp_iterator)
                if not next_chunk:
                    raise EOFError()
                self.buf += self.decoder.decode(next_chunk)

            except (StopIteration, requests.RequestException, EOFError, six.moves.http_client.IncompleteRead) as e:
                print(e)
                time.sleep(self.retry / 1000.0)
                self._connect()

                # The SSE spec only supports resuming from a whole message, so
                # if we have half a message we should throw it out.
                head, sep, tail = self.buf.rpartition('\n')
                self.buf = head + sep
                continue

        # Split the complete event (up to the end_of_field) into event_string,
        # and retain anything after the current complete event in self.buf
        # for next time.
        (event_string, self.buf) = re.split(end_of_field, self.buf, maxsplit=1)
        msg = event_string

        # If the server requests a specific retry delay, we need to honor it.
        if msg.retry:
            self.retry = msg.retry

        # last_id should only be set if included in the message.  It's not
        # forgotten if a message omits it.
        if msg.id:
            self.last_id = msg.id

        return msg

    if six.PY2:
        next = __next__


class Event(object):

    sse_line_pattern = re.compile('(?P<name>[^:]*):?( ?(?P<value>.*))?')

    def __init__(self, data='', event='message', id=None, retry=None):
        assert isinstance(data, six.string_types), "Data must be text"
        self.data = data
        self.event = event
        self.id = id
        self.retry = retry

    def __str__(self):
        return self.data
 

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant