Skip to content

Commit

Permalink
Add #274 Process webhooks in goroutine
Browse files Browse the repository at this point in the history
Signed-off-by: George J Padayatti <[email protected]>
  • Loading branch information
albinpa authored and georgepadayatti committed Oct 18, 2023
1 parent 38671dc commit ff10611
Show file tree
Hide file tree
Showing 5 changed files with 391 additions and 111 deletions.
182 changes: 79 additions & 103 deletions src/main/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/bb-consent/api/src/database"
"github.com/bb-consent/api/src/email"
"github.com/bb-consent/api/src/firebaseUtils"
"github.com/bb-consent/api/src/kafkaUtils"
"github.com/bb-consent/api/src/middleware"
"github.com/bb-consent/api/src/notifications"
"github.com/bb-consent/api/src/token"
Expand All @@ -19,129 +18,106 @@ import (
v2Handlers "github.com/bb-consent/api/src/v2/handler"
individualHandler "github.com/bb-consent/api/src/v2/handler/individual"
v2HttpPaths "github.com/bb-consent/api/src/v2/http_path"
"github.com/bb-consent/api/src/webhookdispatcher"
"github.com/bb-consent/api/src/webhooks"
"github.com/casbin/casbin/v2"
"github.com/gorilla/mux"
"github.com/spf13/cobra"
)

func main() {
var configFileName string

var rootCmd = &cobra.Command{Use: "bb-consent-api"}
func startAPICmdHandlerfunc(cmd *cobra.Command, args []string) {

var configFileName string
// Load configuration
configFile := "/opt/bb-consent/api/config/" + configFileName
loadedConfig, err := config.Load(configFile)
if err != nil {
log.Printf("Failed to load config file %s \n", configFile)
panic(err)
}

// Define the "start-api" command
var startAPICmd = &cobra.Command{
Use: "start-api",
Short: "Starts the bb consent api server",
Run: func(cmd *cobra.Command, args []string) {

configFile := "/opt/bb-consent/api/config/" + configFileName
loadedConfig, err := config.Load(configFile)
if err != nil {
log.Printf("Failed to load config file %s \n", configFile)
panic(err)
}

log.Printf("config file: %s loaded\n", configFile)

err = database.Init(loadedConfig)
if err != nil {
panic(err)
}
log.Println("Data base session opened")

webhooks.Init(loadedConfig)
log.Println("Webhooks configuration initialized")

err = kafkaUtils.Init(loadedConfig)
if err != nil {
panic(err)
}
log.Println("Kafka producer client initialised")

v1Handlers.IamInit(loadedConfig)
v2Handlers.IamInit(loadedConfig)
individualHandler.IamInit(loadedConfig)
log.Println("Iam initialized")

email.Init(loadedConfig)
log.Println("Email initialized")

token.Init(loadedConfig)
log.Println("Token initialized")

err = notifications.Init()
if err != nil {
panic(err)
}

firebaseUtils.Init(loadedConfig)
log.Println("Firebase initialized")

middleware.ApplicationModeInit(loadedConfig)
log.Println("Application mode initialized")

// setup casbin auth rules
authEnforcer, err := casbin.NewEnforcer("/opt/bb-consent/api/config/auth_model.conf", "/opt/bb-consent/api/config/rbac_policy.csv")
if err != nil {
panic(err)
}

// If the application starts in single tenant mode then create/update organisation, type, admin logic
switch loadedConfig.ApplicationMode {
case config.SingleTenant:
SingleTenantConfiguration(loadedConfig)
case config.MultiTenant:
default:
panic("Application mode is mandatory. Specify either 'single-tenant' or 'multi-tenant'.")
}

router := mux.NewRouter()
v1HttpPaths.SetRoutes(router, authEnforcer)
v2HttpPaths.SetRoutes(router, authEnforcer)

log.Println("Listening port 80")
http.ListenAndServe(":80", router)
},
log.Printf("config file: %s loaded\n", configFile)

// Database
err = database.Init(loadedConfig)
if err != nil {
panic(err)
}
log.Println("Data base session opened")

// Webhooks
webhooks.Init(loadedConfig)
log.Println("Webhooks configuration initialized")

// IAM
v1Handlers.IamInit(loadedConfig)
v2Handlers.IamInit(loadedConfig)
individualHandler.IamInit(loadedConfig)
log.Println("Iam initialized")

// Email
email.Init(loadedConfig)
log.Println("Email initialized")

// Token
token.Init(loadedConfig)
log.Println("Token initialized")

// Notifications
err = notifications.Init()
if err != nil {
panic(err)
}

// Define the "start-webhook-dispatcher" command
var startWebhookCmd = &cobra.Command{
Use: "start-webhook-dispatcher",
Short: "Starts the webhook dispatcher",
Run: func(cmd *cobra.Command, args []string) {
// Firebase
firebaseUtils.Init(loadedConfig)
log.Println("Firebase initialized")

log.SetFlags(log.LstdFlags | log.Lshortfile)
log.Println("Starting webhook dispatcher")
// Application mode
middleware.ApplicationModeInit(loadedConfig)
log.Println("Application mode initialized")

configFile := "/opt/bb-consent/api/config/" + configFileName
// Setup Casbin auth rules
authEnforcer, err := casbin.NewEnforcer("/opt/bb-consent/api/config/auth_model.conf", "/opt/bb-consent/api/config/rbac_policy.csv")
if err != nil {
panic(err)
}

loadedConfig, err := config.Load(configFile)
if err != nil {
log.Printf("Failed to load config file %s \n", configFile)
panic(err)
}
log.Printf("config file: %s loaded\n", configFile)
// Execute actions based on application mode
switch loadedConfig.ApplicationMode {
case config.SingleTenant:
SingleTenantConfiguration(loadedConfig)
case config.MultiTenant:
default:
panic("Application mode is mandatory. Specify either 'single-tenant' or 'multi-tenant'.")
}

err = database.Init(loadedConfig)
if err != nil {
panic(err)
}
log.Println("Data base session opened")
// Router
router := mux.NewRouter()
v1HttpPaths.SetRoutes(router, authEnforcer)
v2HttpPaths.SetRoutes(router, authEnforcer)

webhookdispatcher.WebhookDispatcherInit(loadedConfig)
},
// Start server and listen in port 80
log.Println("Listening port 80")
http.ListenAndServe(":80", router)
}

func main() {

var rootCmd = &cobra.Command{Use: "bb-consent-api"}

// Define the "start-api" command
var startAPICmd = &cobra.Command{
Use: "start-api",
Short: "Starts the bb consent api server",
Run: startAPICmdHandlerfunc,
}

// Define the "config" flag
startAPICmd.Flags().StringVarP(&configFileName, "config", "c", "config-development.json", "configuration file")
startWebhookCmd.Flags().StringVarP(&configFileName, "config", "c", "config-development.json", "configuration file")

// Add the "start-api," and "start-webhook-dispatcher" commands to the root command
rootCmd.AddCommand(startAPICmd, startWebhookCmd)
// Add the "start-api" commands to the root command
rootCmd.AddCommand(startAPICmd)

// Execute the CLI
if err := rootCmd.Execute(); err != nil {
Expand Down
File renamed without changes.
15 changes: 7 additions & 8 deletions src/v2/webhook/webhooks.go → src/v2/webhook/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/bb-consent/api/src/config"
"github.com/bb-consent/api/src/kafkaUtils"
"github.com/bb-consent/api/src/user"
"github.com/bb-consent/api/src/v2/webhook_dispatcher"
"github.com/confluentinc/confluent-kafka-go/kafka"
)

Expand Down Expand Up @@ -278,6 +279,8 @@ func TriggerWebhooks(webhookEventData WebhookEventData, webhookEventType string)
return
}

go webhook_dispatcher.ProcessWebhooks(webhookEventType, b)

// Log webhook calls in webhooks category
aLog := fmt.Sprintf("Organization webhook: %v triggered by user: %v by event: %v", toBeProcessedWebhook.PayloadURL, u.Email, webhookEventType)
actionlog.LogOrgWebhookCalls(u.ID.Hex(), u.Email, webhookEventData.GetOrganisationID(), aLog)
Expand All @@ -289,8 +292,7 @@ func TriggerWebhooks(webhookEventData WebhookEventData, webhookEventType string)
func TriggerOrgSubscriptionWebhookEvent(userID, orgID string, eventType string) {

// Constructing webhook event data attribute
var orgSubscriptionWebhookEvent OrgSubscriptionWebhookEvent
orgSubscriptionWebhookEvent = OrgSubscriptionWebhookEvent{
orgSubscriptionWebhookEvent := OrgSubscriptionWebhookEvent{
OrganisationID: orgID,
UserID: userID,
}
Expand All @@ -303,8 +305,7 @@ func TriggerOrgSubscriptionWebhookEvent(userID, orgID string, eventType string)
func TriggerConsentWebhookEvent(userID, purposeID, consentID, orgID, eventType, timeStamp string, days int, attributes []string) {

// Constructing webhook event data attribute
var consentWebhookEvent ConsentWebhookEvent
consentWebhookEvent = ConsentWebhookEvent{
consentWebhookEvent := ConsentWebhookEvent{
OrganisationID: orgID,
UserID: userID,
ConsentID: consentID,
Expand All @@ -322,8 +323,7 @@ func TriggerConsentWebhookEvent(userID, purposeID, consentID, orgID, eventType,
func TriggerDataRequestWebhookEvent(userID string, orgID string, dataRequestID string, eventType string) {

// Constructing webhook event data attribute
var dataRequestWebhookEvent DataRequestWebhookEvent
dataRequestWebhookEvent = DataRequestWebhookEvent{
dataRequestWebhookEvent := DataRequestWebhookEvent{
OrganisationID: orgID,
UserID: userID,
DataRequestID: dataRequestID,
Expand All @@ -337,8 +337,7 @@ func TriggerDataRequestWebhookEvent(userID string, orgID string, dataRequestID s
func TriggerDataUpdateRequestWebhookEvent(userID, attributeID, purposeID, consentID, orgID, dataRequestID string, eventType string) {

// Constructing webhook event data attribute
var dataUpdateRequestWebhookEvent DataUpdateRequestWebhookEvent
dataUpdateRequestWebhookEvent = DataUpdateRequestWebhookEvent{
dataUpdateRequestWebhookEvent := DataUpdateRequestWebhookEvent{
OrganisationID: orgID,
UserID: userID,
DataRequestID: dataRequestID,
Expand Down
38 changes: 38 additions & 0 deletions src/v2/webhook_dispatcher/db.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package webhook_dispatcher

import (
"context"

"github.com/bb-consent/api/src/database"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
)

func webhookCollection() *mongo.Collection {
return database.DB.Client.Database(database.DB.Name).Collection("webhooks")
}

func webhookDeliveryCollection() *mongo.Collection {
return database.DB.Client.Database(database.DB.Name).Collection("webhookDeliveries")
}

// GetWebhookByOrgID Gets a webhook by organisation ID and webhook ID
func GetWebhookByOrgID(webhookId, orgID string) (result Webhook, err error) {

err = webhookCollection().FindOne(context.TODO(), bson.M{"_id": webhookId, "orgid": orgID}).Decode(&result)

return result, err
}

// AddWebhookDelivery Adds payload delivery details to database for a webhook event
func AddWebhookDelivery(webhookDelivery WebhookDelivery) (WebhookDelivery, error) {

if webhookDelivery.ID == primitive.NilObjectID {
webhookDelivery.ID = primitive.NewObjectID()
}

_, err := webhookDeliveryCollection().InsertOne(context.TODO(), &webhookDelivery)

return webhookDelivery, err
}
Loading

0 comments on commit ff10611

Please sign in to comment.