Skip to content

Commit

Permalink
Merge pull request #10 from spapa013/main
Browse files Browse the repository at this point in the history
v0.1.9
  • Loading branch information
spapa013 authored Jan 9, 2024
2 parents 0bcf320 + e8284c6 commit 7faacd7
Show file tree
Hide file tree
Showing 5 changed files with 224 additions and 2 deletions.
92 changes: 92 additions & 0 deletions datajoint_plus/blob.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
"""
(De)serialization methods for basic datatypes and numpy.ndarrays with provisions for mutual
compatibility with Matlab-based serialization implemented by mYm.
"""

import collections
from decimal import Decimal
import datetime
import uuid
import numpy as np
from .errors import DataJointError
from datajoint.blob import (
mxClassID,
rev_class_id,
dtype_list,
type_names,
compression,
bypass_serialization,
len_u64,
len_u32,
MatCell,
MatStruct,
Blob as DJBlob,
)


class Blob(DJBlob):
def pack_blob(self, obj):
# original mYm-based serialization from datajoint-matlab
if isinstance(obj, MatCell):
return self.pack_cell_array(obj)
if isinstance(obj, MatStruct):
return self.pack_struct(obj)
if isinstance(obj, np.ndarray) and obj.dtype.fields is None:
return self.pack_array(obj)

# blob types in the expanded dj0 blob format
self.set_dj0()
if not isinstance(obj, (np.ndarray, np.number)):
# python built-in data types
if isinstance(obj, bool):
return self.pack_bool(obj)
if isinstance(obj, int):
return self.pack_int(obj)
if isinstance(obj, complex):
return self.pack_complex(obj)
if isinstance(obj, float):
return self.pack_float(obj)
if isinstance(obj, np.ndarray) and obj.dtype.fields:
return self.pack_recarray(np.array(obj))
if isinstance(obj, np.number):
return self.pack_array(np.array(obj))
if isinstance(obj, np.bool_):
return self.pack_array(np.array(obj))
if isinstance(obj, (datetime.datetime, datetime.date, datetime.time)):
return self.pack_datetime(obj)
if isinstance(obj, Decimal):
return self.pack_decimal(obj)
if isinstance(obj, uuid.UUID):
return self.pack_uuid(obj)
if isinstance(obj, collections.Mapping):
return self.pack_dict(obj)
if isinstance(obj, str):
return self.pack_string(obj)
if isinstance(obj, collections.ByteString):
return self.pack_bytes(obj)
if isinstance(obj, collections.MutableSequence):
return self.pack_list(obj)
if isinstance(obj, collections.Sequence):
return self.pack_tuple(obj)
if isinstance(obj, collections.Set):
return self.pack_set(obj)
if obj is None:
return self.pack_none()
raise DataJointError("Packing object of type %s currently not supported!" % type(obj))


def pack(obj, compress=True):
if bypass_serialization:
# provide a way to move blobs quickly without de/serialization
assert isinstance(obj, bytes) and obj.startswith((b'ZL123\0', b'mYm\0', b'dj0\0'))
return obj
return Blob().pack(obj, compress=compress)


def unpack(blob, squeeze=False):
if bypass_serialization:
# provide a way to move blobs quickly without de/serialization
assert isinstance(blob, bytes) and blob.startswith((b'ZL123\0', b'mYm\0', b'dj0\0'))
return blob
if blob is not None:
return Blob(squeeze=squeeze).unpack(blob)
119 changes: 119 additions & 0 deletions datajoint_plus/jobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import os
from datajoint.hash import key_hash
import platform
from .table import Table
from datajoint.settings import config
from datajoint.errors import DuplicateError

ERROR_MESSAGE_LENGTH = 2047
TRUNCATION_APPENDIX = '...truncated'


class JobTable(Table):
"""
A base relation with no definition. Allows reserving jobs
"""
def __init__(self, arg, database=None):
if isinstance(arg, JobTable):
super().__init__(arg)
# copy constructor
self.database = arg.database
self._connection = arg._connection
self._definition = arg._definition
self._user = arg._user
return
super().__init__()
self.database = database
self._connection = arg
self._definition = """ # job reservation table for `{database}`
table_name :varchar(255) # className of the table
key_hash :char(32) # key hash
---
status :enum('reserved','error','ignore') # if tuple is missing, the job is available
key=null :blob # structure containing the key
error_message="" :varchar({error_message_length}) # error message returned if failed
error_stack=null :blob # error stack if failed
user="" :varchar(255) # database user
host="" :varchar(255) # system hostname
pid=0 :int unsigned # system process id
connection_id = 0 : bigint unsigned # connection_id()
timestamp=CURRENT_TIMESTAMP :timestamp # automatic timestamp
""".format(database=database, error_message_length=ERROR_MESSAGE_LENGTH)
if not self.is_declared:
self.declare()
self._user = self.connection.get_user()

@property
def definition(self):
return self._definition

@property
def table_name(self):
return '~jobs'

def delete(self):
"""bypass interactive prompts and dependencies"""
self.delete_quick()

def drop(self):
"""bypass interactive prompts and dependencies"""
self.drop_quick()

def reserve(self, table_name, key):
"""
Reserve a job for computation. When a job is reserved, the job table contains an entry for the
job key, identified by its hash. When jobs are completed, the entry is removed.
:param table_name: `database`.`table_name`
:param key: the dict of the job's primary key
:return: True if reserved job successfully. False = the jobs is already taken
"""
job = dict(
table_name=table_name,
key_hash=key_hash(key),
status='reserved',
host=platform.node(),
pid=os.getpid(),
connection_id=self.connection.connection_id,
key=key,
user=self._user)
try:
with config(enable_python_native_blobs=True):
self.insert1(job, ignore_extra_fields=True)
except DuplicateError:
return False
return True

def complete(self, table_name, key):
"""
Log a completed job. When a job is completed, its reservation entry is deleted.
:param table_name: `database`.`table_name`
:param key: the dict of the job's primary key
"""
job_key = dict(table_name=table_name, key_hash=key_hash(key))
(self & job_key).delete_quick()

def error(self, table_name, key, error_message, error_stack=None):
"""
Log an error message. The job reservation is replaced with an error entry.
if an error occurs, leave an entry describing the problem
:param table_name: `database`.`table_name`
:param key: the dict of the job's primary key
:param error_message: string error message
:param error_stack: stack trace
"""
if len(error_message) > ERROR_MESSAGE_LENGTH:
error_message = error_message[:ERROR_MESSAGE_LENGTH-len(TRUNCATION_APPENDIX)] + TRUNCATION_APPENDIX
with config(enable_python_native_blobs=True):
self.insert1(
dict(
table_name=table_name,
key_hash=key_hash(key),
status="error",
host=platform.node(),
pid=os.getpid(),
connection_id=self.connection.connection_id,
user=self._user,
key=key,
error_message=error_message,
error_stack=error_stack),
replace=True, ignore_extra_fields=True)
11 changes: 11 additions & 0 deletions datajoint_plus/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from .hash import generate_table_id
from .utils import classproperty
from .table import FreeTable
from .jobs import JobTable

logger = getLogger(__name__)

Expand Down Expand Up @@ -82,6 +83,16 @@ def load_dependencies(self, force=True):
"""
load_dependencies(self.connection, force=force)

@property
def jobs(self):
"""
schema.jobs provides a view of the job reservation table for the schema
:return: jobs table
"""
if self._jobs is None:
self._jobs = JobTable(self.connection, self.database)
return self._jobs


class VirtualModule(types.ModuleType):
"""
Expand Down
2 changes: 1 addition & 1 deletion datajoint_plus/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
UnknownAttributeError
)
from datajoint.expression import QueryExpression
from datajoint import blob
from datajoint_plus import blob
from datajoint_plus.hash import generate_table_id

from .utils import classproperty, goto
Expand Down
2 changes: 1 addition & 1 deletion datajoint_plus/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.1.8"
__version__ = "0.1.9"

0 comments on commit 7faacd7

Please sign in to comment.