diff --git a/src/main/main.go b/src/main/main.go index 6194ab7..b4552d3 100644 --- a/src/main/main.go +++ b/src/main/main.go @@ -10,7 +10,6 @@ import ( "github.com/bb-consent/api/src/database" "github.com/bb-consent/api/src/email" "github.com/bb-consent/api/src/firebaseUtils" - "github.com/bb-consent/api/src/kafkaUtils" "github.com/bb-consent/api/src/middleware" "github.com/bb-consent/api/src/notifications" "github.com/bb-consent/api/src/token" @@ -19,129 +18,106 @@ import ( v2Handlers "github.com/bb-consent/api/src/v2/handler" individualHandler "github.com/bb-consent/api/src/v2/handler/individual" v2HttpPaths "github.com/bb-consent/api/src/v2/http_path" - "github.com/bb-consent/api/src/webhookdispatcher" "github.com/bb-consent/api/src/webhooks" "github.com/casbin/casbin/v2" "github.com/gorilla/mux" "github.com/spf13/cobra" ) -func main() { +var configFileName string - var rootCmd = &cobra.Command{Use: "bb-consent-api"} +func startAPICmdHandlerfunc(cmd *cobra.Command, args []string) { - var configFileName string + // Load configuration + configFile := "/opt/bb-consent/api/config/" + configFileName + loadedConfig, err := config.Load(configFile) + if err != nil { + log.Printf("Failed to load config file %s \n", configFile) + panic(err) + } - // Define the "start-api" command - var startAPICmd = &cobra.Command{ - Use: "start-api", - Short: "Starts the bb consent api server", - Run: func(cmd *cobra.Command, args []string) { - - configFile := "/opt/bb-consent/api/config/" + configFileName - loadedConfig, err := config.Load(configFile) - if err != nil { - log.Printf("Failed to load config file %s \n", configFile) - panic(err) - } - - log.Printf("config file: %s loaded\n", configFile) - - err = database.Init(loadedConfig) - if err != nil { - panic(err) - } - log.Println("Data base session opened") - - webhooks.Init(loadedConfig) - log.Println("Webhooks configuration initialized") - - err = kafkaUtils.Init(loadedConfig) - if err != nil { - panic(err) - } - log.Println("Kafka producer client initialised") - - v1Handlers.IamInit(loadedConfig) - v2Handlers.IamInit(loadedConfig) - individualHandler.IamInit(loadedConfig) - log.Println("Iam initialized") - - email.Init(loadedConfig) - log.Println("Email initialized") - - token.Init(loadedConfig) - log.Println("Token initialized") - - err = notifications.Init() - if err != nil { - panic(err) - } - - firebaseUtils.Init(loadedConfig) - log.Println("Firebase initialized") - - middleware.ApplicationModeInit(loadedConfig) - log.Println("Application mode initialized") - - // setup casbin auth rules - authEnforcer, err := casbin.NewEnforcer("/opt/bb-consent/api/config/auth_model.conf", "/opt/bb-consent/api/config/rbac_policy.csv") - if err != nil { - panic(err) - } - - // If the application starts in single tenant mode then create/update organisation, type, admin logic - switch loadedConfig.ApplicationMode { - case config.SingleTenant: - SingleTenantConfiguration(loadedConfig) - case config.MultiTenant: - default: - panic("Application mode is mandatory. Specify either 'single-tenant' or 'multi-tenant'.") - } - - router := mux.NewRouter() - v1HttpPaths.SetRoutes(router, authEnforcer) - v2HttpPaths.SetRoutes(router, authEnforcer) - - log.Println("Listening port 80") - http.ListenAndServe(":80", router) - }, + log.Printf("config file: %s loaded\n", configFile) + + // Database + err = database.Init(loadedConfig) + if err != nil { + panic(err) + } + log.Println("Data base session opened") + + // Webhooks + webhooks.Init(loadedConfig) + log.Println("Webhooks configuration initialized") + + // IAM + v1Handlers.IamInit(loadedConfig) + v2Handlers.IamInit(loadedConfig) + individualHandler.IamInit(loadedConfig) + log.Println("Iam initialized") + + // Email + email.Init(loadedConfig) + log.Println("Email initialized") + + // Token + token.Init(loadedConfig) + log.Println("Token initialized") + + // Notifications + err = notifications.Init() + if err != nil { + panic(err) } - // Define the "start-webhook-dispatcher" command - var startWebhookCmd = &cobra.Command{ - Use: "start-webhook-dispatcher", - Short: "Starts the webhook dispatcher", - Run: func(cmd *cobra.Command, args []string) { + // Firebase + firebaseUtils.Init(loadedConfig) + log.Println("Firebase initialized") - log.SetFlags(log.LstdFlags | log.Lshortfile) - log.Println("Starting webhook dispatcher") + // Application mode + middleware.ApplicationModeInit(loadedConfig) + log.Println("Application mode initialized") - configFile := "/opt/bb-consent/api/config/" + configFileName + // Setup Casbin auth rules + authEnforcer, err := casbin.NewEnforcer("/opt/bb-consent/api/config/auth_model.conf", "/opt/bb-consent/api/config/rbac_policy.csv") + if err != nil { + panic(err) + } - loadedConfig, err := config.Load(configFile) - if err != nil { - log.Printf("Failed to load config file %s \n", configFile) - panic(err) - } - log.Printf("config file: %s loaded\n", configFile) + // Execute actions based on application mode + switch loadedConfig.ApplicationMode { + case config.SingleTenant: + SingleTenantConfiguration(loadedConfig) + case config.MultiTenant: + default: + panic("Application mode is mandatory. Specify either 'single-tenant' or 'multi-tenant'.") + } - err = database.Init(loadedConfig) - if err != nil { - panic(err) - } - log.Println("Data base session opened") + // Router + router := mux.NewRouter() + v1HttpPaths.SetRoutes(router, authEnforcer) + v2HttpPaths.SetRoutes(router, authEnforcer) - webhookdispatcher.WebhookDispatcherInit(loadedConfig) - }, + // Start server and listen in port 80 + log.Println("Listening port 80") + http.ListenAndServe(":80", router) +} + +func main() { + + var rootCmd = &cobra.Command{Use: "bb-consent-api"} + + // Define the "start-api" command + var startAPICmd = &cobra.Command{ + Use: "start-api", + Short: "Starts the bb consent api server", + Run: startAPICmdHandlerfunc, } // Define the "config" flag startAPICmd.Flags().StringVarP(&configFileName, "config", "c", "config-development.json", "configuration file") - startWebhookCmd.Flags().StringVarP(&configFileName, "config", "c", "config-development.json", "configuration file") - // Add the "start-api," and "start-webhook-dispatcher" commands to the root command - rootCmd.AddCommand(startAPICmd, startWebhookCmd) + // Add the "start-api" commands to the root command + rootCmd.AddCommand(startAPICmd) // Execute the CLI if err := rootCmd.Execute(); err != nil { diff --git a/src/v2/webhook/webhooks_db.go b/src/v2/webhook/db.go similarity index 100% rename from src/v2/webhook/webhooks_db.go rename to src/v2/webhook/db.go diff --git a/src/v2/webhook/webhooks.go b/src/v2/webhook/webhook.go similarity index 96% rename from src/v2/webhook/webhooks.go rename to src/v2/webhook/webhook.go index 5cc0e0e..c737151 100644 --- a/src/v2/webhook/webhooks.go +++ b/src/v2/webhook/webhook.go @@ -13,6 +13,7 @@ import ( "github.com/bb-consent/api/src/config" "github.com/bb-consent/api/src/kafkaUtils" "github.com/bb-consent/api/src/user" + "github.com/bb-consent/api/src/v2/webhook_dispatcher" "github.com/confluentinc/confluent-kafka-go/kafka" ) @@ -278,6 +279,8 @@ func TriggerWebhooks(webhookEventData WebhookEventData, webhookEventType string) return } + go webhook_dispatcher.ProcessWebhooks(webhookEventType, b) + // Log webhook calls in webhooks category aLog := fmt.Sprintf("Organization webhook: %v triggered by user: %v by event: %v", toBeProcessedWebhook.PayloadURL, u.Email, webhookEventType) actionlog.LogOrgWebhookCalls(u.ID.Hex(), u.Email, webhookEventData.GetOrganisationID(), aLog) @@ -289,8 +292,7 @@ func TriggerWebhooks(webhookEventData WebhookEventData, webhookEventType string) func TriggerOrgSubscriptionWebhookEvent(userID, orgID string, eventType string) { // Constructing webhook event data attribute - var orgSubscriptionWebhookEvent OrgSubscriptionWebhookEvent - orgSubscriptionWebhookEvent = OrgSubscriptionWebhookEvent{ + orgSubscriptionWebhookEvent := OrgSubscriptionWebhookEvent{ OrganisationID: orgID, UserID: userID, } @@ -303,8 +305,7 @@ func TriggerOrgSubscriptionWebhookEvent(userID, orgID string, eventType string) func TriggerConsentWebhookEvent(userID, purposeID, consentID, orgID, eventType, timeStamp string, days int, attributes []string) { // Constructing webhook event data attribute - var consentWebhookEvent ConsentWebhookEvent - consentWebhookEvent = ConsentWebhookEvent{ + consentWebhookEvent := ConsentWebhookEvent{ OrganisationID: orgID, UserID: userID, ConsentID: consentID, @@ -322,8 +323,7 @@ func TriggerConsentWebhookEvent(userID, purposeID, consentID, orgID, eventType, func TriggerDataRequestWebhookEvent(userID string, orgID string, dataRequestID string, eventType string) { // Constructing webhook event data attribute - var dataRequestWebhookEvent DataRequestWebhookEvent - dataRequestWebhookEvent = DataRequestWebhookEvent{ + dataRequestWebhookEvent := DataRequestWebhookEvent{ OrganisationID: orgID, UserID: userID, DataRequestID: dataRequestID, @@ -337,8 +337,7 @@ func TriggerDataRequestWebhookEvent(userID string, orgID string, dataRequestID s func TriggerDataUpdateRequestWebhookEvent(userID, attributeID, purposeID, consentID, orgID, dataRequestID string, eventType string) { // Constructing webhook event data attribute - var dataUpdateRequestWebhookEvent DataUpdateRequestWebhookEvent - dataUpdateRequestWebhookEvent = DataUpdateRequestWebhookEvent{ + dataUpdateRequestWebhookEvent := DataUpdateRequestWebhookEvent{ OrganisationID: orgID, UserID: userID, DataRequestID: dataRequestID, diff --git a/src/v2/webhook_dispatcher/db.go b/src/v2/webhook_dispatcher/db.go new file mode 100644 index 0000000..b68e356 --- /dev/null +++ b/src/v2/webhook_dispatcher/db.go @@ -0,0 +1,38 @@ +package webhook_dispatcher + +import ( + "context" + + "github.com/bb-consent/api/src/database" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/bson/primitive" + "go.mongodb.org/mongo-driver/mongo" +) + +func webhookCollection() *mongo.Collection { + return database.DB.Client.Database(database.DB.Name).Collection("webhooks") +} + +func webhookDeliveryCollection() *mongo.Collection { + return database.DB.Client.Database(database.DB.Name).Collection("webhookDeliveries") +} + +// GetWebhookByOrgID Gets a webhook by organisation ID and webhook ID +func GetWebhookByOrgID(webhookId, orgID string) (result Webhook, err error) { + + err = webhookCollection().FindOne(context.TODO(), bson.M{"_id": webhookId, "orgid": orgID}).Decode(&result) + + return result, err +} + +// AddWebhookDelivery Adds payload delivery details to database for a webhook event +func AddWebhookDelivery(webhookDelivery WebhookDelivery) (WebhookDelivery, error) { + + if webhookDelivery.ID == primitive.NilObjectID { + webhookDelivery.ID = primitive.NewObjectID() + } + + _, err := webhookDeliveryCollection().InsertOne(context.TODO(), &webhookDelivery) + + return webhookDelivery, err +} diff --git a/src/v2/webhook_dispatcher/webhookdispatcher.go b/src/v2/webhook_dispatcher/webhookdispatcher.go new file mode 100644 index 0000000..a9d2684 --- /dev/null +++ b/src/v2/webhook_dispatcher/webhookdispatcher.go @@ -0,0 +1,267 @@ +package webhook_dispatcher + +import ( + "bytes" + "crypto/hmac" + "crypto/sha256" + "crypto/tls" + "encoding/hex" + "encoding/json" + "fmt" + "io/ioutil" + "log" + "net/http" + "net/url" + "strconv" + "strings" + "time" + + "go.mongodb.org/mongo-driver/bson/primitive" +) + +// WebhookEvent Webhook event wrapper +type WebhookEvent struct { + DeliveryID string `json:"deliveryID"` // Webhook delivery ID + WebhookID string `json:"webhookID"` // Webhook endpoint ID + Timestamp string `json:"timestamp"` // UTC timestamp of webhook triggered data time + Data interface{} `json:"data"` // Event data attribute + Type string `json:"type"` // Event type for e.g. data.delete.initiated +} + +// Payload content type const +const ( + // Payload will be posted as json body + PayloadContentTypeJSON = 112 + + // Payload will be stringified and posted as form under `payload` key + PayloadContentTypeFormURLEncoded = 113 +) + +// PayloadContentTypes Available data format for payload to be posted to webhook +var PayloadContentTypes = map[int]string{ + PayloadContentTypeJSON: "application/json", + PayloadContentTypeFormURLEncoded: "application/x-www-form-urlencoded", +} + +// Delivery status const +const ( + DeliveryStatusCompleted = 212 + DeliveryStatusFailed = 213 +) + +// DeliveryStatus Indicating the payload delivery status to webhook +var DeliveryStatus = map[int]string{ + DeliveryStatusCompleted: "completed", + DeliveryStatusFailed: "failed", +} + +type Webhook struct { + ID string `json:"id" bson:"_id,omitempty"` // Webhook ID + OrganisationId string `json:"orgId" bson:"orgid"` // Organisation ID + PayloadURL string `json:"payloadUrl" valid:"required"` // Webhook payload URL + ContentType string `json:"contentType" valid:"required"` // Webhook payload content type for e.g application/json + SubscribedEvents []string `json:"subscribedEvents" valid:"required"` // Events subscribed for e.g. user.data.delete + Disabled bool `json:"disabled"` // Disabled or not + SecretKey string `json:"secretKey" valid:"required"` // For calculating SHA256 HMAC to verify data integrity and authenticity + SkipSSLVerification bool `json:"skipSslVerification"` // Skip SSL certificate verification or not (expiry is checked) + TimeStamp string `json:"timestamp" valid:"required"` // UTC timestamp + IsDeleted bool `json:"-"` +} + +// WebhookDelivery Details of payload delivery to webhook endpoint +type WebhookDelivery struct { + ID primitive.ObjectID `bson:"_id,omitempty"` // Webhook delivery ID + WebhookID string // Webhook ID + UserID string // ID of user who triggered the webhook event + WebhookEventType string // Webhook event type for e.g. data.delete.initiated + RequestHeaders map[string][]string // HTTP headers posted to webhook endpoint + RequestPayload interface{} // JSON payload posted to webhook endpoint + ResponseHeaders map[string][]string // HTTP response headers received from webhook endpoint + ResponseBody string // HTTP response body received from webhook endpoint in bytes + ResponseStatusCode int // HTTP response status code + ResponseStatusStr string // HTTP response status string + ExecutionStartTimeStamp string // UTC timestamp when webhook execution started + ExecutionEndTimeStamp string // UTC timestamp when webhook execution ended + Status string // Status of webhook delivery for e.g. failed or completed + StatusDescription string // Describe the status for e.g. Reason for failure +} + +func ProcessWebhooks(webhookEventType string, value []byte) { + // For recording execution times + var executionStartTimeStamp string + var executionEndTimeStamp string + + // For storing webhook payload delivery details to db + var webhookDelivery WebhookDelivery + + // Recording webhook processing start timestamp + executionStartTimeStamp = strconv.FormatInt(time.Now().UTC().Unix(), 10) + + // To store incoming webhook events + var webhookEvent WebhookEvent + + // Unmarshalling the incoming message value bytes to webhook event struct + err := json.Unmarshal([]byte(value), &webhookEvent) + if err != nil { + log.Printf("Invalid incoming webhook recieved !") + return + } + + // Webhook event data attribute + // Converting data attribute to appropriate webhook event struct + + webhookEventData, ok := webhookEvent.Data.(map[string]interface{}) + if !ok { + log.Printf("Invalid incoming webhook recieved !") + return + } + + // Quick fix + // Retrieving user and organisation ID from webhook data attribute + userID := webhookEventData["userID"].(string) + orgID := webhookEventData["organisationID"].(string) + + log.Printf("Processing webhook:%s triggered by user:%s of org:%s for event:%s", webhookEvent.WebhookID, userID, orgID, webhookEventType) + + // Instantiating webhook delivery + webhookDelivery = WebhookDelivery{ + ID: primitive.NewObjectID(), + WebhookID: webhookEvent.WebhookID, + UserID: userID, + WebhookEventType: webhookEventType, + ExecutionStartTimeStamp: executionStartTimeStamp, + } + + // Fetch webhook by ID + webhook, err := GetWebhookByOrgID(webhookEvent.WebhookID, orgID) + if err != nil { + log.Printf("Failed to fetch by webhook from db;Failed processing webhook:%s triggered by user:%s of org:%s for event:%s", webhookEvent.WebhookID, userID, orgID, webhookEventType) + return + } + + // Checking if the webhook is disabled or not + if webhook.Disabled { + log.Printf("Webhook is disabled;Failed processing webhook:%s triggered by user:%s of org:%s for event:%s", webhookEvent.WebhookID, userID, orgID, webhookEventType) + return + } + + // Getting the webhook secret key + secretKey := webhook.SecretKey + + // Updating webhook event payload with delivery ID + webhookEvent.DeliveryID = webhookDelivery.ID.Hex() + + // Constructing webhook payload bytes + requestPayload, _ := json.Marshal(&webhookEvent) + + // Current UTC timestamp + timestamp := strconv.FormatInt(time.Now().UTC().Unix(), 10) + + // Constructing SHA256 payload + sha256Payload := timestamp + "." + string(requestPayload) + + // Create a new HMAC by defining the hash type and the key (as byte array) + h := hmac.New(sha256.New, []byte(secretKey)) + + // Write Data to it + h.Write([]byte(sha256Payload)) + + // Get result and encode as hexadecimal string + sha := hex.EncodeToString(h.Sum(nil)) + + // Constructing HTTP request instance based payload content type + var req *http.Request + if webhook.ContentType == PayloadContentTypes[PayloadContentTypeFormURLEncoded] { + // x-www-form-urlencoded payload + data := url.Values{} + data.Set("payload", string(requestPayload)) + + req, _ = http.NewRequest("POST", webhook.PayloadURL, strings.NewReader(data.Encode())) + } else { + req, _ = http.NewRequest("POST", webhook.PayloadURL, bytes.NewBuffer(requestPayload)) + } + + // Adding HTTP headers + // If secret key is defined, then add X-IGrant-Signature header for checking data integrity and authenticity + if len(strings.TrimSpace(secretKey)) > 0 { + req.Header.Set("X-IGrant-Signature", fmt.Sprintf("t=%s,sig=%s", timestamp, sha)) + } + + req.Header.Set("Content-Type", webhook.ContentType) + req.Header.Set("User-Agent", "IGrant-Hookshot/1.0") + req.Header.Set("Accept", "*/*") + + // Skip SSL certificate verification or not + transCfg := &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: webhook.SkipSSLVerification}, + } + + client := &http.Client{Transport: transCfg} + resp, err := client.Do(req) + if err != nil { + log.Printf("HTTP POST request failed err:%v;Failed processing webhook:%s triggered by user:%s of org:%s for event:%s", err.Error(), webhookEvent.WebhookID, userID, orgID, webhookEventType) + + // Recording webhook processing end timestamp + executionEndTimeStamp = strconv.FormatInt(time.Now().UTC().Unix(), 10) + + // Recording webhook delivery details to db + webhookDelivery.RequestHeaders = req.Header + webhookDelivery.RequestPayload = webhookEvent + webhookDelivery.StatusDescription = fmt.Sprintf("Error performing HTTP POST for the webhook endpoint:%s", webhook.PayloadURL) + webhookDelivery.Status = DeliveryStatus[DeliveryStatusFailed] + webhookDelivery.ExecutionEndTimeStamp = executionEndTimeStamp + + _, err = AddWebhookDelivery(webhookDelivery) + if err != nil { + log.Printf("Failed to save webhook delivery details to db;Failed processing webhook:%s triggered by user:%s of org:%s for event:%s", webhookEvent.WebhookID, userID, orgID, webhookEventType) + return + } + + return + } + defer resp.Body.Close() + + respBodyBytes, err := ioutil.ReadAll(resp.Body) + if err != nil { + + // Recording webhook processing end timestamp + executionEndTimeStamp = strconv.FormatInt(time.Now().UTC().Unix(), 10) + + // Recording webhook delivery details to db + webhookDelivery.RequestHeaders = req.Header + webhookDelivery.RequestPayload = webhookEvent + webhookDelivery.ResponseHeaders = resp.Header + webhookDelivery.ResponseStatusCode = resp.StatusCode + webhookDelivery.ResponseStatusStr = resp.Status + webhookDelivery.ExecutionEndTimeStamp = executionEndTimeStamp + webhookDelivery.Status = DeliveryStatus[DeliveryStatusCompleted] + + _, err = AddWebhookDelivery(webhookDelivery) + if err != nil { + log.Printf("Failed to save webhook delivery details to db;Failed processing webhook:%s triggered by user:%s of org:%s for event:%s", webhookEvent.WebhookID, userID, orgID, webhookEventType) + return + } + + log.Printf("Failed to read webhook endpoint response;Failed processing webhook:%s triggered by user:%s of org:%s for event:%s", webhookEvent.WebhookID, userID, orgID, webhookEventType) + return + } + + // Recording webhook processing end timestamp + executionEndTimeStamp = strconv.FormatInt(time.Now().UTC().Unix(), 10) + + // Recording webhook delivery details to db + webhookDelivery.RequestHeaders = req.Header + webhookDelivery.RequestPayload = webhookEvent + webhookDelivery.ResponseHeaders = resp.Header + webhookDelivery.ResponseBody = string(respBodyBytes) + webhookDelivery.ResponseStatusCode = resp.StatusCode + webhookDelivery.ResponseStatusStr = resp.Status + webhookDelivery.ExecutionEndTimeStamp = executionEndTimeStamp + webhookDelivery.Status = DeliveryStatus[DeliveryStatusCompleted] + + _, err = AddWebhookDelivery(webhookDelivery) + if err != nil { + log.Printf("Failed to save webhook delivery details to db;Failed processing webhook:%s triggered by user:%s of org:%s for event:%s", webhookEvent.WebhookID, userID, orgID, webhookEventType) + return + } +}