Skip to content

Commit

Permalink
Make most SubTextIO fields private
Browse files Browse the repository at this point in the history
  • Loading branch information
Donaim committed Aug 6, 2024
1 parent 0284e84 commit f36590f
Showing 1 changed file with 59 additions and 50 deletions.
109 changes: 59 additions & 50 deletions src/multicsv/subtextio.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,91 +117,99 @@ def __init__(self, base_io: TextIO, start: int, end: int):
raise BaseMustBeReadable("Base io must be readable"
" if existing content is to be modified.")

self.base_io = base_io
self.start = start
self.end = end
self.position = 0 # Position within the SubTextIO
self._base_io = base_io
self._start = start
self._end = end
self._position = 0 # Position within the SubTextIO
self._closed = base_io.closed
self._load()
self.initial_length = self.buffer_length
self._initial_length = self.buffer_length

def _load(self) -> None:
"""
Load the relevant part of the base_io into the buffer.
"""

base_initial_position = self.base_io.tell()
base_initial_position = self._base_io.tell()

#
# Below we try to avoid calling `base_io.read()` as much as possible.
#
try:
self.base_io.seek(0, os.SEEK_END)
base_last_position = self.base_io.tell()
self._base_io.seek(0, os.SEEK_END)
base_last_position = self._base_io.tell()

if self.start > base_last_position:
raise StartsBeyondBaseContent(
"Start position is greater than base TextIO length.")

if self.end > self.start:
self.base_io.seek(self.end)
base_final_position = self.base_io.tell()
self._base_io.seek(self.end)
base_final_position = self._base_io.tell()
self.is_at_end = base_final_position == base_last_position

if self.end < base_last_position:
self.base_io.seek(self.start)
self._buffer = self.base_io.read(self.end - self.start)
self._base_io.seek(self.start)
self._buffer = self._base_io.read(self.end - self.start)
else:
self._buffer = ""
else:
self._buffer = ""
base_final_position = self.start
self.base_io.seek(0, os.SEEK_END)
self.is_at_end = base_final_position == self.base_io.tell()
self._base_io.seek(0, os.SEEK_END)
self.is_at_end = base_final_position == self._base_io.tell()
finally:
self.base_io.seek(base_initial_position)
self._base_io.seek(base_initial_position)

@property
def start(self) -> int:
return self._start

@property
def end(self) -> int:
return self._end

@property
def buffer_length(self) -> int:
return len(self._buffer)

@property
def mode(self) -> str:
return self.base_io.mode
return self._base_io.mode

@property
def closed(self) -> bool:
return self._closed

@property
def encoding(self) -> str:
return self.base_io.encoding
return self._base_io.encoding

def read(self, size: int = -1) -> str:
self._check_closed()

if size < 0 or size > self.buffer_length - self.position:
size = self.buffer_length - self.position
if size < 0 or size > self.buffer_length - self._position:
size = self.buffer_length - self._position

result = self._buffer[self.position:self.position + size]
self.position += len(result)
result = self._buffer[self._position:self._position + size]
self._position += len(result)
return result

def readline(self, limit: int = -1) -> str:
self._check_closed()

if self.position >= self.buffer_length:
if self._position >= self.buffer_length:
return ''

newline_pos = self._buffer.find('\n', self.position)
newline_pos = self._buffer.find('\n', self._position)
if newline_pos == -1 or newline_pos >= self.buffer_length:
newline_pos = self.buffer_length

if limit < 0 or limit > newline_pos - self.position:
limit = newline_pos - self.position + 1
if limit < 0 or limit > newline_pos - self._position:
limit = newline_pos - self._position + 1

result = self._buffer[self.position:self.position + limit]
self.position += len(result)
result = self._buffer[self._position:self._position + limit]
self._position += len(result)
return result

def readlines(self, hint: int = -1) -> List[str]:
Expand All @@ -216,7 +224,7 @@ def readlines(self, hint: int = -1) -> List[str]:

self._check_closed()

remaining_buffer = self._buffer[self.position:]
remaining_buffer = self._buffer[self._position:]
lines = remaining_buffer.splitlines(keepends=True)
read_size = 0
result = []
Expand All @@ -227,18 +235,18 @@ def readlines(self, hint: int = -1) -> List[str]:
if 0 <= hint <= read_size:
break

self.position += read_size
self._position += read_size
return result

def write(self, s: str) -> int:
self._check_closed()

pre = self._buffer[:self.position]
post = self._buffer[self.position + len(s):]
pre = self._buffer[:self._position]
post = self._buffer[self._position + len(s):]
self._buffer = pre + s + post

written = len(s)
self.position += written
self._position += written

return written

Expand All @@ -250,7 +258,7 @@ def truncate(self, size: Optional[int] = None) -> int:
self._check_closed()

if size is None:
end = self.position
end = self._position
else:
end = size

Expand All @@ -270,37 +278,38 @@ def seek(self, offset: int, whence: int = os.SEEK_SET) -> int:
if whence == os.SEEK_SET: # Absolute positioning
target = offset
elif whence == os.SEEK_CUR: # Relative to current position
target = self.position + offset
target = self._position + offset
elif whence == os.SEEK_END: # Relative to the end
target = self.buffer_length + offset
else:
raise InvalidWhenceError(
f"Invalid value for whence: {repr(whence)}")

self.position = max(0, min(target, self.buffer_length))
return self.position
self._position = max(0, min(target, self.buffer_length))
return self._position

def tell(self) -> int:
self._check_closed()
return self.position
return self._position

def flush(self) -> None:
if not self._closed:
base_initial_position = self.base_io.tell()
base_initial_position = self._base_io.tell()
try:
if self.buffer_length == self.initial_length or self.is_at_end:
self.base_io.seek(self.start)
self.base_io.write(self._buffer)
if self.buffer_length == self._initial_length \
or self.is_at_end:
self._base_io.seek(self.start)
self._base_io.write(self._buffer)
else:
self.base_io.seek(self.end)
content_after = self.base_io.read()
self._base_io.seek(self.end)
content_after = self._base_io.read()

self.base_io.seek(self.start)
self.base_io.write(self._buffer + content_after)
self._base_io.seek(self.start)
self._base_io.write(self._buffer + content_after)

self.base_io.flush()
self._base_io.flush()
finally:
self.base_io.seek(base_initial_position)
self._base_io.seek(base_initial_position)

def isatty(self) -> bool:
return False
Expand All @@ -309,10 +318,10 @@ def fileno(self) -> int:
raise io.UnsupportedOperation("Not a filesystem descriptor.")

def readable(self) -> bool:
return self.base_io.readable()
return self._base_io.readable()

def writable(self) -> bool:
return self.base_io.writable()
return self._base_io.writable()

def seekable(self) -> bool:
return True
Expand All @@ -329,7 +338,7 @@ def __iter__(self) -> 'SubTextIO':
return self

def __next__(self) -> str:
if self.position < self.buffer_length:
if self._position < self.buffer_length:
return self.readline()
else:
raise StopIteration
Expand Down

0 comments on commit f36590f

Please sign in to comment.