Skip to content

Commit

Permalink
♻️ refactoring for adapter
Browse files Browse the repository at this point in the history
  • Loading branch information
koladev32 committed Jul 6, 2024
1 parent 65f68e4 commit 35e0999
Show file tree
Hide file tree
Showing 9 changed files with 267 additions and 121 deletions.
Binary file added .DS_Store
Binary file not shown.
41 changes: 41 additions & 0 deletions sendhooks/adapter/adapter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package adapter

import (
"context"
)

// WebhookPayload represents the structure of the data from Redis.
type WebhookPayload struct {
URL string `json:"url"`
WebhookID string `json:"webhookId"`
MessageID string `json:"messageId"`
Data map[string]interface{} `json:"data"`
SecretHash string `json:"secretHash"`
MetaData map[string]interface{} `json:"metaData"`
}

type RedisConfig struct {
RedisAddress string `json:"redisAddress"`
RedisPassword string `json:"redisPassword"`
RedisDb string `json:"redisDb"`
RedisSsl string `json:"redisSsl"`
RedisCaCert string `json:"redisCaCert"`
RedisClientCert string `json:"redisClientCert"`
RedisClientKey string `json:"redisClientKey"`
RedisStreamName string `json:"redisStreamName"`
RedisStreamStatusName string `json:"redisStreamStatusName"`
}

type Configuration struct {
Redis RedisConfig `json:"redis"`
SecretHashHeaderName string `json:"secretHashHeaderName"`
Broker string `json:"broker"`
}

// Adapter defines methods for interacting with different queue systems.
type Adapter interface {
Connect() error
SubscribeToQueue(ctx context.Context, queue chan<- WebhookPayload) error
ProcessWebhooks(ctx context.Context, queue <-chan WebhookPayload) error
PublishStatus(ctx context.Context, webhookID, url, created, delivered, status, deliveryError string) error
}
65 changes: 65 additions & 0 deletions sendhooks/adapter/adapter_manager/adapter_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package adapter_manager

import (
"encoding/json"
"log"
"os"
"sendhooks/adapter"
"sync"

redisadapter "sendhooks/adapter/redis_adapter"
)

var (
instance adapter.Adapter
once sync.Once
)

var (
config adapter.Configuration
)

// LoadConfiguration loads the configuration from a file
func LoadConfiguration(filename string) {
once.Do(func() {
file, err := os.Open(filename)
if err != nil {
log.Fatalf("Failed to open config file: %v", err)
}
defer file.Close()

decoder := json.NewDecoder(file)
err = decoder.Decode(&config)
if err != nil {
log.Fatalf("Failed to decode config file: %v", err)
}
})
}

// GetConfig returns the loaded configuration
func GetConfig() adapter.Configuration {
return config
}

// Initialize initializes the appropriate adapter based on the configuration.
func Initialize() {
once.Do(func() {
conf := GetConfig()
switch conf.Broker {
case "redis":
instance = redisadapter.NewRedisAdapter(conf)
default:
log.Fatalf("Unsupported broker type: %v", conf.Broker)
}

err := instance.Connect()
if err != nil {
log.Fatalf("Failed to connect to broker: %v", err)
}
})
}

// GetAdapter returns the singleton instance of the adapter.
func GetAdapter() adapter.Adapter {
return instance
}
105 changes: 105 additions & 0 deletions sendhooks/adapter/redis_adapter/redis_adapter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package redisadapter

import (
"context"
"crypto/tls"
"strconv"
"strings"

"sendhooks/adapter"
"sendhooks/utils"

"github.com/go-redis/redis/v8"
)

type RedisAdapter struct {
client *redis.Client
config adapter.Configuration
streamName string
statusStream string
}

// PublishStatus implements adapter.Adapter.
func (r *RedisAdapter) PublishStatus(ctx context.Context, webhookID string, url string, created string, delivered string, status string, deliveryError string) error {
panic("unimplemented")
}

// SubscribeToQueue implements adapter.Adapter.
func (r *RedisAdapter) SubscribeToQueue(ctx context.Context, queue chan<- adapter.WebhookPayload) error {
panic("unimplemented")
}

func NewRedisAdapter(config adapter.Configuration) *RedisAdapter {
return &RedisAdapter{
config: config,
streamName: config.Redis.RedisStreamName,
statusStream: config.Redis.RedisStreamStatusName,
}
}

func (r *RedisAdapter) Connect() error {
redisAddress := r.config.Redis.RedisAddress
if redisAddress == "" {
redisAddress = "localhost:6379" // Default address
}

redisDB := r.config.Redis.RedisDb
if redisDB == "" {
redisDB = "0" // Default database
}

redisDBInt, _ := strconv.Atoi(redisDB)
redisPassword := r.config.Redis.RedisPassword

useSSL := strings.ToLower(r.config.Redis.RedisSsl) == "true"
var tlsConfig *tls.Config

if useSSL {
caCertPath := r.config.Redis.RedisCaCert
clientCertPath := r.config.Redis.RedisClientCert
clientKeyPath := r.config.Redis.RedisClientKey

var err error
tlsConfig, err = utils.CreateTLSConfig(caCertPath, clientCertPath, clientKeyPath)
if err != nil {
return err
}
}

r.client = redis.NewClient(&redis.Options{
Addr: redisAddress,
Password: redisPassword,
DB: redisDBInt,
TLSConfig: tlsConfig,
})

return nil
}

func (r *RedisAdapter) SubscribeToStream(ctx context.Context, streamName string, queue chan<- adapter.WebhookPayload) error {
stream := streamName
if stream == "" {
stream = r.streamName
}
// Implement subscription logic here
// Example:
// for {
// streams, err := r.client.XRead(&redis.XReadArgs{
// Streams: []string{stream, "0"},
// Count: 1,
// Block: 0,
// }).Result()
// if err != nil {
// return err
// }
// for _, message := range streams[0].Messages {
// queue <- adapter.WebhookPayload{ID: message.ID, Payload: message.Values["data"].(string)}
// }
// }
return nil
}

func (r *RedisAdapter) ProcessWebhooks(ctx context.Context, queue <-chan adapter.WebhookPayload) error {
// Implement webhook processing logic here
return nil
}
120 changes: 20 additions & 100 deletions sendhooks/main.go
Original file line number Diff line number Diff line change
@@ -1,85 +1,46 @@
// main.go
package main

import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
"log"
"os"
"strconv"
"strings"

"sendhooks/logging"
"sendhooks/queue"

redisClient "sendhooks/redis"
redistlsconfig "sendhooks/utils"
"sendhooks/adapter"
"sendhooks/adapter/adapter_manager"

"github.com/go-redis/redis/v8"
redisadapter "sendhooks/adapter/redis_adapter"
"sendhooks/logging"
)

// Config Declare a variable 'config' of type 'redisClient.Configuration'.
var Config redisClient.Configuration

// LoadConfiguration is a function that takes a filename as a string and returns an error.
// It's used to load and parse a configuration file for the Redis client.
func LoadConfiguration(filename string) error {
func main() {
adapter_manager.LoadConfiguration("config.json")

// Open the file specified by 'filename'. If an error occurs (e.g., file not found),
file, err := os.Open(filename)
if err != nil {
return err
}
// Defer the closing of the file until the end of the function's execution.
// This ensures the file is closed once the function finishes, even if an error occurs.
defer func(file *os.File) {
err := file.Close()
if err != nil {
// Currently, this error is ignored.
}
}(file)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Create a JSON decoder that reads from the opened file.
decoder := json.NewDecoder(file)
// Use the decoder to decode the JSON content into the 'config' variable.
// If an error occurs during decoding (e.g., JSON format issue), return the error.
err = decoder.Decode(&Config)
err := logging.WebhookLogger(logging.EventType, "starting sendhooks engine")
if err != nil {
return err
log.Fatalf("Failed to log sendhooks event: %v", err)
}

// If everything is successful, return nil indicating no error occurred.
return nil
}
var queueAdapter adapter.Adapter
conf := adapter_manager.GetConfig()

func main() {
// Load configuration file
err := LoadConfiguration("config.json")
if err != nil {
log.Printf("%v. anyway, using default configuration to the project\n", err)
}
// Create a context
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err = logging.WebhookLogger(logging.EventType, "starting sendhooks engine")
if err != nil {
log.Fatalf("Failed to log sendhooks event: %v", err)
if conf.Broker == "redis" {
queueAdapter = redisadapter.NewRedisAdapter(conf)
}

client, err := createRedisClient()
err = queueAdapter.Connect()
if err != nil {
log.Fatalf("Failed to create Redis client: %v", err)
log.Fatalf("Failed to connect to Redis: %v", err)
}

// Create a channel to act as the queue
webhookQueue := make(chan redisClient.WebhookPayload, 100) // Buffer size 100
webhookQueue := make(chan adapter.WebhookPayload, 100)

go queue.ProcessWebhooks(ctx, webhookQueue, client, Config)

// Subscribe to the "hooks" Redis stream
err = redisClient.SubscribeToStream(ctx, client, webhookQueue, Config)
go queueAdapter.ProcessWebhooks(ctx, webhookQueue)

err = queueAdapter.SubscribeToQueue(ctx, webhookQueue)
if err != nil {
logging.WebhookLogger(logging.ErrorType, fmt.Errorf("error initializing connection: %s", err))
log.Fatalf("error initializing connection: %v", err)
Expand All @@ -88,44 +49,3 @@ func main() {

select {}
}

func createRedisClient() (*redis.Client, error) {
redisAddress := Config.RedisAddress
if redisAddress == "" {
redisAddress = "localhost:6379" // Default address
}

redisDB := Config.RedisDb
if redisDB == "" {
redisDB = "0" // Default database
}

redisDBInt, _ := strconv.Atoi(redisDB)

redisPassword := Config.RedisPassword

// SSL/TLS configuration
useSSL := strings.ToLower(Config.RedisSsl) == "true"
var tlsConfig *tls.Config

if useSSL {
caCertPath := Config.RedisCaCert
clientCertPath := Config.RedisClientCert
clientKeyPath := Config.RedisClientKey

var err error
tlsConfig, err = redistlsconfig.CreateTLSConfig(caCertPath, clientCertPath, clientKeyPath)
if err != nil {
return nil, err
}
}

client := redis.NewClient(&redis.Options{
Addr: redisAddress,
Password: redisPassword,
DB: redisDBInt,
TLSConfig: tlsConfig,
})

return client, nil
}
Loading

0 comments on commit 35e0999

Please sign in to comment.