-
Notifications
You must be signed in to change notification settings - Fork 392
Pathological Eventual Consistency in Cluster Metadata
The following fell out of a discussion amongst Jordon West, Engel Sanchez and Andrew Thompson. This document is an attempt to discuss this case, generally, but the discussion was initially focused on Riak's new Cluster Metadata subsystem and Riak itself. So I'm sure some assumptions and details about the two will leak through.
Assume we have a static set of nodes, n1,n2,..,nN
, each of which are
responsible for storing a replica, r1,r2,...,rN
, of the value for
key, k
. Each replica is a list of values (siblings) and a logical
clock.In the case of Riak, the set of nodes may be the owners in
the preference list (primary or otherwise but for now we consider static
members). In Cluster Metadata, the set of nodes is equivalent to all
nodes in the cluster.
When a update to k
is received by a node, it increments its counter
in the logical clock and writes it along w/ the new value, locally,
before returning to the client. All further replication is done
asynchronously. This is the only option in Cluster Metadata and w=1
in Riak.
Siblings may be generated when concurrent updates are received by different nodes or when updates two updates occur on different sides of a network partition.
When reading k
I'm going to assume for now that we only wait for a
value from a single replica -- r=1 in Riak land and the way Cluster
Metadata works. I suspect this applies no matter how many replicas we
read but I haven't had time to think it through. Either way, the
siblings are resolved and the new value is written back to the
cluster. This write may or may not occur on the same node as the
previous read and is treated like any other update. Specifically the
logical clock is incremented -- the new value is a descendant of the
siblings that were resolved.
In addition to the assumptions above, we start with each replica
r1,r2,..,rN
having the same value. The value has S
siblings,
[s1,s2,..,sS]
and logical clock [c1,c2,...,cN]
-- one counter per
node like a version vector and that the identifier is per node in the
cluster not per client (Riak's vnode_vclocks
). The process of getting in
this state is not a concern and its a separate conversation about how
difficult it is to get there. What's important for this example is
that we have a siblings on a subset of nodes that, to start, all have
the same logical clock. To keep this simple, among other reasons, I'll
assume the same siblings are on all nodes to start and that all nodes
have these siblings. Neither of these assumptions are requirements.
The guts of the problem stems from what happens when we read these siblings concurrently. As outlined earlier, when we resolve and produce a new value we perform an update. This can lead to the further generation of siblings.
Take the following progression, assuming 3 nodes, n1,n2,n3
and 3
siblings s1,s2,s3
stored on each with clock [c1,c2,c3]
(as outlined above):
- a client reads
k
fromn1
and resolves siblings,[s1,s2,s3]
producings4
, which is written back ton1
. The resulting value stored atn1
is[s4]
with logical clock[c1+1,c2,c3]
. - a client reads
k
fromn2
and resolves siblings,[s1,s2,s3]
producings5
, which is written back ton2
. The resulting value stored atn2
is[s5]
with logical clock[c1,c2+1,c3]
. - a client reads
k
fromn3
and resolves siblings,[s1,s2,s3]
producings6
, which is written back ton3
. The resulting value stored atn3
is[s6]
with logical clock[c1,c2,c3+1]
. - async. replication of the value written to
n1
reachesn2
. After merging the replicated data the value atn1
is[s4,s5]
and the clock is[c1+1,c2+1,c3]
, the value atn2
andn3
remain unchanged. The merge of the data atn1
does not result in a new round of replication. - async. replication of the value written to
n2
reachesn3
. After merging the replicated data the value atn3
is[s5,s6]
with the resulting clock being[c1,c2+1,c3+1]
.n1
still stores[s4,s5]
and the associated clock, whilen2
's value continues to remain unchanged. The merge of the data atn3
does not result in a new round of replication. - async. replication of the value written to
n1
reachesn3
. After merging the replicated data the value atn3
is[s4,s5,s6]
with the resulting clock being[c1+1,c2+1,c3+1]
. The values atn1
andn2
do not change from their previous values. The merge of the data atn3
does not result in a new round of replication. - async. replication of the value written to
n3
reachesn1
. After merging the replicated data the value atn3
is[s4,s6]
with the resulting clock being[c1+1,c2,c3+1]
. The values atn2
andn3
do not change. The merge of the data atn1
does not result in a new round of replication. - async replication of the value written to
n3
reachesn2
. After merging the replicated data the value atn3
is[s4,s5,s6]
with the resulting clock being[c1+1,c2+1,c3+1]
. The values atn1
andn3
do not change. The merge of the data atn2
does not result in a new round of replication. - async replication of the value written to
n2
reachesn1
. After merging the replicated data the value atn1
is[s4,s5,s6]
with the resulting clock being[c1+1,c2+1,c3+1]
. The values atn2
andn3
do not change. The merge of the data atn3
does not result in a new round of replication.
At this point, if we assume no more reads or writes are performed
during the interleaving above, we have reached a consistent state and
no more replication messages will be flowing through the system. The
final state is all r1,r2,r3
all having the value [s4,s5,s6]
with
logical clock [c1+1,c2+1,c3+1]
. This final state and initial state
are the same. One is just a causal descendant of the other. We could
imagine a case where the above or similar interleaving continues to
occur over and over, leaving us in our initial state indefinitely.
If we can persist in this state indefinitely, the obvious question is
can we escape it. An interleaving where the final state of
r1,r2,..,rN
is the same value with a single sibling would be one
such way.
One such interleaving, assuming the same initial state, is shown below. In short, a round of replication completing before any other update of a resolved value will result in the desired end state.
- a client reads
k
fromn1
and resolves siblings,[s1,s2,s3]
producings4
, which is written back ton1
. The resulting value stored atn1
is[s4]
with logical clock[c1+1,c2,c3]
. - async. replication of the value written to
n1
reachesn2
. After merging the replicated data the value atn2
is[s4]
with logical clock[c1+1,c2,c3]
. The values atn1
andn3
remain the same and the merge of the data atn2
does not result in a new round of replication. - async. replication of the value written to
n1
reachesn3
. After merging the replicated data the value atn3
is[s4]
with logical clock[c1+1,c2,c3
]. The values at
n1and
n2do not change and the merge of the data at
n3` does not result in a new round of replication. - a client reads
k
fromn2
and has no need to resolve siblings since the only one iss4
.
The last step is ancillary but I mention it to round out the example. At the end of this interleaving, if no other operations are performed on the store, there will be no more messages flowing through the system.
Since we can escape this undesirable state with some interleavings then persisting in it indefinitely requires that only the interleavings that do not meet the termination condition flow through the system. Of course, this assumes you believe the above. I'd like to prove it formally, but for now that will have to wait. So assuming you believe, it then becomes interesting to analyze how likely that this pathological case is and I'm going to try and do some math now.
Lets make some more assumptions about our cluster. I'm going to evolve these as we go along with this analysis but lets start with the following:
- We have
a1,a2,...aM
application servers that act as clients for the store. In this case I'm thinking more about Riak but for the Cluster Metadata curious, lets say these clients may also invoke work that performs makes the store perform these read and update requests within the cluster (e.g. security storing some data in the store based on a request from one of the clients). - The network between the clients and the store is as fast as the network between the nodes -- they are in the same rack or whatever. This assumption should raise the likely hood of the pathological case.
- Since we assume the networks are equally fast, lets assume it takes
constant time to communicate between any client and any node in the
store and between any two nodes. I'll refer to this as
MessageTime
. - Other work done by a node to facilitate an update or replicate a value is considered negligible -- we have perfect and infinitely fast disks/CPUs/memory/etc.
We can then define the time it takes for a client to send an update to a node a receive a successful response as:
UpdateTime = MessageTime + MessageTime
= 2*MessageTime + WriteTime
Additionally, we can define the time it takes for an update to replicate to some or all nodes in the cluster. If we assume no concurrency while replicating asynchronously then we can define this as:
ReplicationTime = (N-1)*MessageTime
If we choose say 2 milliseconds
as a value for UpdateTime
Then the
MessageTime
is 1 millisecond
and the ReplicationTime
is
N-1
. If we assume a single client and two nodes, then the time it
takes to update and replicate are equal. Now, if we throw-away
some of the above assumptions and consider the overhead in the client in
addition to variability in networks, the likely-hood of always seeing a
pathological interleaving indefinitely seems small (seems is not real
numbers...).
With 160 nodes or replicas in Cluster Metadata, the time to replicate
to all of them would be about 7 milliseconds
using the numbers
above. I calculate this by considering that the number of hops needed
to fully replicate (measured as the "Last Distance Hop"). This number
grows linearly (for now, its known how we can make the curve better)
and for 10 nodes it was 3
, for 20 nodes it 4
and for 40 nodes it
was 5
, so extrapolating we get 7
for 160 nodes, which is the value
for N
in our ReplicationTime
equation. Since we replicate
concurrently between nodes, interchanging the node count for this hop
count is safe given my assumptions.
If we assume a constant request rate and a 99% of chance of hitting distinct nodes for each concurrent request then yes its possible in this cluster to never reach the termination condition.
The question then becomes is this actually an issue or was it just an interesting exercise that helps all of us better understand eventual consistency. I see it as the latter although its no more than conjecture without trying to hit the case in an existing code base or attempt to explore it more formally. However, I'll try to outline my reasoning since its pretty short.
Applications that write data to an eventually consistent store, possibly resulting in siblings, need to account for conflict resolution. A smart conflict resolver is deterministic. Our pathological example did not result in something like sibling explosion (in fact the number of siblings is well bounded). If our concurrent resolutions each resolve to the same value the siblings potentially produced from it are in turn simple to resolve and so forth.
This conversation came up in the context of Riak's security
enable/disable feature, whose state is stored in Cluster Metadata
. If the resolver were to use last-write-wins (the one built
into cluster metadata), which we will call deterministic ignoring the
potential randomness of clocks, and the initial siblings were {true, time1}
and {false, time1+10seconds}
then we would always pick the
latter no matter which node we read from. That in turn might generate
{false,time+20seconds}
and {false, time+30seconds}
on the two
different nodes, which no matter what resolves to false (even if there
are more siblings). So we can create siblings galore and still resolve
to a correct value. If a user comes along and re-enables security and
generates {true, time+50seconds}
it may or may not take however, given
the pathological case. Again the odds of persisting in that state
forever are debatable, but its possible no matter how miniscule.