diff --git a/src/config/config.go b/src/config/config.go index ac96710..2819670 100644 --- a/src/config/config.go +++ b/src/config/config.go @@ -69,7 +69,7 @@ type KafkaConfig struct { // WebhooksConfig webhooks configuration (kafka broker cluster, topic e.t.c) type WebhooksConfig struct { - KafkaConfig KafkaConfig + Events []string `json:"events"` } // Organization organization data type diff --git a/src/kafkaUtils/kafka.go b/src/kafkaUtils/kafka.go deleted file mode 100644 index cf77f82..0000000 --- a/src/kafkaUtils/kafka.go +++ /dev/null @@ -1,28 +0,0 @@ -package kafkaUtils - -import ( - "github.com/bb-consent/api/src/config" - "github.com/confluentinc/confluent-kafka-go/kafka" -) - -type kafkaProducerClient struct { - Producer *kafka.Producer -} - -var KafkaProducerClient kafkaProducerClient - -// Init Initialises the kafka producer client -func Init(config *config.Configuration) error { - // Creating a high level apache kafka producer instance - // https://github.com/edenhill/librdkafka/tree/master/CONFIGURATION.md - producer, err := kafka.NewProducer(&kafka.ConfigMap{ - "bootstrap.servers": config.Webhooks.KafkaConfig.Broker.URL, - }) - KafkaProducerClient = kafkaProducerClient{Producer: producer} - - if err != nil { - return err - } - - return nil -} diff --git a/src/main/main.go b/src/main/main.go index 8ad5b43..f60be1d 100644 --- a/src/main/main.go +++ b/src/main/main.go @@ -20,6 +20,7 @@ import ( "github.com/bb-consent/api/src/v2/middleware" "github.com/bb-consent/api/src/v2/sms" v2token "github.com/bb-consent/api/src/v2/token" + v2wh "github.com/bb-consent/api/src/v2/webhook" "github.com/bb-consent/api/src/webhooks" "github.com/casbin/casbin/v2" "github.com/gorilla/mux" @@ -49,6 +50,7 @@ func startAPICmdHandlerfunc(cmd *cobra.Command, args []string) { // Webhooks webhooks.Init(loadedConfig) + v2wh.Init(loadedConfig) log.Println("Webhooks configuration initialized") // IAM diff --git a/src/v1/handler/webhooks_handler.go b/src/v1/handler/webhooks_handler.go index ef7d938..4b8ee19 100644 --- a/src/v1/handler/webhooks_handler.go +++ b/src/v1/handler/webhooks_handler.go @@ -748,7 +748,7 @@ func ReDeliverWebhook(w http.ResponseWriter, r *http.Request) { } // Converting the webhook payload to bytes - webhookPayloadBytes, err := json.Marshal(webhookDelivery.RequestPayload) + _, err = json.Marshal(webhookDelivery.RequestPayload) if err != nil { m := fmt.Sprintf("Failed to convert webhook event data to bytes, error:%v; Failed to redeliver payload for webhook for event:<%s>, user:<%s>, org:<%s>", err.Error(), webhookDelivery.WebhookEventType, webhookDelivery.UserID, organizationID) common.HandleError(w, http.StatusInternalServerError, m, err) @@ -763,7 +763,7 @@ func ReDeliverWebhook(w http.ResponseWriter, r *http.Request) { return } - go wh.PushWebhookEventToKafkaTopic(webhookDelivery.WebhookEventType, webhookPayloadBytes, wh.WebhooksConfiguration.KafkaConfig.Topic) + // go wh.PushWebhookEventToKafkaTopic(webhookDelivery.WebhookEventType, webhookPayloadBytes, wh.WebhooksConfiguration.KafkaConfig.Topic) // Log webhook calls in webhooks category aLog := fmt.Sprintf("Organization webhook: %v triggered by user: %v by event: %v", webhook.PayloadURL, u.Email, webhookDelivery.WebhookEventType) diff --git a/src/v2/handler/webhook/config_list_webhook_event_types.go b/src/v2/handler/webhook/config_list_webhook_event_types.go index 700f61e..781c218 100644 --- a/src/v2/handler/webhook/config_list_webhook_event_types.go +++ b/src/v2/handler/webhook/config_list_webhook_event_types.go @@ -10,18 +10,17 @@ import ( // WebhookEventTypesResp Define response structure for webhook event types type WebhookEventTypesResp struct { - EventTypes []string + EventTypes []string `json:"eventTypes"` } // ConfigListWebhookEventTypes List available webhook event types func ConfigListWebhookEventTypes(w http.ResponseWriter, r *http.Request) { - var webhookEventTypesResp WebhookEventTypesResp - for _, eventType := range wh.EventTypes { - webhookEventTypesResp.EventTypes = append(webhookEventTypesResp.EventTypes, eventType) + resp := WebhookEventTypesResp{ + wh.WebhooksConfiguration.Events, } - response, _ := json.Marshal(webhookEventTypesResp) + response, _ := json.Marshal(resp) w.Header().Set(config.ContentTypeHeader, config.ContentTypeJSON) w.WriteHeader(http.StatusOK) w.Write(response) diff --git a/src/v2/handler/webhook/config_redeliver_webhook_payload_by_deliveryid.go b/src/v2/handler/webhook/config_redeliver_webhook_payload_by_deliveryid.go index d24abe1..2b02958 100644 --- a/src/v2/handler/webhook/config_redeliver_webhook_payload_by_deliveryid.go +++ b/src/v2/handler/webhook/config_redeliver_webhook_payload_by_deliveryid.go @@ -10,6 +10,7 @@ import ( "github.com/bb-consent/api/src/user" "github.com/bb-consent/api/src/v2/actionlog" wh "github.com/bb-consent/api/src/v2/webhook" + "github.com/bb-consent/api/src/v2/webhook_dispatcher" "github.com/gorilla/mux" ) @@ -61,7 +62,7 @@ func ConfigRedeliverWebhookPayloadByDeliveryID(w http.ResponseWriter, r *http.Re return } - go wh.PushWebhookEventToKafkaTopic(webhookDelivery.WebhookEventType, webhookPayloadBytes, wh.WebhooksConfiguration.KafkaConfig.Topic) + go webhook_dispatcher.ProcessWebhooks(webhookDelivery.WebhookEventType, webhookPayloadBytes) // Log webhook calls in webhooks category aLog := fmt.Sprintf("Organization webhook: %v triggered by user: %v by event: %v", webhook.PayloadURL, u.Email, webhookDelivery.WebhookEventType) diff --git a/src/v2/webhook/webhook.go b/src/v2/webhook/webhook.go index dba5817..663bfaf 100644 --- a/src/v2/webhook/webhook.go +++ b/src/v2/webhook/webhook.go @@ -10,11 +10,9 @@ import ( "time" "github.com/bb-consent/api/src/config" - "github.com/bb-consent/api/src/kafkaUtils" "github.com/bb-consent/api/src/user" "github.com/bb-consent/api/src/v2/actionlog" "github.com/bb-consent/api/src/v2/webhook_dispatcher" - "github.com/confluentinc/confluent-kafka-go/kafka" ) // Event type const @@ -175,36 +173,6 @@ func (e DataUpdateRequestWebhookEvent) GetUserID() string { return e.UserID } -func PushWebhookEventToKafkaTopic(webhookEventType string, webhookPayload []byte, kafkaTopicName string) error { - - // Creating a delivery report channel - deliveryChan := make(chan kafka.Event) - - // Kafka producer emits messages to producer channel queue and then librdkafka queue, so double queuing - // Pushing the webhook event payload to given topic - err := kafkaUtils.KafkaProducerClient.Producer.Produce(&kafka.Message{ - TopicPartition: kafka.TopicPartition{Topic: &kafkaTopicName, Partition: kafka.PartitionAny}, - Key: []byte(webhookEventType), // webhook event type ID - Value: webhookPayload, - }, deliveryChan) - if err != nil { - return err - } - - e := <-deliveryChan - m := e.(*kafka.Message) - - // If the message was not delivered successfully to the kafka topic - if m.TopicPartition.Error != nil { - return m.TopicPartition.Error - } - - close(deliveryChan) - - return nil - -} - // PingWebhook Pings webhook payload URL to check the status func PingWebhook(webhook Webhook) (req *http.Request, resp *http.Response, executionStartTimeStamp string, executionEndTimeStamp string, err error) { executionStartTimeStamp = strconv.FormatInt(time.Now().UTC().Unix(), 10) @@ -272,13 +240,6 @@ func TriggerWebhooks(webhookEventData WebhookEventData, webhookEventType string) return } - // Push the webhook event payload to the kafka topic - err = PushWebhookEventToKafkaTopic(webhookEventType, b, WebhooksConfiguration.KafkaConfig.Topic) - if err != nil { - log.Printf("Failed to push the webhook event to kafka, error:%v, Failed to trigger webhook for event:<%s>, user:<%s>, org:<%s>", err.Error(), webhookEventType, u.ID.Hex(), webhookEventData.GetOrganisationID()) - return - } - go webhook_dispatcher.ProcessWebhooks(webhookEventType, b) // Log webhook calls in webhooks category diff --git a/src/webhookdispatcher/webhookdispatcher.go b/src/webhookdispatcher/webhookdispatcher.go index 49ec355..0b34025 100644 --- a/src/webhookdispatcher/webhookdispatcher.go +++ b/src/webhookdispatcher/webhookdispatcher.go @@ -1,23 +1,6 @@ package webhookdispatcher import ( - "bytes" - "crypto/hmac" - "crypto/sha256" - "crypto/tls" - "encoding/hex" - "encoding/json" - "fmt" - "io/ioutil" - "log" - "net/http" - "net/url" - "strconv" - "strings" - "time" - - "github.com/bb-consent/api/src/config" - "github.com/confluentinc/confluent-kafka-go/kafka" "go.mongodb.org/mongo-driver/bson/primitive" ) @@ -87,227 +70,3 @@ type WebhookDelivery struct { Status string // Status of webhook delivery for e.g. failed or completed StatusDescription string // Describe the status for e.g. Reason for failure } - -func WebhookDispatcherInit(webhookConfig *config.Configuration) { - - // Creating a kafka consumer instance - // https://github.com/edenhill/librdkafka/tree/master/CONFIGURATION.md - c, err := kafka.NewConsumer(&kafka.ConfigMap{ - "bootstrap.servers": webhookConfig.Webhooks.KafkaConfig.Broker.URL, - "group.id": webhookConfig.Webhooks.KafkaConfig.Broker.GroupID, // - "auto.offset.reset": "earliest", - "enable.auto.commit": true, - }) - - if err != nil { - panic(err) - } - - // Subscribing to kafka topics - err = c.SubscribeTopics([]string{webhookConfig.Webhooks.KafkaConfig.Topic}, nil) - if err != nil { - log.Printf("Failed to subscribe to kafka topic:%s; Error: %v", webhookConfig.Webhooks.KafkaConfig.Topic, err) - panic(err) - } - - for { - msg, err := c.ReadMessage(-1) - - if err == nil { - - // Processing webhooks asynchronously - go func() { - - // For recording execution times - var executionStartTimeStamp string - var executionEndTimeStamp string - - // For storing webhook payload delivery details to db - var webhookDelivery WebhookDelivery - - // Recording webhook processing start timestamp - executionStartTimeStamp = strconv.FormatInt(time.Now().UTC().Unix(), 10) - - // To store incoming webhook events - var webhookEvent WebhookEvent - - // To store incoming webhook event type - var webhookEventType string - - // Unmarshalling the incoming message value bytes to webhook event struct - err := json.Unmarshal([]byte(msg.Value), &webhookEvent) - if err != nil { - log.Printf("Invalid incoming webhook recieved !") - return - } - - // Webhook event type - webhookEventType = string(msg.Key) - - // Webhook event data attribute - // Converting data attribute to appropriate webhook event struct - - webhookEventData, ok := webhookEvent.Data.(map[string]interface{}) - if !ok { - log.Printf("Invalid incoming webhook recieved !") - return - } - - // Quick fix - // Retrieving user and organisation ID from webhook data attribute - userID := webhookEventData["userID"].(string) - orgID := webhookEventData["organisationID"].(string) - - log.Printf("Processing webhook:%s triggered by user:%s of org:%s for event:%s", webhookEvent.WebhookID, userID, orgID, webhookEventType) - - // Instantiating webhook delivery - webhookDelivery = WebhookDelivery{ - ID: primitive.NewObjectID(), - WebhookID: webhookEvent.WebhookID, - UserID: userID, - WebhookEventType: webhookEventType, - ExecutionStartTimeStamp: executionStartTimeStamp, - } - - // Fetch webhook by ID - webhook, err := GetWebhookByOrgID(webhookEvent.WebhookID, orgID) - if err != nil { - log.Printf("Failed to fetch by webhook from db;Failed processing webhook:%s triggered by user:%s of org:%s for event:%s", webhookEvent.WebhookID, userID, orgID, webhookEventType) - return - } - - // Checking if the webhook is disabled or not - if webhook.Disabled { - log.Printf("Webhook is disabled;Failed processing webhook:%s triggered by user:%s of org:%s for event:%s", webhookEvent.WebhookID, userID, orgID, webhookEventType) - return - } - - // Getting the webhook secret key - secretKey := webhook.SecretKey - - // Updating webhook event payload with delivery ID - webhookEvent.DeliveryID = webhookDelivery.ID.Hex() - - // Constructing webhook payload bytes - requestPayload, _ := json.Marshal(&webhookEvent) - - // Current UTC timestamp - timestamp := strconv.FormatInt(time.Now().UTC().Unix(), 10) - - // Constructing SHA256 payload - sha256Payload := timestamp + "." + string(requestPayload) - - // Create a new HMAC by defining the hash type and the key (as byte array) - h := hmac.New(sha256.New, []byte(secretKey)) - - // Write Data to it - h.Write([]byte(sha256Payload)) - - // Get result and encode as hexadecimal string - sha := hex.EncodeToString(h.Sum(nil)) - - // Constructing HTTP request instance based payload content type - var req *http.Request - if webhook.ContentType == PayloadContentTypes[PayloadContentTypeFormURLEncoded] { - // x-www-form-urlencoded payload - data := url.Values{} - data.Set("payload", string(requestPayload)) - - req, _ = http.NewRequest("POST", webhook.PayloadURL, strings.NewReader(data.Encode())) - } else { - req, _ = http.NewRequest("POST", webhook.PayloadURL, bytes.NewBuffer(requestPayload)) - } - - // Adding HTTP headers - // If secret key is defined, then add X-IGrant-Signature header for checking data integrity and authenticity - if len(strings.TrimSpace(secretKey)) > 0 { - req.Header.Set("X-IGrant-Signature", fmt.Sprintf("t=%s,sig=%s", timestamp, sha)) - } - - req.Header.Set("Content-Type", webhook.ContentType) - req.Header.Set("User-Agent", "IGrant-Hookshot/1.0") - req.Header.Set("Accept", "*/*") - - // Skip SSL certificate verification or not - transCfg := &http.Transport{ - TLSClientConfig: &tls.Config{InsecureSkipVerify: webhook.SkipSSLVerification}, - } - - client := &http.Client{Transport: transCfg} - resp, err := client.Do(req) - if err != nil { - log.Printf("HTTP POST request failed err:%v;Failed processing webhook:%s triggered by user:%s of org:%s for event:%s", err.Error(), webhookEvent.WebhookID, userID, orgID, webhookEventType) - - // Recording webhook processing end timestamp - executionEndTimeStamp = strconv.FormatInt(time.Now().UTC().Unix(), 10) - - // Recording webhook delivery details to db - webhookDelivery.RequestHeaders = req.Header - webhookDelivery.RequestPayload = webhookEvent - webhookDelivery.StatusDescription = fmt.Sprintf("Error performing HTTP POST for the webhook endpoint:%s", webhook.PayloadURL) - webhookDelivery.Status = DeliveryStatus[DeliveryStatusFailed] - webhookDelivery.ExecutionEndTimeStamp = executionEndTimeStamp - - _, err = AddWebhookDelivery(webhookDelivery) - if err != nil { - log.Printf("Failed to save webhook delivery details to db;Failed processing webhook:%s triggered by user:%s of org:%s for event:%s", webhookEvent.WebhookID, userID, orgID, webhookEventType) - return - } - - return - } - defer resp.Body.Close() - - respBodyBytes, err := ioutil.ReadAll(resp.Body) - if err != nil { - - // Recording webhook processing end timestamp - executionEndTimeStamp = strconv.FormatInt(time.Now().UTC().Unix(), 10) - - // Recording webhook delivery details to db - webhookDelivery.RequestHeaders = req.Header - webhookDelivery.RequestPayload = webhookEvent - webhookDelivery.ResponseHeaders = resp.Header - webhookDelivery.ResponseStatusCode = resp.StatusCode - webhookDelivery.ResponseStatusStr = resp.Status - webhookDelivery.ExecutionEndTimeStamp = executionEndTimeStamp - webhookDelivery.Status = DeliveryStatus[DeliveryStatusCompleted] - - _, err = AddWebhookDelivery(webhookDelivery) - if err != nil { - log.Printf("Failed to save webhook delivery details to db;Failed processing webhook:%s triggered by user:%s of org:%s for event:%s", webhookEvent.WebhookID, userID, orgID, webhookEventType) - return - } - - log.Printf("Failed to read webhook endpoint response;Failed processing webhook:%s triggered by user:%s of org:%s for event:%s", webhookEvent.WebhookID, userID, orgID, webhookEventType) - return - } - - // Recording webhook processing end timestamp - executionEndTimeStamp = strconv.FormatInt(time.Now().UTC().Unix(), 10) - - // Recording webhook delivery details to db - webhookDelivery.RequestHeaders = req.Header - webhookDelivery.RequestPayload = webhookEvent - webhookDelivery.ResponseHeaders = resp.Header - webhookDelivery.ResponseBody = string(respBodyBytes) - webhookDelivery.ResponseStatusCode = resp.StatusCode - webhookDelivery.ResponseStatusStr = resp.Status - webhookDelivery.ExecutionEndTimeStamp = executionEndTimeStamp - webhookDelivery.Status = DeliveryStatus[DeliveryStatusCompleted] - - _, err = AddWebhookDelivery(webhookDelivery) - if err != nil { - log.Printf("Failed to save webhook delivery details to db;Failed processing webhook:%s triggered by user:%s of org:%s for event:%s", webhookEvent.WebhookID, userID, orgID, webhookEventType) - return - } - }() - - } else { - // The client will automatically try to recover from all errors. - fmt.Printf("Webhook dispatcher(Kafka consumer) error: %v (%v)\n", err, msg) - } - } - - c.Close() -} diff --git a/src/webhooks/webhooks.go b/src/webhooks/webhooks.go index e5c407d..ce25924 100644 --- a/src/webhooks/webhooks.go +++ b/src/webhooks/webhooks.go @@ -11,9 +11,7 @@ import ( "github.com/bb-consent/api/src/actionlog" "github.com/bb-consent/api/src/config" - "github.com/bb-consent/api/src/kafkaUtils" "github.com/bb-consent/api/src/user" - "github.com/confluentinc/confluent-kafka-go/kafka" ) // Event type const @@ -174,36 +172,6 @@ func (e DataUpdateRequestWebhookEvent) GetUserID() string { return e.UserID } -func PushWebhookEventToKafkaTopic(webhookEventType string, webhookPayload []byte, kafkaTopicName string) error { - - // Creating a delivery report channel - deliveryChan := make(chan kafka.Event) - - // Kafka producer emits messages to producer channel queue and then librdkafka queue, so double queuing - // Pushing the webhook event payload to given topic - err := kafkaUtils.KafkaProducerClient.Producer.Produce(&kafka.Message{ - TopicPartition: kafka.TopicPartition{Topic: &kafkaTopicName, Partition: kafka.PartitionAny}, - Key: []byte(webhookEventType), // webhook event type ID - Value: webhookPayload, - }, deliveryChan) - if err != nil { - return err - } - - e := <-deliveryChan - m := e.(*kafka.Message) - - // If the message was not delivered successfully to the kafka topic - if m.TopicPartition.Error != nil { - return m.TopicPartition.Error - } - - close(deliveryChan) - - return nil - -} - // PingWebhook Pings webhook payload URL to check the status func PingWebhook(webhook Webhook) (req *http.Request, resp *http.Response, executionStartTimeStamp string, executionEndTimeStamp string, err error) { executionStartTimeStamp = strconv.FormatInt(time.Now().UTC().Unix(), 10) @@ -265,18 +233,18 @@ func TriggerWebhooks(webhookEventData WebhookEventData, webhookEventType string) } // Converting the webhook event data to bytes - b, err := json.Marshal(we) + _, err := json.Marshal(we) if err != nil { log.Printf("Failed to convert webhook event data to bytes, error:%v, Failed to trigger webhook for event:<%s>, user:<%s>, org:<%s>", err.Error(), webhookEventType, u.ID.Hex(), webhookEventData.GetOrganisationID()) return } // Push the webhook event payload to the kafka topic - err = PushWebhookEventToKafkaTopic(webhookEventType, b, WebhooksConfiguration.KafkaConfig.Topic) - if err != nil { - log.Printf("Failed to push the webhook event to kafka, error:%v, Failed to trigger webhook for event:<%s>, user:<%s>, org:<%s>", err.Error(), webhookEventType, u.ID.Hex(), webhookEventData.GetOrganisationID()) - return - } + // err = PushWebhookEventToKafkaTopic(webhookEventType, b, WebhooksConfiguration.KafkaConfig.Topic) + // if err != nil { + // log.Printf("Failed to push the webhook event to kafka, error:%v, Failed to trigger webhook for event:<%s>, user:<%s>, org:<%s>", err.Error(), webhookEventType, u.ID.Hex(), webhookEventData.GetOrganisationID()) + // return + // } // Log webhook calls in webhooks category aLog := fmt.Sprintf("Organization webhook: %v triggered by user: %v by event: %v", toBeProcessedWebhook.PayloadURL, u.Email, webhookEventType)