forked from zodb/relstorage
-
Notifications
You must be signed in to change notification settings - Fork 0
/
poll-invalidation-zodb-3-7.patch
96 lines (88 loc) · 3.58 KB
/
poll-invalidation-zodb-3-7.patch
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
Index: Connection.py
===================================================================
--- Connection.py (revision 104552)
+++ Connection.py (working copy)
@@ -75,8 +75,14 @@
"""Create a new Connection."""
self._db = db
- self._normal_storage = self._storage = db._storage
- self.new_oid = db._storage.new_oid
+ storage = db._storage
+ m = getattr(storage, 'bind_connection', None)
+ if m is not None:
+ # Use a storage instance bound to this connection.
+ storage = m(self)
+
+ self._normal_storage = self._storage = storage
+ self.new_oid = storage.new_oid
self._savepoint_storage = None
self.transaction_manager = self._synch = self._mvcc = None
@@ -170,6 +176,12 @@
# Multi-database support
self.connections = {self._db.database_name: self}
+ # Allow the storage to decide whether invalidations should
+ # propagate between connections. If the storage provides MVCC
+ # semantics, it is better to not propagate invalidations between
+ # connections.
+ self._propagate_invalidations = getattr(
+ self._storage, 'propagate_invalidations', True)
def add(self, obj):
"""Add a new object 'obj' to the database and assign it an oid."""
@@ -267,6 +279,11 @@
self.transaction_manager.unregisterSynch(self)
self._synch = None
+ # If the storage wants to know, tell it this connection is closing.
+ m = getattr(self._storage, 'connection_closing', None)
+ if m is not None:
+ m()
+
if primary:
for connection in self.connections.values():
if connection is not self:
@@ -438,8 +455,23 @@
self._registered_objects = []
self._creating.clear()
+ def _poll_invalidations(self):
+ """Poll and process object invalidations provided by the storage.
+ """
+ m = getattr(self._storage, 'poll_invalidations', None)
+ if m is not None:
+ # Poll the storage for invalidations.
+ invalidated = m()
+ if invalidated is None:
+ # special value: the transaction is so old that
+ # we need to flush the whole cache.
+ self._cache.invalidate(self._cache.cache_data.keys())
+ elif invalidated:
+ self._cache.invalidate(invalidated)
+
# Process pending invalidations.
def _flush_invalidations(self):
+ self._poll_invalidations()
self._inv_lock.acquire()
try:
# Non-ghostifiable objects may need to read when they are
@@ -698,6 +730,10 @@
"""Indicate confirmation that the transaction is done."""
def callback(tid):
+ if not self._propagate_invalidations:
+ # The storage disabled inter-connection invalidation.
+ return
+
d = dict.fromkeys(self._modified)
self._db.invalidate(tid, d, self)
# It's important that the storage calls the passed function
Index: DB.py
===================================================================
--- DB.py (revision 104552)
+++ DB.py (working copy)
@@ -260,6 +260,10 @@
storage.store(z64, None, file.getvalue(), '', t)
storage.tpc_vote(t)
storage.tpc_finish(t)
+ if hasattr(storage, 'connection_closing'):
+ # Let the storage release whatever resources it used for loading
+ # the root object.
+ storage.connection_closing()
# Multi-database setup.
if databases is None: