Skip to content

Commit

Permalink
Fix issues with telemetry
Browse files Browse the repository at this point in the history
  • Loading branch information
colmsnowplow committed Aug 12, 2022
1 parent b5dad9d commit ae608a2
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 14 deletions.
6 changes: 4 additions & 2 deletions cmd/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func RunCli(supportedSourceConfigPairs []sourceconfig.ConfigPair) {
}
o.Start()

telemetry.InitTelemetryWithCollector(cfg)
stopTelemetry := telemetry.InitTelemetryWithCollector(cfg)

// Handle SIGTERM
sig := make(chan os.Signal)
Expand All @@ -126,7 +126,8 @@ func RunCli(supportedSourceConfigPairs []sourceconfig.ConfigPair) {
select {
case <-stop:
log.Debug("source.Stop() finished successfully!")


stopTelemetry()
err := common.DeleteTemporaryDir()
if err != nil {
log.Debugf(`error deleting tmp directory: %v`, err)
Expand All @@ -137,6 +138,7 @@ func RunCli(supportedSourceConfigPairs []sourceconfig.ConfigPair) {
t.Close()
ft.Close()
o.Stop()
stopTelemetry()

err := common.DeleteTemporaryDir()
if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type configurationData struct {
LogLevel string `hcl:"log_level,optional" env:"LOG_LEVEL"`
GoogleServiceAccountB64 string `hcl:"google_application_credentials_b64,optional" env:"GOOGLE_APPLICATION_CREDENTIALS_B64"`
UserProvidedID string `hcl:"user_provided_id,optional" env:"USER_PROVIDED_ID"`
EnableTelemetry bool `hcl:"enable_telemetry,optional" env:"ENABLE_TELEMETRY"`
DisableTelemetry bool `hcl:"disable_telemetry,optional" env:"DISABLE_TELEMETRY"`
}

// component is a type to abstract over configuration blocks.
Expand Down Expand Up @@ -98,9 +98,9 @@ func defaultConfigData() *configurationData {
TimeoutSec: 1,
BufferSec: 15,
},
Transformations: nil,
LogLevel: "info",
EnableTelemetry: true,
Transformations: nil,
LogLevel: "info",
DisableTelemetry: false,
}
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/telemetry/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package telemetry

import (
"time"

"github.com/snowplow-devops/stream-replicator/cmd"
)

var (
Expand All @@ -11,5 +13,5 @@ var (
url = "telemetry-g.snowplowanalytics.com"
port = "443"
applicationName = "stream-replicator"
applicationVersion = "1.0.0"
applicationVersion = cmd.AppVersion
)
28 changes: 21 additions & 7 deletions pkg/telemetry/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (

// config holds the configuration for telemetry
type config struct {
enable bool
disable bool
interval time.Duration
method string
url string
Expand All @@ -27,7 +27,7 @@ type config struct {

func newTelemetryWithConfig(cfg *conf.Config) *config {
return &config{
enable: cfg.Data.EnableTelemetry,
disable: cfg.Data.DisableTelemetry,
interval: interval,
method: method,
protocol: protocol,
Expand All @@ -40,7 +40,7 @@ func newTelemetryWithConfig(cfg *conf.Config) *config {
}
}

func initTelemetry(telemetry *config) {
func initTelemetry(telemetry *config) func() {
storage := gt.InitStorageMemory()
emitter := gt.InitEmitter(
gt.RequireCollectorUri(fmt.Sprintf(`%s:%s`, telemetry.url, telemetry.port)),
Expand Down Expand Up @@ -76,13 +76,24 @@ func initTelemetry(telemetry *config) {

ticker := time.NewTicker(telemetry.interval)

stop := make(chan struct{})

go func() {
makeAndTrackHeartbeat(telemetry, tracker)
for {
<-ticker.C
makeAndTrackHeartbeat(telemetry, tracker)
select {
case <-ticker.C:
makeAndTrackHeartbeat(telemetry, tracker)
case <-stop:
return
}

}
}()

return func() {
close(stop)
}
}

func makeAndTrackHeartbeat(telemetry *config, tracker *gt.Tracker) {
Expand All @@ -99,9 +110,12 @@ func makeAndTrackHeartbeat(telemetry *config, tracker *gt.Tracker) {
}

// InitTelemetryWithCollector initialises telemetry
func InitTelemetryWithCollector(cfg *conf.Config) {
func InitTelemetryWithCollector(cfg *conf.Config) func() {
telemetry := newTelemetryWithConfig(cfg)
initTelemetry(telemetry)
if telemetry.disable {
return func() {}
}
return initTelemetry(telemetry)
}

func makeHeartbeatEvent(service config) *gt.SelfDescribingJson {
Expand Down

0 comments on commit ae608a2

Please sign in to comment.