Skip to content

Commit

Permalink
feat(notifications): add grpc server
Browse files Browse the repository at this point in the history
  • Loading branch information
thevaibhav-dixit committed Jan 15, 2024
1 parent b3c03f3 commit a83ca78
Show file tree
Hide file tree
Showing 18 changed files with 494 additions and 207 deletions.
50 changes: 32 additions & 18 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,5 @@ opentelemetry-http = "0.9.0"
http = "0.2.9"
tonic = "0.10.2"
tonic-build = { version = "0.10.2", features = ["prost"] }
tonic-health = "0.10.2"
prost = "0.12"
1 change: 1 addition & 0 deletions core/notifications/BUCK
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ rust_protobuf_library(
"//third-party/rust:uuid",
"//third-party/rust:serde_with",
"//third-party/rust:tonic",
"//third-party/rust:tonic-health",
],
srcs = glob([
"src/**/*.rs",
Expand Down
1 change: 1 addition & 0 deletions core/notifications/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ rand = { workspace = true }
uuid = { workspace = true }
serde_with = { workspace = true }
tonic = { workspace = true }
tonic-health = { workspace = true }
prost = { workspace = true }

[build-dependencies]
Expand Down
31 changes: 20 additions & 11 deletions core/notifications/proto/notifications.proto
Original file line number Diff line number Diff line change
@@ -1,20 +1,29 @@
syntax = "proto3";

package helloworld;
import "google/protobuf/struct.proto";

// The greeting service definition.
service Greeter {
// Sends a greeting
rpc SayHello (HelloRequest) returns (HelloReply) {}
package services.notifications.v1;

service NotificationsService {
rpc ShouldSendNotification (ShouldSendNotificationRequest) returns (ShouldSendNotificationResponse) {}
}

enum NotificationChannel {
PUSH = 0;
}

// The request message containing the user's name.
message HelloRequest {
string name = 1;
enum NotificationCategory {
CIRCLES = 0;
PAYMENTS = 1;
}

// The response message containing the greetings.
message HelloReply {
string message = 1;
message ShouldSendNotificationRequest {
string user_id = 1;
NotificationChannel channel = 2;
NotificationCategory category = 3;
}

message ShouldSendNotificationResponse {
string user_id = 1;
bool should_send = 2;
}
2 changes: 2 additions & 0 deletions core/notifications/src/app/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,6 @@ use crate::user_notification_settings::error::*;
pub enum ApplicationError {
#[error("{0}")]
UserNotificationSettingsError(#[from] UserNotificationSettingsError),
#[error("{0}")]
GrpcServerError(#[from] tonic::transport::Error),
}
14 changes: 14 additions & 0 deletions core/notifications/src/app/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,20 @@ impl NotificationsApp {
}
}

pub async fn should_send_notification(
&self,
user_id: GaloyUserId,
channel: UserNotificationChannel,
category: UserNotificationCategory,
) -> Result<bool, ApplicationError> {
let user_settings = self
.settings
.find_for_user_id(&user_id)
.await?
.unwrap_or_else(|| UserNotificationSettings::new(user_id.clone()));
Ok(user_settings.should_send_notification(channel, category))
}

pub async fn notification_settings_for_user(
&self,
user_id: GaloyUserId,
Expand Down
6 changes: 4 additions & 2 deletions core/notifications/src/cli/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use tracing::TracingConfig;
use std::path::Path;

use super::db::*;
use crate::{app::AppConfig, graphql::server::ServerConfig};
use crate::{app::AppConfig, graphql::server::ServerConfig, grpc::GrpcServerConfig};

#[derive(Clone, Default, Serialize, Deserialize)]
pub struct Config {
Expand All @@ -15,13 +15,15 @@ pub struct Config {
pub app: AppConfig,
#[serde(default)]
pub server: ServerConfig,
#[serde(default)]
pub grpc_server: GrpcServerConfig,
#[serde(default = "default_tracing_config")]
pub tracing: TracingConfig,
}

fn default_tracing_config() -> TracingConfig {
TracingConfig {
service_name: "api-keys".to_string(),
service_name: "notifications".to_string(),
}
}

Expand Down
33 changes: 32 additions & 1 deletion core/notifications/src/cli/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
pub mod config;
mod db;

use anyhow::Context;
use clap::Parser;
use std::path::PathBuf;

Expand Down Expand Up @@ -33,8 +34,38 @@ pub async fn run() -> anyhow::Result<()> {

async fn run_cmd(config: Config) -> anyhow::Result<()> {
tracing::init_tracer(config.tracing)?;
let (send, mut receive) = tokio::sync::mpsc::channel(1);
let mut handles = vec![];
let pool = db::init_pool(&config.db).await?;
let app = crate::app::NotificationsApp::new(pool, config.app);

println!("Starting notifications graphql server");
crate::graphql::server::run_server(config.server, app).await
let graphql_send = send.clone();
let graphql_config = config.server;
let graphql_app = app.clone();
handles.push(tokio::spawn(async move {
let _ = graphql_send.try_send(
crate::graphql::server::run_server(graphql_config, graphql_app)
.await
.context("graphql server error"),
);
}));

println!("Starting notifications grpc server");
let grpc_send = send.clone();
let grpc_config = config.grpc_server;
handles.push(tokio::spawn(async move {
let _ = grpc_send.try_send(
crate::grpc::run_server(grpc_config, app)
.await
.context("grpc server error"),
);
}));

let reason = receive.recv().await.expect("Didn't receive msg");
for handle in handles {
handle.abort();
}

reason
}
19 changes: 19 additions & 0 deletions core/notifications/src/grpc/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct GrpcServerConfig {
#[serde(default = "default_port")]
pub listen_port: u16,
}

impl Default for GrpcServerConfig {
fn default() -> Self {
Self {
listen_port: default_port(),
}
}
}

fn default_port() -> u16 {
2478
}
12 changes: 12 additions & 0 deletions core/notifications/src/grpc/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
mod config;
mod server;

use crate::app::*;

pub use config::*;
pub use server::*;

pub async fn run_server(config: GrpcServerConfig, app: NotificationsApp) -> anyhow::Result<()> {
server::start(config, app).await?;
Ok(())
}
29 changes: 29 additions & 0 deletions core/notifications/src/grpc/server/convert.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use crate::app::ApplicationError;
use crate::primitives::{UserNotificationCategory, UserNotificationChannel};

impl From<i32> for UserNotificationCategory {
fn from(category: i32) -> Self {
match category {
0 => UserNotificationCategory::Circles,
1 => UserNotificationCategory::Payments,
_ => unimplemented!(),
}
}
}

impl From<i32> for UserNotificationChannel {
fn from(channel: i32) -> Self {
match channel {
0 => UserNotificationChannel::Push,
_ => unimplemented!(),
}
}
}

impl From<ApplicationError> for tonic::Status {
fn from(err: ApplicationError) -> Self {
match err {
_ => tonic::Status::internal(err.to_string()),
}
}
}
Loading

0 comments on commit a83ca78

Please sign in to comment.