This repository has been archived by the owner on Apr 4, 2024. It is now read-only.
forked from diffplug/selfie
-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
SnapshotReader - Edwin - Not completed
- Loading branch information
1 parent
d04a06d
commit b66e44e
Showing
8 changed files
with
358 additions
and
50 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
class ConvertToWindowsNewlines: | ||
def __init__(self, sink): | ||
self.sink = sink | ||
|
||
def append(self, value, start_index=None, end_index=None): | ||
# If value is a single character | ||
if isinstance(value, str) and len(value) == 1: | ||
if value != '\n': | ||
self.sink.write(value) | ||
else: | ||
self.sink.write('\r\n') | ||
# If value is a CharSequence (in Python, a str) | ||
elif isinstance(value, str): | ||
# If start_index and end_index are provided, use the slice of the string | ||
if start_index is not None and end_index is not None: | ||
value_to_append = value[start_index:end_index] | ||
else: | ||
value_to_append = value | ||
self.sink.write(value_to_append.replace("\n", "\r\n")) | ||
return self |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
from .SnapshotValue import SnapshotValue | ||
|
||
class Snapshot: | ||
def __init__(self, subject, facet_data=None): | ||
if facet_data is None: | ||
facet_data = {} | ||
self.subject = subject | ||
self._facet_data = facet_data | ||
|
||
@property | ||
def facets(self): | ||
return self._facet_data | ||
|
||
def plus_facet(self, key, value): | ||
if not key: | ||
raise ValueError("The empty string is reserved for the subject.") | ||
new_facet_data = self._facet_data.copy() | ||
new_facet_data[self._unix_newlines(key)] = SnapshotValue.of(value) | ||
return Snapshot(self.subject, new_facet_data) | ||
|
||
def plus_or_replace(self, key, value): | ||
if not key: | ||
return Snapshot(value, self._facet_data) | ||
new_facet_data = self._facet_data.copy() | ||
new_facet_data[self._unix_newlines(key)] = value | ||
return Snapshot(self.subject, new_facet_data) | ||
|
||
def subject_or_facet_maybe(self, key): | ||
if not key: | ||
return self.subject | ||
return self._facet_data.get(key) | ||
|
||
def subject_or_facet(self, key): | ||
result = self.subject_or_facet_maybe(key) | ||
if result is None: | ||
raise KeyError(f"'{key}' not found in {list(self._facet_data.keys())}") | ||
return result | ||
|
||
def all_entries(self): | ||
return {**{"": self.subject}, **self._facet_data} | ||
|
||
def __str__(self): | ||
return f"[{self.subject} {self._facet_data}]" | ||
|
||
@staticmethod | ||
def _unix_newlines(key): | ||
return key.replace("\r\n", "\n") | ||
|
||
@classmethod | ||
def of(cls, value): | ||
return cls(SnapshotValue.of(value)) | ||
|
||
@classmethod | ||
def of_entries(cls, entries): | ||
root = None | ||
facets = {} | ||
for key, value in entries: | ||
if not key: | ||
if root is not None: | ||
raise ValueError("Duplicate root snapshot.") | ||
root = value | ||
else: | ||
facets[key] = value | ||
return cls(root or SnapshotValue.of(""), facets) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,124 @@ | ||
import threading | ||
from typing import List | ||
import base64 | ||
|
||
from .SnapshotValue import SnapshotValue | ||
from .ConvertToWindowsNewlines import ConvertToWindowsNewlines | ||
from .ParseException import ParseException | ||
from .SnapshotReader import SnapshotReader | ||
from .SnapshotValueReader import SnapshotValueReader | ||
|
||
|
||
|
||
class SnapshotFile: | ||
HEADER_PREFIX = "📷 " | ||
END_OF_FILE = "[end of file]" | ||
|
||
def __init__(self): | ||
self.unix_newlines = True | ||
self.metadata = None | ||
self._snapshots = {} | ||
self._lock = threading.Lock() | ||
self._was_set_at_test_time = False | ||
|
||
@property | ||
def snapshots(self): | ||
return self._snapshots | ||
|
||
@snapshots.setter | ||
def snapshots(self, value): | ||
with self._lock: | ||
self._snapshots = value | ||
|
||
@property | ||
def was_set_at_test_time(self): | ||
return self._was_set_at_test_time | ||
|
||
def set_at_test_time(self, key, snapshot): | ||
with self._lock: | ||
old_snapshots = self._snapshots.copy() | ||
self._snapshots[key] = snapshot | ||
self._was_set_at_test_time = True if self._snapshots != old_snapshots else self._was_set_at_test_time | ||
|
||
def serialize(self, value_writer_raw): | ||
value_writer = value_writer_raw if self.unix_newlines else ConvertToWindowsNewlines(value_writer_raw) | ||
if self.metadata: | ||
self.write_entry(value_writer, f"📷 {self.metadata[0]}", None, SnapshotValue.of(self.metadata[1])) | ||
for key, snapshot in self._snapshots.items(): | ||
self.write_entry(value_writer, key, None, snapshot.subject) | ||
for facet_key, facet_value in snapshot.facets.items(): | ||
self.write_entry(value_writer, key, facet_key, facet_value) | ||
self.write_entry(value_writer, "", "end of file", SnapshotValue.of("")) | ||
|
||
def write_entry(value_writer, key, facet, value): | ||
value_writer.write("╔═ ") | ||
value_writer.write(SnapshotValueReader.nameEsc.escape(key)) | ||
if facet is not None: | ||
value_writer.write("[") | ||
value_writer.write(SnapshotValueReader.nameEsc.escape(facet)) | ||
value_writer.write("]") | ||
value_writer.write(" ═╗") | ||
if value.is_binary: | ||
binary_length = len(value.value_binary()) | ||
value_writer.write(f" base64 length {binary_length} bytes") | ||
value_writer.write("\n") | ||
|
||
if key == "" and facet == "end of file": | ||
return | ||
|
||
if value.is_binary: | ||
# Base64 encoding and replacing \r with an empty string | ||
binary_data = value.value_binary() | ||
encoded = base64.b64encode(binary_data).decode('utf-8') | ||
# Assuming efficientReplace is a more efficient method for replacing characters | ||
# Here, we just use the regular replace method for simplicity | ||
escaped = encoded.replace("\r", "") | ||
value_writer.write(escaped) | ||
else: | ||
# For string values, applying specific escape logic and then replacing "\n╔" with a special sequence | ||
text_data = value.value_string() | ||
# Assuming body_escape function handles the escaping as in SnapshotValueReader.bodyEsc.escape | ||
escaped = body_escape(text_data).replace("\n╔", "\n\uDF41") | ||
value_writer.write(escaped) | ||
value_writer.write("\n") | ||
|
||
@staticmethod | ||
def parse(value_reader): | ||
try: | ||
result = SnapshotFile() | ||
result.unix_newlines = value_reader.unix_newlines | ||
reader = SnapshotReader(value_reader) | ||
|
||
# Check if the first value starts with 📷 | ||
if reader.peek_key() and reader.peek_key().startswith(SnapshotFile.HEADER_PREFIX): | ||
metadata_name = reader.peek_key()[len(SnapshotFile.HEADER_PREFIX):] | ||
metadata_value = reader.value_reader.next_value().value_string() | ||
# Assuming 'entry' function creates a dictionary entry in Python | ||
result.metadata = (metadata_name, metadata_value) | ||
|
||
while reader.peek_key() is not None: | ||
key = reader.peek_key() | ||
snapshot = reader.next_snapshot() | ||
# Update snapshots dictionary with new key-value pair | ||
result.snapshots.update({key: snapshot}) | ||
|
||
return result | ||
|
||
except ValueError as e: | ||
if isinstance(e, ParseException): | ||
raise e | ||
else: | ||
raise ParseException(value_reader.line_reader, e) from None | ||
|
||
|
||
@staticmethod | ||
def create_empty_with_unix_newlines(unix_newlines): | ||
result = SnapshotFile() | ||
result.unix_newlines = unix_newlines | ||
return result | ||
|
||
def remove_all_indices(self, indices: List[int]): | ||
if not indices: | ||
return | ||
self._was_set_at_test_time = True | ||
self.snapshots = self.snapshots.minus_sorted_indices(indices) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
from .Snapshot import Snapshot | ||
from .SnapshotFile import SnapshotFile | ||
|
||
|
||
class SnapshotReader: | ||
def __init__(self, value_reader): | ||
self.value_reader = value_reader | ||
|
||
def peek_key(self): | ||
next_key = self.value_reader.peek_key() | ||
if next_key is None or next_key == SnapshotFile.END_OF_FILE: | ||
return None | ||
if '[' in next_key: | ||
raise ValueError(f"Missing root snapshot, square brackets not allowed: '{next_key}'") | ||
return next_key | ||
|
||
def next_snapshot(self): | ||
root_name = self.peek_key() | ||
snapshot = Snapshot.of(self.value_reader.next_value()) | ||
while True: | ||
next_key = self.value_reader.peek_key() | ||
if next_key is None: | ||
return snapshot | ||
facet_idx = next_key.find('[') | ||
if facet_idx == -1 or (facet_idx == 0 and next_key == SnapshotFile.END_OF_FILE): | ||
return snapshot | ||
facet_root = next_key[:facet_idx] | ||
if facet_root != root_name: | ||
raise ValueError(f"Expected '{next_key}' to come after '{facet_root}', not '{root_name}'") | ||
facet_end_idx = next_key.find(']', facet_idx + 1) | ||
if facet_end_idx == -1: | ||
raise ValueError(f"Missing ] in {next_key}") | ||
facet_name = next_key[facet_idx + 1:facet_end_idx] | ||
snapshot = snapshot.plus_facet(facet_name, self.value_reader.next_value()) | ||
|
||
def skip_snapshot(self): | ||
root_name = self.peek_key() | ||
if root_name is None: | ||
raise ValueError("No snapshot to skip") | ||
self.value_reader.skip_value() | ||
while True: | ||
next_key = self.peek_key() | ||
if next_key is None or not next_key.startswith(f"{root_name}["): | ||
break | ||
self.value_reader.skip_value() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
from abc import ABC, abstractmethod | ||
from typing import Union | ||
|
||
def unix_newlines(string: str) -> str: | ||
return string.replace("\r\n", "\n") | ||
|
||
class SnapshotValue(ABC): | ||
@property | ||
def is_binary(self) -> bool: | ||
return isinstance(self, SnapshotValueBinary) | ||
|
||
@abstractmethod | ||
def value_binary(self) -> bytes: | ||
pass | ||
|
||
@abstractmethod | ||
def value_string(self) -> str: | ||
pass | ||
|
||
@staticmethod | ||
def of(value: Union[bytes, str]) -> "SnapshotValue": | ||
if isinstance(value, bytes): | ||
return SnapshotValueBinary(value) | ||
elif isinstance(value, str): | ||
return SnapshotValueString(unix_newlines(value)) | ||
else: | ||
raise TypeError("Value must be either bytes or str") | ||
|
||
class SnapshotValueBinary(SnapshotValue): | ||
def __init__(self, value: bytes): | ||
self._value = value | ||
|
||
def value_binary(self) -> bytes: | ||
return self._value | ||
|
||
def value_string(self) -> str: | ||
raise NotImplementedError("This is a binary value.") | ||
|
||
class SnapshotValueString(SnapshotValue): | ||
def __init__(self, value: str): | ||
self._value = value | ||
|
||
def value_binary(self) -> bytes: | ||
raise NotImplementedError("This is a string value.") | ||
|
||
def value_string(self) -> str: | ||
return self._value |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.