Skip to content

Commit

Permalink
fix: subscribe method update
Browse files Browse the repository at this point in the history
  • Loading branch information
rishtigupta committed Jan 3, 2024
1 parent a709eab commit 6afb495
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 8 deletions.
4 changes: 2 additions & 2 deletions examples/topic-example.php
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ function printBanner(string $message, LoggerInterface $logger): void

// Subscribe to topic
$logger->info("Subscribing to topic: $TOPIC_NAME\n");
$callback = function ($message) use ($logger) {
$onMessage = function ($message) use ($logger) {
$logger->info("Received message: " . json_encode($message));
file_put_contents('message_received.flag', '1');
};
$response = $topicClient->subscribe($CACHE_NAME, $TOPIC_NAME, $callback);
$response = $topicClient->subscribe($CACHE_NAME, $TOPIC_NAME, $onMessage);
if ($response->asSuccess()) {
$logger->info("SUCCESS: Subscribed to topic: " . $TOPIC_NAME . "\n");
} elseif ($response->asError()) {
Expand Down
16 changes: 12 additions & 4 deletions src/Topic/Internal/ScsTopicClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,10 @@ public function publish(string $cacheName, string $topicName, string $value): To
*
* @param string $cacheName The name of the cache.
* @param string $topicName The name of the topic to subscribe to.
* @param callable $callback The callback function to handle incoming messages.
* @param callable $onMessage Callback for handling incoming messages.
* @return TopicSubscribeResponse
*/
public function subscribe(string $cacheName, string $topicName, callable $callback): TopicSubscribeResponse
public function subscribe(string $cacheName, string $topicName, callable $onMessage): TopicSubscribeResponse
{
$this->logger->info("Subscribing to topic: $topicName in cache $cacheName\n");

Expand All @@ -128,16 +129,23 @@ public function subscribe(string $cacheName, string $topicName, callable $callba
$call = $this->grpcManager->client->Subscribe($request);

foreach ($call->responses() as $response) {
$this->logger->info("Received message from topic $topicName in cache $cacheName\n");
$callback($response);
try {
$this->logger->info("Received message from topic $topicName in cache $cacheName\n");
$onMessage($response);
} catch (\Exception $e) {
$this->logger->error("Error processing message: " . $e->getMessage());
}
}


} catch (SdkError $e) {
$this->logger->debug("Failed to subscribe to topic $topicName in cache $cacheName: {$e->getMessage()}");
return new TopicSubscribeResponseError($e);
} catch (\Exception $e) {
$this->logger->debug("Failed to subscribe to topic $topicName in cache $cacheName: {$e->getMessage()}");
return new TopicSubscribeResponseError(new UnknownError($e->getMessage()));
}

return new TopicSubscribeResponseSubscription();
}

Expand Down
4 changes: 2 additions & 2 deletions src/Topic/TopicClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ public function publish(string $cacheName, string $topicName, string $message):
return $this->topicClient->publish($cacheName, $topicName, $message);
}

public function subscribe(string $cacheName, string $topicName, callable $callback): TopicSubscribeResponse
public function subscribe(string $cacheName, string $topicName, callable $onMessage): TopicSubscribeResponse
{
$this->logger->info("Subscribing to topic: $topicName\n");
return $this->topicClient->subscribe($cacheName, $topicName, $callback);
return $this->topicClient->subscribe($cacheName, $topicName, $onMessage);
}
}

0 comments on commit 6afb495

Please sign in to comment.