Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wrap compilation in a cross-os-process lock #13860

Merged
merged 20 commits into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
246 changes: 246 additions & 0 deletions lib/mix/lib/mix/lock.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
defmodule Mix.Lock do
@moduledoc false

# Lock implementation working across multiple OS processes.
#
# The lock is implemented using TCP sockets and hard links.
#
# A process holds the lock if it owns a TCP socket, whose port is
# written in the lock_0 file. We need to create such lock files
# atomically, so the process first writes its port to a port_P
# file and then attempts to create a hard link to it at lock_0.
#
# An inherent problem with lock files is that the lock owner may
# terminate abruptly, leaving a "stale" file. Other processes can
# detect a stale file by reading the port written in that file,
# trying to connect to that port and failing. In order for another
# process to link to the same path, the file needs to be replaced.
# However, we need to guarantee that only a single process can
# remove or replace the file, otherwise a concurrent process may
# end up removing a newly linked file.
#
# To address this problem we employ a chained locking procedure.
# Specifically, we attempt to link our port to lock_0, if that
# fails, we try to connect to the lock_0 port. If we manage to
# connect, it means the lock is taken, so we wait for it to close
# and start over. If we fail to connect, it means the lock is stale,
# so we want to replace it. In order to do that, we try to obtain
# lock_1. Again, we try to link and connect. Eventually, we should
# successfully link to lock_N. At that point we can clean up all
# the files, so we perform these steps:
#
# * move our port_P to lock_0
# * remove all the other port_P files
# * remove all lock_1+ files
#
# It is important to perform these steps in this order, to avoid
# race conditions. By moving to lock_0, we make sure that all new
# processes trying to lock will connect to our port. By removing
# all port_P files we make sure that currently paused processes
# that are about to link port_P at lock_N will fail to link, since
# the port_P file will no longer exist (once lock_N is removed).
#
# Finally, note that we do not remove the lock file in `unlock/1`.
# If we did that, another process could try to connect and fail
# because the file would not exist, in such case the process would
# assume the file is stale and needs to be replaced, therefore
# possibly replacing another process who successfully links at the
# empty spot. This means we effectively always leave a stale file,
# however, in order to shortcut the port check for future processes,
# we atomically replace the file content with port 0, to indicate
# the file is stale.
#
# The main caveat of using ephemeral TCP ports is that they are not
# unique. This creates a theoretical scenario where the lock holder
# terminates abruptly and leaves its port in lock_0, then the port
# is assigned to a unrelated process (unaware of the locking). To
# handle this scenario, when we connect to a lock_N port, we expect
# it to immediately send us `@probe_data`. If this does not happen
# within `@probe_timeout_ms`, we assume the port is taken by an
# unrelated process and the lock file is stale. Note that it is ok
# to use a long timeout, because this scenario is very unlikely.
# Theoretically, if an actual lock owner is not able to send the
# probe data within the timeout, the lock will fail, however with
# a high enough timeout, this should not be a problem in practice.

@loopback {127, 0, 0, 1}
@listen_opts [:binary, ip: @loopback, packet: :raw, nodelay: true, backlog: 128, active: false]
@connect_opts [:binary, packet: :raw, nodelay: true, active: false]
@probe_data "elixirlock"
jonatanklosko marked this conversation as resolved.
Show resolved Hide resolved
@probe_timeout_ms 5_000

@doc """
Acquires a lock identified by the given key.

This function blocks until the lock is acquired by this process,
and then executes `fun`, returning its return value.

This function can also be called if this process already has the
lock. In such case the function is executed immediately.
"""
@spec lock(iodata(), (-> term())) :: :ok
def lock(key, fun) do
jonatanklosko marked this conversation as resolved.
Show resolved Hide resolved
key = key |> :erlang.md5() |> Base.url_encode64(padding: false)
path = Path.join([System.tmp_dir!(), "mix_lock", key])

pdict_key = {__MODULE__, path}
has_lock? = Process.get(pdict_key)

if has_lock? do
fun.()
else
lock = lock(path)
Process.put(pdict_key, true)

try do
fun.()
after
Process.delete(pdict_key)
jonatanklosko marked this conversation as resolved.
Show resolved Hide resolved
unlock(lock)
end
end
end

defp lock(path) do
File.mkdir_p!(path)

with {:ok, socket} <- :gen_tcp.listen(0, @listen_opts),
{:ok, port} <- :inet.port(socket) do
spawn_link(fn -> accept_loop(socket) end)

try do
try_lock(path, socket, port)
rescue
exception ->
# Close the socket to make sure we don't block the lock
:gen_tcp.close(socket)
reraise exception, __STACKTRACE__
end
else
{:error, reason} ->
raise Mix.Error,
jonatanklosko marked this conversation as resolved.
Show resolved Hide resolved
jonatanklosko marked this conversation as resolved.
Show resolved Hide resolved
"failed to open a TCP socket while acquiring a lock, reason: #{inspect(reason)}"
end
end

defp try_lock(path, socket, port) do
port_path = Path.join(path, "port_#{port}")

File.write!(port_path, <<port::unsigned-integer-32>>, [:raw])

case grab_lock(path, port_path, 0) do
{:ok, 0} ->
# We grabbed lock_0, so all good
%{socket: socket, path: path}

{:ok, _n} ->
# We grabbed lock_1+, so we need to replace lock_0 and clean up
take_over(path, port_path)
jonatanklosko marked this conversation as resolved.
Show resolved Hide resolved
%{socket: socket, path: path}

{:taken, probe_socket} ->
# Another process has the lock, wait for close and start over
await_close(probe_socket)
try_lock(path, socket, port)

:invalidated ->
try_lock(path, socket, port)
end
end

defp grab_lock(path, port_path, n) do
lock_path = Path.join(path, "lock_#{n}")

case File.ln(port_path, lock_path) do
jonatanklosko marked this conversation as resolved.
Show resolved Hide resolved
:ok ->
{:ok, n}

{:error, :eexist} ->
case probe(lock_path) do
{:ok, probe_socket} ->
{:taken, probe_socket}

:error ->
grab_lock(path, port_path, n + 1)
end

{:error, :enoent} ->
:invalidated
end
end

defp accept_loop(listen_socket) do
case :gen_tcp.accept(listen_socket) do
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If any other error happens here, we'll get a nasty CaseClauseError. Some of those are transient (:system_limit?), some are not. Maybe we do a simple algorithm of:

  1. Accept
  2. If {:error, reason} and we don't know if reason is transient or not (basically catch all), then sleep for like a second and go back to 1.

With a max number of like 2 or 3 retries for now? I know that just rerunning mix compile would likely achieve the same, but it seems like a small price to pay for a potentially-better UX. Thoughts?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit concerned that if we fail to accept, it means the client fails to connect and assumes the socket is stale. So it's probably safer to crash the lock owner to avoid corruption? We could raise a more specific error to avoid CaseClauseError.

When we reach system_limit on open files/sockets it usually blows everything up anyway? Are there any other transient errors you worry about? I'm looking at the list in linux manual and I'm not sure if it makes sense to retry any of them (other than EINTR).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The accepting process is sort of holding its own little lock, so this confuses me a bit:

it means the client fails to connect and assumes the socket is stale

The client would fail to connect with :closed if the socket is closed, not with other errors, no?

Copy link
Member Author

@jonatanklosko jonatanklosko Sep 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah you're right, I think these are not tied. The client is put in the accept queue and at that point they connect successfully, whether :gen_tcp.accept grabs it from the queue is separate.

But if the idea is to sleep and retry accepting, this heavily increases the chance that we don't send the probe data to the client on time, which is critical.

{:ok, socket} ->
_ = :gen_tcp.send(socket, @probe_data)
accept_loop(listen_socket)

{:error, reason} when reason in [:closed, :einval] ->
:ok
end
end

defp probe(port_path) do
with {:ok, <<port::unsigned-integer-32>>} when port > 0 <- File.read(port_path),
{:ok, socket} <- connect(port) do
case :gen_tcp.recv(socket, 0, @probe_timeout_ms) do
{:ok, @probe_data} ->
{:ok, socket}

{:error, _reason} ->
jonatanklosko marked this conversation as resolved.
Show resolved Hide resolved
:gen_tcp.close(socket)
:error
end
else
_other -> :error
jonatanklosko marked this conversation as resolved.
Show resolved Hide resolved
end
end

defp connect(port) do
# On Windows connecting to an unbound port takes a few seconds to
# fail, so instead we shortcut the check by attempting a listen,
# which succeeds or fails immediately
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, nice trick!

case :gen_tcp.listen(port, [reuseaddr: true] ++ @listen_opts) do
{:ok, socket} ->
:gen_tcp.close(socket)
# The port is free, so connecting would fail
{:error, :econnrefused}

{:error, _reason} ->
:gen_tcp.connect(@loopback, port, @connect_opts)
end
end

defp take_over(path, port_path) do
lock_path = Path.join(path, "lock_0")

# We linked to lock_N successfully, so port_path should exist
File.rename!(port_path, lock_path)

names = File.ls!(path)

for "port_" <> _ = name <- names do
File.rm!(Path.join(path, name))
end

for "lock_" <> _ = name <- names, name != "lock_0" do
File.rm!(Path.join(path, name))
end
Comment on lines +285 to +293
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this work? Nitpick, because your version works too 🙃

for file <- path |> Path.join("{port_,lock_}*") |> Path.wildcard(),
    Path.basename(file) != "lock_0",
    do: File.rm!(file)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to remove all port_* files first, only then all lock_* files. Not sure if wildcard guarantees the order, but even if it does, I'd rather be explicit about the order of operations.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't guarantee that as far as I know. Why do we need to delete all port files first? Might be worth leaving a comment.

Copy link
Member Author

@jonatanklosko jonatanklosko Sep 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sidenote: we actually want all files in the directory, so semantically there's no need for wildcard.

Why do we need to delete all port files first?

This part from the doc comment:

# [...] By removing
# all port_P files we make sure that currently paused processes
# that are about to link port_P at lock_N will fail to link, since
# the port_P file will no longer exist (once lock_N is removed).

This is best seen on an example, so imagine this:

port_100 (A)
port_200 (B)
lock_0 (stale)
lock_1 (stale)
lock_2 (owned by A)

Process A has port 100 and owns lock_2.
Process B has port 200 and is paused right before File.ln("port_200", "lock_1").

Process A first moves port_100 to lock_0. And now wants to remove files.

Let's say process A were to remove lock_1 first. Then process B wakes up and successfully links. At this point Process B will overtake A. Contrarily, if we remove port_200 first, when process B wakes up, it will fail to link.

In other words, once we are sure we are the new owner, we remove port_* files to invalidate any peers attempting to also get the lock.

Does that make sense? :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, makes sense. My head hurts, but it makes sense 😉 Let's maybe leave a comment here too saying that we need to delete these files in the specific order then, wdyt?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW the module comment already says they need to be deleted in order. There was also comment where take_over is called, but I moved it here, since it's more relevant here :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perfecto!

end

defp await_close(socket) do
{:error, _reason} = :gen_tcp.recv(socket, 0)
jonatanklosko marked this conversation as resolved.
Show resolved Hide resolved
end

defp unlock(lock) do
port_path = Path.join(lock.path, "port_0")
lock_path = Path.join(lock.path, "lock_0")

File.write!(port_path, <<0::unsigned-integer-32>>, [:raw])
File.rename!(port_path, lock_path)
after
# Closing the socket will cause the accepting process to finish
# and all accepted sockets (tied to that process) will get closed
:gen_tcp.close(lock.socket)
end
end
74 changes: 0 additions & 74 deletions lib/mix/lib/mix/state.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,6 @@ defmodule Mix.State do
GenServer.start_link(__MODULE__, :ok, name: @name)
end

def lock(key, fun) do
try do
GenServer.call(@name, {:lock, key}, @timeout)
fun.()
after
GenServer.call(@name, {:unlock, key}, @timeout)
end
end

def builtin_apps do
GenServer.call(@name, :builtin_apps, @timeout)
end
Expand Down Expand Up @@ -83,8 +74,6 @@ defmodule Mix.State do
)

state = %{
key_to_waiting: %{},
pid_to_key: %{},
builtin_apps: :code.get_path()
}

Expand Down Expand Up @@ -114,69 +103,6 @@ defmodule Mix.State do
end
end

@impl true
def handle_call({:lock, key}, {pid, _} = from, state) do
%{key_to_waiting: key_to_waiting, pid_to_key: pid_to_key} = state

key_to_waiting =
case key_to_waiting do
%{^key => {locked, waiting}} ->
Map.put(key_to_waiting, key, {locked, :queue.in(from, waiting)})

%{} ->
go!(from)
Map.put(key_to_waiting, key, {pid, :queue.new()})
end

ref = Process.monitor(pid)
pid_to_key = Map.put(pid_to_key, pid, {key, ref})
{:noreply, %{state | key_to_waiting: key_to_waiting, pid_to_key: pid_to_key}}
end

@impl true
def handle_call({:unlock, key}, {pid, _}, state) do
%{key_to_waiting: key_to_waiting, pid_to_key: pid_to_key} = state
{{^key, ref}, pid_to_key} = Map.pop(pid_to_key, pid)
Process.demonitor(ref, [:flush])
key_to_waiting = unlock(key_to_waiting, pid_to_key, key)
{:reply, :ok, %{state | key_to_waiting: key_to_waiting, pid_to_key: pid_to_key}}
end

@impl true
def handle_info({:DOWN, ref, _type, pid, _reason}, state) do
%{key_to_waiting: key_to_waiting, pid_to_key: pid_to_key} = state
{{key, ^ref}, pid_to_key} = Map.pop(pid_to_key, pid)

key_to_waiting =
case key_to_waiting do
%{^key => {^pid, _}} ->
unlock(key_to_waiting, pid_to_key, key)

%{^key => {locked, waiting}} ->
waiting = :queue.delete_with(fn {qpid, _qref} -> qpid == pid end, waiting)
Map.put(key_to_waiting, key, {locked, waiting})
end

{:noreply, %{state | key_to_waiting: key_to_waiting, pid_to_key: pid_to_key}}
end

defp unlock(key_to_waiting, pid_to_key, key) do
%{^key => {_locked, waiting}} = key_to_waiting

case :queue.out(waiting) do
{{:value, {pid, _} = from}, waiting} ->
# Assert that we still know this PID
_ = Map.fetch!(pid_to_key, pid)
go!(from)
Map.put(key_to_waiting, key, {pid, waiting})

{:empty, _waiting} ->
Map.delete(key_to_waiting, key)
end
end

defp go!(from), do: GenServer.reply(from, :ok)

# ../elixir/ebin -> elixir
# ../ssl-9.6/ebin -> ssl
defp app_from_code_path(path) do
Expand Down
7 changes: 7 additions & 0 deletions lib/mix/lib/mix/tasks/compile.all.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@ defmodule Mix.Tasks.Compile.All do
@impl true
def run(args) do
Mix.Project.get!()

Mix.Lock.lock(Mix.Tasks.Compile.compile_lock_key(), fn ->
do_run(args)
end)
end

defp do_run(args) do
config = Mix.Project.config()

# Compute the app cache if it is stale and we are
Expand Down
5 changes: 2 additions & 3 deletions lib/mix/lib/mix/tasks/compile.elixir.ex
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,10 @@ defmodule Mix.Tasks.Compile.Elixir do
|> profile_opts()

# Having compilations racing with other is most undesired,
# so we wrap the compiler in a lock. Ideally we would use
# flock in the future.
# so we wrap the compiler in a lock.

with_logger_app(project, fn ->
Mix.State.lock(__MODULE__, fn ->
Mix.Lock.lock(Mix.Tasks.Compile.compile_lock_key(), fn ->
Mix.Compilers.Elixir.compile(
manifest,
srcs,
Expand Down
13 changes: 8 additions & 5 deletions lib/mix/lib/mix/tasks/compile.erlang.ex
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,14 @@ defmodule Mix.Tasks.Compile.Erlang do
@impl true
def run(args) do
{opts, _, _} = OptionParser.parse(args, switches: @switches)
project = Mix.Project.config()
source_paths = project[:erlc_paths]
Mix.Compilers.Erlang.assert_valid_erlc_paths(source_paths)
files = Mix.Utils.extract_files(source_paths, [:erl])
do_run(files, opts, project, source_paths)

Mix.Lock.lock(Mix.Tasks.Compile.compile_lock_key(), fn ->
project = Mix.Project.config()
source_paths = project[:erlc_paths]
Mix.Compilers.Erlang.assert_valid_erlc_paths(source_paths)
files = Mix.Utils.extract_files(source_paths, [:erl])
do_run(files, opts, project, source_paths)
end)
end

defp do_run([], _, _, _), do: {:noop, []}
Expand Down
Loading
Loading