Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Connection.from_dataloader/5 helper? #128

Open
milmazz opened this issue Dec 18, 2018 · 6 comments
Open

Connection.from_dataloader/5 helper? #128

milmazz opened this issue Dec 18, 2018 · 6 comments

Comments

@milmazz
Copy link

milmazz commented Dec 18, 2018

I recently started using this module, and I was a bit surprised when I didn't find a helper using Dataloader, something analogous to Absinthe.Relay.Connection.from_query/4.

I did some basic adaptations and seems to be working, but probably I'm missing something here.

  def from_dataloader(loader, source_name, resource, parent, options) do
    # NOTE: The user should provide the `max` field, but this is just for demonstration purposes.
    opts = [max: options[:max] || 100]
    # NOTE: The user must provide the `sort` field, but this is just for demonstration purposes.
    sort = options[:sort] || [%{asc: :inserted_at}]
    pagination_args = Keyword.fetch!(options, :pagination_args)

    with {:ok, offset, limit} <- Connection.offset_and_limit_for_query(pagination_args, opts) do
      args = %{sort: sort, limit: limit, offset: offset}

      loader
      |> Dataloader.load(source_name, {resource, args}, parent)
      |> on_load(fn loader ->
        records = Dataloader.get(loader, source_name, {resource, args}, parent)

        opts =
          opts
          |> Keyword.put(:has_previous_page, offset > 0)
          |> Keyword.put(:has_next_page, length(records) > limit)

        Connection.from_slice(Enum.take(records, limit), offset, opts)
      end)
    end
  end

Please let me know if you're interested on something like this, I can start working on a proper PR if you find this helper could be useful.

@bgentry
Copy link
Contributor

bgentry commented Dec 20, 2018

I'm also really interested in using dataloader for my Relay connections. This was discussed awhile ago in #100 and at the time @benwilson512 said:

There is not any way to use dataloader with relay from connection at this time, for the same reason that it wasn't possible with absinthe_ecto. The only way to generate sensible SQL is via a window fu function or lateral joins, neither of which are well supported by Ecto yet.

However in the interim, Ecto gained significant new functionality including Window functions: elixir-ecto/ecto#2618

It also appears that Ecto now has at least some support for lateral joins.

It seems pretty clear there's a lot of interest in using dataloader with this library. My particular interest is in being able to easily add connections to my types, not just at the top level query in my schema.

I can open a separate issue if you'd like 😄 I think there should be somewhere to at least track the general issue of compatibility between this library and dataloader.

@benwilson512
Copy link
Contributor

benwilson512 commented Dec 20, 2018 via email

@bgentry
Copy link
Contributor

bgentry commented Dec 21, 2018

@milmazz as I understand it, your solution breaks when loading for many child associations at the same time because the limit and offset don’t get applied on a per-parent basis; they’re instead applied to all combined results.

Imagine you’re trying to load Articles along with limit: 10 Comments on each article. You don’t want 10 total Comments, you want up to 10 for each Article. This is the kind of thing that would require something like a window function to pull off.

I’m super interested in seeing this work. Hoping I can take a crack at it soon, but don’t let that stop you from getting to it first @milmazz 🙂

@bgentry
Copy link
Contributor

bgentry commented Jan 2, 2019

I was able to get a working implementation of Relay Connections with Dataloader that uses window functions to paginate individually for each parent. While it does work and already allows for some fairly simple reuse of a connection on multiple GraphQL objects, it's pretty ugly and tangled. I'm not sure if here or the Dataloader repo is the best place to post about this, so lmk if I should relocate this.

I encountered a few challenges along the way:

  • I couldn't find a way to easily use Dataloader.Ecto to load all rows from a table (top-level, not from an assoc). In the example below, how could I easily load all Packages at the root level via Dataloader?
  • I had to apply ordering + windowing / pagination as the last step in my source.query/2 functions as there is no other place to intercept/alter the query before it's executed. I'd love to be able to do this somewhere else (maybe it'd become part of Dataloader.Ecto.run_batches?)
  • I had to preserve the un-interpolated order_bys in order to use them both to sort the overall query and the windowing subquery. My solution for this was to define an order_by_fn to return these values based on the query args, which is called by dataloader_relay_paginate. That result used for sorting of both the query and the window.
  • There's no way to expose the raw Ecto.Query that was used, either pre-windowing or post. The pre-window query would be very useful from outside the result set, for example to easily calculate the total count on the connection (while preserving any filters that were applied as part of query/2).
  • As-is, I couldn't figure out any way to define a totalCount field on the connection type that would work everywhere I use the connection. That's because there's no apparent way to access the parent of the connection itself from within a connection field. How could a field resolver determine which context it's running in so it can i.e. group by a parent column (or not if it's at the root level)?
  • I didn't yet make the primary key field dynamic -- you can see several places in relay_connection_paginate/2 where it is hardcoded to id.

Here's some example code extracted from the branch where I got this working. The schema examples show a Product which has many Packages. The Packages connection is available both at the top level, and on the Product object.

defmodule MyApp.GraphQL.Schema do
  connection node_type: :package do
    # As far as I can tell, there's no way to make this work for both root-level and child scenarios:
    field(:total_count, non_null(:integer), resolve: &Helpers.resolve_total_count/2)

    edge do
    end
  end

  object :product do
    field(:id, non_null(:id))
    
    connection field(:packages, node_type: :package) do
      # a macro I had to define in order to make the same args availble
      # everywhere I have a `PackageConnection`:
      package_connection_args()

      resolve(
        dataloader_relay_paginate(
          MyApp.WebSource,
          MyApp.Package,
          :product_id,
          &Helpers.sort_from_args(&1, desc: :updated_at),
          max: 100
        )
      )
    end
  end
  
  connection field(:packages, node_type: :package) do
    package_connection_args()

    resolve(
      dataloader_relay_paginate(
        MyApp.WebSource,
        MyApp.Package,
        nil,
        &Helpers.sort_from_args(&1, desc: :updated_at),
        max: 100
      )
    )
  end
end


defmodule MyApp.WebSource do
  def query(Package, %{current_user: user} = params) do
    Package
    |> where(company_id: user.company_id)
    |> MyApp.GraphQL.Helpers.relay_connection_paginate(params)
  end
end


defmodule MyApp.GraphQL.Helpers do
  import Ecto.Query, only: :macros
  import Absinthe.Resolution.Helpers

  alias MyApp.Repo

  @doc """
  Takes a function which returns a query. That query is executed with `Connection.from_query`.
  The result is also augmented with a field `total_count_query`, which can lazily be used to
  resolve the total count of records in the unpaginated result set.
  """
  def all_with_total_count(args, resolution, query_fn) when is_function(query_fn) do
    query = query_fn.(args, resolution)

    total_count_query = from(item in query, select: count()) |> Ecto.Query.exclude(:order_by)

    # hardcoded limit of 100 records at a time, for now:
    case Absinthe.Relay.Connection.from_query(query, &Repo.all/1, args, max: 100) do
      {:ok, data} -> {:ok, Map.put(data, :total_count_query, total_count_query)}
      {:error, _} = err -> err
    end
  end

  def args_with_pagination(args, partition_field, order_by, opts \\ []) do
    {:ok, offset, limit} = Absinthe.Relay.Connection.offset_and_limit_for_query(args, opts)

    conn_pagination = %{
      offset: offset,
      limit: limit,
      partition_field: partition_field,
      order_by: order_by
    }

    Map.merge(args, %{_connection_paginate: conn_pagination})
  end

  def dataloader_relay_paginate(
        source_module,
        queryable,
        partition_field,
        order_by_fn,
        opts \\ []
      ) do
    fn parent, args, %{context: %{loader: loader}} = res ->
      order_by = order_by_fn.(args)
      args = args_with_pagination(args, partition_field, order_by, opts)

      resource = opts[:resource] || res.definition.schema_node.identifier

      if partition_field do
        loader
        |> Dataloader.load(source_module, {resource, args}, parent)
        |> on_load(fn loader ->
          records = Dataloader.get(loader, source_module, {resource, args}, parent)
          %{_connection_paginate: %{offset: offset, limit: limit}} = args

          opts =
            opts
            |> Keyword.put(:has_previous_page, offset > 0)
            |> Keyword.put(:has_next_page, length(records) > limit)

          Absinthe.Relay.Connection.from_slice(Enum.take(records, limit), offset, opts)
        end)
      else
        query_fn = fn params, _ ->
          # TODO: this is kind of cheating -- can we let Dataloader.Ecto do this work instead of duplicating parts of it?
          source = loader.sources[source_module]

          queryable
          |> source_module.query(Enum.into(params, source.default_params))
        end

        all_with_total_count(args, res, query_fn)
      end
    end
  end

  def relay_connection_paginate(query, %{_connection_paginate: pagination_args}) do
    %{limit: limit, offset: offset, partition_field: partition_field, order_by: order_by} =
      pagination_args

    query = Ecto.Queryable.to_query(query)
    |> Ecto.Query.order_by(^order_by)

    if partition_field do
      windowed_query =
        from(q in query,
          # TODO: dynamic primary key
          select: %{id: q.id, dense_rank: over(dense_rank(), :paginate)},
          windows: [paginate: [partition_by: ^partition_field, order_by: ^order_by]]
        )

      start_offset = offset
      end_offset = offset + limit

      query =
        from(q in query,
          join: windowed in subquery(windowed_query),
          on: q.id == windowed.id,
          where: windowed.dense_rank > ^start_offset,
          # fetch an additional record so we can see if there's a next page:
          where: windowed.dense_rank <= ^(end_offset + 1)
        )

      query
    else
      query
      |> Ecto.Query.limit(^limit)
      |> Ecto.Query.offset(^offset)
    end
  end

  def relay_connection_paginate(query, _args), do: query

  def resolve_total_count(_, %{source: conn}), do: {:ok, Repo.one!(conn.total_count_query)}

  @doc """
  Extracts the sorting param (for order_by) from the pagination arguments. If
  no sort is provided, falls back to the provided default_sort.

  For the 1st argument, you can provide either just the :order_by portion of
  the connection arguments, or the entire set of connection arguments (from
  which :order_by will be pulled).

  This function intentionally does not handle partial sort instructions (i.e.
  only a field w/ no direction). Use default_value on the GraphQL order input
  objects' fields to make this work.
  """
  def sort_from_args(%{direction: dir, field: field} = _sort_arg, _default_sort)
      when dir in [:asc, :desc] do
    Keyword.put([], dir, field)
  end

  def sort_from_args(%{sort_by: sort_arg} = _all_args, default_sort) do
    sort_from_args(sort_arg, default_sort)
  end

  def sort_from_args(_args, default_sort), do: default_sort
end

@gullitmiranda
Copy link

@bgentry i make some changes in your example to work for me, maybe can be of some help to you:

  # replace to all_with_total_count
  def put_total_count_query(
        {:ok, data},
        queryable,
        source_module,
        query_params,
        assoc_field,
        %{source: %schema{} = parent} = _res
      ) do
    query =
      queryable
      |> source_module.query(query_params |> Map.put(:_total_count, true))
      |> Ecto.Query.exclude(:order_by)
      |> Ecto.Query.exclude(:limit)

    assoc = Ecto.Association.association_from_schema!(schema, assoc_field)
    %{owner_key: owner_key} = assoc

    id = Map.fetch!(parent, owner_key)

    total_count_query =
      Ecto.Association.assoc_query(assoc, [], query, [id])
      |> select(count())

    {:ok, Map.put(data, :total_count_query, total_count_query)}
  end

  def put_total_count_query(result, _, _, _, _), do: result

  def dataloader_relay_paginate(
        source_module,
        queryable,
        partition_field,
        order_by_fn,
        opts \\ []
      ) do
    fn parent, args, %{context: %{loader: loader}} = res ->
      order_by = order_by_fn.(args)
      args = args_with_pagination(args, partition_field, order_by, opts)

      resource = opts[:resource] || res.definition.schema_node.identifier

      source = loader.sources[source_module]

      query_params = Enum.into(args, source.default_params)

      if partition_field do
        loader
        |> Dataloader.load(source_module, {resource, args}, parent)
        |> on_load(fn loader ->
          records = Dataloader.get(loader, source_module, {resource, args}, parent)
          %{_connection_paginate: %{offset: offset, limit: limit}} = args

          opts =
            opts
            |> Keyword.put(:has_previous_page, offset > 0)
            |> Keyword.put(:has_next_page, length(records) > limit)

          Absinthe.Relay.Connection.from_slice(Enum.take(records, limit), offset, opts)
          |> put_total_count_query(queryable, source_module, query_params, resource, res)
        end)
      else
        query = queryable |> source_module.query(query_params)

        Absinthe.Relay.Connection.from_query(query, &Repo.all/1, args, opts)
        |> put_total_count_query(queryable, source_module, query_params, resource, res)
      end
    end
  end

  # this method prevent of using window queries when build `total_count_query`
  def relay_connection_paginate(query, %{_total_count: true}) do
    query
  end

@cospin
Copy link

cospin commented Aug 21, 2024

Hey there! Anyone know if there has been progress on Ecto?
@bgentry is your solution still working? Have you improved/changed anything in these years?

I'm just about to start a new project and I've come across this :(

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants