Skip to content

Commit

Permalink
Publish to reindex task counts topic
Browse files Browse the repository at this point in the history
  • Loading branch information
RMPall committed Mar 21, 2023
1 parent 29f1c82 commit 289986d
Show file tree
Hide file tree
Showing 13 changed files with 346 additions and 70 deletions.
2 changes: 1 addition & 1 deletion cmd/producer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func main() {
os.Exit(1)
}

// Create Kafka Producer
// Create Kafka ContentUpdatedProducer
pConfig := &kafka.ProducerConfig{
BrokerAddrs: cfg.KafkaConfig.Brokers,
Topic: cfg.KafkaConfig.ContentUpdatedTopic,
Expand Down
54 changes: 30 additions & 24 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,26 @@ type Config struct {
ServiceAuthToken string `envconfig:"SERVICE_AUTH_TOKEN" json:"-"`
ZebedeeClientTimeout time.Duration `envconfig:"ZEBEDEE_CLIENT_TIMEOUT"`
EnablePublishContentUpdatedTopic bool `envconfig:"ENABLE_PUBLISH_CONTENT_UPDATED_TOPIC"`
EnableReindexTaskCounts bool `envconfig:"ENABLE_REINDEX_TASK_COUNTS"`
ZebedeeURL string `envconfig:"ZEBEDEE_URL"`
TaskNameValues string `envconfig:"TASK_NAME_VALUES"`
}

// KafkaConfig contains the config required to connect to Kafka
type KafkaConfig struct {
Brokers []string `envconfig:"KAFKA_ADDR"`
ContentUpdatedTopic string `envconfig:"KAFKA_CONTENT_UPDATED_TOPIC"`
ConsumerGroup string `envconfig:"KAFKA_CONSUMER_GROUP"`
NumWorkers int `envconfig:"KAFKA_NUM_WORKERS"`
OffsetOldest bool `envconfig:"KAFKA_OFFSET_OLDEST"`
ReindexRequestedTopic string `envconfig:"KAFKA_REINDEX_REQUESTED_TOPIC"`
SecProtocol string `envconfig:"KAFKA_SEC_PROTO"`
SecCACerts string `envconfig:"KAFKA_SEC_CA_CERTS"`
SecClientCert string `envconfig:"KAFKA_SEC_CLIENT_CERT"`
SecClientKey string `envconfig:"KAFKA_SEC_CLIENT_KEY" json:"-"`
SecSkipVerify bool `envconfig:"KAFKA_SEC_SKIP_VERIFY"`
Version string `envconfig:"KAFKA_VERSION"`
Brokers []string `envconfig:"KAFKA_ADDR"`
ContentUpdatedTopic string `envconfig:"KAFKA_CONTENT_UPDATED_TOPIC"`
ReindexTaskCountsTopic string `envconfig:"KAFKA_REINDEX_TASK_COUNTS_TOPIC"`
ConsumerGroup string `envconfig:"KAFKA_CONSUMER_GROUP"`
NumWorkers int `envconfig:"KAFKA_NUM_WORKERS"`
OffsetOldest bool `envconfig:"KAFKA_OFFSET_OLDEST"`
ReindexRequestedTopic string `envconfig:"KAFKA_REINDEX_REQUESTED_TOPIC"`
SecProtocol string `envconfig:"KAFKA_SEC_PROTO"`
SecCACerts string `envconfig:"KAFKA_SEC_CA_CERTS"`
SecClientCert string `envconfig:"KAFKA_SEC_CLIENT_CERT"`
SecClientKey string `envconfig:"KAFKA_SEC_CLIENT_KEY" json:"-"`
SecSkipVerify bool `envconfig:"KAFKA_SEC_SKIP_VERIFY"`
Version string `envconfig:"KAFKA_VERSION"`
}

var cfg *Config
Expand All @@ -51,26 +54,29 @@ func Get() (*Config, error) {
APIRouterURL: "http://localhost:23200/v1",
BindAddr: "localhost:28000",
EnablePublishContentUpdatedTopic: false,
EnableReindexTaskCounts: false,
GracefulShutdownTimeout: 5 * time.Second,
HealthCheckCriticalTimeout: 90 * time.Second,
HealthCheckInterval: 30 * time.Second,
KafkaConfig: KafkaConfig{
Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"},
ContentUpdatedTopic: "content-updated",
ConsumerGroup: "dp-search-data-finder",
NumWorkers: 1,
OffsetOldest: true,
ReindexRequestedTopic: "reindex-requested",
SecProtocol: "",
SecCACerts: "",
SecClientCert: "",
SecClientKey: "",
SecSkipVerify: false,
Version: "1.0.2",
Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"},
ContentUpdatedTopic: "content-updated",
ReindexTaskCountsTopic: "reindex-task-counts",
ConsumerGroup: "dp-search-data-finder",
NumWorkers: 1,
OffsetOldest: true,
ReindexRequestedTopic: "reindex-requested",
SecProtocol: "",
SecCACerts: "",
SecClientCert: "",
SecClientKey: "",
SecSkipVerify: false,
Version: "1.0.2",
},
ServiceAuthToken: "",
ZebedeeClientTimeout: 30 * time.Second,
ZebedeeURL: "http://localhost:8082",
TaskNameValues: "dataset-api,zebedee",
}

return cfg, envconfig.Process("", cfg)
Expand Down
24 changes: 24 additions & 0 deletions event/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,27 @@ func (p ContentUpdatedProducer) ContentUpdate(ctx context.Context, cfg *config.C
}
return nil
}

// ReindexTaskCountsProducer produces kafka messages for instances which have been successfully processed.
type ReindexTaskCountsProducer struct {
Marshaller Marshaller
Producer dpkafka.IProducer
}

// TaskCounts produce a kafka message for an instance which has been successfully processed.
func (p ReindexTaskCountsProducer) TaskCounts(ctx context.Context, cfg *config.Config, event models.ReindexTaskCounts) error {
if cfg.EnableReindexTaskCounts {
log.Info(ctx, "EnableReindexTaskCountsFlag Flag is enabled")
eventBytes, err := p.Marshaller.Marshal(event)
if err != nil {
log.Fatal(ctx, "failed to marshal event", err)
return err
}

p.Producer.Channels().Output <- eventBytes
log.Info(ctx, "event produced successfully", log.Data{"event": event, "package": "event.ContentUpdate"})
} else {
log.Info(ctx, "EnableReindexTaskCountsFlag Flag is disabled")
}
return nil
}
57 changes: 57 additions & 0 deletions event/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ const (
someDataType = "test-datatype"
someCollectionID = "test-collectionID"
someJobID = "test-Jobid"
someTask = "test-task"
testCount = "10"
someTraceID = "w34234dgdge335g3333"
someSearchIndex = "test-searchindex"
)
Expand All @@ -35,6 +37,13 @@ var (
TraceID: someTraceID,
SearchIndex: someSearchIndex,
}

expectedReindexTaskCountsEvent = models.ReindexTaskCounts{
JobID: someJobID,
Task: someTask,
ExtractionCompleted: false,
Count: testCount,
}
)

func TestProducer_ContentUpdated(t *testing.T) {
Expand Down Expand Up @@ -84,3 +93,51 @@ func TestProducer_ContentUpdated(t *testing.T) {
})
})
}

func TestProducer_ReindexTaskCounts(t *testing.T) {
Convey("Given ReindexTaskCounts has been configured correctly", t, func() {
pChannels := &dpkafka.ProducerChannels{
Output: make(chan []byte, 1),
}

kafkaProducerMock := &kafkatest.IProducerMock{
ChannelsFunc: func() *dpkafka.ProducerChannels {
return pChannels
},
}

marshallerMock := &mock.MarshallerMock{
MarshalFunc: func(s interface{}) ([]byte, error) {
return schema.ReindexTaskCounts.Marshal(s)
},
}

// event is message
reindexTaskCountsProducer := event.ReindexTaskCountsProducer{
Producer: kafkaProducerMock,
Marshaller: marshallerMock,
}

Convey("When ReindexTaskCounts is called on the event producer with EnablePublishReindexTaskCountsTopic enabled", func() {
err := reindexTaskCountsProducer.TaskCounts(ctx, &config.Config{EnableReindexTaskCounts: true}, expectedReindexTaskCountsEvent)
So(err, ShouldBeNil)

var avroBytes []byte
var testTimeout = time.Second * 5
select {
case avroBytes = <-pChannels.Output:
t.Log("avro byte sent to producer output")
case <-time.After(testTimeout):
t.Fatalf("failing test due to timing out after %v seconds", testTimeout)
t.FailNow()
}

Convey("Then the expected bytes are sent to producer.output", func() {
var actual models.ReindexTaskCounts
err = schema.ReindexTaskCounts.Unmarshal(avroBytes, &actual)
So(err, ShouldBeNil)
So(expectedReindexTaskCountsEvent, ShouldResemble, actual)
})
})
})
}
3 changes: 3 additions & 0 deletions features/steps/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ func NewSearchDataFinderComponent() (*Component, error) {
DoGetKafkaProducerFunc: func(ctx context.Context, config *config.Config) (kafkaConsumer kafka.IProducer, err error) {
return c.fakeKafkaProducer, nil
},
DoGetKafkaProducerForReindexTaskCountsFunc: func(ctx context.Context, config *config.Config) (kafkaConsumer kafka.IProducer, err error) {
return c.fakeKafkaProducer, nil
},
DoGetHealthCheckFunc: getHealthCheckOK,
DoGetHealthClientFunc: c.getHealthClientOK,
DoGetHTTPServerFunc: c.getHTTPServer,
Expand Down
86 changes: 71 additions & 15 deletions handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package handler

import (
"context"
"strconv"
"strings"
"sync"

"github.com/ONSdigital/dp-api-clients-go/v2/dataset"
Expand All @@ -18,6 +20,10 @@ var (
DefaultPaginationLimit = 500
)

const (
TotalTasksCount = 2
)

type DatasetEditionMetadata struct {
id string
editionID string
Expand All @@ -26,10 +32,16 @@ type DatasetEditionMetadata struct {

// ReindexRequestedHandler is the handler for reindex requested messages.
type ReindexRequestedHandler struct {
ZebedeeCli clients.ZebedeeClient
DatasetAPICli clients.DatasetAPIClient
Producer event.ContentUpdatedProducer
Config *config.Config
ZebedeeCli clients.ZebedeeClient
DatasetAPICli clients.DatasetAPIClient
ContentUpdatedProducer event.ContentUpdatedProducer
ReindexTaskCountsProducer event.ReindexTaskCountsProducer
Config *config.Config
}

type taskDetails struct {
Name string
count int
}

// Handle takes a single event.
Expand All @@ -41,22 +53,47 @@ func (h *ReindexRequestedHandler) Handle(ctx context.Context, reindexReqEvent *m
}
log.Info(ctx, "reindex requested event handler called", logData)

taskCounter := 1
tasks := make(chan taskDetails)
var extractionCompleted bool
var wg sync.WaitGroup
wg.Add(2)
wg.Add(3)
go func() {
defer wg.Done()
h.getAndSendZebedeeDocsURL(ctx, h.Config, h.ZebedeeCli, reindexReqEvent, tasks)
}()
go func() {
defer wg.Done()
h.getAndSendZebedeeDocsURL(ctx, h.Config, h.ZebedeeCli, reindexReqEvent)
h.getAndSendDatasetURLs(ctx, h.Config, h.DatasetAPICli, reindexReqEvent, tasks)
}()
go func() {
defer wg.Done()
h.getAndSendDatasetURLs(ctx, h.Config, h.DatasetAPICli, reindexReqEvent)
for task := range tasks {
if taskCounter == TotalTasksCount {
extractionCompleted = true
}
err := h.ReindexTaskCountsProducer.TaskCounts(ctx, h.Config, models.ReindexTaskCounts{
JobID: reindexReqEvent.JobID,
Task: task.Name,
Count: strconv.Itoa(task.count),
ExtractionCompleted: extractionCompleted,
})
if err != nil {
log.Error(ctx, "failed to publish task counts to reindex task counts topic due to", err, log.Data{"request_params": nil})
return
}
taskCounter++
if taskCounter > TotalTasksCount {
break
}
}
}()
wg.Wait()

log.Info(ctx, "event successfully handled", logData)
}

func (h *ReindexRequestedHandler) getAndSendZebedeeDocsURL(ctx context.Context, cfg *config.Config, zebedeeCli clients.ZebedeeClient, reindexReqEvent *models.ReindexRequested) {
func (h *ReindexRequestedHandler) getAndSendZebedeeDocsURL(ctx context.Context, cfg *config.Config, zebedeeCli clients.ZebedeeClient, reindexReqEvent *models.ReindexRequested, task chan taskDetails) {
log.Info(ctx, "extract and publish zebedee docs url")
publishedIndex, err := zebedeeCli.GetPublishedIndex(ctx, nil)
if err != nil {
Expand All @@ -81,7 +118,7 @@ func (h *ReindexRequestedHandler) getAndSendZebedeeDocsURL(ctx context.Context,
// it takes more than 10 mins to retrieve all document urls from zebedee
// TODO: remove (i < 10) condition when this app has been completely implemented
for i := 0; (i < 10) && (i < totalZebedeeDocs); i++ {
err := h.Producer.ContentUpdate(ctx, cfg, models.ContentUpdated{
err := h.ContentUpdatedProducer.ContentUpdate(ctx, cfg, models.ContentUpdated{
URI: publishedIndex.Items[i].URI,
JobID: reindexReqEvent.JobID,
TraceID: reindexReqEvent.TraceID,
Expand All @@ -95,19 +132,29 @@ func (h *ReindexRequestedHandler) getAndSendZebedeeDocsURL(ctx context.Context,

log.Info(ctx, "first 10 Zebedee docs URLs retrieved", log.Data{"first URLs": urlList})
log.Info(ctx, "total number of zebedee docs retrieved", log.Data{"total_documents": totalZebedeeDocs})
taskNames := strings.Split(cfg.TaskNameValues, ",")
task <- taskDetails{
Name: extractTaskName(taskNames, "zebedee"),
count: totalZebedeeDocs,
}
}

func (h *ReindexRequestedHandler) getAndSendDatasetURLs(ctx context.Context, cfg *config.Config, datasetAPICli clients.DatasetAPIClient, reindexReqEvent *models.ReindexRequested) {
func (h *ReindexRequestedHandler) getAndSendDatasetURLs(ctx context.Context, cfg *config.Config, datasetAPICli clients.DatasetAPIClient, reindexReqEvent *models.ReindexRequested, task chan taskDetails) {
log.Info(ctx, "extract and send dataset urls")
var wgDataset sync.WaitGroup
wgDataset.Add(4)
datasetChan := h.extractDatasets(ctx, &wgDataset, datasetAPICli, cfg.ServiceAuthToken)
editionChan := h.retrieveDatasetEditions(ctx, &wgDataset, datasetAPICli, datasetChan, cfg.ServiceAuthToken)
datasetURLChan := h.getAndSendDatasetURLsFromLatestMetadata(ctx, &wgDataset, datasetAPICli, editionChan, cfg.ServiceAuthToken)
h.logExtractedDatasetURLs(ctx, &wgDataset, cfg, datasetURLChan, reindexReqEvent)
urlCount := h.logExtractedDatasetURLs(ctx, &wgDataset, cfg, datasetURLChan, reindexReqEvent)
// TODO - logExtractedDatasetURLs is temporary and should be replaced in the future
wgDataset.Wait() // wait for the other go-routines to complete which extracts the dataset urls
log.Info(ctx, "successfully extracted all datasets")
taskNames := strings.Split(cfg.TaskNameValues, ",")
task <- taskDetails{
Name: extractTaskName(taskNames, "dataset"),
count: urlCount,
}
}

func (h *ReindexRequestedHandler) extractDatasets(ctx context.Context, wgDataset *sync.WaitGroup, datasetAPIClient clients.DatasetAPIClient, serviceAuthToken string) chan dataset.Dataset {
Expand Down Expand Up @@ -221,13 +268,13 @@ func (h *ReindexRequestedHandler) getAndSendDatasetURLsFromLatestMetadata(ctx co
// TODO - logExtractedDatasetURLs is temporary.
// The dataset url should be sent to the content-updated topic here in the future.
// But for the time being, we are going to extract the urls and print them
func (h *ReindexRequestedHandler) logExtractedDatasetURLs(ctx context.Context, wgDataset *sync.WaitGroup, cfg *config.Config, datasetURLChan chan string, reindexReqEvent *models.ReindexRequested) {
urlList := make([]string, 0)
func (h *ReindexRequestedHandler) logExtractedDatasetURLs(ctx context.Context, wgDataset *sync.WaitGroup, cfg *config.Config, datasetURLChan chan string, reindexReqEvent *models.ReindexRequested) int {
var urlListCount int
go func() {
defer wgDataset.Done()
for datasetURL := range datasetURLChan {
log.Info(ctx, "log extracted dataset urls")
err := h.Producer.ContentUpdate(ctx, cfg, models.ContentUpdated{
err := h.ContentUpdatedProducer.ContentUpdate(ctx, cfg, models.ContentUpdated{
URI: datasetURL,
JobID: reindexReqEvent.JobID,
TraceID: reindexReqEvent.TraceID,
Expand All @@ -237,8 +284,17 @@ func (h *ReindexRequestedHandler) logExtractedDatasetURLs(ctx context.Context, w
log.Error(ctx, "failed to publish datasets to content update topic", err)
return
}
urlListCount++
}
}()
return urlListCount
}

log.Info(ctx, "dataset docs URLs retrieved", log.Data{"urls": urlList})
func extractTaskName(taskNames []string, matchingName string) string {
for _, taskName := range taskNames {
if strings.Contains(taskName, matchingName) {
return taskName
}
}
return ""
}
8 changes: 8 additions & 0 deletions models/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,14 @@ type ContentUpdated struct {
SearchIndex string `avro:"search_index"`
}

// ReindexTaskCounts provides an avro structure for a Reindex task counts event
type ReindexTaskCounts struct {
JobID string `avro:"job_id"`
Task string `avro:"task"`
ExtractionCompleted bool `avro:"extraction_completed"`
Count string `avro:"count"`
}

// ReindexRequested provides an avro structure for a Reindex Requested event
type ReindexRequested struct {
JobID string `avro:"job_id"`
Expand Down
Loading

0 comments on commit 289986d

Please sign in to comment.