From 21e88d53abf9279104a8f1a42cdf721bab2e0b07 Mon Sep 17 00:00:00 2001 From: jbeemster Date: Sun, 8 Nov 2020 14:23:07 +0100 Subject: [PATCH] Add gRPC Emitter with Stream Collector protocol (closes #57) --- tracker/emittergrpc/collector.pb.go | 1108 ++++++++++++++++++++++ tracker/emittergrpc/collector.proto | 137 +++ tracker/emittergrpc/collector_grpc.pb.go | 204 ++++ tracker/emittergrpc/emitter_grpc.go | 323 +++++++ 4 files changed, 1772 insertions(+) create mode 100644 tracker/emittergrpc/collector.pb.go create mode 100644 tracker/emittergrpc/collector.proto create mode 100644 tracker/emittergrpc/collector_grpc.pb.go create mode 100644 tracker/emittergrpc/emitter_grpc.go diff --git a/tracker/emittergrpc/collector.pb.go b/tracker/emittergrpc/collector.pb.go new file mode 100644 index 0000000..5ac67e5 --- /dev/null +++ b/tracker/emittergrpc/collector.pb.go @@ -0,0 +1,1108 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.25.0 +// protoc v3.13.0 +// source: tracker/emittergrpc/collector.proto + +package emittergrpc + +import ( + proto "github.com/golang/protobuf/proto" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// This is a compile-time assertion that a sufficiently up-to-date version +// of the legacy proto package is being used. +const _ = proto.ProtoPackageIsVersion4 + +type HealthCheckResponse_ServingStatus int32 + +const ( + HealthCheckResponse_UNKNOWN HealthCheckResponse_ServingStatus = 0 + HealthCheckResponse_SERVING HealthCheckResponse_ServingStatus = 1 + HealthCheckResponse_NOT_SERVING HealthCheckResponse_ServingStatus = 2 +) + +// Enum value maps for HealthCheckResponse_ServingStatus. +var ( + HealthCheckResponse_ServingStatus_name = map[int32]string{ + 0: "UNKNOWN", + 1: "SERVING", + 2: "NOT_SERVING", + } + HealthCheckResponse_ServingStatus_value = map[string]int32{ + "UNKNOWN": 0, + "SERVING": 1, + "NOT_SERVING": 2, + } +) + +func (x HealthCheckResponse_ServingStatus) Enum() *HealthCheckResponse_ServingStatus { + p := new(HealthCheckResponse_ServingStatus) + *p = x + return p +} + +func (x HealthCheckResponse_ServingStatus) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (HealthCheckResponse_ServingStatus) Descriptor() protoreflect.EnumDescriptor { + return file_tracker_emittergrpc_collector_proto_enumTypes[0].Descriptor() +} + +func (HealthCheckResponse_ServingStatus) Type() protoreflect.EnumType { + return &file_tracker_emittergrpc_collector_proto_enumTypes[0] +} + +func (x HealthCheckResponse_ServingStatus) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use HealthCheckResponse_ServingStatus.Descriptor instead. +func (HealthCheckResponse_ServingStatus) EnumDescriptor() ([]byte, []int) { + return file_tracker_emittergrpc_collector_proto_rawDescGZIP(), []int{2, 0} +} + +type HealthCheckRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Service string `protobuf:"bytes,1,opt,name=service,proto3" json:"service,omitempty"` +} + +func (x *HealthCheckRequest) Reset() { + *x = HealthCheckRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_tracker_emittergrpc_collector_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *HealthCheckRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*HealthCheckRequest) ProtoMessage() {} + +func (x *HealthCheckRequest) ProtoReflect() protoreflect.Message { + mi := &file_tracker_emittergrpc_collector_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use HealthCheckRequest.ProtoReflect.Descriptor instead. +func (*HealthCheckRequest) Descriptor() ([]byte, []int) { + return file_tracker_emittergrpc_collector_proto_rawDescGZIP(), []int{0} +} + +func (x *HealthCheckRequest) GetService() string { + if x != nil { + return x.Service + } + return "" +} + +type TrackPayloadRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + E string `protobuf:"bytes,1,opt,name=e,proto3" json:"e,omitempty"` + Eid string `protobuf:"bytes,2,opt,name=eid,proto3" json:"eid,omitempty"` + Dtm string `protobuf:"bytes,3,opt,name=dtm,proto3" json:"dtm,omitempty"` + Stm string `protobuf:"bytes,4,opt,name=stm,proto3" json:"stm,omitempty"` + Ttm string `protobuf:"bytes,5,opt,name=ttm,proto3" json:"ttm,omitempty"` + Tv string `protobuf:"bytes,6,opt,name=tv,proto3" json:"tv,omitempty"` + Aid string `protobuf:"bytes,7,opt,name=aid,proto3" json:"aid,omitempty"` + Tna string `protobuf:"bytes,8,opt,name=tna,proto3" json:"tna,omitempty"` + P string `protobuf:"bytes,9,opt,name=p,proto3" json:"p,omitempty"` + Cx string `protobuf:"bytes,10,opt,name=cx,proto3" json:"cx,omitempty"` + UePx string `protobuf:"bytes,11,opt,name=ue_px,json=uePx,proto3" json:"ue_px,omitempty"` + // subject class + Uid string `protobuf:"bytes,12,opt,name=uid,proto3" json:"uid,omitempty"` + Ip string `protobuf:"bytes,13,opt,name=ip,proto3" json:"ip,omitempty"` + Ua string `protobuf:"bytes,14,opt,name=ua,proto3" json:"ua,omitempty"` + Res string `protobuf:"bytes,15,opt,name=res,proto3" json:"res,omitempty"` + Vp string `protobuf:"bytes,16,opt,name=vp,proto3" json:"vp,omitempty"` + Cd string `protobuf:"bytes,17,opt,name=cd,proto3" json:"cd,omitempty"` + Tz string `protobuf:"bytes,18,opt,name=tz,proto3" json:"tz,omitempty"` + Lang string `protobuf:"bytes,19,opt,name=lang,proto3" json:"lang,omitempty"` + Duid string `protobuf:"bytes,20,opt,name=duid,proto3" json:"duid,omitempty"` + Tnuid string `protobuf:"bytes,21,opt,name=tnuid,proto3" json:"tnuid,omitempty"` + Co string `protobuf:"bytes,22,opt,name=co,proto3" json:"co,omitempty"` + UePr string `protobuf:"bytes,23,opt,name=ue_pr,json=uePr,proto3" json:"ue_pr,omitempty"` + // page view + Url string `protobuf:"bytes,24,opt,name=url,proto3" json:"url,omitempty"` + Page string `protobuf:"bytes,25,opt,name=page,proto3" json:"page,omitempty"` + Refr string `protobuf:"bytes,26,opt,name=refr,proto3" json:"refr,omitempty"` + // structured event + SeCa string `protobuf:"bytes,27,opt,name=se_ca,json=seCa,proto3" json:"se_ca,omitempty"` + SeAc string `protobuf:"bytes,28,opt,name=se_ac,json=seAc,proto3" json:"se_ac,omitempty"` + SeLa string `protobuf:"bytes,29,opt,name=se_la,json=seLa,proto3" json:"se_la,omitempty"` + SePr string `protobuf:"bytes,30,opt,name=se_pr,json=sePr,proto3" json:"se_pr,omitempty"` + SeVa string `protobuf:"bytes,31,opt,name=se_va,json=seVa,proto3" json:"se_va,omitempty"` + // ecomm transaction + TrId string `protobuf:"bytes,32,opt,name=tr_id,json=trId,proto3" json:"tr_id,omitempty"` + TrTt string `protobuf:"bytes,33,opt,name=tr_tt,json=trTt,proto3" json:"tr_tt,omitempty"` + TrAf string `protobuf:"bytes,34,opt,name=tr_af,json=trAf,proto3" json:"tr_af,omitempty"` + TrTx string `protobuf:"bytes,35,opt,name=tr_tx,json=trTx,proto3" json:"tr_tx,omitempty"` + TrSh string `protobuf:"bytes,36,opt,name=tr_sh,json=trSh,proto3" json:"tr_sh,omitempty"` + TrCi string `protobuf:"bytes,37,opt,name=tr_ci,json=trCi,proto3" json:"tr_ci,omitempty"` + TrSt string `protobuf:"bytes,38,opt,name=tr_st,json=trSt,proto3" json:"tr_st,omitempty"` + TrCo string `protobuf:"bytes,39,opt,name=tr_co,json=trCo,proto3" json:"tr_co,omitempty"` + TrCu string `protobuf:"bytes,40,opt,name=tr_cu,json=trCu,proto3" json:"tr_cu,omitempty"` + // transaction item + TiId string `protobuf:"bytes,41,opt,name=ti_id,json=tiId,proto3" json:"ti_id,omitempty"` + TiSk string `protobuf:"bytes,42,opt,name=ti_sk,json=tiSk,proto3" json:"ti_sk,omitempty"` + TiNm string `protobuf:"bytes,43,opt,name=ti_nm,json=tiNm,proto3" json:"ti_nm,omitempty"` + TiCa string `protobuf:"bytes,44,opt,name=ti_ca,json=tiCa,proto3" json:"ti_ca,omitempty"` + TiPr string `protobuf:"bytes,45,opt,name=ti_pr,json=tiPr,proto3" json:"ti_pr,omitempty"` + TiQu string `protobuf:"bytes,46,opt,name=ti_qu,json=tiQu,proto3" json:"ti_qu,omitempty"` + TiCu string `protobuf:"bytes,47,opt,name=ti_cu,json=tiCu,proto3" json:"ti_cu,omitempty"` + // domain session + Vid string `protobuf:"bytes,48,opt,name=vid,proto3" json:"vid,omitempty"` + Sid string `protobuf:"bytes,49,opt,name=sid,proto3" json:"sid,omitempty"` + // browser + Cookie string `protobuf:"bytes,50,opt,name=cookie,proto3" json:"cookie,omitempty"` + FPdf string `protobuf:"bytes,51,opt,name=f_pdf,json=fPdf,proto3" json:"f_pdf,omitempty"` + FQt string `protobuf:"bytes,52,opt,name=f_qt,json=fQt,proto3" json:"f_qt,omitempty"` + FRealp string `protobuf:"bytes,53,opt,name=f_realp,json=fRealp,proto3" json:"f_realp,omitempty"` + FWma string `protobuf:"bytes,54,opt,name=f_wma,json=fWma,proto3" json:"f_wma,omitempty"` + FDir string `protobuf:"bytes,55,opt,name=f_dir,json=fDir,proto3" json:"f_dir,omitempty"` + FFla string `protobuf:"bytes,56,opt,name=f_fla,json=fFla,proto3" json:"f_fla,omitempty"` + FJava string `protobuf:"bytes,57,opt,name=f_java,json=fJava,proto3" json:"f_java,omitempty"` + FGears string `protobuf:"bytes,58,opt,name=f_gears,json=fGears,proto3" json:"f_gears,omitempty"` + FAg string `protobuf:"bytes,59,opt,name=f_ag,json=fAg,proto3" json:"f_ag,omitempty"` + // document + Ds string `protobuf:"bytes,60,opt,name=ds,proto3" json:"ds,omitempty"` + Cs string `protobuf:"bytes,61,opt,name=cs,proto3" json:"cs,omitempty"` + // mac + Mac string `protobuf:"bytes,62,opt,name=mac,proto3" json:"mac,omitempty"` + // page ping + PpMix string `protobuf:"bytes,63,opt,name=pp_mix,json=ppMix,proto3" json:"pp_mix,omitempty"` + PpMax string `protobuf:"bytes,64,opt,name=pp_max,json=ppMax,proto3" json:"pp_max,omitempty"` + PpMiy string `protobuf:"bytes,65,opt,name=pp_miy,json=ppMiy,proto3" json:"pp_miy,omitempty"` + PpMay string `protobuf:"bytes,66,opt,name=pp_may,json=ppMay,proto3" json:"pp_may,omitempty"` + // advertising impressions + AdBa string `protobuf:"bytes,67,opt,name=ad_ba,json=adBa,proto3" json:"ad_ba,omitempty"` + AdCa string `protobuf:"bytes,68,opt,name=ad_ca,json=adCa,proto3" json:"ad_ca,omitempty"` + AdAd string `protobuf:"bytes,69,opt,name=ad_ad,json=adAd,proto3" json:"ad_ad,omitempty"` + AdUid string `protobuf:"bytes,70,opt,name=ad_uid,json=adUid,proto3" json:"ad_uid,omitempty"` + // social interactions + Sa string `protobuf:"bytes,71,opt,name=sa,proto3" json:"sa,omitempty"` + Sn string `protobuf:"bytes,72,opt,name=sn,proto3" json:"sn,omitempty"` + St string `protobuf:"bytes,73,opt,name=st,proto3" json:"st,omitempty"` + Sp string `protobuf:"bytes,74,opt,name=sp,proto3" json:"sp,omitempty"` +} + +func (x *TrackPayloadRequest) Reset() { + *x = TrackPayloadRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_tracker_emittergrpc_collector_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *TrackPayloadRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*TrackPayloadRequest) ProtoMessage() {} + +func (x *TrackPayloadRequest) ProtoReflect() protoreflect.Message { + mi := &file_tracker_emittergrpc_collector_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use TrackPayloadRequest.ProtoReflect.Descriptor instead. +func (*TrackPayloadRequest) Descriptor() ([]byte, []int) { + return file_tracker_emittergrpc_collector_proto_rawDescGZIP(), []int{1} +} + +func (x *TrackPayloadRequest) GetE() string { + if x != nil { + return x.E + } + return "" +} + +func (x *TrackPayloadRequest) GetEid() string { + if x != nil { + return x.Eid + } + return "" +} + +func (x *TrackPayloadRequest) GetDtm() string { + if x != nil { + return x.Dtm + } + return "" +} + +func (x *TrackPayloadRequest) GetStm() string { + if x != nil { + return x.Stm + } + return "" +} + +func (x *TrackPayloadRequest) GetTtm() string { + if x != nil { + return x.Ttm + } + return "" +} + +func (x *TrackPayloadRequest) GetTv() string { + if x != nil { + return x.Tv + } + return "" +} + +func (x *TrackPayloadRequest) GetAid() string { + if x != nil { + return x.Aid + } + return "" +} + +func (x *TrackPayloadRequest) GetTna() string { + if x != nil { + return x.Tna + } + return "" +} + +func (x *TrackPayloadRequest) GetP() string { + if x != nil { + return x.P + } + return "" +} + +func (x *TrackPayloadRequest) GetCx() string { + if x != nil { + return x.Cx + } + return "" +} + +func (x *TrackPayloadRequest) GetUePx() string { + if x != nil { + return x.UePx + } + return "" +} + +func (x *TrackPayloadRequest) GetUid() string { + if x != nil { + return x.Uid + } + return "" +} + +func (x *TrackPayloadRequest) GetIp() string { + if x != nil { + return x.Ip + } + return "" +} + +func (x *TrackPayloadRequest) GetUa() string { + if x != nil { + return x.Ua + } + return "" +} + +func (x *TrackPayloadRequest) GetRes() string { + if x != nil { + return x.Res + } + return "" +} + +func (x *TrackPayloadRequest) GetVp() string { + if x != nil { + return x.Vp + } + return "" +} + +func (x *TrackPayloadRequest) GetCd() string { + if x != nil { + return x.Cd + } + return "" +} + +func (x *TrackPayloadRequest) GetTz() string { + if x != nil { + return x.Tz + } + return "" +} + +func (x *TrackPayloadRequest) GetLang() string { + if x != nil { + return x.Lang + } + return "" +} + +func (x *TrackPayloadRequest) GetDuid() string { + if x != nil { + return x.Duid + } + return "" +} + +func (x *TrackPayloadRequest) GetTnuid() string { + if x != nil { + return x.Tnuid + } + return "" +} + +func (x *TrackPayloadRequest) GetCo() string { + if x != nil { + return x.Co + } + return "" +} + +func (x *TrackPayloadRequest) GetUePr() string { + if x != nil { + return x.UePr + } + return "" +} + +func (x *TrackPayloadRequest) GetUrl() string { + if x != nil { + return x.Url + } + return "" +} + +func (x *TrackPayloadRequest) GetPage() string { + if x != nil { + return x.Page + } + return "" +} + +func (x *TrackPayloadRequest) GetRefr() string { + if x != nil { + return x.Refr + } + return "" +} + +func (x *TrackPayloadRequest) GetSeCa() string { + if x != nil { + return x.SeCa + } + return "" +} + +func (x *TrackPayloadRequest) GetSeAc() string { + if x != nil { + return x.SeAc + } + return "" +} + +func (x *TrackPayloadRequest) GetSeLa() string { + if x != nil { + return x.SeLa + } + return "" +} + +func (x *TrackPayloadRequest) GetSePr() string { + if x != nil { + return x.SePr + } + return "" +} + +func (x *TrackPayloadRequest) GetSeVa() string { + if x != nil { + return x.SeVa + } + return "" +} + +func (x *TrackPayloadRequest) GetTrId() string { + if x != nil { + return x.TrId + } + return "" +} + +func (x *TrackPayloadRequest) GetTrTt() string { + if x != nil { + return x.TrTt + } + return "" +} + +func (x *TrackPayloadRequest) GetTrAf() string { + if x != nil { + return x.TrAf + } + return "" +} + +func (x *TrackPayloadRequest) GetTrTx() string { + if x != nil { + return x.TrTx + } + return "" +} + +func (x *TrackPayloadRequest) GetTrSh() string { + if x != nil { + return x.TrSh + } + return "" +} + +func (x *TrackPayloadRequest) GetTrCi() string { + if x != nil { + return x.TrCi + } + return "" +} + +func (x *TrackPayloadRequest) GetTrSt() string { + if x != nil { + return x.TrSt + } + return "" +} + +func (x *TrackPayloadRequest) GetTrCo() string { + if x != nil { + return x.TrCo + } + return "" +} + +func (x *TrackPayloadRequest) GetTrCu() string { + if x != nil { + return x.TrCu + } + return "" +} + +func (x *TrackPayloadRequest) GetTiId() string { + if x != nil { + return x.TiId + } + return "" +} + +func (x *TrackPayloadRequest) GetTiSk() string { + if x != nil { + return x.TiSk + } + return "" +} + +func (x *TrackPayloadRequest) GetTiNm() string { + if x != nil { + return x.TiNm + } + return "" +} + +func (x *TrackPayloadRequest) GetTiCa() string { + if x != nil { + return x.TiCa + } + return "" +} + +func (x *TrackPayloadRequest) GetTiPr() string { + if x != nil { + return x.TiPr + } + return "" +} + +func (x *TrackPayloadRequest) GetTiQu() string { + if x != nil { + return x.TiQu + } + return "" +} + +func (x *TrackPayloadRequest) GetTiCu() string { + if x != nil { + return x.TiCu + } + return "" +} + +func (x *TrackPayloadRequest) GetVid() string { + if x != nil { + return x.Vid + } + return "" +} + +func (x *TrackPayloadRequest) GetSid() string { + if x != nil { + return x.Sid + } + return "" +} + +func (x *TrackPayloadRequest) GetCookie() string { + if x != nil { + return x.Cookie + } + return "" +} + +func (x *TrackPayloadRequest) GetFPdf() string { + if x != nil { + return x.FPdf + } + return "" +} + +func (x *TrackPayloadRequest) GetFQt() string { + if x != nil { + return x.FQt + } + return "" +} + +func (x *TrackPayloadRequest) GetFRealp() string { + if x != nil { + return x.FRealp + } + return "" +} + +func (x *TrackPayloadRequest) GetFWma() string { + if x != nil { + return x.FWma + } + return "" +} + +func (x *TrackPayloadRequest) GetFDir() string { + if x != nil { + return x.FDir + } + return "" +} + +func (x *TrackPayloadRequest) GetFFla() string { + if x != nil { + return x.FFla + } + return "" +} + +func (x *TrackPayloadRequest) GetFJava() string { + if x != nil { + return x.FJava + } + return "" +} + +func (x *TrackPayloadRequest) GetFGears() string { + if x != nil { + return x.FGears + } + return "" +} + +func (x *TrackPayloadRequest) GetFAg() string { + if x != nil { + return x.FAg + } + return "" +} + +func (x *TrackPayloadRequest) GetDs() string { + if x != nil { + return x.Ds + } + return "" +} + +func (x *TrackPayloadRequest) GetCs() string { + if x != nil { + return x.Cs + } + return "" +} + +func (x *TrackPayloadRequest) GetMac() string { + if x != nil { + return x.Mac + } + return "" +} + +func (x *TrackPayloadRequest) GetPpMix() string { + if x != nil { + return x.PpMix + } + return "" +} + +func (x *TrackPayloadRequest) GetPpMax() string { + if x != nil { + return x.PpMax + } + return "" +} + +func (x *TrackPayloadRequest) GetPpMiy() string { + if x != nil { + return x.PpMiy + } + return "" +} + +func (x *TrackPayloadRequest) GetPpMay() string { + if x != nil { + return x.PpMay + } + return "" +} + +func (x *TrackPayloadRequest) GetAdBa() string { + if x != nil { + return x.AdBa + } + return "" +} + +func (x *TrackPayloadRequest) GetAdCa() string { + if x != nil { + return x.AdCa + } + return "" +} + +func (x *TrackPayloadRequest) GetAdAd() string { + if x != nil { + return x.AdAd + } + return "" +} + +func (x *TrackPayloadRequest) GetAdUid() string { + if x != nil { + return x.AdUid + } + return "" +} + +func (x *TrackPayloadRequest) GetSa() string { + if x != nil { + return x.Sa + } + return "" +} + +func (x *TrackPayloadRequest) GetSn() string { + if x != nil { + return x.Sn + } + return "" +} + +func (x *TrackPayloadRequest) GetSt() string { + if x != nil { + return x.St + } + return "" +} + +func (x *TrackPayloadRequest) GetSp() string { + if x != nil { + return x.Sp + } + return "" +} + +type HealthCheckResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Status HealthCheckResponse_ServingStatus `protobuf:"varint,1,opt,name=status,proto3,enum=tracker.HealthCheckResponse_ServingStatus" json:"status,omitempty"` +} + +func (x *HealthCheckResponse) Reset() { + *x = HealthCheckResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_tracker_emittergrpc_collector_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *HealthCheckResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*HealthCheckResponse) ProtoMessage() {} + +func (x *HealthCheckResponse) ProtoReflect() protoreflect.Message { + mi := &file_tracker_emittergrpc_collector_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use HealthCheckResponse.ProtoReflect.Descriptor instead. +func (*HealthCheckResponse) Descriptor() ([]byte, []int) { + return file_tracker_emittergrpc_collector_proto_rawDescGZIP(), []int{2} +} + +func (x *HealthCheckResponse) GetStatus() HealthCheckResponse_ServingStatus { + if x != nil { + return x.Status + } + return HealthCheckResponse_UNKNOWN +} + +type TrackPayloadResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Success bool `protobuf:"varint,1,opt,name=success,proto3" json:"success,omitempty"` +} + +func (x *TrackPayloadResponse) Reset() { + *x = TrackPayloadResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_tracker_emittergrpc_collector_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *TrackPayloadResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*TrackPayloadResponse) ProtoMessage() {} + +func (x *TrackPayloadResponse) ProtoReflect() protoreflect.Message { + mi := &file_tracker_emittergrpc_collector_proto_msgTypes[3] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use TrackPayloadResponse.ProtoReflect.Descriptor instead. +func (*TrackPayloadResponse) Descriptor() ([]byte, []int) { + return file_tracker_emittergrpc_collector_proto_rawDescGZIP(), []int{3} +} + +func (x *TrackPayloadResponse) GetSuccess() bool { + if x != nil { + return x.Success + } + return false +} + +var File_tracker_emittergrpc_collector_proto protoreflect.FileDescriptor + +var file_tracker_emittergrpc_collector_proto_rawDesc = []byte{ + 0x0a, 0x23, 0x74, 0x72, 0x61, 0x63, 0x6b, 0x65, 0x72, 0x2f, 0x65, 0x6d, 0x69, 0x74, 0x74, 0x65, + 0x72, 0x67, 0x72, 0x70, 0x63, 0x2f, 0x63, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x2e, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x07, 0x74, 0x72, 0x61, 0x63, 0x6b, 0x65, 0x72, 0x22, 0x2e, + 0x0a, 0x12, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x12, 0x18, 0x0a, 0x07, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x22, 0xbf, + 0x0b, 0x0a, 0x13, 0x54, 0x72, 0x61, 0x63, 0x6b, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x0c, 0x0a, 0x01, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x01, 0x65, 0x12, 0x10, 0x0a, 0x03, 0x65, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x03, 0x65, 0x69, 0x64, 0x12, 0x10, 0x0a, 0x03, 0x64, 0x74, 0x6d, 0x18, 0x03, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x03, 0x64, 0x74, 0x6d, 0x12, 0x10, 0x0a, 0x03, 0x73, 0x74, 0x6d, 0x18, + 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x73, 0x74, 0x6d, 0x12, 0x10, 0x0a, 0x03, 0x74, 0x74, + 0x6d, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x74, 0x74, 0x6d, 0x12, 0x0e, 0x0a, 0x02, + 0x74, 0x76, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x74, 0x76, 0x12, 0x10, 0x0a, 0x03, + 0x61, 0x69, 0x64, 0x18, 0x07, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x61, 0x69, 0x64, 0x12, 0x10, + 0x0a, 0x03, 0x74, 0x6e, 0x61, 0x18, 0x08, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x74, 0x6e, 0x61, + 0x12, 0x0c, 0x0a, 0x01, 0x70, 0x18, 0x09, 0x20, 0x01, 0x28, 0x09, 0x52, 0x01, 0x70, 0x12, 0x0e, + 0x0a, 0x02, 0x63, 0x78, 0x18, 0x0a, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x63, 0x78, 0x12, 0x13, + 0x0a, 0x05, 0x75, 0x65, 0x5f, 0x70, 0x78, 0x18, 0x0b, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x75, + 0x65, 0x50, 0x78, 0x12, 0x10, 0x0a, 0x03, 0x75, 0x69, 0x64, 0x18, 0x0c, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x03, 0x75, 0x69, 0x64, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x70, 0x18, 0x0d, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x02, 0x69, 0x70, 0x12, 0x0e, 0x0a, 0x02, 0x75, 0x61, 0x18, 0x0e, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x02, 0x75, 0x61, 0x12, 0x10, 0x0a, 0x03, 0x72, 0x65, 0x73, 0x18, 0x0f, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x03, 0x72, 0x65, 0x73, 0x12, 0x0e, 0x0a, 0x02, 0x76, 0x70, 0x18, 0x10, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x02, 0x76, 0x70, 0x12, 0x0e, 0x0a, 0x02, 0x63, 0x64, 0x18, 0x11, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x02, 0x63, 0x64, 0x12, 0x0e, 0x0a, 0x02, 0x74, 0x7a, 0x18, 0x12, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x02, 0x74, 0x7a, 0x12, 0x12, 0x0a, 0x04, 0x6c, 0x61, 0x6e, 0x67, 0x18, + 0x13, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6c, 0x61, 0x6e, 0x67, 0x12, 0x12, 0x0a, 0x04, 0x64, + 0x75, 0x69, 0x64, 0x18, 0x14, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x64, 0x75, 0x69, 0x64, 0x12, + 0x14, 0x0a, 0x05, 0x74, 0x6e, 0x75, 0x69, 0x64, 0x18, 0x15, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, + 0x74, 0x6e, 0x75, 0x69, 0x64, 0x12, 0x0e, 0x0a, 0x02, 0x63, 0x6f, 0x18, 0x16, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x02, 0x63, 0x6f, 0x12, 0x13, 0x0a, 0x05, 0x75, 0x65, 0x5f, 0x70, 0x72, 0x18, 0x17, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x75, 0x65, 0x50, 0x72, 0x12, 0x10, 0x0a, 0x03, 0x75, 0x72, + 0x6c, 0x18, 0x18, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x75, 0x72, 0x6c, 0x12, 0x12, 0x0a, 0x04, + 0x70, 0x61, 0x67, 0x65, 0x18, 0x19, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x70, 0x61, 0x67, 0x65, + 0x12, 0x12, 0x0a, 0x04, 0x72, 0x65, 0x66, 0x72, 0x18, 0x1a, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, + 0x72, 0x65, 0x66, 0x72, 0x12, 0x13, 0x0a, 0x05, 0x73, 0x65, 0x5f, 0x63, 0x61, 0x18, 0x1b, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x04, 0x73, 0x65, 0x43, 0x61, 0x12, 0x13, 0x0a, 0x05, 0x73, 0x65, 0x5f, + 0x61, 0x63, 0x18, 0x1c, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x73, 0x65, 0x41, 0x63, 0x12, 0x13, + 0x0a, 0x05, 0x73, 0x65, 0x5f, 0x6c, 0x61, 0x18, 0x1d, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x73, + 0x65, 0x4c, 0x61, 0x12, 0x13, 0x0a, 0x05, 0x73, 0x65, 0x5f, 0x70, 0x72, 0x18, 0x1e, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x04, 0x73, 0x65, 0x50, 0x72, 0x12, 0x13, 0x0a, 0x05, 0x73, 0x65, 0x5f, 0x76, + 0x61, 0x18, 0x1f, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x73, 0x65, 0x56, 0x61, 0x12, 0x13, 0x0a, + 0x05, 0x74, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x20, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x74, 0x72, + 0x49, 0x64, 0x12, 0x13, 0x0a, 0x05, 0x74, 0x72, 0x5f, 0x74, 0x74, 0x18, 0x21, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x04, 0x74, 0x72, 0x54, 0x74, 0x12, 0x13, 0x0a, 0x05, 0x74, 0x72, 0x5f, 0x61, 0x66, + 0x18, 0x22, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x74, 0x72, 0x41, 0x66, 0x12, 0x13, 0x0a, 0x05, + 0x74, 0x72, 0x5f, 0x74, 0x78, 0x18, 0x23, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x74, 0x72, 0x54, + 0x78, 0x12, 0x13, 0x0a, 0x05, 0x74, 0x72, 0x5f, 0x73, 0x68, 0x18, 0x24, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x04, 0x74, 0x72, 0x53, 0x68, 0x12, 0x13, 0x0a, 0x05, 0x74, 0x72, 0x5f, 0x63, 0x69, 0x18, + 0x25, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x74, 0x72, 0x43, 0x69, 0x12, 0x13, 0x0a, 0x05, 0x74, + 0x72, 0x5f, 0x73, 0x74, 0x18, 0x26, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x74, 0x72, 0x53, 0x74, + 0x12, 0x13, 0x0a, 0x05, 0x74, 0x72, 0x5f, 0x63, 0x6f, 0x18, 0x27, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x04, 0x74, 0x72, 0x43, 0x6f, 0x12, 0x13, 0x0a, 0x05, 0x74, 0x72, 0x5f, 0x63, 0x75, 0x18, 0x28, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x74, 0x72, 0x43, 0x75, 0x12, 0x13, 0x0a, 0x05, 0x74, 0x69, + 0x5f, 0x69, 0x64, 0x18, 0x29, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x74, 0x69, 0x49, 0x64, 0x12, + 0x13, 0x0a, 0x05, 0x74, 0x69, 0x5f, 0x73, 0x6b, 0x18, 0x2a, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, + 0x74, 0x69, 0x53, 0x6b, 0x12, 0x13, 0x0a, 0x05, 0x74, 0x69, 0x5f, 0x6e, 0x6d, 0x18, 0x2b, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x04, 0x74, 0x69, 0x4e, 0x6d, 0x12, 0x13, 0x0a, 0x05, 0x74, 0x69, 0x5f, + 0x63, 0x61, 0x18, 0x2c, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x74, 0x69, 0x43, 0x61, 0x12, 0x13, + 0x0a, 0x05, 0x74, 0x69, 0x5f, 0x70, 0x72, 0x18, 0x2d, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x74, + 0x69, 0x50, 0x72, 0x12, 0x13, 0x0a, 0x05, 0x74, 0x69, 0x5f, 0x71, 0x75, 0x18, 0x2e, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x04, 0x74, 0x69, 0x51, 0x75, 0x12, 0x13, 0x0a, 0x05, 0x74, 0x69, 0x5f, 0x63, + 0x75, 0x18, 0x2f, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x74, 0x69, 0x43, 0x75, 0x12, 0x10, 0x0a, + 0x03, 0x76, 0x69, 0x64, 0x18, 0x30, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x76, 0x69, 0x64, 0x12, + 0x10, 0x0a, 0x03, 0x73, 0x69, 0x64, 0x18, 0x31, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x73, 0x69, + 0x64, 0x12, 0x16, 0x0a, 0x06, 0x63, 0x6f, 0x6f, 0x6b, 0x69, 0x65, 0x18, 0x32, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x06, 0x63, 0x6f, 0x6f, 0x6b, 0x69, 0x65, 0x12, 0x13, 0x0a, 0x05, 0x66, 0x5f, 0x70, + 0x64, 0x66, 0x18, 0x33, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x66, 0x50, 0x64, 0x66, 0x12, 0x11, + 0x0a, 0x04, 0x66, 0x5f, 0x71, 0x74, 0x18, 0x34, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x66, 0x51, + 0x74, 0x12, 0x17, 0x0a, 0x07, 0x66, 0x5f, 0x72, 0x65, 0x61, 0x6c, 0x70, 0x18, 0x35, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x06, 0x66, 0x52, 0x65, 0x61, 0x6c, 0x70, 0x12, 0x13, 0x0a, 0x05, 0x66, 0x5f, + 0x77, 0x6d, 0x61, 0x18, 0x36, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x66, 0x57, 0x6d, 0x61, 0x12, + 0x13, 0x0a, 0x05, 0x66, 0x5f, 0x64, 0x69, 0x72, 0x18, 0x37, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, + 0x66, 0x44, 0x69, 0x72, 0x12, 0x13, 0x0a, 0x05, 0x66, 0x5f, 0x66, 0x6c, 0x61, 0x18, 0x38, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x04, 0x66, 0x46, 0x6c, 0x61, 0x12, 0x15, 0x0a, 0x06, 0x66, 0x5f, 0x6a, + 0x61, 0x76, 0x61, 0x18, 0x39, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x66, 0x4a, 0x61, 0x76, 0x61, + 0x12, 0x17, 0x0a, 0x07, 0x66, 0x5f, 0x67, 0x65, 0x61, 0x72, 0x73, 0x18, 0x3a, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x06, 0x66, 0x47, 0x65, 0x61, 0x72, 0x73, 0x12, 0x11, 0x0a, 0x04, 0x66, 0x5f, 0x61, + 0x67, 0x18, 0x3b, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x66, 0x41, 0x67, 0x12, 0x0e, 0x0a, 0x02, + 0x64, 0x73, 0x18, 0x3c, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x64, 0x73, 0x12, 0x0e, 0x0a, 0x02, + 0x63, 0x73, 0x18, 0x3d, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x63, 0x73, 0x12, 0x10, 0x0a, 0x03, + 0x6d, 0x61, 0x63, 0x18, 0x3e, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6d, 0x61, 0x63, 0x12, 0x15, + 0x0a, 0x06, 0x70, 0x70, 0x5f, 0x6d, 0x69, 0x78, 0x18, 0x3f, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, + 0x70, 0x70, 0x4d, 0x69, 0x78, 0x12, 0x15, 0x0a, 0x06, 0x70, 0x70, 0x5f, 0x6d, 0x61, 0x78, 0x18, + 0x40, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x70, 0x70, 0x4d, 0x61, 0x78, 0x12, 0x15, 0x0a, 0x06, + 0x70, 0x70, 0x5f, 0x6d, 0x69, 0x79, 0x18, 0x41, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x70, 0x70, + 0x4d, 0x69, 0x79, 0x12, 0x15, 0x0a, 0x06, 0x70, 0x70, 0x5f, 0x6d, 0x61, 0x79, 0x18, 0x42, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x05, 0x70, 0x70, 0x4d, 0x61, 0x79, 0x12, 0x13, 0x0a, 0x05, 0x61, 0x64, + 0x5f, 0x62, 0x61, 0x18, 0x43, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x61, 0x64, 0x42, 0x61, 0x12, + 0x13, 0x0a, 0x05, 0x61, 0x64, 0x5f, 0x63, 0x61, 0x18, 0x44, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, + 0x61, 0x64, 0x43, 0x61, 0x12, 0x13, 0x0a, 0x05, 0x61, 0x64, 0x5f, 0x61, 0x64, 0x18, 0x45, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x04, 0x61, 0x64, 0x41, 0x64, 0x12, 0x15, 0x0a, 0x06, 0x61, 0x64, 0x5f, + 0x75, 0x69, 0x64, 0x18, 0x46, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x61, 0x64, 0x55, 0x69, 0x64, + 0x12, 0x0e, 0x0a, 0x02, 0x73, 0x61, 0x18, 0x47, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x73, 0x61, + 0x12, 0x0e, 0x0a, 0x02, 0x73, 0x6e, 0x18, 0x48, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x73, 0x6e, + 0x12, 0x0e, 0x0a, 0x02, 0x73, 0x74, 0x18, 0x49, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x73, 0x74, + 0x12, 0x0e, 0x0a, 0x02, 0x73, 0x70, 0x18, 0x4a, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x73, 0x70, + 0x22, 0x95, 0x01, 0x0a, 0x13, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x42, 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, + 0x75, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x2a, 0x2e, 0x74, 0x72, 0x61, 0x63, 0x6b, + 0x65, 0x72, 0x2e, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x52, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x53, 0x65, 0x72, 0x76, 0x69, 0x6e, 0x67, 0x53, 0x74, + 0x61, 0x74, 0x75, 0x73, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x22, 0x3a, 0x0a, 0x0d, + 0x53, 0x65, 0x72, 0x76, 0x69, 0x6e, 0x67, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x0b, 0x0a, + 0x07, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x0b, 0x0a, 0x07, 0x53, 0x45, + 0x52, 0x56, 0x49, 0x4e, 0x47, 0x10, 0x01, 0x12, 0x0f, 0x0a, 0x0b, 0x4e, 0x4f, 0x54, 0x5f, 0x53, + 0x45, 0x52, 0x56, 0x49, 0x4e, 0x47, 0x10, 0x02, 0x22, 0x30, 0x0a, 0x14, 0x54, 0x72, 0x61, 0x63, + 0x6b, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, + 0x12, 0x18, 0x0a, 0x07, 0x73, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x08, 0x52, 0x07, 0x73, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x32, 0x84, 0x02, 0x0a, 0x10, 0x43, + 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, + 0x4a, 0x0a, 0x0b, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x12, 0x1b, + 0x2e, 0x74, 0x72, 0x61, 0x63, 0x6b, 0x65, 0x72, 0x2e, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x43, + 0x68, 0x65, 0x63, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1c, 0x2e, 0x74, 0x72, + 0x61, 0x63, 0x6b, 0x65, 0x72, 0x2e, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, 0x63, + 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x4d, 0x0a, 0x0c, 0x54, + 0x72, 0x61, 0x63, 0x6b, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x1c, 0x2e, 0x74, 0x72, + 0x61, 0x63, 0x6b, 0x65, 0x72, 0x2e, 0x54, 0x72, 0x61, 0x63, 0x6b, 0x50, 0x61, 0x79, 0x6c, 0x6f, + 0x61, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1d, 0x2e, 0x74, 0x72, 0x61, 0x63, + 0x6b, 0x65, 0x72, 0x2e, 0x54, 0x72, 0x61, 0x63, 0x6b, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x55, 0x0a, 0x12, 0x53, 0x74, + 0x72, 0x65, 0x61, 0x6d, 0x54, 0x72, 0x61, 0x63, 0x6b, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, + 0x12, 0x1c, 0x2e, 0x74, 0x72, 0x61, 0x63, 0x6b, 0x65, 0x72, 0x2e, 0x54, 0x72, 0x61, 0x63, 0x6b, + 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1d, + 0x2e, 0x74, 0x72, 0x61, 0x63, 0x6b, 0x65, 0x72, 0x2e, 0x54, 0x72, 0x61, 0x63, 0x6b, 0x50, 0x61, + 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x28, + 0x01, 0x42, 0x41, 0x5a, 0x3f, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, + 0x73, 0x6e, 0x6f, 0x77, 0x70, 0x6c, 0x6f, 0x77, 0x2f, 0x73, 0x6e, 0x6f, 0x77, 0x70, 0x6c, 0x6f, + 0x77, 0x2d, 0x67, 0x6f, 0x6c, 0x61, 0x6e, 0x67, 0x2d, 0x74, 0x72, 0x61, 0x63, 0x6b, 0x65, 0x72, + 0x2f, 0x74, 0x72, 0x61, 0x63, 0x6b, 0x65, 0x72, 0x2f, 0x65, 0x6d, 0x69, 0x74, 0x74, 0x65, 0x72, + 0x67, 0x72, 0x70, 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_tracker_emittergrpc_collector_proto_rawDescOnce sync.Once + file_tracker_emittergrpc_collector_proto_rawDescData = file_tracker_emittergrpc_collector_proto_rawDesc +) + +func file_tracker_emittergrpc_collector_proto_rawDescGZIP() []byte { + file_tracker_emittergrpc_collector_proto_rawDescOnce.Do(func() { + file_tracker_emittergrpc_collector_proto_rawDescData = protoimpl.X.CompressGZIP(file_tracker_emittergrpc_collector_proto_rawDescData) + }) + return file_tracker_emittergrpc_collector_proto_rawDescData +} + +var file_tracker_emittergrpc_collector_proto_enumTypes = make([]protoimpl.EnumInfo, 1) +var file_tracker_emittergrpc_collector_proto_msgTypes = make([]protoimpl.MessageInfo, 4) +var file_tracker_emittergrpc_collector_proto_goTypes = []interface{}{ + (HealthCheckResponse_ServingStatus)(0), // 0: tracker.HealthCheckResponse.ServingStatus + (*HealthCheckRequest)(nil), // 1: tracker.HealthCheckRequest + (*TrackPayloadRequest)(nil), // 2: tracker.TrackPayloadRequest + (*HealthCheckResponse)(nil), // 3: tracker.HealthCheckResponse + (*TrackPayloadResponse)(nil), // 4: tracker.TrackPayloadResponse +} +var file_tracker_emittergrpc_collector_proto_depIdxs = []int32{ + 0, // 0: tracker.HealthCheckResponse.status:type_name -> tracker.HealthCheckResponse.ServingStatus + 1, // 1: tracker.CollectorService.HealthCheck:input_type -> tracker.HealthCheckRequest + 2, // 2: tracker.CollectorService.TrackPayload:input_type -> tracker.TrackPayloadRequest + 2, // 3: tracker.CollectorService.StreamTrackPayload:input_type -> tracker.TrackPayloadRequest + 3, // 4: tracker.CollectorService.HealthCheck:output_type -> tracker.HealthCheckResponse + 4, // 5: tracker.CollectorService.TrackPayload:output_type -> tracker.TrackPayloadResponse + 4, // 6: tracker.CollectorService.StreamTrackPayload:output_type -> tracker.TrackPayloadResponse + 4, // [4:7] is the sub-list for method output_type + 1, // [1:4] is the sub-list for method input_type + 1, // [1:1] is the sub-list for extension type_name + 1, // [1:1] is the sub-list for extension extendee + 0, // [0:1] is the sub-list for field type_name +} + +func init() { file_tracker_emittergrpc_collector_proto_init() } +func file_tracker_emittergrpc_collector_proto_init() { + if File_tracker_emittergrpc_collector_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_tracker_emittergrpc_collector_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*HealthCheckRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_tracker_emittergrpc_collector_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*TrackPayloadRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_tracker_emittergrpc_collector_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*HealthCheckResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_tracker_emittergrpc_collector_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*TrackPayloadResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_tracker_emittergrpc_collector_proto_rawDesc, + NumEnums: 1, + NumMessages: 4, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_tracker_emittergrpc_collector_proto_goTypes, + DependencyIndexes: file_tracker_emittergrpc_collector_proto_depIdxs, + EnumInfos: file_tracker_emittergrpc_collector_proto_enumTypes, + MessageInfos: file_tracker_emittergrpc_collector_proto_msgTypes, + }.Build() + File_tracker_emittergrpc_collector_proto = out.File + file_tracker_emittergrpc_collector_proto_rawDesc = nil + file_tracker_emittergrpc_collector_proto_goTypes = nil + file_tracker_emittergrpc_collector_proto_depIdxs = nil +} diff --git a/tracker/emittergrpc/collector.proto b/tracker/emittergrpc/collector.proto new file mode 100644 index 0000000..ff42da9 --- /dev/null +++ b/tracker/emittergrpc/collector.proto @@ -0,0 +1,137 @@ +syntax = "proto3"; + +option go_package = "github.com/snowplow/snowplow-golang-tracker/tracker/emittergrpc"; + +package tracker; + +// Service + +service CollectorService { + rpc HealthCheck(HealthCheckRequest) returns (HealthCheckResponse) {} + rpc TrackPayload(TrackPayloadRequest) returns (TrackPayloadResponse) {} + rpc StreamTrackPayload(stream TrackPayloadRequest) returns (TrackPayloadResponse) {} +} + +// Requests + +message HealthCheckRequest { + string service = 1; +} + +message TrackPayloadRequest { + string e = 1; + string eid = 2; + string dtm = 3; + string stm = 4; + string ttm = 5; + string tv = 6; + string aid = 7; + string tna = 8; + string p = 9; + + string cx = 10; + string ue_px = 11; + + // subject class + string uid = 12; + string ip = 13; + string ua = 14; + string res = 15; + string vp = 16; + string cd = 17; + string tz = 18; + string lang = 19; + string duid = 20; + string tnuid = 21; + + string co = 22; + string ue_pr = 23; + + // page view + string url = 24; + string page = 25; + string refr = 26; + + // structured event + string se_ca = 27; + string se_ac = 28; + string se_la = 29; + string se_pr = 30; + string se_va = 31; + + // ecomm transaction + string tr_id = 32; + string tr_tt = 33; + string tr_af = 34; + string tr_tx = 35; + string tr_sh = 36; + string tr_ci = 37; + string tr_st = 38; + string tr_co = 39; + string tr_cu = 40; + + // transaction item + string ti_id = 41; + string ti_sk = 42; + string ti_nm = 43; + string ti_ca = 44; + string ti_pr = 45; + string ti_qu = 46; + string ti_cu = 47; + + // domain session + string vid = 48; + string sid = 49; + + // browser + string cookie = 50; + string f_pdf = 51; + string f_qt = 52; + string f_realp = 53; + string f_wma = 54; + string f_dir = 55; + string f_fla = 56; + string f_java = 57; + string f_gears = 58; + string f_ag = 59; + + // document + string ds = 60; + string cs = 61; + + // mac + string mac = 62; + + // page ping + string pp_mix = 63; + string pp_max = 64; + string pp_miy = 65; + string pp_may = 66; + + // advertising impressions + string ad_ba = 67; + string ad_ca = 68; + string ad_ad = 69; + string ad_uid = 70; + + // social interactions + string sa = 71; + string sn = 72; + string st = 73; + string sp = 74; +} + +// Responses + +message HealthCheckResponse { + enum ServingStatus { + UNKNOWN = 0; + SERVING = 1; + NOT_SERVING = 2; + } + ServingStatus status = 1; +} + +message TrackPayloadResponse { + bool success = 1; +} diff --git a/tracker/emittergrpc/collector_grpc.pb.go b/tracker/emittergrpc/collector_grpc.pb.go new file mode 100644 index 0000000..c6e22f2 --- /dev/null +++ b/tracker/emittergrpc/collector_grpc.pb.go @@ -0,0 +1,204 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. + +package emittergrpc + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion7 + +// CollectorServiceClient is the client API for CollectorService service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type CollectorServiceClient interface { + HealthCheck(ctx context.Context, in *HealthCheckRequest, opts ...grpc.CallOption) (*HealthCheckResponse, error) + TrackPayload(ctx context.Context, in *TrackPayloadRequest, opts ...grpc.CallOption) (*TrackPayloadResponse, error) + StreamTrackPayload(ctx context.Context, opts ...grpc.CallOption) (CollectorService_StreamTrackPayloadClient, error) +} + +type collectorServiceClient struct { + cc grpc.ClientConnInterface +} + +func NewCollectorServiceClient(cc grpc.ClientConnInterface) CollectorServiceClient { + return &collectorServiceClient{cc} +} + +func (c *collectorServiceClient) HealthCheck(ctx context.Context, in *HealthCheckRequest, opts ...grpc.CallOption) (*HealthCheckResponse, error) { + out := new(HealthCheckResponse) + err := c.cc.Invoke(ctx, "/tracker.CollectorService/HealthCheck", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *collectorServiceClient) TrackPayload(ctx context.Context, in *TrackPayloadRequest, opts ...grpc.CallOption) (*TrackPayloadResponse, error) { + out := new(TrackPayloadResponse) + err := c.cc.Invoke(ctx, "/tracker.CollectorService/TrackPayload", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *collectorServiceClient) StreamTrackPayload(ctx context.Context, opts ...grpc.CallOption) (CollectorService_StreamTrackPayloadClient, error) { + stream, err := c.cc.NewStream(ctx, &_CollectorService_serviceDesc.Streams[0], "/tracker.CollectorService/StreamTrackPayload", opts...) + if err != nil { + return nil, err + } + x := &collectorServiceStreamTrackPayloadClient{stream} + return x, nil +} + +type CollectorService_StreamTrackPayloadClient interface { + Send(*TrackPayloadRequest) error + CloseAndRecv() (*TrackPayloadResponse, error) + grpc.ClientStream +} + +type collectorServiceStreamTrackPayloadClient struct { + grpc.ClientStream +} + +func (x *collectorServiceStreamTrackPayloadClient) Send(m *TrackPayloadRequest) error { + return x.ClientStream.SendMsg(m) +} + +func (x *collectorServiceStreamTrackPayloadClient) CloseAndRecv() (*TrackPayloadResponse, error) { + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + m := new(TrackPayloadResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// CollectorServiceServer is the server API for CollectorService service. +// All implementations must embed UnimplementedCollectorServiceServer +// for forward compatibility +type CollectorServiceServer interface { + HealthCheck(context.Context, *HealthCheckRequest) (*HealthCheckResponse, error) + TrackPayload(context.Context, *TrackPayloadRequest) (*TrackPayloadResponse, error) + StreamTrackPayload(CollectorService_StreamTrackPayloadServer) error + mustEmbedUnimplementedCollectorServiceServer() +} + +// UnimplementedCollectorServiceServer must be embedded to have forward compatible implementations. +type UnimplementedCollectorServiceServer struct { +} + +func (UnimplementedCollectorServiceServer) HealthCheck(context.Context, *HealthCheckRequest) (*HealthCheckResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method HealthCheck not implemented") +} +func (UnimplementedCollectorServiceServer) TrackPayload(context.Context, *TrackPayloadRequest) (*TrackPayloadResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method TrackPayload not implemented") +} +func (UnimplementedCollectorServiceServer) StreamTrackPayload(CollectorService_StreamTrackPayloadServer) error { + return status.Errorf(codes.Unimplemented, "method StreamTrackPayload not implemented") +} +func (UnimplementedCollectorServiceServer) mustEmbedUnimplementedCollectorServiceServer() {} + +// UnsafeCollectorServiceServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to CollectorServiceServer will +// result in compilation errors. +type UnsafeCollectorServiceServer interface { + mustEmbedUnimplementedCollectorServiceServer() +} + +func RegisterCollectorServiceServer(s grpc.ServiceRegistrar, srv CollectorServiceServer) { + s.RegisterService(&_CollectorService_serviceDesc, srv) +} + +func _CollectorService_HealthCheck_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(HealthCheckRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(CollectorServiceServer).HealthCheck(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/tracker.CollectorService/HealthCheck", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(CollectorServiceServer).HealthCheck(ctx, req.(*HealthCheckRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _CollectorService_TrackPayload_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(TrackPayloadRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(CollectorServiceServer).TrackPayload(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/tracker.CollectorService/TrackPayload", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(CollectorServiceServer).TrackPayload(ctx, req.(*TrackPayloadRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _CollectorService_StreamTrackPayload_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(CollectorServiceServer).StreamTrackPayload(&collectorServiceStreamTrackPayloadServer{stream}) +} + +type CollectorService_StreamTrackPayloadServer interface { + SendAndClose(*TrackPayloadResponse) error + Recv() (*TrackPayloadRequest, error) + grpc.ServerStream +} + +type collectorServiceStreamTrackPayloadServer struct { + grpc.ServerStream +} + +func (x *collectorServiceStreamTrackPayloadServer) SendAndClose(m *TrackPayloadResponse) error { + return x.ServerStream.SendMsg(m) +} + +func (x *collectorServiceStreamTrackPayloadServer) Recv() (*TrackPayloadRequest, error) { + m := new(TrackPayloadRequest) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +var _CollectorService_serviceDesc = grpc.ServiceDesc{ + ServiceName: "tracker.CollectorService", + HandlerType: (*CollectorServiceServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "HealthCheck", + Handler: _CollectorService_HealthCheck_Handler, + }, + { + MethodName: "TrackPayload", + Handler: _CollectorService_TrackPayload_Handler, + }, + }, + Streams: []grpc.StreamDesc{ + { + StreamName: "StreamTrackPayload", + Handler: _CollectorService_StreamTrackPayload_Handler, + ClientStreams: true, + }, + }, + Metadata: "tracker/emittergrpc/collector.proto", +} diff --git a/tracker/emittergrpc/emitter_grpc.go b/tracker/emittergrpc/emitter_grpc.go new file mode 100644 index 0000000..fe3bcc3 --- /dev/null +++ b/tracker/emittergrpc/emitter_grpc.go @@ -0,0 +1,323 @@ +// +// Copyright (c) 2016-2020 Snowplow Analytics Ltd. All rights reserved. +// +// This program is licensed to you under the Apache License Version 2.0, +// and you may not use this file except in compliance with the Apache License Version 2.0. +// You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the Apache License Version 2.0 is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. +// + +package emittergrpc + +import ( + "context" + "encoding/json" + "fmt" + "log" + "strings" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + + gt "github.com/snowplow/snowplow-golang-tracker/v2/tracker" +) + +// EmitterGRPC contains the attributes of the GRPC Snowplow Emitter +type EmitterGRPC struct { + CollectorURI string + SendLimit int + StreamLimit int + DbName string + Storage gt.Storage + SendChannel chan bool + Callback func(successCount []gt.CallbackResult, failureCount []gt.CallbackResult) + GrpcConn *grpc.ClientConn + GrpcClient CollectorServiceClient + TLSFilePath string +} + +// InitEmitter creates a new GRPC Emitter object which handles +// storing and sending Snowplow Events. +func InitEmitter(options ...func(*EmitterGRPC)) *EmitterGRPC { + e := &EmitterGRPC{} + + // Set Defaults + e.SendLimit = gt.DEFAULT_SEND_LIMIT + e.StreamLimit = 500 + e.DbName = gt.DEFAULT_DB_NAME + e.TLSFilePath = "" + + // Option parameters + for _, op := range options { + op(e) + } + + // Check collector URI is not empty + if e.CollectorURI == "" { + panic("FATAL: CollectorURI cannot be empty.") + } + + // Setup default event storage + if e.Storage == nil { + e.Storage = *gt.InitStorageSQLite3(e.DbName) + } + + return e +} + +// --- Require + +// RequireCollectorURI sets the Emitters collector URI. +func RequireCollectorURI(collectorURI string) func(e *EmitterGRPC) { + return func(e *EmitterGRPC) { e.CollectorURI = collectorURI } +} + +// --- Option + +// OptionSendLimit sets the send limit for the emitter. +func OptionSendLimit(sendLimit int) func(e *EmitterGRPC) { + return func(e *EmitterGRPC) { e.SendLimit = sendLimit } +} + +// OptionStreamLimit sets the amount of events to be sent per stream. +func OptionStreamLimit(streamLimit int) func(e *EmitterGRPC) { + return func(e *EmitterGRPC) { e.StreamLimit = streamLimit } +} + +// OptionTLSFilePath configures the GRPC clients PEM keys to use in authenticating +// a secure connection with the server. +// +// Note: If this value is empty the client defaults to insecure +func OptionTLSFilePath(tlsFilePath string) func(e *EmitterGRPC) { + return func(e *EmitterGRPC) { e.TLSFilePath = tlsFilePath } +} + +// OptionDbName overrides the default name of the storage database. +func OptionDbName(dbName string) func(e *EmitterGRPC) { + return func(e *EmitterGRPC) { e.DbName = dbName } +} + +// OptionStorage sets a custom event Storage target which implements the Storage interface +// +// Note: If this option is used OptionDbName will be ignored +func OptionStorage(storage gt.Storage) func(e *EmitterGRPC) { + return func(e *EmitterGRPC) { e.Storage = storage } +} + +// OptionCallback sets a custom callback for the emitter loop. +func OptionCallback(callback func(successCount []gt.CallbackResult, failureCount []gt.CallbackResult)) func(e *EmitterGRPC) { + return func(e *EmitterGRPC) { e.Callback = callback } +} + +// --- GRPC Handlers + +// dial establishes the connection with the GRPC server. +func dial(collectorURI string, tlsFilePath string) (*grpc.ClientConn, error) { + if tlsFilePath == "" { + return grpc.Dial(collectorURI, grpc.WithInsecure(), grpc.WithBlock()) + } + + creds, err := credentials.NewClientTLSFromFile(tlsFilePath, "") + if err != nil { + return nil, err + } + return grpc.Dial(collectorURI, grpc.WithTransportCredentials(creds), grpc.WithBlock()) +} + +// --- Event Handlers + +// Add will push an event to the database and will then initiate a sending loop. +func (e *EmitterGRPC) Add(payload gt.Payload) { + e.Storage.AddEventRow(payload) + e.start() +} + +// Flush will attempt to start the send loop regardless of an event coming in. +func (e *EmitterGRPC) Flush() { + e.start() +} + +// Stop waits for the send channel to have a value and then resets it to nil. +func (e *EmitterGRPC) Stop() { + <-e.SendChannel + e.SendChannel = nil + e.GrpcConn.Close() +} + +// start will begin the sending loop. +func (e *EmitterGRPC) start() { + if e.SendChannel == nil || !e.IsSending() { + conn, err := dial(e.CollectorURI, e.TLSFilePath) + if err != nil { + log.Println(fmt.Sprintf("ERROR: could not establish connection with GRPC server: %v", err)) + return + } + e.GrpcConn = conn + e.GrpcClient = NewCollectorServiceClient(e.GrpcConn) + + e.SendChannel = make(chan bool, 1) + go func() { + var done bool + defer func() { + e.SendChannel <- done + }() + + for { + eventRows := e.Storage.GetEventRowsWithinRange(e.SendLimit) + + // If there are no events in the database exit + if len(eventRows) == 0 { + break + } + results := e.doSend(eventRows) + + // Process results + ids := []int{} + successes := []gt.CallbackResult{} + failures := []gt.CallbackResult{} + + for _, res := range results { + + count := len(res.Ids) + status := res.Status + + if status >= 200 && status < 400 { + ids = append(ids, res.Ids...) + successes = append(successes, gt.CallbackResult{count, status}) + } else { + failures = append(failures, gt.CallbackResult{count, status}) + } + } + + if e.Callback != nil { + e.Callback(successes, failures) + } + + // If all the events failed to be sent exit + if len(successes) == 0 && len(failures) > 0 { + break + } + + e.Storage.DeleteEventRows(ids) + } + done = true + }() + } +} + +// doSend will send all of the eventsRows it is given. +func (e *EmitterGRPC) doSend(eventRows []gt.EventRow) []gt.SendResult { + futures := []<-chan gt.SendResult{} + + ids := []int{} + payloads := []gt.Payload{} + + for _, val := range eventRows { + if len(payloads) > e.StreamLimit { + futures = append(futures, e.sendGrpcRequest(e.GrpcClient, ids, payloads)) + ids = []int{val.Id} + payloads = []gt.Payload{val.Event} + } else { + ids = append(ids, val.Id) + payloads = append(payloads, val.Event) + } + } + if len(payloads) > 0 { + futures = append(futures, e.sendGrpcRequest(e.GrpcClient, ids, payloads)) + } + + // Wait for all Futures to complete + results := []gt.SendResult{} + for _, future := range futures { + results = append(results, <-future) + } + + return results +} + +// SendPostRequest sends an array of Payloads together to the collector endpoint via POST. +func (e *EmitterGRPC) sendGrpcRequest(client CollectorServiceClient, ids []int, body []gt.Payload) <-chan gt.SendResult { + c := make(chan gt.SendResult, 1) + go func() { + var result gt.SendResult + defer func() { + c <- result + }() + + status := -1 + + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + + stream, err := client.StreamTrackPayload(ctx) + if err != nil { + log.Println(fmt.Sprintf("%v.StreamTrackPayload(_) = _, %v", client, err)) + result = gt.SendResult{Ids: ids, Status: status} + return + } + + for _, val := range body { + val.Add(gt.SENT_TIMESTAMP, gt.NewString(gt.GetTimestampString())) + tpReq, _ := payloadToTrackPayloadRequest(val) + err := stream.Send(tpReq) + if err != nil { + log.Println(fmt.Sprintf("%v.Send(%v) = %v", stream, tpReq, err)) + result = gt.SendResult{Ids: ids, Status: status} + return + } + } + + resp, err := stream.CloseAndRecv() + if err != nil { + log.Println(fmt.Sprintf("%v.CloseAndRecv() got error %v, want %v", stream, err, nil)) + result = gt.SendResult{Ids: ids, Status: status} + return + } + + if resp.GetSuccess() { + result = gt.SendResult{Ids: ids, Status: 200} + } else { + result = gt.SendResult{Ids: ids, Status: 503} + } + }() + return c +} + +// --- Helpers + +// IsSending checks whether the send channel has finished. +func (e EmitterGRPC) IsSending() bool { + return len(e.SendChannel) == 0 +} + +// payloadToTrackEventRequest converts a Payload into a gRPC TrackPayloadRequest +// +// TODO: Remove double serialization due to field names being renamed +// https://developers.google.com/protocol-buffers/docs/reference/go-generated#fields +func payloadToTrackPayloadRequest(payload gt.Payload) (*TrackPayloadRequest, error) { + ter := &TrackPayloadRequest{} + d := json.NewDecoder(strings.NewReader(payload.String())) + d.UseNumber() + err := d.Decode(&ter) + if err != nil { + return nil, err + } + return ter, nil +} + +// --- Getters & Setters + +// GetSendChannel returns the send channel +func (e EmitterGRPC) GetSendChannel() chan bool { + return e.SendChannel +} + +// GetStorage returns the send channel +func (e EmitterGRPC) GetStorage() gt.Storage { + return e.Storage +}