Skip to content

Commit

Permalink
Add #183 Event lists in webhook should be configured
Browse files Browse the repository at this point in the history
  • Loading branch information
albinpa authored and georgepadayatti committed Oct 24, 2023
1 parent d610eb9 commit efc1a67
Show file tree
Hide file tree
Showing 9 changed files with 17 additions and 355 deletions.
2 changes: 1 addition & 1 deletion src/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ type KafkaConfig struct {

// WebhooksConfig webhooks configuration (kafka broker cluster, topic e.t.c)
type WebhooksConfig struct {
KafkaConfig KafkaConfig
Events []string `json:"events"`
}

// Organization organization data type
Expand Down
28 changes: 0 additions & 28 deletions src/kafkaUtils/kafka.go

This file was deleted.

2 changes: 2 additions & 0 deletions src/main/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/bb-consent/api/src/v2/middleware"
"github.com/bb-consent/api/src/v2/sms"
v2token "github.com/bb-consent/api/src/v2/token"
v2wh "github.com/bb-consent/api/src/v2/webhook"
"github.com/bb-consent/api/src/webhooks"
"github.com/casbin/casbin/v2"
"github.com/gorilla/mux"
Expand Down Expand Up @@ -49,6 +50,7 @@ func startAPICmdHandlerfunc(cmd *cobra.Command, args []string) {

// Webhooks
webhooks.Init(loadedConfig)
v2wh.Init(loadedConfig)
log.Println("Webhooks configuration initialized")

// IAM
Expand Down
4 changes: 2 additions & 2 deletions src/v1/handler/webhooks_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -748,7 +748,7 @@ func ReDeliverWebhook(w http.ResponseWriter, r *http.Request) {
}

// Converting the webhook payload to bytes
webhookPayloadBytes, err := json.Marshal(webhookDelivery.RequestPayload)
_, err = json.Marshal(webhookDelivery.RequestPayload)
if err != nil {
m := fmt.Sprintf("Failed to convert webhook event data to bytes, error:%v; Failed to redeliver payload for webhook for event:<%s>, user:<%s>, org:<%s>", err.Error(), webhookDelivery.WebhookEventType, webhookDelivery.UserID, organizationID)
common.HandleError(w, http.StatusInternalServerError, m, err)
Expand All @@ -763,7 +763,7 @@ func ReDeliverWebhook(w http.ResponseWriter, r *http.Request) {
return
}

go wh.PushWebhookEventToKafkaTopic(webhookDelivery.WebhookEventType, webhookPayloadBytes, wh.WebhooksConfiguration.KafkaConfig.Topic)
// go wh.PushWebhookEventToKafkaTopic(webhookDelivery.WebhookEventType, webhookPayloadBytes, wh.WebhooksConfiguration.KafkaConfig.Topic)

// Log webhook calls in webhooks category
aLog := fmt.Sprintf("Organization webhook: %v triggered by user: %v by event: %v", webhook.PayloadURL, u.Email, webhookDelivery.WebhookEventType)
Expand Down
9 changes: 4 additions & 5 deletions src/v2/handler/webhook/config_list_webhook_event_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,17 @@ import (

// WebhookEventTypesResp Define response structure for webhook event types
type WebhookEventTypesResp struct {
EventTypes []string
EventTypes []string `json:"eventTypes"`
}

// ConfigListWebhookEventTypes List available webhook event types
func ConfigListWebhookEventTypes(w http.ResponseWriter, r *http.Request) {
var webhookEventTypesResp WebhookEventTypesResp

for _, eventType := range wh.EventTypes {
webhookEventTypesResp.EventTypes = append(webhookEventTypesResp.EventTypes, eventType)
resp := WebhookEventTypesResp{
wh.WebhooksConfiguration.Events,
}

response, _ := json.Marshal(webhookEventTypesResp)
response, _ := json.Marshal(resp)
w.Header().Set(config.ContentTypeHeader, config.ContentTypeJSON)
w.WriteHeader(http.StatusOK)
w.Write(response)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/bb-consent/api/src/user"
"github.com/bb-consent/api/src/v2/actionlog"
wh "github.com/bb-consent/api/src/v2/webhook"
"github.com/bb-consent/api/src/v2/webhook_dispatcher"
"github.com/gorilla/mux"
)

Expand Down Expand Up @@ -61,7 +62,7 @@ func ConfigRedeliverWebhookPayloadByDeliveryID(w http.ResponseWriter, r *http.Re
return
}

go wh.PushWebhookEventToKafkaTopic(webhookDelivery.WebhookEventType, webhookPayloadBytes, wh.WebhooksConfiguration.KafkaConfig.Topic)
go webhook_dispatcher.ProcessWebhooks(webhookDelivery.WebhookEventType, webhookPayloadBytes)

// Log webhook calls in webhooks category
aLog := fmt.Sprintf("Organization webhook: %v triggered by user: %v by event: %v", webhook.PayloadURL, u.Email, webhookDelivery.WebhookEventType)
Expand Down
39 changes: 0 additions & 39 deletions src/v2/webhook/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,9 @@ import (
"time"

"github.com/bb-consent/api/src/config"
"github.com/bb-consent/api/src/kafkaUtils"
"github.com/bb-consent/api/src/user"
"github.com/bb-consent/api/src/v2/actionlog"
"github.com/bb-consent/api/src/v2/webhook_dispatcher"
"github.com/confluentinc/confluent-kafka-go/kafka"
)

// Event type const
Expand Down Expand Up @@ -175,36 +173,6 @@ func (e DataUpdateRequestWebhookEvent) GetUserID() string {
return e.UserID
}

func PushWebhookEventToKafkaTopic(webhookEventType string, webhookPayload []byte, kafkaTopicName string) error {

// Creating a delivery report channel
deliveryChan := make(chan kafka.Event)

// Kafka producer emits messages to producer channel queue and then librdkafka queue, so double queuing
// Pushing the webhook event payload to given topic
err := kafkaUtils.KafkaProducerClient.Producer.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &kafkaTopicName, Partition: kafka.PartitionAny},
Key: []byte(webhookEventType), // webhook event type ID
Value: webhookPayload,
}, deliveryChan)
if err != nil {
return err
}

e := <-deliveryChan
m := e.(*kafka.Message)

// If the message was not delivered successfully to the kafka topic
if m.TopicPartition.Error != nil {
return m.TopicPartition.Error
}

close(deliveryChan)

return nil

}

// PingWebhook Pings webhook payload URL to check the status
func PingWebhook(webhook Webhook) (req *http.Request, resp *http.Response, executionStartTimeStamp string, executionEndTimeStamp string, err error) {
executionStartTimeStamp = strconv.FormatInt(time.Now().UTC().Unix(), 10)
Expand Down Expand Up @@ -272,13 +240,6 @@ func TriggerWebhooks(webhookEventData WebhookEventData, webhookEventType string)
return
}

// Push the webhook event payload to the kafka topic
err = PushWebhookEventToKafkaTopic(webhookEventType, b, WebhooksConfiguration.KafkaConfig.Topic)
if err != nil {
log.Printf("Failed to push the webhook event to kafka, error:%v, Failed to trigger webhook for event:<%s>, user:<%s>, org:<%s>", err.Error(), webhookEventType, u.ID.Hex(), webhookEventData.GetOrganisationID())
return
}

go webhook_dispatcher.ProcessWebhooks(webhookEventType, b)

// Log webhook calls in webhooks category
Expand Down
Loading

0 comments on commit efc1a67

Please sign in to comment.