Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update to Erlang/OTP 26 and Elixir 1.16, remove deprecated warnings and fix dialyzer issue. #27

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
Changelog
=========

## Version 0.11.0

- [Enhancement] Update to Erlang/OTP 26 and Elixir 1.16, remove deprecated warnings



## Version 0.10.1

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ Add `que` to your project dependencies in `mix.exs`:

```elixir
def deps do
[{:que, "~> 0.10.1"}]
[{:que, "~> 0.11.0"}]
end
```

Expand Down
10 changes: 3 additions & 7 deletions config/config.exs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# This file is responsible for configuring your application
# and its dependencies with the aid of the Mix.Config module.
use Mix.Config
import Config

# This configuration is loaded before any dependency and is restricted
# to this project. If another project depends on this project, this
Expand All @@ -21,17 +21,13 @@ use Mix.Config
# config :logger, level: :info
#


config :mnesia,
dir: '.mnesia/#{Mix.env}/#{node()}'

dir: ~c".mnesia/#{Mix.env()}/#{node()}"


if Mix.env == :test do
if Mix.env() == :test do
config :que, log_level: :medium
end


# It is also possible to import configuration files, relative to this
# directory. For example, you can emulate configuration per environment
# by uncommenting the line below and defining dev.exs, test.exs and such.
Expand Down
12 changes: 2 additions & 10 deletions lib/que.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ defmodule Que do

```
defp deps do
[{:que, "~> #{Que.Mixfile.project[:version]}"}]
[{:que, "~> #{Que.Mixfile.project()[:version]}"}]
end
```

Expand Down Expand Up @@ -80,20 +80,14 @@ defmodule Que do

"""




@doc """
Starts the Que Application (and its Supervision Tree)
"""
def start(_type, _args) do
Que.Helpers.log("Booting Que", :low)
Que.Supervisor.start_link
Que.Supervisor.start_link()
end




@doc """
Enqueues a Job to be processed by Que.

Expand All @@ -112,6 +106,4 @@ defmodule Que do
"""
@spec add(worker :: module, arguments :: term) :: {:ok, %Que.Job{}}
defdelegate add(worker, arguments), to: Que.ServerSupervisor

end

16 changes: 4 additions & 12 deletions lib/que/helpers.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,43 +5,35 @@ defmodule Que.Helpers do
@moduledoc false

@log_levels [low: 0, medium: 1, high: 2]
@min_level Application.get_env(:que, :log_level, :low)

@min_level Application.compile_env(:que, :log_level, :low)

## Helper Module for `Que`. Exports methods that are used in the
## project internally. Not meant to be used as part of the Public
## API.



@doc """
Logger wrapper for internal Que use.

Doesn't log messages if the specified level is `:low` and the
enviroment is `:test`.
"""
@spec log(message :: String.t, level :: atom) :: :ok
@spec log(message :: String.t(), level :: atom) :: :ok
def log(message, level \\ :medium) do
if (level_value(level) >= level_value(@min_level)) do
if level_value(level) >= level_value(@min_level) do
Logger.info("#{@prefix} #{message}")
end
end



@doc """
Off-loads tasks to custom `Que.TaskSupervisor`
"""
@spec do_task((() -> any)) :: {:ok, pid}
@spec do_task((-> any)) :: {:ok, pid}
def do_task(fun) do
Task.Supervisor.start_child(Que.TaskSupervisor, fun)
end



# Convert Log Level to Integer
defp level_value(level) when is_atom(level) do
@log_levels[level] || 0
end

end
53 changes: 16 additions & 37 deletions lib/que/job.ex
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
defmodule Que.Job do
require Logger

defstruct [:id, :arguments, :worker, :status, :ref, :pid, :created_at, :updated_at]
defstruct [:id, :arguments, :worker, :status, :ref, :pid, :created_at, :updated_at]
## Note: Update Que.Persistence.Mnesia after changing these values


@moduledoc """
Module to manage a Job's state and execute the worker's callbacks.

Expand All @@ -13,49 +12,39 @@ defmodule Que.Job do
unless you absolutely know what you're doing.
"""


@statuses [:queued, :started, :failed, :completed]
@typedoc "One of the atoms in `#{inspect(@statuses)}`"
@type status :: atom

@typedoc "A `Que.Job` struct"
@type t :: %Que.Job{}


@typedoc "One of the atoms in `#{inspect(@statuses)}`"
@type status :: atom

@typedoc "A `Que.Job` struct"
@type t :: %Que.Job{}

@doc """
Returns a new Job struct with defaults
"""
@spec new(worker :: Que.Worker.t, args :: list) :: Que.Job.t
@spec new(worker :: Que.Worker.t(), args :: list) :: Que.Job.t()
def new(worker, args \\ nil) do
%Que.Job{
status: :queued,
worker: worker,
status: :queued,
worker: worker,
arguments: args
}
end




@doc """
Update the Job status to one of the predefined values in `@statuses`
"""
@spec set_status(job :: Que.Job.t, status :: status) :: Que.Job.t
@spec set_status(job :: Que.Job.t(), status :: status) :: Que.Job.t()
def set_status(job, status) when status in @statuses do
%{ job | status: status }
%{job | status: status}
end




@doc """
Updates the Job struct with new status and spawns & monitors a new Task
under the TaskSupervisor which executes the perform method with supplied
arguments
"""
@spec perform(job :: Que.Job.t) :: Que.Job.t
@spec perform(job :: Que.Job.t()) :: Que.Job.t()
def perform(job) do
Que.Helpers.log("Starting #{job}")

Expand All @@ -66,17 +55,14 @@ defmodule Que.Job do
job.worker.perform(job.arguments)
end)

%{ job | status: :started, pid: pid, ref: Process.monitor(pid) }
%{job | status: :started, pid: pid, ref: Process.monitor(pid)}
end




@doc """
Handles Job Success, Calls appropriate worker method and updates the job
status to :completed
"""
@spec handle_success(job :: Que.Job.t) :: Que.Job.t
@spec handle_success(job :: Que.Job.t()) :: Que.Job.t()
def handle_success(job) do
Que.Helpers.log("Completed #{job}")

Expand All @@ -86,17 +72,14 @@ defmodule Que.Job do
job.worker.on_teardown(job)
end)

%{ job | status: :completed, pid: nil, ref: nil }
%{job | status: :completed, pid: nil, ref: nil}
end




@doc """
Handles Job Failure, Calls appropriate worker method and updates the job
status to :failed
"""
@spec handle_failure(job :: Que.Job.t, error :: term) :: Que.Job.t
@spec handle_failure(job :: Que.Job.t(), error :: term) :: Que.Job.t()
def handle_failure(job, error) do
Que.Helpers.log("Failed #{job}")

Expand All @@ -106,18 +89,14 @@ defmodule Que.Job do
job.worker.on_teardown(job)
end)

%{ job | status: :failed, pid: nil, ref: nil }
%{job | status: :failed, pid: nil, ref: nil}
end
end




## Implementing the String.Chars protocol for Que.Job structs

defimpl String.Chars, for: Que.Job do
def to_string(job) do
"Job # #{job.id} with #{ExUtils.Module.name(job.worker)}"
end
end

42 changes: 11 additions & 31 deletions lib/que/server_supervisor.ex
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
defmodule Que.ServerSupervisor do
use Supervisor
use DynamicSupervisor

@module __MODULE__

Expand All @@ -9,37 +9,29 @@ defmodule Que.ServerSupervisor do
you absolutely know what you're doing.
"""




@doc """
Starts the Supervision Tree
"""
@spec start_link() :: Supervisor.on_start
def start_link do
@spec start_link(:ok) :: Supervisor.on_start()
def start_link(_) do
Que.Helpers.log("Booting Server Supervisor for Workers", :low)
pid = Supervisor.start_link(@module, :ok, name: @module)
pid = DynamicSupervisor.start_link(@module, :ok, name: @module)

# Resume Pending Jobs
resume_queued_jobs()

pid
end




@doc """
Starts a `Que.Server` for the given worker
"""
@spec start_server(worker :: Que.Worker.t) :: Supervisor.on_start_child | no_return
@spec start_server(worker :: Que.Worker.t()) :: Supervisor.on_start_child() | no_return
def start_server(worker) do
Que.Worker.validate!(worker)
Supervisor.start_child(@module, [worker])
DynamicSupervisor.start_child(@module, {Que.Server, worker})
end




# If the server for the worker is running, add job to it.
# If not, spawn a new server first and then add it.
@doc false
Expand All @@ -51,27 +43,17 @@ defmodule Que.ServerSupervisor do
Que.Server.add(worker, args)
end




@doc false
def init(:ok) do
children = [
worker(Que.Server, [])
]

supervise(children, strategy: :simple_one_for_one)
DynamicSupervisor.init(strategy: :one_for_one)
end




# Spawn all (valid) Workers with queued jobs
defp resume_queued_jobs do
{valid, invalid} =
Que.Persistence.incomplete
|> Enum.map(&(&1.worker))
|> Enum.uniq
Que.Persistence.incomplete()
|> Enum.map(& &1.worker)
|> Enum.uniq()
|> Enum.split_with(&Que.Worker.valid?/1)

# Notify user about pending jobs for Invalid Workers
Expand All @@ -85,6 +67,4 @@ defmodule Que.ServerSupervisor do
Enum.map(valid, &start_server/1)
end
end

end

17 changes: 5 additions & 12 deletions lib/que/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,32 +7,25 @@ defmodule Que.Supervisor do
you absolutely know what you're doing.
"""




@doc """
Starts the Supervision Tree for `Que`
"""
@spec start_link() :: Supervisor.on_start
@spec start_link() :: Supervisor.on_start()
def start_link do
# Initialize Mnesia DB for Jobs
Que.Persistence.initialize
Que.Persistence.initialize()

# Start Supervision Tree
Supervisor.start_link(__MODULE__, :ok, name: __MODULE__)
end




@doc false
def init(:ok) do
children = [
supervisor(Task.Supervisor, [[name: Que.TaskSupervisor]]),
supervisor(Que.ServerSupervisor, [])
{Task.Supervisor, name: Que.TaskSupervisor},
{Que.ServerSupervisor, []}
]

supervise(children, strategy: :one_for_one)
Supervisor.init(children, strategy: :one_for_one)
end

end
Loading