forked from zodb/relstorage
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpoll-invalidation-zodb-3-8.patch
96 lines (88 loc) · 3.63 KB
/
poll-invalidation-zodb-3-8.patch
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
Index: Connection.py
===================================================================
--- Connection.py (revision 104546)
+++ Connection.py (working copy)
@@ -90,8 +90,15 @@
self.connections = {self._db.database_name: self}
self._version = version
- self._normal_storage = self._storage = db._storage
- self.new_oid = db._storage.new_oid
+
+ storage = db._storage
+ m = getattr(storage, 'bind_connection', None)
+ if m is not None:
+ # Use a storage instance bound to this connection.
+ storage = m(self)
+ self._normal_storage = self._storage = storage
+ self.new_oid = storage.new_oid
+
self._savepoint_storage = None
# Do we need to join a txn manager?
@@ -151,6 +158,12 @@
# in the cache on abort and in other connections on finish.
self._modified = []
+ # Allow the storage to decide whether invalidations should
+ # propagate between connections. If the storage provides MVCC
+ # semantics, it is better to not propagate invalidations between
+ # connections.
+ self._propagate_invalidations = getattr(
+ self._storage, 'propagate_invalidations', True)
# _invalidated queues invalidate messages delivered from the DB
# _inv_lock prevents one thread from modifying the set while
@@ -297,6 +310,11 @@
if self._opened:
self.transaction_manager.unregisterSynch(self)
+ # If the storage wants to know, tell it this connection is closing.
+ m = getattr(self._storage, 'connection_closing', None)
+ if m is not None:
+ m()
+
if primary:
for connection in self.connections.values():
if connection is not self:
@@ -469,8 +487,23 @@
self._registered_objects = []
self._creating.clear()
+ def _poll_invalidations(self):
+ """Poll and process object invalidations provided by the storage.
+ """
+ m = getattr(self._storage, 'poll_invalidations', None)
+ if m is not None:
+ # Poll the storage for invalidations.
+ invalidated = m()
+ if invalidated is None:
+ # special value: the transaction is so old that
+ # we need to flush the whole cache.
+ self._cache.invalidate(self._cache.cache_data.keys())
+ elif invalidated:
+ self._cache.invalidate(invalidated)
+
# Process pending invalidations.
def _flush_invalidations(self):
+ self._poll_invalidations()
self._inv_lock.acquire()
try:
# Non-ghostifiable objects may need to read when they are
@@ -748,6 +781,9 @@
"""Indicate confirmation that the transaction is done."""
def callback(tid):
+ if not self._propagate_invalidations:
+ # The storage disabled inter-connection invalidation.
+ return
d = dict.fromkeys(self._modified)
self._db.invalidate(tid, d, self)
# It's important that the storage calls the passed function
Index: DB.py
===================================================================
--- DB.py (revision 104546)
+++ DB.py (working copy)
@@ -284,6 +284,10 @@
storage.store(z64, None, file.getvalue(), '', t)
storage.tpc_vote(t)
storage.tpc_finish(t)
+ if hasattr(storage, 'connection_closing'):
+ # Let the storage release whatever resources it used for loading
+ # the root object.
+ storage.connection_closing()
# Multi-database setup.
if databases is None: