Skip to content

Commit

Permalink
Merge branch 'release/0.3.0'
Browse files Browse the repository at this point in the history
  • Loading branch information
justinpjose committed Sep 28, 2022
2 parents 2506edd + 49c27d7 commit 2364f5e
Show file tree
Hide file tree
Showing 37 changed files with 1,708 additions and 520 deletions.
2 changes: 1 addition & 1 deletion ci/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ image_resource:
type: docker-image
source:
repository: golang
tag: 1.18.4
tag: 1.19.1

inputs:
- name: dp-search-reindex-tracker
Expand Down
2 changes: 1 addition & 1 deletion ci/component.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ image_resource:
type: docker-image
source:
repository: golang
tag: 1.18.4
tag: 1.19.1

inputs:
- name: dp-search-reindex-tracker
Expand Down
2 changes: 1 addition & 1 deletion ci/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ image_resource:
type: docker-image
source:
repository: golang
tag: 1.18.4
tag: 1.19.1

inputs:
- name: dp-search-reindex-tracker
Expand Down
2 changes: 1 addition & 1 deletion ci/unit.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ image_resource:
type: docker-image
source:
repository: golang
tag: 1.18.4
tag: 1.19.1

inputs:
- name: dp-search-reindex-tracker
Expand Down
28 changes: 16 additions & 12 deletions cmd/producer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
kafka "github.com/ONSdigital/dp-kafka/v3"
"github.com/ONSdigital/dp-search-reindex-tracker/config"
"github.com/ONSdigital/dp-search-reindex-tracker/event"
"github.com/ONSdigital/dp-search-reindex-tracker/schema"
"github.com/ONSdigital/log.go/v2/log"
)

Expand All @@ -28,7 +27,10 @@ func main() {
}

pConfig := &kafka.ProducerConfig{
KafkaVersion: &cfg.KafkaConfig.Version,
KafkaVersion: &cfg.KafkaConfig.Version,
BrokerAddrs: cfg.KafkaConfig.Brokers,
Topic: cfg.KafkaConfig.ReindexRequestedTopic,
MinBrokersHealthy: &cfg.KafkaConfig.NumWorkers,
}
if cfg.KafkaConfig.SecProtocol == config.KafkaTLSProtocolFlag {
pConfig.SecurityConfig = kafka.GetSecurityConfig(
Expand All @@ -51,11 +53,11 @@ func main() {
scanner := bufio.NewScanner(os.Stdin)
for {
e := scanEvent(scanner)
log.Info(ctx, "sending hello-called event", log.Data{"helloCalledEvent": e})
log.Info(ctx, "sending reindex-requested event", log.Data{"reindexRequestedEvent": e})

bytes, err := schema.HelloCalledEvent.Marshal(e)
bytes, err := event.ReindexRequestedSchema.Marshal(e)
if err != nil {
log.Fatal(ctx, "hello-called event error", err)
log.Fatal(ctx, "reindex-requested event error", err)
os.Exit(1)
}

Expand All @@ -65,16 +67,18 @@ func main() {
}
}

// scanEvent creates a HelloCalled event according to the user input
func scanEvent(scanner *bufio.Scanner) *event.HelloCalled {
fmt.Println("--- [Send Kafka HelloCalled] ---")
// scanEvent creates a ReindexRequested event according to the user input
func scanEvent(scanner *bufio.Scanner) *event.ReindexRequestedModel {
fmt.Println("--- [Send Kafka ReindexRequested] ---")

fmt.Println("Please type the recipient name")
fmt.Println("Please type the job id")
fmt.Printf("$ ")
scanner.Scan()
name := scanner.Text()
jobID := scanner.Text()

return &event.HelloCalled{
RecipientName: name,
return &event.ReindexRequestedModel{
JobID: jobID,
SearchIndex: "test",
TraceID: "trace1234",
}
}
6 changes: 4 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@ const KafkaTLSProtocolFlag = "TLS"

// Config represents service configuration for dp-search-reindex-tracker
type Config struct {
APIRouterURL string `envconfig:"API_ROUTER_URL"`
BindAddr string `envconfig:"BIND_ADDR"`
GracefulShutdownTimeout time.Duration `envconfig:"GRACEFUL_SHUTDOWN_TIMEOUT"`
HealthCheckInterval time.Duration `envconfig:"HEALTHCHECK_INTERVAL"`
HealthCheckCriticalTimeout time.Duration `envconfig:"HEALTHCHECK_CRITICAL_TIMEOUT"`
OutputFilePath string `envconfig:"OUTPUT_FILE_PATH"`
ServiceAuthToken string `envconfig:"SERVICE_AUTH_TOKEN" json:"-"`
KafkaConfig KafkaConfig
}

Expand Down Expand Up @@ -47,11 +48,12 @@ func Get() (*Config, error) {
}

cfg = &Config{
APIRouterURL: "http://localhost:23200/v1",
BindAddr: "localhost:28500",
GracefulShutdownTimeout: 5 * time.Second,
HealthCheckInterval: 30 * time.Second,
HealthCheckCriticalTimeout: 90 * time.Second,
OutputFilePath: "/tmp/helloworld.txt",
ServiceAuthToken: "",
KafkaConfig: KafkaConfig{
Brokers: []string{"localhost:9092"},
NumWorkers: 1,
Expand Down
2 changes: 2 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ func TestConfig(t *testing.T) {
})

Convey("Then the values should be set to the expected defaults", func() {
So(cfg.APIRouterURL, ShouldEqual, "http://localhost:23200/v1")
So(cfg.BindAddr, ShouldEqual, "localhost:28500")
So(cfg.GracefulShutdownTimeout, ShouldEqual, 5*time.Second)
So(cfg.HealthCheckInterval, ShouldEqual, 30*time.Second)
Expand All @@ -35,6 +36,7 @@ func TestConfig(t *testing.T) {
So(cfg.KafkaConfig.SecProtocol, ShouldEqual, "")
So(cfg.KafkaConfig.SecSkipVerify, ShouldBeFalse)
So(cfg.KafkaConfig.Version, ShouldEqual, "1.0.2")
So(cfg.ServiceAuthToken, ShouldEqual, "")
})

Convey("Then a second call to config should return the same config", func() {
Expand Down
78 changes: 21 additions & 57 deletions event/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,79 +2,43 @@ package event

import (
"context"
"errors"

kafka "github.com/ONSdigital/dp-kafka/v3"
"github.com/ONSdigital/dp-search-reindex-tracker/config"
"github.com/ONSdigital/dp-search-reindex-tracker/schema"
"github.com/ONSdigital/log.go/v2/log"
)

//go:generate moq -out mock/handler.go -pkg mock . Handler

// TODO: remove or replace hello called logic with app specific
// Handler represents a handler for processing a single event.
type Handler interface {
Handle(ctx context.Context, cfg *config.Config, helloCalled *HelloCalled) error
}

// Consume converts messages to event instances, and pass the event to the provided handler.
func Consume(ctx context.Context, messageConsumer kafka.IConsumerGroup, handler Handler, cfg *config.Config) {

// consume loop, to be executed by each worker
var consume = func(workerID int) {
for {
select {
case message, ok := <-messageConsumer.Channels().Upstream:
if !ok {
log.Info(ctx, "closing event consumer loop because upstream channel is closed", log.Data{"worker_id": workerID})
return
}
messageCtx := context.Background()
processMessage(messageCtx, message, handler, cfg)
message.Release()
case <-messageConsumer.Channels().Closer:
log.Info(ctx, "closing event consumer loop because closer channel is closed", log.Data{"worker_id": workerID})
return
}
}
// ProcessMessage unmarshals the provided kafka message into an event and calls the handler.
// Handling the commit of the message is done by the dp-kafka library which will commit the message on success or failure.
func ProcessMessage[M KafkaAvroModel](ctx context.Context, cfg *config.Config, topicEvent *KafkaConsumerEvent[M], message kafka.Message) error {
if topicEvent == nil {
err := errors.New("provided topicEvent is nil")
log.Error(ctx, "failed to validate topicEvent", err)
return err
}

// workers to consume messages in parallel
for w := 1; w <= cfg.KafkaConfig.NumWorkers; w++ {
go consume(w)
}
}

// processMessage unmarshals the provided kafka message into an event and calls the handler.
// After the message is handled, it is committed.
func processMessage(ctx context.Context, message kafka.Message, handler Handler, cfg *config.Config) {

// unmarshal - commit on failure (consuming the message again would result in the same error)
event, err := unmarshal(message)
// unmarshal message
event, err := unmarshal[M](topicEvent.Schema, message)
if err != nil {
log.Error(ctx, "failed to unmarshal event", err)
message.Commit()
return
logData := log.Data{
"schema": topicEvent.Schema,
"message": message,
}
log.Error(ctx, "failed to unmarshal event", err, logData)
return err
}

log.Info(ctx, "event received", log.Data{"event": event})

// handle - commit on failure (implement error handling to not commit if message needs to be consumed again)
err = handler.Handle(ctx, cfg, event)
// handle event
err = topicEvent.Handler.Handle(ctx, cfg, event)
if err != nil {
log.Error(ctx, "failed to handle event", err)
message.Commit()
return
return err
}

log.Info(ctx, "event processed - committing message", log.Data{"event": event})
message.Commit()
log.Info(ctx, "message committed", log.Data{"event": event})
}
log.Info(ctx, "message processed")

// unmarshal converts a event instance to []byte.
func unmarshal(message kafka.Message) (*HelloCalled, error) {
var event HelloCalled
err := schema.HelloCalledEvent.Unmarshal(message.GetData(), &event)
return &event, err
return nil
}
Loading

0 comments on commit 2364f5e

Please sign in to comment.