diff --git a/event/producer_test.go b/event/producer_test.go index 5eafed9..9a27175 100644 --- a/event/producer_test.go +++ b/event/producer_test.go @@ -21,7 +21,7 @@ const ( someCollectionID = "test-collectionID" someJobID = "test-Jobid" someTask = "test-task" - testCount = "10" + testCount = 10 someTraceID = "w34234dgdge335g3333" someSearchIndex = "test-searchindex" ) @@ -129,7 +129,6 @@ func TestProducer_ReindexTaskCounts(t *testing.T) { t.Log("avro byte sent to producer output") case <-time.After(testTimeout): t.Fatalf("failing test due to timing out after %v seconds", testTimeout) - t.FailNow() } Convey("Then the expected bytes are sent to producer.output", func() { diff --git a/features/healthcheck.feature b/features/healthcheck.feature index f68e092..f2e8346 100644 --- a/features/healthcheck.feature +++ b/features/healthcheck.feature @@ -28,6 +28,18 @@ Feature: Healthcheck endpoint should inform the health of service "status":"OK", "status_code":200, "message":"OK" + }, + { + "name":"Kafka content updated producer", + "status":"OK", + "status_code":200, + "message":"OK" + }, + { + "name":"Kafka reindex task counts producer", + "status":"OK", + "status_code":200, + "message":"OK" } ] } diff --git a/handler/handler.go b/handler/handler.go index b51e1cf..5282b04 100644 --- a/handler/handler.go +++ b/handler/handler.go @@ -2,7 +2,6 @@ package handler import ( "context" - "strconv" "strings" "sync" @@ -75,7 +74,7 @@ func (h *ReindexRequestedHandler) Handle(ctx context.Context, reindexReqEvent *m err := h.ReindexTaskCountsProducer.TaskCounts(ctx, h.Config, models.ReindexTaskCounts{ JobID: reindexReqEvent.JobID, Task: task.Name, - Count: strconv.Itoa(task.count), + Count: int32(task.count), ExtractionCompleted: extractionCompleted, }) if err != nil { diff --git a/models/event.go b/models/event.go index 442bee2..9749790 100644 --- a/models/event.go +++ b/models/event.go @@ -15,7 +15,7 @@ type ReindexTaskCounts struct { JobID string `avro:"job_id"` Task string `avro:"task"` ExtractionCompleted bool `avro:"extraction_completed"` - Count string `avro:"count"` + Count int32 `avro:"count"` } // ReindexRequested provides an avro structure for a Reindex Requested event diff --git a/schema/schema.go b/schema/schema.go index fab71e6..ae1a1e8 100644 --- a/schema/schema.go +++ b/schema/schema.go @@ -29,7 +29,7 @@ var reindexTaskCounts = `{ {"name": "job_id", "type": "string", "default": ""}, {"name": "task", "type": "string", "default": ""}, {"name": "extraction_completed", "type": "boolean", "default": false}, - {"name": "count", "type": "string", "default":"0"} + {"name": "count", "type": "int"} ] }` diff --git a/service/service.go b/service/service.go index ad3f8b9..9ffb0c8 100644 --- a/service/service.go +++ b/service/service.go @@ -95,7 +95,7 @@ func Run(ctx context.Context, cfg *config.Config, serviceList *ExternalServiceLi return nil, err } - if err := registerCheckers(ctx, hc, consumer, routerHealthClient); err != nil { + if err := registerCheckers(ctx, hc, consumer, producer, producerForReindexTaskCounts, routerHealthClient); err != nil { return nil, errors.Wrap(err, "unable to register checkers") } @@ -183,7 +183,7 @@ func (svc *Service) Close(ctx context.Context) error { return nil } -func registerCheckers(ctx context.Context, hc HealthChecker, consumer kafka.IConsumerGroup, routerHealthClient *health.Client) error { +func registerCheckers(ctx context.Context, hc HealthChecker, consumer kafka.IConsumerGroup, contentUpdatedProducer, reindexTaskCounts kafka.IProducer, routerHealthClient *health.Client) error { hasErrors := false if err := hc.AddCheck("API router", routerHealthClient.Checker); err != nil { @@ -194,6 +194,14 @@ func registerCheckers(ctx context.Context, hc HealthChecker, consumer kafka.ICon hasErrors = true log.Error(ctx, "error adding check for kafka", err) } + if err := hc.AddCheck("Kafka content updated producer", contentUpdatedProducer.Checker); err != nil { + hasErrors = true + log.Error(ctx, "error adding check for kafka", err) + } + if err := hc.AddCheck("Kafka reindex task counts producer", reindexTaskCounts.Checker); err != nil { + hasErrors = true + log.Error(ctx, "error adding check for kafka", err) + } if hasErrors { return errors.New("Error(s) registering checkers for healthcheck") } diff --git a/service/service_test.go b/service/service_test.go index 0987e79..f9236c0 100644 --- a/service/service_test.go +++ b/service/service_test.go @@ -181,9 +181,11 @@ func TestRun(t *testing.T) { }) Convey("The checkers are registered and the healthcheck and http server started", func() { - So(len(hcMock.AddCheckCalls()), ShouldEqual, 2) + So(len(hcMock.AddCheckCalls()), ShouldEqual, 4) So(hcMock.AddCheckCalls()[0].Name, ShouldResemble, "API router") So(hcMock.AddCheckCalls()[1].Name, ShouldResemble, "Kafka consumer") + So(hcMock.AddCheckCalls()[2].Name, ShouldResemble, "Kafka content updated producer") + So(hcMock.AddCheckCalls()[3].Name, ShouldResemble, "Kafka reindex task counts producer") So(len(initMock.DoGetHTTPServerCalls()), ShouldEqual, 1) So(initMock.DoGetHTTPServerCalls()[0].BindAddr, ShouldEqual, "localhost:28000") So(len(hcMock.StartCalls()), ShouldEqual, 1) @@ -222,9 +224,11 @@ func TestRun(t *testing.T) { So(svcList.KafkaConsumer, ShouldBeTrue) So(svcList.ZebedeeCli, ShouldBeTrue) So(svcList.DatasetAPICli, ShouldBeTrue) - So(len(hcMockAddFail.AddCheckCalls()), ShouldEqual, 2) + So(len(hcMockAddFail.AddCheckCalls()), ShouldEqual, 4) So(hcMockAddFail.AddCheckCalls()[0].Name, ShouldResemble, "API router") So(hcMockAddFail.AddCheckCalls()[1].Name, ShouldResemble, "Kafka consumer") + So(hcMockAddFail.AddCheckCalls()[2].Name, ShouldResemble, "Kafka content updated producer") + So(hcMockAddFail.AddCheckCalls()[3].Name, ShouldResemble, "Kafka reindex task counts producer") }) }) })