forked from CCExtractor/sample-platform
-
Notifications
You must be signed in to change notification settings - Fork 0
/
database.py
executable file
·136 lines (103 loc) · 3.6 KB
/
database.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
import re
from abc import ABCMeta
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base, DeclarativeMeta
from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy.sql.sqltypes import SchemaType, Enum, TypeDecorator
class DeclarativeABCMeta(DeclarativeMeta, ABCMeta):
"""
Empty class to create a mixin between DeclarativeMeta and ABCMeta
"""
pass
Base = declarative_base(metaclass=DeclarativeMeta)
Base.query = None
db_engine = None
def create_session(db_string, drop_tables=False):
"""
Creates a new DB session using the scoped_session that SQLAlchemy
provides.
:param db_string: The connection string.
:type db_string: str
:param drop_tables: Drop existing tables?
:type drop_tables: bool
:return: A SQLAlchemy session object
:rtype: sqlalchemy.orm.scoped_session
"""
import os
global db_engine, Base
# In testing, we want to maintain same memory variable
if db_engine is None or 'TESTING' not in os.environ or os.environ['TESTING'] == 'False':
db_engine = create_engine(db_string, convert_unicode=True)
db_session = scoped_session(sessionmaker(bind=db_engine))
Base.query = db_session.query_property()
if drop_tables:
Base.metadata.drop_all(bind=db_engine)
Base.metadata.create_all(bind=db_engine)
return db_session
class EnumSymbol(object):
"""
Define a fixed symbol tied to a parent class.
"""
def __init__(self, cls_, name, value, description):
self.cls_ = cls_
self.name = name
self.value = value
self.description = description
def __reduce__(self):
"""
Allow unpickling to return the symbol linked to the DeclEnum class.
"""
return getattr, (self.cls_, self.name)
def __iter__(self):
return iter([self.value, self.description])
def __repr__(self):
return "<{name}>".format(name=self.name)
class EnumMeta(type):
"""
Generate new DeclEnum classes.
"""
def __init__(cls, classname, bases, dict_):
cls._reg = reg = cls._reg.copy()
for k, v in dict_.items():
if isinstance(v, tuple):
sym = reg[v[0]] = EnumSymbol(cls, k, *v)
setattr(cls, k, sym)
return type.__init__(cls, classname, bases, dict_)
def __iter__(cls):
return iter(cls._reg.values())
class DeclEnum(object, metaclass=EnumMeta):
"""
Declarative enumeration.
"""
_reg = {}
@classmethod
def from_string(cls, value):
try:
return cls._reg[value]
except KeyError:
raise ValueError("Invalid value for {name}: {value}".format(name=cls.__name__, value=value))
@classmethod
def values(cls):
return cls._reg.keys()
@classmethod
def db_type(cls):
return DeclEnumType(cls)
class DeclEnumType(SchemaType, TypeDecorator):
def __init__(self, enum):
self.enum = enum
self.impl = Enum(
*enum.values(),
name="ck{0}".format(re.sub('([A-Z])', lambda m: "_" + m.group(1).lower(), enum.__name__))
)
def _set_table(self, table, column):
self.impl._set_table(table, column)
def copy(self):
return DeclEnumType(self.enum)
def process_bind_param(self, value, dialect):
if value is None:
return None
return value.value
def process_result_value(self, value, dialect):
if value is None:
return None
return self.enum.from_string(value.strip())