Skip to content

Commit

Permalink
Improve SQLite3 parser (#25)
Browse files Browse the repository at this point in the history
---------

Co-authored-by: Schamper <[email protected]>
  • Loading branch information
JSCU-CNI and Schamper authored Apr 25, 2024
1 parent 1829cfd commit 067f05d
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 5 deletions.
10 changes: 6 additions & 4 deletions dissect/sql/sqlite3.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def __init__(self, fh, wal_fh=None):
if self.header.magic != SQLITE3_HEADER_MAGIC:
raise InvalidDatabase("Invalid header magic")

self.encoding = ENCODING[self.header.text_encoding]
self.encoding = ENCODING.get(self.header.text_encoding, "utf-8")
self.page_size = self.header.page_size
if self.page_size == 1:
self.page_size = 65536
Expand Down Expand Up @@ -75,7 +75,9 @@ def indices(self):
yield Index(self, *cell.values)

def raw_page(self, num):
if num < 1 or num > self.header.page_count:
# Only throw an out of bounds exception if the header contains a page_count.
# Some old versions of SQLite3 do not set/update the page_count correctly.
if (num < 1 or num > self.header.page_count) and self.header.page_count > 0:
raise InvalidPageNumber("Page number exceeds boundaries")
elif num == 1: # Page 1 is root
self.fh.seek(len(c_sqlite3.header))
Expand All @@ -100,8 +102,8 @@ class Column:
"""Describes a column of a sqlite table."""

SPACE = r"\s"
EXPRESSION = r"\(.+\)"
STRING = r"['\"].+['\"]"
EXPRESSION = r"\(.+?\)"
STRING = r"['\"].+?['\"]"
TOKENIZER_EXPRESSION = re.compile(f"({SPACE}|{EXPRESSION}|{STRING})")

def __init__(self, name: str, description: str):
Expand Down
5 changes: 5 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,8 @@ def open_data(name: str) -> BinaryIO:
@pytest.fixture
def sqlite_db() -> BinaryIO:
return open_data("data/test.sqlite")


@pytest.fixture
def empty_db() -> BinaryIO:
return open_data("data/empty.sqlite")
Binary file added tests/data/empty.sqlite
Binary file not shown.
6 changes: 5 additions & 1 deletion tests/test_default_values.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ def test_parse_table_defaults():
new_test DEFAULT (1),
extra_test DEFAULT ("hello world"),
all_tests DEFAULT (-1.1),
test DEFAULT (x == NULL)
test DEFAULT (x == NULL),
some_test INTEGER DEFAULT (0) NOT NULL,
another_test VARCHAR(1337) DEFAULT ("test") NOT NULL
);
"""
table = Table(sqlite=None, type_=None, name=None, table_name=None, page=None, sql=table_definition)
Expand All @@ -57,6 +59,8 @@ def test_parse_table_defaults():
("extra_test", "hello world"),
("all_tests", -1.1),
("test", None),
("some_test", 0),
("another_test", "test"),
]
for key, value in assertion_data:
assert column_name_for_key(table.columns, key).default_value == value
Expand Down
7 changes: 7 additions & 0 deletions tests/test_sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,10 @@ def test_sqlite(sqlite_db: BinaryIO) -> None:
)
def test_sqlite_read_record(input: bytes, encoding: str, expected_output: tuple[list[int], list[Any]]) -> None:
assert sqlite3.read_record(BytesIO(input), encoding) == expected_output


def test_empty(empty_db: BinaryIO) -> None:
s = sqlite3.SQLite3(empty_db)

assert s.encoding == "utf-8"
assert len(list(s.tables())) == 0

0 comments on commit 067f05d

Please sign in to comment.