Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: pickle shelffile -> standard_address_space.sql #763

Open
wants to merge 19 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions examples/server-persistent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import sys
sys.path.insert(0, "..")
import time
import random

from opcua import Server
from opcua.common.sqlite3_backend import SQLite3Backend
from opcua.server.address_space_sqlite import StandardAddressSpaceSQLite, AddressSpaceSQLite

ITEMS = ('Pump', 'Motor', 'Fan', 'Gearbox', 'Filter', 'Building', 'Ventilation')

if __name__ == "__main__":

print('\nStart and stop this server multiple times and\n'
'verify that address space is persisted in SQL.\n'
'Values written from any opc-ua client into the\n'
'server are also stored.\n')

with SQLite3Backend(sqlFile='my_address_space.py', readonly=False) as backend, \
StandardAddressSpaceSQLite() as stdAspace, \
AddressSpaceSQLite(backend=backend, cache=stdAspace) as myAspace:

# setup our server
server = Server(aspace=myAspace)
server.set_endpoint("opc.tcp://0.0.0.0:4840/freeopcua/server/")

# setup our own namespace, not really necessary but should as spec
uri = "http://examples.freeopcua.github.io"
idx = server.register_namespace(uri)

# get Objects node, this is where we should put our nodes
objects = server.get_objects_node()

# populating our address space
myobj = objects.add_object(idx, "{:s}-{:d}".format(random.choice(ITEMS), random.randint(1,100)))
myvar = myobj.add_variable(idx, "MyVariable", 42)
myvar.set_writable() # Set MyVariable to be writable by clients

# starting!
server.start()

try:
while True:
time.sleep(1)
except KeyboardInterrupt:
pass
finally:
#close connection, remove subcsriptions, etc
server.stop()
136 changes: 136 additions & 0 deletions opcua/common/sqlite3_backend.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@

import os
import sys
import time
import sqlite3
import threading
from multiprocessing import Lock

class SQLite3Backend(object):
PY2 = sys.version_info < (3, 0)
CHECKP_INTERVAL = 90 # [sec] WAL checkpoint

def __init__(self, sqlFile = None, readonly=True):
assert(isinstance(sqlFile, str))
assert(isinstance(readonly, bool))
self._sqlFile = sqlFile # Path to database file.
self._readonly = bool(readonly)
self._lock = Lock() # Database lock.
self._conn = {} # Database connection.
self._lastCheckP = int(0) # Epoch of last checkpoint.

def __enter__(self):
self._lastCheckP = time.time()
return self

def __exit__(self, exc_type, exc_value, traceback):
self._db_disconnect()

def __str__(self):
return self._sqlFile

@property
def readonly(self):
return self._readonly

# PUBLIC METHODS
def execute_read(self, dbCmd = None, params = (), CB = None):
with self._lock:
c = self._get_conn().cursor()
for row in c.execute(dbCmd, params):
CB(row)
c.close()

def execute_write(self, dbCmd = None, params = (), commit=True):
with self._lock:
conn = self._get_conn()
if dbCmd is not None:
c = conn.cursor()
c.execute(dbCmd, params)
c.close()
if bool(commit) is True:
conn.commit()
self._wal_throttled()

def wal_throttled_threadsafe(self):
with self._lock:
self._wal_throttled()

def wal_checkpoint_threadsafe(self):
"""
Store checkpoint: forces database modifications to be persistent.
Automatically done when sqlite cache runs over the 1000 pages threshold.
IMPORTANT: slow operation, manual syncs are only useful for sporadic
transactions that you really want to survive a power loss.
"""
with self._lock:
self._wal_checkpoint()

# PRIVATE METHODS
def _wal_throttled(self):
# commits still require a wal_checkpoint to become persistent.
if abs(time.time() - self._lastCheckP) < self.CHECKP_INTERVAL:
return
self._wal_checkpoint()

def _wal_checkpoint(self):
self._lastCheckP = time.time()
c = self._get_conn().cursor()
c.execute('PRAGMA wal_checkpoint')
c.close()

def _db_connect(self):
CID = SQLite3Backend._getCID()
assert CID not in self._conn
if SQLite3Backend.PY2:
self._db_connect_py2(CID)
else:
self._conn[CID] = sqlite3.connect(
'file:{:s}?immutable={:s}'.format(self._sqlFile, '1' if self.readonly else '0'),
detect_types = sqlite3.PARSE_DECLTYPES, # so datetimes won't be BLOBs
check_same_thread = False,
uri = True,
)
c = self._get_conn().cursor()
if self.readonly is True:
c.execute('PRAGMA query_only=1')
else:
c.execute('PRAGMA journal_mode=WAL')
c.execute('PRAGMA synchronous=NORMAL')

# Legacy support for Python<3.x.
def _db_connect_py2(self, CID):
if os.access(self._sqlFile, os.W_OK) is False:
raise Exception('Python>=3.x is required for immutable sqlite3 database.')
self._conn[CID] = sqlite3.connect(
self._sqlFile,
detect_types = sqlite3.PARSE_DECLTYPES,
check_same_thread = False
)
self._conn[CID].text_factory = bytes

def _db_disconnect(self):
# Commit, checkpoint.
if self.readonly is False:
self.wal_checkpoint_threadsafe()
# Close all connections to database.
for CID in self._conn:
self._conn[CID].close()
# Remove all items from dict.
self._conn.clear()

def _get_conn(self):
if self._lock.acquire(False) is True:
self._lock.release()
raise Exception('Forgot to lock?')
# sqlite3 multithreading: http://beets.io/blog/sqlite-nightmare.html
CID = SQLite3Backend._getCID()
try:
return self._conn[CID]
except KeyError:
self._db_connect()
return self._conn[CID]

@staticmethod
def _getCID():
return threading.current_thread().ident
50 changes: 50 additions & 0 deletions opcua/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import functools
import threading
from socket import error as SocketError
from collections import MutableMapping

try:
import asyncio
Expand Down Expand Up @@ -210,4 +211,53 @@ def run_until_complete(self, coro):
return future.result()


class ThreadSafeDict(MutableMapping):

def __init__(self, cache=None):
self._lock = cache._lock if hasattr(cache, '_lock') else threading.RLock() # FIXME: should use multiple reader, one writter pattern
if cache is None:
self._cache = {}
else:
assert(isinstance(cache, (dict, ThreadSafeDict)))
self._cache = cache

def __enter__(self):
return self

def __exit__(self, exc_type, exc_value, traceback):
self._cache = None

def __getitem__(self, key):
with self._lock:
return self._cache.__getitem__(key)

def get(self, key, value=None):
with self._lock:
return self._cache.get(key, value)

def __setitem__(self, key, value):
with self._lock:
return self._cache.__setitem__(key, value)

def __contains__(self, key):
with self._lock:
return self._cache.__contains__(key)

def __delitem__(self, key):
with self._lock:
del self._cache[key]

def __iter__(self):
with self._lock:
return self._cache.__iter__()

def __len__(self):
return len(self._cache)

def keys(self):
with self._lock:
return self._cache.keys()

def empty(self):
with self._lock:
self._cache = {}
Loading