Skip to content

Commit

Permalink
[#299] Started exploring different ways to read encrypted sqlite data…
Browse files Browse the repository at this point in the history
…base OR encrypted ZIP-file (unfinished)
  • Loading branch information
casaout committed Jun 3, 2024
1 parent 41d5f9d commit d537073
Showing 1 changed file with 370 additions and 0 deletions.
370 changes: 370 additions & 0 deletions documentation/analysis/ReadExportedDatabase.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,370 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## DRAFT: Reading Exported PA Databases"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#%pip install pdfquery\n",
"from pdfquery import PDFQuery\n",
"\n",
"path = 'C:\\\\Users\\\\casao\\Downloads\\\\Meyer_Peter_19590107_20240327094816.pdf'\n",
"\n",
"pdf = PDFQuery(path)\n",
"pdf.load()\n",
"\n",
"# Use CSS-like selectors to locate the elements\n",
"text_elements = pdf.pq('LTTextLineHorizontal')\n",
"\n",
"# Extract the text from the elements\n",
"text = [t.text for t in text_elements]\n",
"\n",
"print(text)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# %pip install camelot-py\n",
"path = 'C:\\\\Users\\\\casao\\Downloads\\\\Meyer_Peter_19590107_20240327094816.pdf'\n",
"\n",
"import camelot.io as camelot\n",
"camelot.read_pdf(path)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import ctypes\n",
"from ctypes.util import find_library\n",
"find_library(\"\".join((\"gsdll\", str(ctypes.sizeof(ctypes.c_voidp) * 8), \".dll\")))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%pip install libsqlcipher \n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%pip install sqlcipher3 \n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# from: https://stackoverflow.com/questions/986403/encrypted-database-file-in-python\n",
"\n",
"from cryptography.fernet import Fernet\n",
"from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes\n",
"from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC\n",
"from cryptography.hazmat.primitives import hashes\n",
"from cryptography.hazmat.backends import default_backend\n",
"import base64\n",
"# from os import getcwd\n",
"import sqlite3\n",
"import gzip\n",
"\n",
"def key_creation(password):\n",
" password_bytes = password.encode('utf-8')\n",
" kdf=PBKDF2HMAC(algorithm=hashes.SHA256(), salt=b'\\xfaz\\xb5\\xf2|\\xa1z\\xa9\\xfe\\xd1F@1\\xaa\\x8a\\xc2', iterations=1024, length=32, backend=default_backend())\n",
" key=Fernet(base64.urlsafe_b64encode(kdf.derive(password_bytes)))\n",
" return key\n",
"\n",
"def key_creation2(password):\n",
" password_bytes = password.encode('utf-8')\n",
" kdf=PBKDF2HMAC(algorithm=hashes.SHA256(), salt=b'\\xfaz\\xb5\\xf2|\\xa1z\\xa9\\xfe\\xd1F@1\\xaa\\x8a\\xc2', iterations=1024, length=32, backend=default_backend())\n",
" key=kdf.derive(password_bytes)\n",
" return key\n",
"\n",
"def encryption(b, password):\n",
" f=key_creation(password)\n",
" safe=f.encrypt(b)\n",
" return safe\n",
"\n",
"def decryption(safe, password):\n",
" f=key_creation(password)\n",
" b=f.decrypt(safe)\n",
" return b\n",
"\n",
"def decryption2(ciphertext, password):\n",
" key = key_creation2(password) # Derive the key from the password\n",
" cipher = Cipher(algorithms.AES(key), modes.CBC(b'\\x00' * 16), backend=default_backend()) # Create a cipher object using AES in CBC mode\n",
" decryptor = cipher.decryptor() # Create a decryptor object\n",
" plaintext = decryptor.update(ciphertext) + decryptor.finalize() # Decrypt the ciphertext\n",
" return plaintext\n",
"\n",
"def open_cdb(path,password):\n",
" with open(path, 'rb') as f:\n",
" encrypted_content = f.read()\n",
" # decrypted_content=decryption(encrypted_content,password)\n",
" decrypted_content=decryption2(encrypted_content,password)\n",
" content=decrypted_content.decode('utf-8')\n",
" # content = content.decode('latin-1')\n",
" con=sqlite3.connect(':memory:')\n",
" con.executescript(content)\n",
" return con\n",
"\n",
"def save_cdb(con,path,password):\n",
" fp=gzip.open(path,'wb')\n",
" b=b''\n",
" for line in con.iterdump():\n",
" b+=bytes('%s\\n','utf8') % bytes(line,'utf8')\n",
" b=encryption(b,password)\n",
" fp.write(b)\n",
" fp.close()\n",
"\n",
"\n",
"\n",
"## trying it out\n",
"\n",
"path = \"C:\\\\Users\\\\casao\\\\AppData\\\\Roaming\\\\personal-analytics\\\\exports\\\\PA_32XSB1_2024-03-04_10-41.sqlite\"\n",
"pwd = \"PersonalAnalytics_32XSB1\"\n",
"\n",
"\n",
"conn = sqlite3.connect(':memory:')\n",
"# conn.execute('CREATE TABLE PRODUCTS (ID INT PRIMARY KEY NOT NULL,\\nNAME TEXT NOT NULL,\\nPRICE REAL NOT NULL,\\nTAXES REAL NOT NULL);')\n",
"# save_cdb(conn,name,password)\n",
"# conn.close()\n",
"conn = open_cdb(path, pwd)\n",
"cursor = conn.execute('select * from settings;')\n",
"headers = list(map(lambda x: x[0], cursor.description))\n",
"print(headers)\n",
"for x in cursor:\n",
" for j in range(len(x)):\n",
" print(headers[j]+' ',x[j])\n",
" print('\\n')\n",
"conn.close()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%pip install pycryptodome"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# temp ZIP\n",
"\n",
"# %pip install pycryptodome\n",
"\n",
"path = \"C:\\\\TEMP\\\\PA\\\\PA_32XSB1.encrypted.zip\"\n",
"pwd = \"PersonalAnalytics_32XSB1\"\n",
"\n",
"\n",
"import zipfile\n",
"from Crypto.Cipher import AES\n",
"from Crypto.Util.Padding import unpad\n",
"import io\n",
"\n",
"def decrypt_aes(key, ciphertext):\n",
" cipher = AES.new(key, AES.MODE_CBC, iv=ciphertext[:16])\n",
" plaintext = cipher.decrypt(ciphertext[16:])\n",
" return unpad(plaintext, AES.block_size)\n",
"\n",
"def extract_encrypted_zip(zip_path, password):\n",
" with open(zip_path, 'rb') as f:\n",
" encrypted_zip = f.read()\n",
"\n",
" # Assume the first 32 bytes of the encrypted zip file contains the AES key\n",
" key = encrypted_zip[:32]\n",
" encrypted_data = encrypted_zip[32:]\n",
"\n",
" decrypted_data = decrypt_aes(key, encrypted_data)\n",
"\n",
" # Use io.BytesIO to create a file-like object from the decrypted data\n",
" decrypted_zip = io.BytesIO(decrypted_data)\n",
"\n",
" with zipfile.ZipFile(decrypted_zip, 'r') as z:\n",
" z.extractall(pwd=password.encode())\n",
"\n",
"# Usage example:\n",
"\n",
"extract_encrypted_zip(path, pwd)\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%pip install pyzipper"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import sqlite3\n",
"import pyzipper\n",
"import io\n",
"\n",
"subject = \"32XSB1\"\n",
"zipPath = \"C:\\\\TEMP\\\\PA\\\\PA_32XSB1.encrypted.zip\"\n",
"pwd = \"PersonalAnalytics_32XSB1\"\n",
"dbfile = \"database.sqlite\"\n",
"tableName = \"user_input\"\n",
"\n",
"charset = \"iso-8859-1\" # error: 'near \"SQLite\": syntax error'\n",
"# charset = \"utf-8\" # error: 'utf-8' codec can't decode byte 0xe7 in position 27: invalid continuation byte\n",
"\n",
"with pyzipper.AESZipFile(zipPath) as zf:\n",
" # try 1: extracting then opening the file (in explorer) works\n",
" # zf.extractall(path='C:\\\\TEMP\\\\PA\\\\PA_32XSB1_decrypted', pwd = bytes(pwd, 'utf-8'))\n",
" # print(zf.infolist())\n",
"\n",
" # try 2: read log/txt file from encrypted zip file works\n",
"\n",
" # try 3: read sqlite file from encrypted zip file .... :( \n",
" for file_info in zf.infolist():\n",
" print(file_info)\n",
" if file_info.filename == dbfile:\n",
" with zf.open(file_info, pwd=bytes(pwd, charset)) as sqlite_file:\n",
" \n",
" sqlite_data = sqlite_file.read()\n",
" conn = sqlite3.connect(':memory:')\n",
" cursor = conn.cursor()\n",
" sqlite_data_str = sqlite_data.decode(charset)\n",
"\n",
" # print(sqlite_data_str) # printing the string actually shows decrypted content\n",
"\n",
" # bug: 'near \"SQLite\": syntax error'\n",
" cursor.executescript(sqlite_data_str) # conn.executescript(sqlite_data_str) # same issue\n",
"\n",
" # Query to list all tables in the database\n",
" cursor.execute(\"SELECT name FROM external.sqlite_master WHERE type='table';\")\n",
"\n",
" # Fetch and print the table names\n",
" tables = cursor.fetchall()\n",
" for table in tables:\n",
" print(f\"Table: {table[0]}\")\n",
" cursor.execute(f\"PRAGMA table_info(external.{table[0]});\")\n",
" columns = cursor.fetchall()\n",
" print(\"Columns:\")\n",
" for column in columns:\n",
" print(column)\n",
"\n",
" conn.close()\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%pip install pysqlcipher3"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# melih's approach\n",
"\n",
"import pysqlcipher3.dbapi2 as sqlcipher\n",
"\n",
"subject = \"32XSB1\"\n",
"path = \"C:\\\\TEMP\\\\database.sqlite\"\n",
"pwd = \"PersonalAnalytics_32XSB1\"\n",
"dbfile = \"database.sqlite\"\n",
"tableName = \"user_input\"\n",
"\n",
"# Create a new database connection and cursor\n",
"conn = sqlcipher.connect(path)\n",
"cursor = conn.cursor()\n",
"\n",
"# Decrypt the database with the passphrase\n",
"cursor.execute(\"PRAGMA key = '%s'\" % pwd)\n",
"\n",
"\n",
"# Create a test table if it doesn't exist\n",
"is_table_exists = cursor.execute(\"SELECT name FROM sqlite_master WHERE type='table' AND name='test'\")\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"interpreter": {
"hash": "17818a3c040c2ba0e718928cdc05366fda2dbb7844910ae1d6f355a610437351"
},
"kernelspec": {
"display_name": "Python 3.8.8 ('base')",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.8"
},
"orig_nbformat": 4
},
"nbformat": 4,
"nbformat_minor": 2
}

0 comments on commit d537073

Please sign in to comment.