Skip to content

Commit

Permalink
Better code structure + comments
Browse files Browse the repository at this point in the history
  • Loading branch information
dma1dma1 committed May 30, 2024
1 parent fae4335 commit 9571b30
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 21 deletions.
32 changes: 13 additions & 19 deletions codalab/lib/beam/MultiReaderFileStream.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def seek(s, offset, whence=SEEK_SET):

self.readers = [FileStreamReader(i) for i in range(0, self.NUM_READERS)]

def _fill_buf_bytes(self, num_bytes=None):
def _fill_buf_bytes(self, num_bytes=0):
"""
Fills the buffer with bytes from the fileobj
"""
Expand All @@ -44,24 +44,17 @@ def _fill_buf_bytes(self, num_bytes=None):
return
self._buffer += s

def read(self, index: int, num_bytes=0): # type: ignore

def read(self, index: int, num_bytes=None): # type: ignore
"""Read the specified number of bytes from the associated file.
index: index that specifies which reader is reading.
"""
while (self._pos[index] + num_bytes) - self._buffer_pos > self.MAX_THRESHOLD:
time.sleep(.1) # 100 ms

with self._lock:
# Calculate how many new bytes need to be read
new_pos = self._pos[index] + num_bytes
new_bytes_needed = new_pos - max(self._pos)
if new_bytes_needed > 0:
self._fill_buf_bytes(new_bytes_needed)

# Get the bytes in the buffer that correspond to the read function call
buffer_index = self._pos[index] - self._buffer_pos
s = self._buffer[buffer_index:buffer_index + num_bytes]
if num_bytes == None:
# Read remaining in buffer
num_bytes = (self._buffer_pos + len(self._buffer)) - self._pos[index]

s = self.peek(index, num_bytes)
with self._lock:
# Modify reader position in fileobj
self._pos[index] += len(s)

Expand All @@ -74,20 +67,21 @@ def read(self, index: int, num_bytes=0): # type: ignore
self._buffer_pos += diff
return s

def peek(self, index: int, num_bytes): # type: ignore
while (self._pos[index] + num_bytes) - self._buffer_pos > self.MAX_THRESHOLD:
def peek(self, index: int, num_bytes: int): # type: ignore
new_pos = self._pos[index] + num_bytes
while (new_pos) - self._buffer_pos > self.MAX_THRESHOLD:
time.sleep(.1) # 100 ms

with self._lock:
# Calculate how many new bytes need to be read
new_pos = self._pos[index] + num_bytes
new_bytes_needed = new_pos - max(self._pos)
if new_bytes_needed > 0:
self._fill_buf_bytes(new_bytes_needed)

# Get the bytes in the buffer that correspond to the read function call
buffer_index = self._pos[index] - self._buffer_pos
s = self._buffer[buffer_index:buffer_index + num_bytes]

return s

def seek(self, index: int, offset: int, whence=SEEK_SET):
Expand Down
1 change: 1 addition & 0 deletions codalab/lib/upload_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ def write_fileobj(
conn_str = os.environ.get('AZURE_STORAGE_CONNECTION_STRING', '')
os.environ['AZURE_STORAGE_CONNECTION_STRING'] = bundle_conn_str
try:
# Chunk size set to 1MiB for performance
CHUNK_SIZE = 1024 * 1024

def upload_file_content():
Expand Down
4 changes: 2 additions & 2 deletions tests/unit/beam/multireaderfilestream_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,15 @@ def thread2():
t1.start()

# Sleep a little for thread 1 to start reading
time.sleep(3)
time.sleep(.5)

# Assert that the first reader has not read past the maximum threshold
self.assertGreater(70000000, m_stream._pos[0])

t2.start()

# Sleep a little for thread 2 to start reading
time.sleep(1)
time.sleep(.5)

# Assert that the first reader is at 100000000, second reader is at 40000000
self.assertEqual(100000000, m_stream._pos[0])
Expand Down

0 comments on commit 9571b30

Please sign in to comment.