From 9571b3042f289a8b1a87869f22183ba890354b4e Mon Sep 17 00:00:00 2001 From: Daniel Date: Thu, 30 May 2024 01:23:13 -0700 Subject: [PATCH] Better code structure + comments --- codalab/lib/beam/MultiReaderFileStream.py | 32 ++++++++----------- codalab/lib/upload_manager.py | 1 + tests/unit/beam/multireaderfilestream_test.py | 4 +-- 3 files changed, 16 insertions(+), 21 deletions(-) diff --git a/codalab/lib/beam/MultiReaderFileStream.py b/codalab/lib/beam/MultiReaderFileStream.py index f4fa8ef92..0d985ffaf 100644 --- a/codalab/lib/beam/MultiReaderFileStream.py +++ b/codalab/lib/beam/MultiReaderFileStream.py @@ -35,7 +35,7 @@ def seek(s, offset, whence=SEEK_SET): self.readers = [FileStreamReader(i) for i in range(0, self.NUM_READERS)] - def _fill_buf_bytes(self, num_bytes=None): + def _fill_buf_bytes(self, num_bytes=0): """ Fills the buffer with bytes from the fileobj """ @@ -44,24 +44,17 @@ def _fill_buf_bytes(self, num_bytes=None): return self._buffer += s - def read(self, index: int, num_bytes=0): # type: ignore + + def read(self, index: int, num_bytes=None): # type: ignore """Read the specified number of bytes from the associated file. index: index that specifies which reader is reading. """ - while (self._pos[index] + num_bytes) - self._buffer_pos > self.MAX_THRESHOLD: - time.sleep(.1) # 100 ms - - with self._lock: - # Calculate how many new bytes need to be read - new_pos = self._pos[index] + num_bytes - new_bytes_needed = new_pos - max(self._pos) - if new_bytes_needed > 0: - self._fill_buf_bytes(new_bytes_needed) - - # Get the bytes in the buffer that correspond to the read function call - buffer_index = self._pos[index] - self._buffer_pos - s = self._buffer[buffer_index:buffer_index + num_bytes] + if num_bytes == None: + # Read remaining in buffer + num_bytes = (self._buffer_pos + len(self._buffer)) - self._pos[index] + s = self.peek(index, num_bytes) + with self._lock: # Modify reader position in fileobj self._pos[index] += len(s) @@ -74,20 +67,21 @@ def read(self, index: int, num_bytes=0): # type: ignore self._buffer_pos += diff return s - def peek(self, index: int, num_bytes): # type: ignore - while (self._pos[index] + num_bytes) - self._buffer_pos > self.MAX_THRESHOLD: + def peek(self, index: int, num_bytes: int): # type: ignore + new_pos = self._pos[index] + num_bytes + while (new_pos) - self._buffer_pos > self.MAX_THRESHOLD: time.sleep(.1) # 100 ms with self._lock: # Calculate how many new bytes need to be read - new_pos = self._pos[index] + num_bytes new_bytes_needed = new_pos - max(self._pos) if new_bytes_needed > 0: self._fill_buf_bytes(new_bytes_needed) - + # Get the bytes in the buffer that correspond to the read function call buffer_index = self._pos[index] - self._buffer_pos s = self._buffer[buffer_index:buffer_index + num_bytes] + return s def seek(self, index: int, offset: int, whence=SEEK_SET): diff --git a/codalab/lib/upload_manager.py b/codalab/lib/upload_manager.py index 4efb1d358..b45f3389e 100644 --- a/codalab/lib/upload_manager.py +++ b/codalab/lib/upload_manager.py @@ -255,6 +255,7 @@ def write_fileobj( conn_str = os.environ.get('AZURE_STORAGE_CONNECTION_STRING', '') os.environ['AZURE_STORAGE_CONNECTION_STRING'] = bundle_conn_str try: + # Chunk size set to 1MiB for performance CHUNK_SIZE = 1024 * 1024 def upload_file_content(): diff --git a/tests/unit/beam/multireaderfilestream_test.py b/tests/unit/beam/multireaderfilestream_test.py index 41f0b59e4..5369ec09e 100644 --- a/tests/unit/beam/multireaderfilestream_test.py +++ b/tests/unit/beam/multireaderfilestream_test.py @@ -40,7 +40,7 @@ def thread2(): t1.start() # Sleep a little for thread 1 to start reading - time.sleep(3) + time.sleep(.5) # Assert that the first reader has not read past the maximum threshold self.assertGreater(70000000, m_stream._pos[0]) @@ -48,7 +48,7 @@ def thread2(): t2.start() # Sleep a little for thread 2 to start reading - time.sleep(1) + time.sleep(.5) # Assert that the first reader is at 100000000, second reader is at 40000000 self.assertEqual(100000000, m_stream._pos[0])