From c810d8f27533c2074da3ff7ca5ca89fe4521dbe2 Mon Sep 17 00:00:00 2001 From: rishtigupta Date: Wed, 3 Jan 2024 15:11:40 -0800 Subject: [PATCH] fix: add streaming call --- src/Topic/Internal/ScsTopicClient.php | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/Topic/Internal/ScsTopicClient.php b/src/Topic/Internal/ScsTopicClient.php index f03fb314..28b2084d 100644 --- a/src/Topic/Internal/ScsTopicClient.php +++ b/src/Topic/Internal/ScsTopicClient.php @@ -39,9 +39,11 @@ class ScsTopicClient implements LoggerAwareInterface private TopicGrpcManager $grpcManager; private LoggerInterface $logger; private int $timeout; + private $authToken; public function __construct(IConfiguration $configuration, ICredentialProvider $authProvider) { + $authToken = $authProvider->getAuthToken(); $operationTimeoutMs = $configuration ->getTransportStrategy() ->getGrpcConfig() @@ -146,11 +148,15 @@ public function subscribe(string $cacheName, string $topicName, callable $onMess try { validateCacheName($cacheName); + + $authToken = $this->authToken; + + $request = new _SubscriptionRequest(); $request->setCacheName($cacheName); $request->setTopic($topicName); - $call = $this->grpcManager->client->Subscribe($request); + $call = $this->grpcManager->client->Subscribe($request, ['authorization' => ['Bearer ' . $authToken]]); $this->processStreamingCall($call); foreach ($call->responses() as $response) {