Skip to content

Commit

Permalink
Validate transaction log periodically automatically
Browse files Browse the repository at this point in the history
  • Loading branch information
polytypic committed Apr 28, 2023
1 parent 370ff1a commit c936fdb
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 15 deletions.
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1612,6 +1612,13 @@ Oops! So, within a transaction we may actually observe different locations
having values from different committed transactions. This is something that
needs to be kept in mind when writing transactions.

To mitigate issues due to torn reads and to also avoid problems with long
running transactions, the **kcas** transaction mechanism automatically validates
the transaction log periodically when an access is made to the transaction log.
Therefore an important guideline for writing transactions is that loops inside a
transaction should always include an access of some shared memory location
through the transaction log or should otherwise be guaranteed to be bounded.

## Scheduler interop

The blocking mechanism in **kcas** is based on a
Expand Down
66 changes: 52 additions & 14 deletions src/kcas.ml
Original file line number Diff line number Diff line change
Expand Up @@ -474,11 +474,33 @@ end

module Xt = struct
type 'x t = {
casn : casn;
mutable casn : casn;
mutable cass : cass;
mutable validate_countdown : int;
mutable validate_period : int;
mutable post_commit : Action.t;
}

let rec validate casn = function
| NIL -> ()
| CASN { loc; state; lt; gt; _ } ->
if lt != NIL then validate casn lt;
let before = if is_cmp casn state then eval state else state.before in
if before != eval (fenceless_get (as_atomic loc)) then Retry.later ();
validate casn gt

let validate xt =
let p = xt.validate_period * 2 in
xt.validate_countdown <- p;
xt.validate_period <- p;
validate xt.casn xt.cass
[@@inline never]

let maybe_validate xt =
let c = xt.validate_countdown - 1 in
if 0 < c then xt.validate_countdown <- c else validate xt
[@@inline]

let update0 loc f xt lt gt =
let state = fenceless_get (as_atomic loc) in
let before = eval state in
Expand Down Expand Up @@ -510,6 +532,7 @@ module Xt = struct
[@@inline]

let update loc f xt =
maybe_validate xt;
let x = loc.id in
match xt.cass with
| NIL -> update0 loc f xt NIL NIL
Expand Down Expand Up @@ -598,13 +621,21 @@ module Xt = struct
awaiter;
remove_awaiters awaiter casn stop gt)

let rec commit backoff (mode : Mode.t) tx =
let xt =
let casn = Atomic.make (mode :> status)
and cass = NIL
and post_commit = Action.noop in
{ casn; cass; post_commit }
in
let initial_validate_period = 16

let reset_quick xt =
xt.cass <- NIL;
xt.validate_countdown <- initial_validate_period;
xt.validate_period <- initial_validate_period;
xt.post_commit <- Action.noop;
xt
[@@inline]

let reset mode xt =
xt.casn <- Atomic.make (mode :> status);
reset_quick xt

let rec commit backoff mode xt tx =
match tx ~xt with
| result -> (
match xt.cass with
Expand All @@ -616,13 +647,14 @@ module Xt = struct
state.before <- state.after;
state.casn <- casn_after;
if cas loc before state then Action.run xt.post_commit result
else commit (Backoff.once backoff) mode tx
else commit (Backoff.once backoff) mode (reset_quick xt) tx
| cass -> (
match determine_for_owner xt.casn cass with
| true -> Action.run xt.post_commit result
| false -> commit (Backoff.once backoff) mode tx
| false -> commit (Backoff.once backoff) mode (reset mode xt) tx
| exception Mode.Interference ->
commit (Backoff.once backoff) Mode.lock_free tx))
commit (Backoff.once backoff) Mode.lock_free
(reset Mode.lock_free xt) tx))
| exception Retry.Later -> (
if xt.cass == NIL then invalid_retry ();
let t = Domain_local_await.prepare_for_await () in
Expand All @@ -631,16 +663,22 @@ module Xt = struct
match t.await () with
| () ->
remove_awaiters t.release xt.casn NIL xt.cass;
commit (Backoff.reset backoff) mode tx
commit (Backoff.reset backoff) mode (reset_quick xt) tx
| exception cancellation_exn ->
remove_awaiters t.release xt.casn NIL xt.cass;
raise cancellation_exn)
| CASN _ as stop ->
remove_awaiters t.release xt.casn stop xt.cass;
commit (Backoff.once backoff) mode tx)
commit (Backoff.once backoff) mode (reset_quick xt) tx)

let commit ?(backoff = Backoff.default) ?(mode = Mode.obstruction_free) { tx }
=
commit backoff mode tx
let casn = Atomic.make (mode :> status)
and cass = NIL
and validate_countdown = initial_validate_period
and validate_period = initial_validate_period
and post_commit = Action.noop in
let xt = { casn; cass; validate_countdown; post_commit; validate_period } in
commit backoff mode xt tx
[@@inline]
end
17 changes: 16 additions & 1 deletion src/kcas.mli
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,22 @@ module Xt : sig
Accesses of shared memory locations using an explicit transaction log
first ensure that the initial value of the shared memory location is
recorded in the log and then act on the current value of the shared memory
location as recorded in the log. *)
location as recorded in the log.
It is important to understand that it is possible for a transaction to
observe the contents of two (or more) different shared memory locations
from two (or more) different committed updates. This means that
invariants that hold between two (or more) different shared memory
locations may be seen as broken inside the transaction function. However,
it is not possible to commit a transaction after it has seen such an
inconsistent view of the shared memory locations.
To mitigate potential issues due to this torn read anomaly and due to very
long running transactions, all of the access recording operations in this
section periodically validate the entire transaction log. An important
guideline for writing transactions is that loops inside a transaction
should always include an access of some shared memory location through the
transaction log or should otherwise be guaranteed to be bounded. *)

val get : xt:'x t -> 'a Loc.t -> 'a
(** [get ~xt r] returns the current value of the shared memory location [r] in
Expand Down
27 changes: 27 additions & 0 deletions test/test.ml
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,32 @@ let test_blocking () =

(* *)

let test_validation () =
let a = Loc.make 0 and b = Loc.make 0 and looping = ref false in
let non_zero_difference_domain =
Domain.spawn @@ fun () ->
let rec tx ~xt =
let d = Xt.get ~xt a - Xt.get ~xt b in
if d <> 0 then d
else (
(* We explicitly want this tx to go into infinite loop! Without
validation this would never finish. *)
looping := true;
tx ~xt)
in
Xt.commit { tx }
in

while not !looping do
Domain.cpu_relax ()
done;

Loc.set a 1;

assert (1 = Domain.join non_zero_difference_domain)

(* *)

type _ _loc_is_injective =
| Int : int _loc_is_injective
| Loc : 'a _loc_is_injective -> 'a Loc.t _loc_is_injective
Expand Down Expand Up @@ -437,6 +463,7 @@ let () =
test_post_commit ();
test_backoff ();
test_blocking ();
test_validation ();
test_xt ()

(*
Expand Down

0 comments on commit c936fdb

Please sign in to comment.