Skip to content

Commit

Permalink
Merge pull request #77 from Transfa/76-create-adapter-for-redis-integ…
Browse files Browse the repository at this point in the history
…ration

♻️ refactoring for adapter
  • Loading branch information
koladev32 authored Jul 6, 2024
2 parents 65f68e4 + 6e166d7 commit 0f7e615
Show file tree
Hide file tree
Showing 12 changed files with 366 additions and 346 deletions.
7 changes: 6 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ Godeps/
.fleet/

# binary files
sendhooks/webhook
webhook

# sendhooks api
node_modules/
Expand All @@ -36,3 +36,8 @@ config.json

#sendhooks-engine-api
api/.env

.DS_Store

sendhooks
sendhooks/config.json
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

### Added

- Create Adapter for Redis Integration [#76](https://github.com/Transfa/sendhooks-engine/issues/76)

### Fixed

## [v0.3.3-beta] - 2024-06-01
Expand Down
50 changes: 50 additions & 0 deletions sendhooks/adapter/adapter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package adapter

import (
"context"
)

// WebhookPayload represents the structure of the data from Redis.
type WebhookPayload struct {
URL string `json:"url"`
WebhookID string `json:"webhookId"`
MessageID string `json:"messageId"`
Data map[string]interface{} `json:"data"`
SecretHash string `json:"secretHash"`
MetaData map[string]interface{} `json:"metaData"`
}

type WebhookDeliveryStatus struct {
WebhookID string `json:"webhook_id"`
Status string `json:"status"`
DeliveryError string `json:"delivery_error"`
URL string `json:"url"`
Created string `json:"created"`
Delivered string `json:"delivered"`
}

type RedisConfig struct {
RedisAddress string `json:"redisAddress"`
RedisPassword string `json:"redisPassword"`
RedisDb string `json:"redisDb"`
RedisSsl string `json:"redisSsl"`
RedisCaCert string `json:"redisCaCert"`
RedisClientCert string `json:"redisClientCert"`
RedisClientKey string `json:"redisClientKey"`
RedisStreamName string `json:"redisStreamName"`
RedisStreamStatusName string `json:"redisStreamStatusName"`
}

type Configuration struct {
Redis RedisConfig `json:"redis"`
SecretHashHeaderName string `json:"secretHashHeaderName"`
Broker string `json:"broker"`
}

// Adapter defines methods for interacting with different queue systems.
type Adapter interface {
Connect() error
SubscribeToQueue(ctx context.Context, queue chan<- WebhookPayload) error
ProcessWebhooks(ctx context.Context, queue chan WebhookPayload, queueAdapter Adapter)
PublishStatus(ctx context.Context, webhookID, url, created, delivered, status, deliveryError string) error
}
65 changes: 65 additions & 0 deletions sendhooks/adapter/adapter_manager/adapter_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package adapter_manager

import (
"encoding/json"
"log"
"os"
"sendhooks/adapter"
"sync"

redisadapter "sendhooks/adapter/redis_adapter"
)

var (
instance adapter.Adapter
once sync.Once
)

var (
config adapter.Configuration
)

// LoadConfiguration loads the configuration from a file
func LoadConfiguration(filename string) {
once.Do(func() {
file, err := os.Open(filename)
if err != nil {
log.Fatalf("Failed to open config file: %v", err)
}
defer file.Close()

decoder := json.NewDecoder(file)
err = decoder.Decode(&config)
if err != nil {
log.Fatalf("Failed to decode config file: %v", err)
}
})
}

// GetConfig returns the loaded configuration
func GetConfig() adapter.Configuration {
return config
}

// Initialize initializes the appropriate adapter based on the configuration.
func Initialize() {
once.Do(func() {
conf := GetConfig()
switch conf.Broker {
case "redis":
instance = redisadapter.NewRedisAdapter(conf)
default:
log.Fatalf("Unsupported broker type: %v", conf.Broker)
}

err := instance.Connect()
if err != nil {
log.Fatalf("Failed to connect to broker: %v", err)
}
})
}

// GetAdapter returns the singleton instance of the adapter.
func GetAdapter() adapter.Adapter {
return instance
}
185 changes: 185 additions & 0 deletions sendhooks/adapter/redis_adapter/redis_adapter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
package redisadapter

import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
"strconv"
"strings"
"time"

"sendhooks/adapter"
"sendhooks/logging"
worker "sendhooks/queue"
"sendhooks/utils"

"github.com/go-redis/redis/v8"
)

// RedisAdapter implements the Adapter interface for Redis.
type RedisAdapter struct {
client *redis.Client
config adapter.Configuration
queueName string
statusQueue string
lastID string
}

// NewRedisAdapter creates a new RedisAdapter instance.
func NewRedisAdapter(config adapter.Configuration) *RedisAdapter {
return &RedisAdapter{
config: config,
queueName: config.Redis.RedisStreamName,
statusQueue: config.Redis.RedisStreamStatusName,
lastID: "0",
}
}

// Connect initializes the Redis client and establishes a connection.
func (r *RedisAdapter) Connect() error {
redisAddress := r.config.Redis.RedisAddress
if redisAddress == "" {
redisAddress = "localhost:6379" // Default address
}

redisDB := r.config.Redis.RedisDb
if redisDB == "" {
redisDB = "0" // Default database
}

redisDBInt, _ := strconv.Atoi(redisDB)
redisPassword := r.config.Redis.RedisPassword

useSSL := strings.ToLower(r.config.Redis.RedisSsl) == "true"
var tlsConfig *tls.Config

if useSSL {
caCertPath := r.config.Redis.RedisCaCert
clientCertPath := r.config.Redis.RedisClientCert
clientKeyPath := r.config.Redis.RedisClientKey

var err error
tlsConfig, err = utils.CreateTLSConfig(caCertPath, clientCertPath, clientKeyPath)
if err != nil {
return err
}
}

r.client = redis.NewClient(&redis.Options{
Addr: redisAddress,
Password: redisPassword,
DB: redisDBInt,
TLSConfig: tlsConfig,
PoolSize: 50,
})

return nil
}

// SubscribeToQueue subscribes to the specified Redis queue and processes messages.
func (r *RedisAdapter) SubscribeToQueue(ctx context.Context, queue chan<- adapter.WebhookPayload) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
if err := r.processQueueMessages(ctx, queue); err != nil {
return err
}
}
}
}

// processQueueMessages retrieves, decodes, and dispatches messages from the Redis queue.
func (r *RedisAdapter) processQueueMessages(ctx context.Context, queue chan<- adapter.WebhookPayload) error {
messages, err := r.readMessagesFromQueue(ctx)
if err != nil {
return err
}

for _, payload := range messages {
select {
case queue <- payload:
_, delErr := r.client.XDel(ctx, r.queueName, payload.MessageID).Result()
if delErr != nil {
logging.WebhookLogger(logging.ErrorType, fmt.Errorf("failed to delete message %s: %v", payload.MessageID, delErr))
}
case <-ctx.Done():
return ctx.Err()
default:
logging.WebhookLogger(logging.WarningType, fmt.Errorf("dropped webhook due to channel overflow. Webhook ID: %s", payload.WebhookID))
}
}

return nil
}

// readMessagesFromQueue reads messages from the Redis queue.
func (r *RedisAdapter) readMessagesFromQueue(ctx context.Context) ([]adapter.WebhookPayload, error) {
entries, err := r.client.XRead(ctx, &redis.XReadArgs{
Streams: []string{r.queueName, r.lastID},
Count: 5,
}).Result()

if err != nil {
if err == redis.Nil {
time.Sleep(time.Second)
return r.readMessagesFromQueue(ctx)
}
return nil, err
}

var messages []adapter.WebhookPayload
for _, entry := range entries[0].Messages {
var payload adapter.WebhookPayload

if data, ok := entry.Values["data"].(string); ok {
if err := json.Unmarshal([]byte(data), &payload); err != nil {
logging.WebhookLogger(logging.ErrorType, fmt.Errorf("error unmarshalling message data: %w", err))
r.lastID = entry.ID
return nil, err
}
} else {
logging.WebhookLogger(logging.ErrorType, fmt.Errorf("expected string for 'data' field but got %T", entry.Values["data"]))
r.lastID = entry.ID
return nil, err
}

payload.MessageID = entry.ID
messages = append(messages, payload)
r.lastID = entry.ID
}

return messages, nil
}

// ProcessWebhooks processes webhooks from the specified queue.
func (r *RedisAdapter) ProcessWebhooks(ctx context.Context, queue chan adapter.WebhookPayload, queueAdapter adapter.Adapter) {

worker.ProcessWebhooks(ctx, queue, r.config, queueAdapter)
}

// PublishStatus publishes the status of a webhook delivery.
func (r *RedisAdapter) PublishStatus(ctx context.Context, webhookID, url, created, delivered, status, deliveryError string) error {
message := adapter.WebhookDeliveryStatus{
WebhookID: webhookID,
Status: status,
DeliveryError: deliveryError,
URL: url,
Created: created,
Delivered: delivered,
}

jsonString, err := json.Marshal(message)
if err != nil {
return err
}

_, err = r.client.XAdd(ctx, &redis.XAddArgs{
Stream: r.statusQueue,
Values: map[string]interface{}{"data": jsonString},
}).Result()

return err
}
Loading

0 comments on commit 0f7e615

Please sign in to comment.