Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integration of subscriptions #13

Open
WizMik opened this issue Feb 19, 2019 · 23 comments
Open

Integration of subscriptions #13

WizMik opened this issue Feb 19, 2019 · 23 comments
Labels
enhancement New feature or request

Comments

@WizMik
Copy link

WizMik commented Feb 19, 2019

This is F*CKING awesome !
I wanted to tell you that while writing the first issue, which is more a feature request.
This package only misses the integration of subscriptions requests to be perfect.

Meantime I still have a lot of new toys to play with, so thanks!

@moufmouf
Copy link
Member

Subscriptions could actually be added!

My current plan is to try the Mercure protocol for real-time communication.

@moufmouf moufmouf added this to the v4.1 milestone Dec 27, 2019
lsimeonov pushed a commit to lsimeonov/graphqlite that referenced this issue Feb 4, 2020
Adding special type for Paginator and FixedLengthPaginator classes
@oojacoboo oojacoboo added the enhancement New feature or request label Mar 29, 2021
@oojacoboo oojacoboo removed this from the v4.1 milestone Mar 29, 2021
@oprypkhantc
Copy link
Contributor

I'm interested in this as well. WebSockets require a separate web server, separate configuration for proxying servers (like Nginx).

What about SSE? Stream responses are already supported by Laravel, Symfony, Swoole and are as easy to use on the client as WebSockets are. Their performance under stress conditions is lower than that of WebSocket, but the conclusion of TimePlus' article is that they're "close enough".

This might be a less painful option for graphqlite's users than WS, thanks to everything being in one app, on one server and in PHP.

@oojacoboo
Copy link
Collaborator

oojacoboo commented Oct 7, 2023

@oprypkhantc it'd be nice to see some action around this for sure. I agree that sticking with SSE and the HTTP protocol is more fitting for GraphQLite. Bi-directional communication over WS isn't really necessary for GraphQL.

API Platform has implemented Mercure support for subscriptions
api-platform/core#3321

Here is a small SSE proof of concept app using Mercure and symfony/mercure
https://github.com/AlessandroMinoccheri/sse-poc

Here is a topic with webonyx on subscription implementation
webonyx/graphql-php#9

Webonyx already supports the subscription property on the GraphQL\Type\Schema::__construct(array $config). So, I'm assuming we could just introduce a new GraphQLite\Subscription attribute to annotate the "controllers"/resolvers. We'd likely want to have some abstractions around symfony/mercure, for publishing updates, as that'll require building out the types/fields for the payload, based on the initial client's subscription query.

What I'm unsure about is, when a subscription is made, presumably with an accompanying query (output fields needed - don't see how input fields could be allowed) - where is that query stored for later use? I'm assuming that'd require a custom userland persistence implementation, which is fine.

Maybe as a first step we could outline the API and clear up some of the unknowns with how an implementation might be done.

@oojacoboo oojacoboo pinned this issue Oct 7, 2023
@oprypkhantc
Copy link
Contributor

I believe the initial effort should be focused on providing a stupid-simple contract that users could then use to build more sophisticated solutions. For example, ExpediaGroup/graphql-kotlin uses existing Java ecosystem to handle subscriptions on the most basic level: https://github.com/ExpediaGroup/graphql-kotlin/blob/master/examples/server/spring-server/src/main/kotlin/com/expediagroup/graphql/examples/server/spring/subscriptions/SimpleSubscription.kt#L39

The way it works is they have two interfaces: Publisher and Subscriber:

interface Subscriber<TItem> {
    public function onSubscribe​(Subscription $subscription): void;
    public function onNext​(TItem $item): void;
    public function onError(Throwable $error): void;
    public function onComplete(): void;
}

interface Publisher<TItem> {
    public function subscribe(Subscriber<TItem> $subscriber): void;
}

And this is how an example implementation might look:

class EverySecondPublisher implements Publisher<int> {
    public function subscribe(Subscriber<int> $subscriber): void
    {
        $subscriber->onSubscribe();
        
        for ($i = 0; $i < 1000; $i++) {
            $subscriber->onNext($i);
        }
        
        $subscriber->onComplete();
    }
}

class SomeController {
    /** @return Publisher<int> */
    #[Subscription]
    public function intAddedEverySecond(): EverySecondPublisher {
        return new EverySecondPublisher();
    }
}

Then all that's left is to implement Subscriber for SSE and use it:

// Graphqlite internals...

class SSESubscriptionFromPsrRequest implements Subscriber {
    public function __construct(private readonly RequestInterface $request) {}
    
    public function onSubscribe(): void {
        // send HTTP headers ?
        $request->open();
    }
    
    public function onNext($item): void {
        $serialized = json_encode($item);
        
        $request->send($serialized);
    }
    
    public function onComplete(): void {
        $request->close();
    }
}

$publisher = $controller->callSubscriptionMethod();
$publisher->subscribe(new SSESubscriptionFromPsrRequest($request));

then, on top of it, we could implement first-party support for symfony/mercure on top of those interfaces.

But I advocate against it. That can be easily included in the graphqlite/symfony-bundle if needed. I'd rather leave that to be framework-specific; for example, in Laravel the standard practice is to use broadcastable events:

class MaintenanceModeEvent {
    public function broadcastChannel(): string {
        return 'maintenance';
    }
}

// Broadcast to every subscriber of `general` channel
$broadcaster->broadcast(new MaintenanceModeEvent());

So instead of changing events throughout the project or trying to workaround symfony/mercure, I'd much rather just implement an adapter to use the existing Laravel's system:

class LaravelBroadcasterPublisher implements Publisher {
    public function __construct(private readonly string $channel, private readonly Broadcaster $broadcaster) {}
    
    public function subscribe(Subscriber $subscriber): void 
    {
        $subscriber->onSubscribe();
        
        // example code, not a real API
        $broadcaster->listen($this->channel, function ($event) use ($subscriber) {
            $subscriber->onNext($event);
        });
    }
}

class SomeController {
    #[Subscription]
    public function maintenanceMode(#[Inject] Broadcaster $broadcaster): Publisher {
        return new LaravelBroadcasterPublisher('maintenance', $broadcaster);
    }
}

Of course this will also have to account for async use cases (through the use of graphql-php promises I guess),

@oojacoboo
Copy link
Collaborator

oojacoboo commented Dec 3, 2023

@oprypkhantc Something I'm unsure about is the return type of the subscription resolver. Currently we're using this for the output type and fields. We might be able to override this by setting the outputType on the Subscription attribute, but I don't love that. In this example, what does returning the Publisher even do, if we're to assume that it's opening a stream? Also, have to think about the input types, which would also need to be used for building the schema definition for the subscription.

Another way we could look at this is that each subscription resolver would look like our query and mutation resolvers, with input type arguments and a proper output types. Then, this method/operation/subscription would be executed when broadcasting. The actual subscription/pub-sub logic is handled as a separate service. Currently, as it is now, I don't see how we're going to define the definition schema for the subscription. And it's way more likely that the pub-sub logic is going to be uniform.

@oprypkhantc
Copy link
Contributor

@oprypkhantc Something I'm unsure about is the return type of the subscription resolver. Currently we're using this for the output type and fields. We might be able to override this by setting the outputType on the Subscription attribute, but I don't love that. In this example, what does returning the Publisher even do, if we're to assume that it's opening a stream? Also, have to think about the input types, which would also need to be used for building the schema definition for the subscription.

Output types are there, they're just nested as a generic type of Publisher. In one of the examples the return type is specified as /** @return Publisher<int> */, and that's how GraphQLite will be able to map the output type automatically. As for the input types, they should simply work the same way they do for regular queries and mutations:

class SomeController {
    /** @return Publisher<String> */
    #[Subscription]
    public function stringEveryNSeconds(int $numberOfSeconds): Publisher {}
}
type Subscription {
    stringEveryNSeconds(numberOfSeconds: Int): String
}

Regarding the Publisher type, it's not immediately returning a stream. It's returning a wrapper that can open a stream whenever you (graphqlite) are ready to open it, which might depend on the implementation, by calling the subscribe method. Think of it as a "OpenableStream". We cannot immediately return a stream because we need access to the Subscriber object, and exposing it through the Publisher return type is the only type-safe way of doing so. It's a good design to steal from the Java ecosystem 😆

And to clarify, this API is more of something to be built upon rather than something to be used by end users. It allows to build anything we want on top of it, but something of this kind should be the foundation of subscriptions so users can implement their own annotations, middleware and whatnot to interact with subscriptions the way they want.

The actual subscription/pub-sub logic is handled as a separate service.

That we can do on top of the publisher/subscriber easily:

class SomeController {
    /** @return Publisher<Post> */
    #[Subscription]
    public function postUpdated(#[UseInputType('ID!')] string $post): Publisher {
        return new LaravelBroadcasterPublisher(
            PostUpdatedEvent::class, 
            // filter for posts by input arguments
            fn (PostUpdatedEvent $event) => $event->post->id === $post
        );
    }
}

If you then want something even more abstract, it could be done with middleware and annotations:

class SomeController {
    #[Subscription]
    #[SubscribeToBroadcaster(PostUpdatedEvent::class)]
    public function postUpdated(#[UseInputType('ID!')] string $post): ?callable {
        return fn (PostUpdatedEvent $event) => $event->post->id === $post;
    }
}

class SubscribeToBroadcaster {
    public function __construct(public readonly string $event) {}
}

class SubscribeToBroadcasterMiddleware {
    public function process($fieldDescriptor, $fieldHandler): FieldDefinition|null
    {
        $subscribeToBroadcasterAttribute = $fieldDescriptor->getMiddlewareAnnotations()->getAnnotationByType(SubscribeToBroadcaster::class);

        if (! $subscribeToBroadcasterAttribute) {
            return $fieldHandler->handle($fieldDescriptor);
        }
        
        // yada yada, map return type from PostUpdatedEvent::class
        $returnType = mapOutputType(reflectionReturnType($subscribeToBroadcasterAttribute->event, 'broadcastAs'));
        
        $fieldDescriptor = $fieldDescriptor
            ->withType($returnType)
            ->withResolver(function () {
                $filter = ($fieldDescriptor->getResolver())();
                
                return new LaravelBroadcasterPublisher($subscribeToBroadcasterAttribute->event, $filter);
            });
            
        return $fieldHandler->handle($fieldDescriptor);
    }
}

@oojacoboo
Copy link
Collaborator

This really feels like it's being attacked from the wrong angle - almost like trying to shoehorn something into an existing design.

An annotation for the return type is certainly undesirable. I realize we have to do that with iterables currently, but that's more of a PHP limitation than a design decision.

What is the benefit of trying to make this subscription so coupled with this pub/sub implementation?

Taking a more pure approach to this, where does this limit implementation with a pub/sub design like you're proposing?

class Controller
{
    
    #[Subscription(
        subscriber: Subscriber::class, // Optional - can have a main config global subscription handler service registered
    )]
    public function articlePublished(ArticleFilterInput $filter): ?Article
    {
        // Some code that returns the published article based on the filters
    }
}

The subscriber argument of the Subscription attribute would implement a subscriber interface, the same one registered globally that would apply to all controller/resolver methods annotated with the Subscription attribute.

And that's basically it. The Subscription attribute and associated method would be used for definition schema building. The subscriber would be responsible for managing the subscription entirely. The whole pub/sub behavior can be contained there. The controller/resolver method just serves to be a pure representation of each execution/broadcast.

@oprypkhantc
Copy link
Contributor

@oojacoboo What we do with iterables is a PHP limitation, but so is this. Generics are part of the language now, even with the ugly PHPDoc syntax.

I'm not sure I'm following. How do I replicate my example that emits a counter every second (without global state, global broadcaster or other services running)?

Also articlePublished return type is specified as ?Article, but how do I return multiple "events" (i.e. multiple broadcasts of the new Article)? Also, how would this code wait for the new article to arrive?

Let's say we have this code:

class ArticleController {
    #[Mutation]
    public function createArticle(ArticleInput $data): Article {
        $article = Article::fromInput($data);
        
        $this->save($article);

        $this->events->dispatch(new ArticleCreatedEvent($article)); // let's say it's also published into Redis

        return $article;
    }

    #[Subscription()]
    public function articlePublished(ArticlePublishedInput $filters): ?Article
    {
        // how do I subscribe to ArticleCreatedEvent here, given that I can subscribe to Redis channel and get  an ArticleCreatedEvent instance with article inside of it?
        // how do I return all subsequently created articles, not just the first one?
    }
}

@oojacoboo
Copy link
Collaborator

oojacoboo commented Dec 4, 2023

So, there would be a global Subscriber service, which would implement our SubscriberInterface, that'd be registered with the GraphQLite config. Also, the Subscription attribute would take an optional subscriber argument to specify an optional custom subscriber per subscription.

class CustomSubscriber implements SubscriberInterface {}

class ArticleController {
    #[Subscription(
        subscriber: CustomSubscriber::class, // Guess this could be an object as well implementing the SubscriberInterface
    )]
    public function articlePublished(ArticlePublishedInput $filters): ?Article
    {
        // how do I subscribe to ArticleCreatedEvent here, given that I can subscribe to Redis channel and get  an ArticleCreatedEvent instance with article inside of it?
        // how do I return all subsequently created articles, not just the first one?
    }
}

how do I subscribe to ArticleCreatedEvent here, given that I can subscribe to Redis channel and get an ArticleCreatedEvent instance with article inside of it?

You can handle this in your Subscriber, if this is the design you prefer.

how do I return all subsequently created articles, not just the first one?

Your Subscriber would determine whatever is/how you are calling your Subscription annotated controller method each time the event is fired. You can, of course, also return an iterable from your subscription if needed, just like any other operation.

So, basically the flow is this...

Whenever a subscription request is made, GraphQLite recognizes the operation as a subscription, validates the request/GQL against the schema, and checks the Subscription attribute associated with the controller method matching the field (articlePublished), to see if there is a custom Subscriber defined (in this case there is). If so, it'll use that custom Subscriber (CustomSubscriber), else it'll use the default Subscriber defined in the global GraphQLite config.

At this point, your Subscriber can do whatever it likes basically. In your case, you're just pushing your global state to Redis. Someone else might push to a database, or if they're using a long running server, maybe even keep it in memory. Then, whenever an event is fired in your system, it'd just execute the controller method and return/push the result to the appropriate client/channel/steam/endpoint/etc.

As for what the Subscriber would receive as a payload - I assume it'd need the AST/GQL and variables. What I'm unsure about is subscription operations with multiple top-level fields, like:

subscription ArticlesAdded ($type: String!) {
    articlePublished(filter: {
        type: $type
    }) {
        ...fields
    }

    articlePublished(filter: {
        type: $type
    }) {
        ...fields
    }
}

We could limit subscription operations to a single field. That'd then allow us to add the articlePublished value to the payload reliably, if that's event useful. The payload could also potentially include a callable, or some other serialized object. I think the raw GQL is going to be the best though.

@oprypkhantc
Copy link
Contributor

oprypkhantc commented Dec 5, 2023

So, there would be a global Subscriber service, which would implement our SubscriberInterface, that'd be registered with the GraphQLite config. Also, the Subscription attribute would take an optional subscriber argument to specify an optional custom subscriber per subscription.

I'm still not getting it though. Can you implement the SubscriberInterface interface and controller methods for both examples (a ticking counter and article publish)?

You can handle this in your Subscriber, if this is the design you prefer.
Your Subscriber would determine whatever is/how you are calling your Subscription annotated controller method each time the event is fired. You can, of course, also return an iterable from your subscription if needed, just like any other operation.

I don't quite understand. If I have to pull the article from Redis in my subscriber class, then why do I need the controller at all and when/why do I call it?

class PublishedArticleRedisSubscriber implements Subscriber {
    public function subscribe(ArticleFilterInput $input, GraphQLiteSubscription $subscription): {
        // I need deserialized input here to be able to subscribe to the correct channel
        $this->redis->sub(`article.published.{$input->type}`, function ($payload) {
            $event = deserialize($payload);
        
            // When and where do I need to call the controller?
            // Also take into account that this might get called a thousand times 
            // before the subscription's over, so calling the controller method with 
            // it's return type specified as ?Article isn't an option
            $subscription->onNext($event->article);
        });
    }
}

@oojacoboo
Copy link
Collaborator

oojacoboo commented Dec 6, 2023

To start, I'm not familiar with Redis' pub/sub design. So, I cannot speak to how Redis dispatches/notifies subscribers about a particular channel/message from a publisher. I assume each redis client would keep a channel open and simply listen. How you tell Redis what to do after that, I don't know. And I don't know how PHP is involved in that process either, since PHP isn't async. Maybe there a thread is kept open for this purpose with Redis - not sure. The particulars of that though really shouldn't matter, as that seems more like an implementation detail.

That said, I want to point out that the subscriber controller method body is the logic of getting the entity (Article in this case) and returning it. That's it, it's your business logic. The subscriber is your middleware and is flexible in how you implement it. Pub/sub might not even be an ideal implementation for someone implementing subscriptions. I think it's best to not make assumptions there.

Here is an example. I'm not sure how perfect it is, but hopefully it gives you a better idea.

use GraphQL\GraphQL;
use GraphQL\Language\Printer;

class GraphQLiteSubscription
{
    /**
     * These params are passed from the initial subsciroption request.
     * 
     * We could possibly include the actual subscription operation string name (articlePublished) as 
     * a param, if we limit to only one subscription per request.  Otherwise, we're not guaranteed
     * that the subscription operation won't include multiple subscriptions.  In that case, I think
     * we'd have to break the AST apart into individual operations, so that events that are fired
     * only trigger the relevant subscriptions.
     *
     * @param mixed[] $variables
     */
    public function __construct(
        public readonly DocumentNode $ast,
        public readonly array $variables,
    ) {}
}

interface Subscriber {

    /**
     * This is called on the inital subscription request and is passed the GraphQLiteSubscription
     * object, populated through all of the information we have available at request time.
     */
    public function subscribe(GraphQLiteSubscription $subscription): void;
}

class PublishedArticleRedisSubscriber implements Subscriber {
    public function subscribe(GraphQLiteSubscription $subscription): void 
    {
        // I have no idea where this is stored or how this works with Redis.  I guess Redis is storing
        // a serialized version of the anonymous function argument.
        $this->redis->sub('article.published', function ($subscription) {
            GraphQL::executeQuery(
                $this->schema,
                $subscription->ast,
                null,
                null,
                $subscription->variables,
            );           
        });
    }
}

class EventDispatcher {
    public function __construct(
        private readonly Publisher $publisher,
    ) {}

    public function dispatch(object $event): void 
    {
        $this->publisher->publish($event->getChannel(), $event);
    }

}

class Publisher {
    public function publish(string $channel, object $payload): void 
    {
        $this->redis->dispatch($channel, serialize($payload));
    }
}

class ArticleController {

    public function __construct(
        private readonly EventDispatcher $events,
    ) {}

    #[Mutation]
    public function createArticle(ArticleInput $data): Article 
    {
        $article = Article::fromInput($data);
        
        $this->save($article);

        $this->events->dispatch(new ArticleCreatedEvent($article)); // let's say it's also published into Redis

        return $article;
    }

    #[Subscription(subscriber: PublishedArticleRedisSubscriber::class)]
    public function articlePublished(ArticlePublishedInput $filters): ?Article
    {
        return Article::fromFilters($filters);
    }
}

@oprypkhantc
Copy link
Contributor

oprypkhantc commented Dec 6, 2023

Thank you. Some notes on Redis - yes you assumed correctly. You may subscribe to any channel and publish to any channel by passing a string payload:

// Process 1
$redis->subscribe('channel.name', function (string $payload) {
    echo $payload;
});

// Process 2
$redis->publish('channel.name', 'hello');

It is blocking though and there's no internal thread magic, so using async libraries/code here is a must. You really cannot get around it.

That said, I want to point out that the subscriber controller method body is the logic of getting the entity (Article in this case) and returning it. That's it, it's your business logic.

The problem with the above implementation is that it's not a subscription; it's just a Query that you trigger whenever you receive some outside event. This isn't exactly right because subscriptions don't end after receiving the payload. Instead of having the subscription logic in the controller method, it's partly contained in the subscription class and partly in the controller method. But this is not very flexible or efficient.

Let me elaborate:

First of all, you cannot access deserialized input in the subscriber class. That is you cannot do $redis->sub('article.published.{$input->type}') in the subscriber, because the input hasn't been validated/deserialized at this point. Of course you can workaround it, but I'd really expect a nice API as with #[Query] and #[Mutation] where input is parsed and deserialized automatically.

Second, you cannot build "stateful" logic inside of the controller method:

#[Subscription]
public function counter(): Publisher<int>
{
    return new Publisher(function (Subscription $sub) {
        $int = 0;
    
        while (true) {
            $sub->onNext($i++);
    
            sleep(1000);
        };
    });
}

This would not be possible with the above implementation, as the subscription method is expected to have the "int" return type. This is, again, not very flexible, but it also affects potential optimizations. Expanding on your example:

#[Input]
class ArticlePublishedInput {
    public function __construct(
        /** @var array<ID> */
        #[Field]
        #[Security('complex_logic')]
        public readonly array $categories,
        #[Field]
        public readonly string $contents,
    ) {}
}

class ArticleController {
    #[Subscription(subscriber: PublishedArticleRedisSubscriber::class)]
    public function articlePublished(ArticlePublishedInput $filters): ?Article
    {
        $categories = $this->categoryRepository->findAll($filters->categories);
        $tags = $this->tagRepository->attachedToCategories($categories);
    
        return $this->articleRepository->firstByTagsAndContents(tags: $tags, contents: $filters->contents);
    }
}

With your implementation, every article that comes in would trigger the articlePublished query. That would execute input deserialization and validation, which triggers the #[Security] annotation (with supposedly complex logic inside of it). Then the controller method is executed, which does some additional stuff based on the input (that is - get all tags by input categories). If 10 articles come in, all of this would have to be executed for every article, even though it's the same subscription, with the same input.

Sure, you could also move part of the logic to the subscriber class, but you don't have access to the input object there. Also, it just feels wrong to have half of the logic in one place, and half in the other, considering all of it is just business logic of articlePublished subscription. It's not like you'll be able to use any of that logic elsewhere.

All this is unnecessary. With "my" stolen implementation:

#[Input]
class ArticlePublishedInput {
    public function __construct(
        /** @var array<ID> */
        #[Field]
        #[Security('complex_logic')]
        public readonly array $categories,
        #[Field]
        public readonly string $contents,
    ) {}
}

class ArticleController {
    #[Subscription]
    public function articlePublished(ArticlePublishedInput $filters): Publisher<Article>
    {
        $categories = $this->categoryRepository->findAll($filters->categories);
        $tags = $this->tagRepository->attachedToCategories($categories);
        
        return Publisher::fromGenerator(function () {
            $this->redis->sub('article.published', function (string $payload) {
                yield $this->articleRepository->firstByTagsAndContents(tags: $tags, contents: $filters->contents);
            });
        });
    }
}

In this case, the input is only deserialized once, categories & tags are also only found once. When 10 articles come in, only the firstByTagsAndContents method would be executed 10 times.

If you understand the benefits but don't like the design, maybe instead of Publisher/Subscriber interfaces being exposed to user, have it so that it's built upon generators? Maybe something like this:

class ArticleController {
    #[Subscription]
    public function articlePublished(ArticlePublishedInput $filters): Generator<Article>
    {
        $categories = $this->categoryRepository->findAll($filters->categories);
        $tags = $this->tagRepository->attachedToCategories($categories);
        
        $this->redis->sub('article.published', function (string $payload) {
            yield $this->articleRepository->firstByTagsAndContents(tags: $tags, contents: $filters->contents);
        });
    }
}

@oojacoboo
Copy link
Collaborator

oojacoboo commented Dec 8, 2023

So, I'd should mention that I'd like to see subscriptions support webhooks as well. That means, the assumption that a subscription is a long running process remains a userland implementation detail and not a requirement. I realize that GraphQL subscriptions were primarily designed for stream/channel communication, which requires long-running processes that are blocking. However, there is no reason why a REST style webhook implementation cannot be used with subscriptions. And, in fact, I'd argue that it's a perfect use case.

The problem with the above implementation is that it's not a subscription; it's just a Query that you trigger whenever you receive some outside event. This isn't exactly right because subscriptions don't end after receiving the payload.

So, in the case of a "webhook" style subscription, the request/response would end. But yes, that's exactly right, the subscription would serve to register it and then execute it later when there is an event/publish for that subscription.

Instead of having the subscription logic in the controller method, it's partly contained in the subscription class and partly in the controller method. But this is not very flexible or efficient.

Actually, there wouldn't be any subscription specific logic necessary to be included in the subscription class, unless you're doing something strange. Most of your subscriptions are going to follow a pattern. In the design I proposed, I don't see much, if any, need to have a custom subscriber or any subscription specific logic in a subscriber (I just used one in the example to support your initial example). I'm sure there would be a number of cases. However, I think you could have one global subscriber that handles all the pub/sub logic, agnostically - at least for most cases.

In fact, this is one of my main complaints about this pub/sub design, is that you must have a separate Publisher class for every subscription, and you're separating the logic between two places. There is clearly defined separation though which helps.

That all said, you make a very valid point regarding optimization. And I agree with you that the separation there could provide some benefits, even at the cost of some additional complexity.

Assuming we went with a Generator return type for the subscription, your example I'm a little confused by. So the middleware would pass the payload from the Publisher as the first argument for a generator function? The filters and the payload from the publisher have to be used in conjunction to get the proper Article, in this case. And the yield is in an anonymous function here. Wouldn't the return on the subscription method be void? Can you update the generator example for use without the redis library. It's not clear what that method even returns.

@oprypkhantc
Copy link
Contributor

So, I'd should mention that I'd like to see subscriptions support webhooks as well. That means, the assumption that a subscription is a long running process remains a userland implementation detail and not a requirement. I realize that GraphQL subscriptions were primarily designed for stream/channel communication, which requires long-running processes that are non-blocking. However, there is no reason why a REST style webhook implementation cannot be used with subscriptions. And, in fact, I'd argue that it's a perfect use case.

I'm not sure if GraphQL subscriptions were designed or even allow to be anything other than long running connections?

Looking at GraphQL spec itself, there are some places indicating that subscriptions are indeed meant to be long running requests:

So, in the case of a "webhook" style subscription, the request/response would end.

Well, sure, but it's the first time I've heard an idea of using GraphQL subscriptions for webhooks, so I think it's safe to assume that's not the most common use case.

Actually, there wouldn't be any subscription specific logic necessary to be included in the subscription class, unless you're doing something strange.

I wasn't doing anything particularly strange in my example above, yet it did indeed require a custom subscriber. I anticipate that being the case for most subscription endpoints in our app.

In fact, this is one of my main complaints about this pub/sub design, is that you must have a separate Publisher class for every subscription, and you're separating the logic between two places. There is clearly defined separation though which helps.

Not really, they are just interfaces. You could have a callback-wrapping implementation as I did with Publisher::fromGenerator, or an anonymous class which I'd be happy to use too. I agree it's not the fanciest looking though, but that can be solved by wrapping generators into those interfaces automatically.

So the middleware would pass the payload from the Publisher as the first argument for a generator function? The filters and the payload from the publisher have to be used in conjunction to get the proper Article, in this case.

The filters and other variables are already accessible, I just forgot to add function() use ($filters, $tags) to the anonymous function.

And the yield is in an anonymous function here. Wouldn't the return on the subscription method be void?

You're right, there's a bug. I'll provide a different example:

class ArticleController {
    #[Subscription]
    public function articlePublished(ArticlePublishedInput $filters): Generator<Article>
    {
        $categories = $this->categoryRepository->findAll($filters->categories);
        $tags = $this->tagRepository->attachedToCategories($categories);
        
        $lastChecked = microtime();
        
        while (microtime() > $lastChecked + 1000) {
            yield from $this->articleRepository->firstByFilters(
                tags: $tags, 
                contents: $filters->contents,
                publishedAfter: $lastChecked,
            );
            
            $lastChecked = microtime();
        });
    }
}

Again, this is blocking, so accounting for async here is a must.

@oojacoboo
Copy link
Collaborator

oojacoboo commented Dec 10, 2023

From the specification on subscriptions:

Delivery Agnostic
GraphQL subscriptions do not require any specific serialization format or transport mechanism. Subscriptions specifies algorithms for the creation of a stream, the content of each payload on that stream, and the closing of that stream. There are intentionally no specifications for message acknoledgement, buffering, resend requests, or any other quality of service (QoS) details. Message serialization, transport mechanisms, and quality of service details should be chosen by the implementing service.

It does call the response a "response stream". It does explicitly define the response stream's contract:

Let response be the result of running ExecuteSubscriptionEvent(subscription, schema, variableValues, event).

This is also defined in other places using varying language, but essentially, the idea is that it's to be executed like a query.

The ExecuteSubscriptionEvent() algorithm is intentionally similar to ExecuteQuery() since this is how each event result is produced.

Apollo was responsible, in large part, for the RFC on subscriptions. Here is a blog article around the RFC proposal and a section specifically around webhooks.

Here is a nice article that goes into some similar details that I've already outlined, but adds some extra color to the topic.

Here is a video from someone else discussing webhooks and GraphQL subscriptions. He goes into some of the challenges and benefits of using GraphQL subscriptions.. Ultimately, they ended up doing some client side string manipulation to pass as a mutation, presumably due to server-side lib limitations (using python). Obviously this is possible, but doesn't provide for a clean/clear schema and even results in duplicate operation definitions, should you want to support a subscription as a long-lived connection and a destination defined http request (webhook).

As I mentioned previously, it's most common for subscriptions to be used with long-lived connections. But that doesn't need to preclude the implementation from also supporting webhook style subscriptions, where the only difference is state stored for the location of the client, and that each "communication" terminates after successful completion. That detail is merely a transport detail and doesn't have any violations with the spec, despite some language that mentions the most commonly used transport style as more "common speak".

I don't think we necessarily need to go above and beyond to support webhook style subscriptions. But we also shouldn't close the door on them.



So the middleware would pass the payload from the Publisher as the first argument for a generator function? The filters and the payload from the publisher have to be used in conjunction to get the proper Article, in this case.

The filters and other variables are already accessible, I just forgot to add function() use ($filters, $tags) to the anonymous function.

What I'm actually trying to get more clarity around is the publisher payload, not related to the subscription input fields. The publisher payload, I'm referring to as the payload associated with the event that's fired. Maybe you're serializing an object, or primitives with your event payload. But how is that payload shared with the actual subscription method? Typically you're not just getting the last record added, as that's possibly inaccurate. Additionally, maybe other data in the payload is necessary to ensure the correct subscription is pushed.

In Redis, it looks like the publish method has this signature:

publish (string $channel, string $message)

I'm guessing maybe a serialized object is passed for the $message in many cases. Again, I don't have a lot of experience with using Redis for pub/sub. We have a more durable implementation.

Nonetheless, that $message string is going to be needed by the subscription.

@oprypkhantc
Copy link
Contributor

I don't think we necessarily need to go above and beyond to support webhook style subscriptions. But we also shouldn't close the door on them.

Agreed. Supporting webhooks through GraphQL actually sounds awesome, albeit with some difficulties on the way. Thanks for the links.

Maybe you're serializing an object, or primitives with your event payload.
I'm guessing maybe a serialized object is passed for the $message in many cases. Again, I don't have a lot of experience with using Redis for pub/sub. We have a more durable implementation.

We don't use Redis pub/sub either - it was a guess of what users might end up doing based on what's currently popular and new capabilities open by GraphQL subscriptions.

We specifically may end up serializing whole events and just publishing them to Redis:

interface ShouldBroadcastGlobally {}

class ArticlePublishedEvent implements ShouldBroadcastGlobally {
    public function __construct(
        public readonly Article $article,
    ) {}
}

class ArticleController {
    #[Mutation]
    public function publishArticle(ArticleInput $input): Article {
        $article = Article::fromInput($input);
        
        $this->articleRepository->save($article);
        $this->eventDispatcher->dispatch(new ArticlePublishedEvent($article));
        
        return $article;
    }
}

class EventDispatcher {
    public function dispatch(object $event): void
    {
        foreach ($this->listenersFor($event) as $listener) {
            $listener($event);
        }
        
        if ($event instanceof ShouldBroadcastGlobally) {
            $payload = serialize($event);
            
            $this->redis->publish('app.events', $payload);
        }
    }
}

Whenever an events implements ShouldBroadcastGlobally, it'd be serialized as-is and published into Redis.

But how is that payload shared with the actual subscription method?

We subscribe to that channel, unserialize the event, check if it's one we wanted and yield that back to graphqlite right from the subscription method:

class ArticleController {
    #[Subscription]
    public function articlePublished(ArticleInput $filters): Generator<Article> {
        while (true) {
            $event = null;
            
            $this->redis->subscribe('app.events', function (string $payload) use (&$event, $filters) {
                $event = unserialize($payload);
            
                if (!$event instanceof ArticlePublishedEvent) {
                    return;
                }
            }, timeout: 10 * 1000);
            
            if (!$event) {
                continue;
            }
            
            // Ignore some articles
            if (!$event->article->hasTags($filters->tags)) {
                continue;
            }
            
            yield $event->article;
        }; 
    }
}

Additionally, maybe other data in the payload is necessary to ensure the correct subscription is pushed.

I'm not sure what you're referring to with the "correct subscription". In this case, a subscription is a long-lived request (and a long-lived process). Everything you yield from the controller method is pushed to that subscription, and once you close the connection - the method execution would also stop.

For the case of webhook subscriptions, this function would have to continue running indefinitely if we want to use the same controller method for both a keep-alive and webhook subscription.

@oojacoboo
Copy link
Collaborator

oojacoboo commented Dec 11, 2023

I'm not sure what you're referring to with the "correct subscription". In this case, a subscription is a long-lived request (and a long-lived process). Everything you yield from the controller method is pushed to that subscription, and once you close the connection - the method execution would also stop.

That was just an extension of my previous statement around the payload. Your previous example didn't incorporate a payload from the publisher. You addressed this.

Unfortunately, the flow of that controller is meh. It would seem to me that moving the keep-alive/infinite loop out of the subscription method, entirely, would be the best design, and the Redis subscription as well. What's going to determine how the generator is executed/called? Is this middleware going to be userland? I think we need to define the middleware contract/interface. Then we need to define how that middleware will be used and how it's customizable in userland.

I think this middleware should be able to determine the transport and keep-alive logic. The controller needs to be responsible for the execution of business logic. This is where I was going with the subscriber argument on the Subscription attribute, opening up customization per controller/subscription:

#[Subscription(subscriber: (new WhateverCustomSubscriber)->forEvent('article.published'))]

Assuming that we did that and have the following:

#[Subscription(
    subscriber: (new WhateverCustomSubscriber)->forEvent('article.published'),
)]
public function articlePublished(ArticleInput $filters): Generator<Article> 
{
    $tags = Tags::findAll($filters->tags);

    yield function ($payload) use ($tags) {
        foreach ($tags as $tag) {
            if (in_array($tag, $payload->article->tags)) {
                return $payload->article;
            }
         }
    };
}

Where does this fail to allow the subscription and keep-alive logic outside of the controller method? The subscriber argument is optional on the attribute, of course. We'd support a global default "subscriber". I don't actually see any reason to need this, unless there is contextual data that should be passed to the subscriber for a specific controller. In many cases, this can probably be generalized.

But, assuming the above "subscription" method, returning a generator, can we define an interface/contract for the middleware, supporting the yield function signature? And while having a custom subscriber to satisfy any additional dependency concerns.

@oprypkhantc
Copy link
Contributor

Unfortunately, the flow of that controller is meh. It would seem to me that moving the keep-alive/infinite loop out of the subscription method, entirely, would be the best design, and the Redis subscription as well.

The loop example was just that - an example. I would never actually deploy anything like that; it needs a better, fully async design, one that ideally won't have any loops at all.

What's going to determine how the generator is executed/called?

Not sure what you mean. The controller method is simply called once through a resolver, the same way as with #[Query] and #[Mutation].

Is this middleware going to be userland? I think we need to define the middleware contract/interface.

If users need custom implementations, they will be able to do so through custom attributes and existing field middleware, the one used for regular #[Query] and #[Mutation] methods, by overwriting resolver in middleware. I don't think there's a need for a custom interface here?

I think this middleware should be able to determine the transport and keep-alive logic.

Specifically the transport part needs to be separate from the rest so that graphqlite can handle it on it's side without intersecting with the userland code.

The controller needs to be responsible for the execution of business logic.

I'd argue Redis subscription is also business logic. Or, rather, a userland logic, that graphqlite should abstract away from as much as possible. Having one subscriber interface is good, but we have an entire field middleware mechanism that can be used instead, which is more powerful.

#[Subscription(subscriber: (new WhateverCustomSubscriber)->forEvent('article.published'))]

Assuming that we did that and have the following:

#[Subscription(
    subscriber: (new WhateverCustomSubscriber)->forEvent('article.published'),
)]
public function articlePublished(ArticleInput $filters): Generator<Article> 
{
    $tags = Tags::findAll($filters->tags);

    yield function ($payload) use ($tags) {
        foreach ($tags as $tag) {
            if (in_array($tag, $payload->article->tags)) {
                return $payload->article;
            }
         }
    };
}

Where does this fail to allow the subscription and keep-alive logic outside of the controller method?

Nowhere, that's a different and more efficient design. There's still a concern though:

yield simply yield an anonymous function once (return type Generator<callable(): Article>), while the actual return type is specified as Generator<Article>. But if you try to change the Generator<Article> return type into the actual return type Generator<callable(): Article>, graphqlite would not be able to automatically map the return type in the schema.

The generator idea only works under an assumption that all of the logic is contained inside the controller method itself. But I get what you mean. To fix the return type, two designs can be used, and both don't need a custom subscriber interface:

This implementation is practically identical in terms of amount of code and it's style to yours:

class WhateverCustomSubscriber implements IteratorAggregate {
    public function __construct(private readonly string $channel, private readonly Closure $mapEvent) {
    
    }
    
    public function getIterator(): Traversable {
        while (true) {
            // yada yada
            $event = receive_event($this->channel);
            $event = ($this->mapEvent)($event);
            
            yield $event;
        };
    }
}

#[Subscription]
public function articlePublished(ArticleInput $filters): Generator<Article> 
{
    $tags = Tags::findAll($filters->tags);
    
    return new WhateverCustomSubscriber('article.published', function ($payload) use ($tags) {
        foreach ($tags as $tag) {
            if (in_array($tag, $payload->article->tags)) {
                return $payload->article;
            }
         }
    });
}

This implementation uses the subscription middleware, but through the existing system/interfaces. By doing so, it allows us to fix the return type on-the-fly to be the one we need, as we now have access to $fieldDescriptor:

class WhateverCustomSubscriber implements FieldMiddlewareAnnotation {
    public function __construct(public readonly string $channel) {
}

class WhateverCustomSubscriberMiddleware implements FieldMiddlewareInterface {
    public function handle(FieldDescriptor $fieldDescriptor, FieldHandler $fieldHandler): FieldDefinition {
        $whateverCustomSubscriberAttribute = $fieldDescriptor->getMiddlewareAnnotations()->getAnnotationByType(WhateverCustomSubscriber::class);
        
        if (!$whateverCustomSubscriber) {
            return $fieldHandler->handle($fieldDescriptor);
        }
    
        $controllerResolver = $fieldDescriptor->getResolver();
    
        // Replace the resolver
        $fieldDescriptor = $fieldDescriptor->withResolver(function(...$args): Generator use ($controllerResolver, $whateverCustomSubscriber) {
            // Get the anonymous function from the controller
            $mapEvent = $controllerResolver(...$args);
            
            while (true) {
                // yada yada
                $event = receive_event($whateverCustomSubscriber->channel);
                $event = ($mapEvent)($event);
            
                yield $event;
            };
        });
        
        // Fix the return type
        $nativeReturnType = $fieldDescriptor->getNativeType();
        
        // Get the "Article" part from the "callable(): Article" type
        $valueType = $nativeReturnType->getValueType();
        
        $fieldDescriptor = $fieldDescriptor->withType(
            $this->typeMapper->mapOutputType($valueType)
        );
        
        return $fieldHandler->handle($fieldDescriptor);
    }
}

#[Subscription]
#[WhateverCustomSubscriber('article.published')]
public function articlePublished(ArticleInput $filters): (callable(): Article)
{
    $tags = Tags::findAll($filters->tags);
    
    return function ($payload) use ($tags) {
        foreach ($tags as $tag) {
            if (in_array($tag, $payload->article->tags)) {
                return $payload->article;
            }
         }
    };
}

@oojacoboo
Copy link
Collaborator

oojacoboo commented Dec 11, 2023

The first one is certainly more desirable. I think this is all looking good now 👍. It's flexible enough for most any use case and the separation of logic looks good to me. You want to work on a PR for this?

@oprypkhantc
Copy link
Contributor

Good :)

Up until recently we were using a custom Pusher implementation, written on async PHP using ReactPHP, acting as our WebSocket server. It was able to outperform (both in RPS and CPU load) some known NodeJS implementations thanks to fully async code.

This subscriptions implementation needs to support async to be viable to use. The problem is that I have practically zero experience with async in PHP, and we really do want to think this through before starting the implementation :/

You want to work on a PR for this?

Sure, but I can do so only in my free time for now, so it might take a while.

@oojacoboo
Copy link
Collaborator

Cool. So, something to keep in mind is that async PHP might not be the way many people choose to implement subscriptions. In fact, I'd argue that your subscriptions should probably be managed by another node optimized for such. That's where mercure comes into play, but really any long-lived channel stream solution.

I do think we need to support long-lived PHP processes, much like some of the examples you've provided. However, in a webhook, mercure, or most any other offloaded system, the response will be returned following the subscription request. In the case of mercure, you return a link header with the location of the subscription channel endpoint. Other solutions will likely have similar implementations, letting the client know where to connect for the channel stream, instead of the subscription request actually being the channel stream.

In Mercure:

The publisher sends updates by issuing POST HTTPS requests on the [Mercure] hub URL. When it receives an update, the hub dispatches it to subscribers using the established server-sent events connections.

The subscriber subscribes to a URL exposed by a hub to receive updates from one or many topics. To subscribe to updates, the client opens an HTTPS connection following the Server-Sent Events specification (W3C.REC-eventsource-20150203) to the hub's subscription URL advertised by the publisher.

@oprypkhantc
Copy link
Contributor

Yeah I agree. PHP definitely isn't in a state yet where a lot of people are using async. Having lightweight long-lived connections right in the same app would be much simpler though than offloading connections to another node, so I'm hoping we'll get there eventually.

The problem is that it requires entirely different code to handle it, and so far I don't see a way around it:

class ArticleCommentController
{
    #[Subscription(outputType: 'ArticleComment!')]
    public function articleCommentPublished(string $articleId): string
    {
        return "articles.{$articleId}.comments.published";
    }
    
    #[Mutation]
    public function publishComment(string $articleId, string $text): void
    {
        // needs to be a GraphQLite provided publisher so it can serialize the `ArticleComment` here..
        $this->publisher->publish("articles.{$articleId}.comments.published", new ArticleComment($text));
    }
}

We can no longer specify the "correct" return type; we have to save the input fields somewhere, somehow (in this case - encoding it into the "channel name"); we have to have a way of identifying that subscription, so we can later publish to it.

@oojacoboo
Copy link
Collaborator

@oprypkhantc I've added #649 for basic subscription operation support.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

4 participants