Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subscription broker #274

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 50 additions & 5 deletions lib/absinthe/plug.ex
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,32 @@ defmodule Absinthe.Plug do
end
```

## Pipeline

GraphQL documents undergo a series of transformations when they are executed.
This happens in a pipeline. The pipeline is a series of phases, each of which
is responsible for e.g. parsing the document, validating it, or executing it.

The pipeline is configurable per document, and the pipeline callback can be
used to change it.

E.g. to skip the SubscribeSelf phase you can do:

```elixir
plug Absinthe.Plug,
schema: MyApp.Schema,
pipeline: {__MODULE__, :document_pipeline}


def document_pipeline(config, pipeline_opts) do
config
|> Absinthe.Plug.default_pipeline(opts)
|> Absinthe.Pipeline.without(Absinthe.Phase.Subscription.SubscribeSelf)
end
```

See `Absinthe.Pipeline` for more information.

## Included GraphQL Types

This package includes additional types for use in Absinthe GraphQL schema and
Expand Down Expand Up @@ -130,6 +156,7 @@ defmodule Absinthe.Plug do
:no_query_message,
:json_codec,
:pipeline,
:subscription_broker,
:document_providers,
:schema,
:serializer,
Expand All @@ -153,8 +180,9 @@ defmodule Absinthe.Plug do
- `:context` -- (Optional) Initial value for the Absinthe context, available to resolvers. (default: `%{}`).
- `:no_query_message` -- (Optional) Message to return to the client if no query is provided (default: "No query document supplied").
- `:json_codec` -- (Optional) A `module` or `{module, Keyword.t}` dictating which JSON codec should be used (default: `Jason`). The codec module should implement `encode!/2` (e.g., `module.encode!(body, opts)`).
- `:pipeline` -- (Optional) `{module, atom}` reference to a 2-arity function that will be called to generate the processing pipeline. (default: `{Absinthe.Plug, :default_pipeline}`).
- `:document_providers` -- (Optional) A `{module, atom}` reference to a 1-arity function that will be called to determine the document providers that will be used to process the request. (default: `{Absinthe.Plug, :default_document_providers}`, which configures `Absinthe.Plug.DocumentProvider.Default` as the lone document provider). A simple list of document providers can also be given. See `Absinthe.Plug.DocumentProvider` for more information about document providers, their role in processing requests, and how you can define and configure your own.
- `:pipeline` -- (Optional) `{module, atom}` reference to a 2-arity function that will be called to generate the processing pipeline. (default: [`{Absinthe.Plug, :default_pipeline}`](`default_pipeline/2`)).
- `:subscription_broker` -- (Optional) `{module, atom}` reference to a 3-arity function that will be called when `subscription` documents are submitted. (default: [`{Absinthe.Plug, :default_subscription_broker}`](`default_subscription_broker/3`)).
- `:document_providers` -- (Optional) A `{module, atom}` reference to a 1-arity function that will be called to determine the document providers that will be used to process the request. (default: [`{Absinthe.Plug, :default_document_providers}`](`default_document_providers/1`), which configures `Absinthe.Plug.DocumentProvider.Default` as the lone document provider). A simple list of document providers can also be given. See `Absinthe.Plug.DocumentProvider` for more information about document providers, their role in processing requests, and how you can define and configure your own.
- `:schema` -- (Required, if not handled by Mix.Config) The Absinthe schema to use. If a module name is not provided, `Application.get_env(:absinthe, :schema)` will be attempt to find one.
- `:serializer` -- (Optional) Similar to `:json_codec` but allows the use of serialization formats other than JSON, like MessagePack or Erlang Term Format. Defaults to whatever is set in `:json_codec`.
- `:content_type` -- (Optional) The content type of the response. Should probably be set if `:serializer` option is used. Defaults to `"application/json"`.
Expand All @@ -172,6 +200,7 @@ defmodule Absinthe.Plug do
context: map,
json_codec: module | {module, Keyword.t()},
pipeline: {module, atom},
subscription_broker: {module, atom},
no_query_message: String.t(),
document_providers:
[Absinthe.Plug.DocumentProvider.t(), ...]
Expand Down Expand Up @@ -203,6 +232,9 @@ defmodule Absinthe.Plug do

pipeline = Keyword.get(opts, :pipeline, {__MODULE__, :default_pipeline})

subscription_broker =
Keyword.get(opts, :subscription_broker, {__MODULE__, :default_subscription_broker})

document_providers =
Keyword.get(opts, :document_providers, {__MODULE__, :default_document_providers})

Expand Down Expand Up @@ -239,6 +271,7 @@ defmodule Absinthe.Plug do
json_codec: json_codec,
no_query_message: no_query_message,
pipeline: pipeline,
subscription_broker: subscription_broker,
raw_options: raw_options,
schema_mod: schema_mod,
serializer: serializer,
Expand Down Expand Up @@ -294,9 +327,10 @@ defmodule Absinthe.Plug do
conn
|> encode(400, error_result(msg), config)

{:ok, %{"subscribed" => topic}} ->
conn
|> subscribe(topic, config)
{:ok, %{"subscribed" => _topic} = result} ->
{module, fun} = config.subscription_broker

apply(module, fun, [conn, config, result])

{:ok, %{data: _} = result} ->
conn
Expand Down Expand Up @@ -566,6 +600,17 @@ defmodule Absinthe.Plug do
)
end

@doc """
The default subscription broker used to process GraphQL subscription documents.

It will start the subscription and any updates will be sent over the connection
using Server Side Events (SSE).
"""
def default_subscription_broker(conn, config, %{"subscribed" => topic}) do
conn
|> subscribe(topic, config)
end

#
# DOCUMENT PROVIDERS
#
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ defmodule Absinthe.Plug.Mixfile do

defp deps do
[
{:absinthe, "~> 1.5"},
{:absinthe, "~> 1.7"},
{:plug, "~> 1.4"},
{:jason, ">= 0.0.0", only: [:dev, :test]},
{:ex_doc, "~> 0.20", only: :dev}
Expand Down
15 changes: 8 additions & 7 deletions mix.lock
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
%{
"absinthe": {:hex, :absinthe, "1.5.4", "e84820f770149ada718e2bd58939322965089a5badd9a0bfe632e05b27248dd0", [:mix], [{:dataloader, "~> 1.0.0", [hex: :dataloader, repo: "hexpm", optional: true]}, {:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}, {:nimble_parsec, "~> 0.5 or ~> 1.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "6df329a071865065edd13a5a1da9981dfe8dc22c5d6729218ca9023999e9a2ff"},
"absinthe": {:hex, :absinthe, "1.7.0", "36819e7b1fd5046c9c734f27fe7e564aed3bda59f0354c37cd2df88fd32dd014", [:mix], [{:dataloader, "~> 1.0.0", [hex: :dataloader, repo: "hexpm", optional: true]}, {:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}, {:nimble_parsec, "~> 0.5 or ~> 1.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0 or ~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "566a5b5519afc9b29c4d367f0c6768162de3ec03e9bf9916f9dc2bcbe7c09643"},
"earmark": {:hex, :earmark, "1.4.4", "4821b8d05cda507189d51f2caeef370cf1e18ca5d7dfb7d31e9cafe6688106a4", [:mix], [], "hexpm", "1f93aba7340574847c0f609da787f0d79efcab51b044bb6e242cae5aca9d264d"},
"earmark_parser": {:hex, :earmark_parser, "1.4.10", "6603d7a603b9c18d3d20db69921527f82ef09990885ed7525003c7fe7dc86c56", [:mix], [], "hexpm", "8e2d5370b732385db2c9b22215c3f59c84ac7dda7ed7e544d7c459496ae519c0"},
"ex_doc": {:hex, :ex_doc, "0.23.0", "a069bc9b0bf8efe323ecde8c0d62afc13d308b1fa3d228b65bca5cf8703a529d", [:mix], [{:earmark_parser, "~> 1.4.0", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}], "hexpm", "f5e2c4702468b2fd11b10d39416ddadd2fcdd173ba2a0285ebd92c39827a5a16"},
"earmark_parser": {:hex, :earmark_parser, "1.4.26", "f4291134583f373c7d8755566122908eb9662df4c4b63caa66a0eabe06569b0a", [:mix], [], "hexpm", "48d460899f8a0c52c5470676611c01f64f3337bad0b26ddab43648428d94aabc"},
"ex_doc": {:hex, :ex_doc, "0.28.5", "3e52a6d2130ce74d096859e477b97080c156d0926701c13870a4e1f752363279", [:mix], [{:earmark_parser, "~> 1.4.19", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "d2c4b07133113e9aa3e9ba27efb9088ba900e9e51caa383919676afdf09ab181"},
"jason": {:hex, :jason, "1.2.2", "ba43e3f2709fd1aa1dce90aaabfd039d000469c05c56f0b8e31978e03fa39052", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "18a228f5f0058ee183f29f9eae0805c6e59d61c3b006760668d8d18ff0d12179"},
"makeup": {:hex, :makeup, "1.0.5", "d5a830bc42c9800ce07dd97fa94669dfb93d3bf5fcf6ea7a0c67b2e0e4a7f26c", [:mix], [{:nimble_parsec, "~> 0.5 or ~> 1.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "cfa158c02d3f5c0c665d0af11512fed3fba0144cf1aadee0f2ce17747fba2ca9"},
"makeup_elixir": {:hex, :makeup_elixir, "0.14.1", "4f0e96847c63c17841d42c08107405a005a2680eb9c7ccadfd757bd31dabccfb", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "f2438b1a80eaec9ede832b5c41cd4f373b38fd7aa33e3b22d9db79e640cbde11"},
"makeup": {:hex, :makeup, "1.1.0", "6b67c8bc2882a6b6a445859952a602afc1a41c2e08379ca057c0f525366fc3ca", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "0a45ed501f4a8897f580eabf99a2e5234ea3e75a4373c8a52824f6e873be57a6"},
"makeup_elixir": {:hex, :makeup_elixir, "0.16.0", "f8c570a0d33f8039513fbccaf7108c5d750f47d8defd44088371191b76492b0b", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "28b2cbdc13960a46ae9a8858c4bebdec3c9a6d7b4b9e7f4ed1502f8159f338e7"},
"makeup_erlang": {:hex, :makeup_erlang, "0.1.1", "3fcb7f09eb9d98dc4d208f49cc955a34218fc41ff6b84df7c75b3e6e533cc65f", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "174d0809e98a4ef0b3309256cbf97101c6ec01c4ab0b23e926a9e17df2077cbb"},
"mime": {:hex, :mime, "1.4.0", "5066f14944b470286146047d2f73518cf5cca82f8e4815cf35d196b58cf07c47", [:mix], [], "hexpm", "75fa42c4228ea9a23f70f123c74ba7cece6a03b1fd474fe13f6a7a85c6ea4ff6"},
"nimble_parsec": {:hex, :nimble_parsec, "1.1.0", "3a6fca1550363552e54c216debb6a9e95bd8d32348938e13de5eda962c0d7f89", [:mix], [], "hexpm", "08eb32d66b706e913ff748f11694b17981c0b04a33ef470e33e11b3d3ac8f54b"},
"nimble_parsec": {:hex, :nimble_parsec, "1.2.3", "244836e6e3f1200c7f30cb56733fd808744eca61fd182f731eac4af635cc6d0b", [:mix], [], "hexpm", "c8d789e39b9131acf7b99291e93dae60ab48ef14a7ee9d58c6964f59efb570b0"},
"plug": {:hex, :plug, "1.11.0", "f17217525597628298998bc3baed9f8ea1fa3f1160aa9871aee6df47a6e4d38e", [:mix], [{:mime, "~> 1.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "2d9c633f0499f9dc5c2fd069161af4e2e7756890b81adcbb2ceaa074e8308876"},
"plug_crypto": {:hex, :plug_crypto, "1.2.0", "1cb20793aa63a6c619dd18bb33d7a3aa94818e5fd39ad357051a67f26dfa2df6", [:mix], [], "hexpm", "a48b538ae8bf381ffac344520755f3007cc10bd8e90b240af98ea29b69683fc2"},
"telemetry": {:hex, :telemetry, "0.4.2", "2808c992455e08d6177322f14d3bdb6b625fbcfd233a73505870d8738a2f4599", [:rebar3], [], "hexpm", "2d1419bd9dda6a206d7b5852179511722e2b18812310d304620c7bd92a13fcef"},
"telemetry": {:hex, :telemetry, "0.4.3", "a06428a514bdbc63293cd9a6263aad00ddeb66f608163bdec7c8995784080818", [:rebar3], [], "hexpm", "eb72b8365ffda5bed68a620d1da88525e326cb82a75ee61354fc24b844768041"},
}
2 changes: 1 addition & 1 deletion test/lib/absinthe/plug/transport_batching_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ defmodule Absinthe.Plug.TransportBatchingTest do
%{
id: "2",
query:
"query Upload($file: Upload) {uploadTest(fileA: $file)}",
"query Upload($file: Upload!) {uploadTest(fileA: $file)}",
variables: %{"file" => "a"}
}
]
Expand Down
76 changes: 52 additions & 24 deletions test/lib/absinthe/plug_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -449,37 +449,61 @@ defmodule Absinthe.PlugTest do
assert expected == resp_body
end

test "Subscriptions over HTTP with Server Sent Events chunked response" do
TestPubSub.start_link()
Absinthe.Subscription.start_link(TestPubSub)
describe "Subscriptions" do
setup do
start_supervised!(TestPubSub)
start_supervised!({Absinthe.Subscription, TestPubSub})
:ok
end

test "(default broker) over HTTP with Server Sent Events chunked response" do
query = "subscription {update}"
opts = Absinthe.Plug.init(schema: TestSchema, pubsub: TestPubSub)

request =
Task.async(fn ->
conn(:post, "/", query: query)
|> put_req_header("content-type", "application/json")
|> plug_parser
|> Absinthe.Plug.call(opts)
end)

Process.sleep(200)
Absinthe.Subscription.publish(TestPubSub, "FOO", update: "*")
Absinthe.Subscription.publish(TestPubSub, "BAR", update: "*")
send(request.pid, :close)

conn = Task.await(request)
{_module, state} = conn.adapter

events =
state.chunks
|> String.split()
|> Enum.map(&Jason.decode!/1)

assert length(events) == 2
assert Enum.member?(events, %{"data" => %{"update" => "FOO"}})
assert Enum.member?(events, %{"data" => %{"update" => "BAR"}})
end

query = "subscription {update}"
opts = Absinthe.Plug.init(schema: TestSchema, pubsub: TestPubSub)
test "(custom broker) sends topic id in header" do
query = "subscription {update}"

request =
Task.async(fn ->
opts =
Absinthe.Plug.init(
schema: TestSchema,
pubsub: TestPubSub,
subscription_broker: {__MODULE__, :test_subscription_broker}
)

conn =
conn(:post, "/", query: query)
|> put_req_header("content-type", "application/json")
|> plug_parser
|> Absinthe.Plug.call(opts)
end)

Process.sleep(200)
Absinthe.Subscription.publish(TestPubSub, "FOO", update: "*")
Absinthe.Subscription.publish(TestPubSub, "BAR", update: "*")
send(request.pid, :close)

conn = Task.await(request)
{_module, state} = conn.adapter

events =
state.chunks
|> String.split()
|> Enum.map(&Jason.decode!/1)

assert length(events) == 2
assert Enum.member?(events, %{"data" => %{"update" => "FOO"}})
assert Enum.member?(events, %{"data" => %{"update" => "BAR"}})
assert ["__absinthe__:doc:" <> _] = get_resp_header(conn, "x-subscription-id")
end
end

@query """
Expand Down Expand Up @@ -715,6 +739,10 @@ defmodule Absinthe.PlugTest do
|> put_private(:user_id, 1)
end

def test_subscription_broker(conn, _config, res) do
conn |> Plug.Conn.put_resp_header("x-subscription-id", res["subscribed"])
end

defp basic_opts(context) do
Map.put(context, :opts, Absinthe.Plug.init(schema: TestSchema))
end
Expand Down
7 changes: 7 additions & 0 deletions test/support/test_pubsub.ex
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
defmodule Absinthe.Plug.TestPubSub do
@behaviour Absinthe.Subscription.Pubsub

def child_spec(_) do
%{
id: __MODULE__,
start: {__MODULE__, :start_link, []}
}
end

def node_name() do
to_string(node())
end
Expand Down