Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[OWL-2005] Add logging for too late metric received by OWL #402

Open
wants to merge 19 commits into
base: develop
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[OWL-2005] Fix typo, formatter
  • Loading branch information
Mike Lue committed Jan 5, 2018
commit e984dec9d351a26f448b60f36aebd0cd29278333
16 changes: 8 additions & 8 deletions common/model/metric.go
Original file line number Diff line number Diff line change
@@ -46,14 +46,14 @@ func (t *JsonMetaData) String() string {
}

type MetaData struct {
Metric string `json:"metric"`
Endpoint string `json:"endpoint"`
Timestamp int64 `json:"timestamp"`
Step int64 `json:"step"`
Value float64 `json:"value"`
CounterType string `json:"counterType"`
Tags map[string]string `json:"tags"`
SourceMetric *MetricValue `json:"-"`
Metric string `json:"metric"`
Endpoint string `json:"endpoint"`
Timestamp int64 `json:"timestamp"`
Step int64 `json:"step"`
Value float64 `json:"value"`
CounterType string `json:"counterType"`
Tags map[string]string `json:"tags"`
SourceMetric *MetricValue `json:"-"`
}

func (t *MetaData) String() string {
8 changes: 4 additions & 4 deletions common/net/listener.go
Original file line number Diff line number Diff line change
@@ -41,18 +41,18 @@ func MustInitTcpListener(address string) net.Listener {

// Constructs the controller for usage on "net.Listener"
func NewListenerController(listener net.Listener) *ListenerController {
return &ListenerController {
return &ListenerController{
Listener: listener,
working: false,
lock: &sync.Mutex{},
working: false,
lock: &sync.Mutex{},
}
}

type ListenerController struct {
net.Listener

working bool
lock *sync.Mutex
lock *sync.Mutex
}

// This method would keep accepting message of socket
4 changes: 2 additions & 2 deletions common/net/listener_test.go
Original file line number Diff line number Diff line change
@@ -2,8 +2,8 @@ package net

import (
"fmt"
"time"
"net"
"time"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
@@ -52,7 +52,7 @@ var _ = Describe("Controller for TCP listener", func() {

Eventually(
func() bool { return accepted },
2 * time.Second, 200 * time.Millisecond,
2*time.Second, 200*time.Millisecond,
).Should(BeTrue())
})
})
4 changes: 2 additions & 2 deletions modules/transfer/receiver/rpc/rpc_transfer_it_test.go
Original file line number Diff line number Diff line change
@@ -22,7 +22,7 @@ var _ = Describe("Sends metrics by RPC protocol", jsonRpcSkipper.PrependBeforeEa

err := client.Call(
"Transfer.Update",
[]*cmodel.MetricValue {
[]*cmodel.MetricValue{
{
Endpoint: "pc01.it.cepave.com", Metric: "m01", Step: 30, Type: "GAUGE", Tags: "",
Value: 11, Timestamp: time.Now().Unix() + 2,
@@ -44,7 +44,7 @@ var _ = Describe("Sends metrics by RPC protocol", jsonRpcSkipper.PrependBeforeEa
Expect(reply).To(PointTo(MatchFields(IgnoreExtras, Fields{
"Message": Equal("ok"),
"Invalid": Equal(0),
"Total": Equal(3),
"Total": Equal(3),
})))
},
)
37 changes: 19 additions & 18 deletions modules/transfer/receiver/rpc/rpc_transfer_stress_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package rpc

import (
"fmt"
"net/rpc"
"time"
"fmt"

rd "github.com/Pallinder/go-randomdata"

@@ -15,10 +15,10 @@ import (
)

const (
number_of_metrics = 0
number_of_threads = 16
number_of_agents = 1024
times_of_measure = 3
number_of_metrics = 0
number_of_threads = 16
number_of_agents = 1024
times_of_measure = 3
sec_interval_of_metrics = 30 // Seconds
)

@@ -42,7 +42,7 @@ var _ = Describe("Stressing test on receiving metrics", func() {
Measure(fmt.Sprintf("%d metrics over %d agents", number_of_metrics, number_of_agents), func(b Benchmarker) {
b.Time("runtime", func() {
for i := 0; i < number_of_agents; i++ {
availableThreads<-true
availableThreads <- true
go func(agentNumber int) {
defer GinkgoRecover()
defer func() {
@@ -62,7 +62,7 @@ var _ = Describe("Stressing test on receiving metrics", func() {
* Waiting for all of the go routines has completed
*/
for i := 0; i < number_of_threads; i++ {
availableThreads<-true
availableThreads <- true
}
// :~)
})
@@ -72,7 +72,7 @@ var _ = Describe("Stressing test on receiving metrics", func() {
func sendMetrics(numberOfMetrics int) (*cmodel.TransferResponse, error) {
var (
reply *cmodel.TransferResponse
err error
err error
)

ginkgoClient := &trpc.GinkgoJsonRpc{}
@@ -87,19 +87,20 @@ func sendMetrics(numberOfMetrics int) (*cmodel.TransferResponse, error) {
}

var (
sampleMetrics = []string {
sampleMetrics = []string{
"cpu.idle", "cpu.busy", "disk.out.peak", "disk.in.peak",
"net.out.bytes", "net.in.bytes", "net.drop.bytes",
"io.out.peak", "io.in.peak",
}

valueRanges = [][]int {
{ 1, 100 },
{ 500, 2500 },
{ 1, 50000 },
{ 10000, 500000 },
valueRanges = [][]int{
{1, 100},
{500, 2500},
{1, 50000},
{10000, 500000},
}
)

func buildMetrics(numberOfMetrics int) []*cmodel.MetricValue {
metrics := make([]*cmodel.MetricValue, numberOfMetrics)

@@ -108,14 +109,14 @@ func buildMetrics(numberOfMetrics int) []*cmodel.MetricValue {
valueRange := valueRanges[rd.Number(len(valueRanges))]
step := int64(rd.Number(1, 6) * 5)

startTime := time.Now().Add(time.Duration(-sec_interval_of_metrics * numberOfMetrics) * time.Second)
startTime := time.Now().Add(time.Duration(-sec_interval_of_metrics*numberOfMetrics) * time.Second)

for i := 0; i < numberOfMetrics; i++ {
metrics[i] = &cmodel.MetricValue {
metrics[i] = &cmodel.MetricValue{
Endpoint: endpoint, Metric: metric,
Step: step, Type: "GAUGE", Tags: "",
Timestamp: startTime.Add(time.Duration(sec_interval_of_metrics * i) * time.Second).Unix(),
Value: rd.Number(valueRange[0], valueRange[1]),
Timestamp: startTime.Add(time.Duration(sec_interval_of_metrics*i) * time.Second).Unix(),
Value: rd.Number(valueRange[0], valueRange[1]),
}
}

23 changes: 13 additions & 10 deletions modules/transfer/service/queue_relay.go
Original file line number Diff line number Diff line change
@@ -26,6 +26,7 @@ type RelayDelegatee interface {
type RelayStationFactory struct {
stationBase *RelayStation
}

func (f *RelayStationFactory) Build() *RelayStation {
return f.stationBase.clone()
}
@@ -53,7 +54,7 @@ func NewRelayFactoryByGlobalConfig(config *g.GlobalConfig) *RelayStationFactory
}

if len(genericTargets) > 0 {
stationBase.Otherwise = append(stationBase.Otherwise, &genericRelayPool{ relayTargets: &genericTargets })
stationBase.Otherwise = append(stationBase.Otherwise, &genericRelayPool{relayTargets: &genericTargets})
}
// :~)

@@ -62,13 +63,13 @@ func NewRelayFactoryByGlobalConfig(config *g.GlobalConfig) *RelayStationFactory
*/
if config.NqmRest.Enabled {
nqmRelayPool := buildRelayPoolForMetricMap()
nqmRelayPool.mapToTargets = map[string]func([]*cmodel.MetaData) {
"nqm-fping": sender.Push2NqmIcmpSendQueue,
nqmRelayPool.mapToTargets = map[string]func([]*cmodel.MetaData){
"nqm-fping": sender.Push2NqmIcmpSendQueue,
"nqm-tcpconn": sender.Push2NqmTcpconnSendQueue,
"nqm-tcpping": sender.Push2NqmTcpSendQueue,
}
nqmRelayPool.mapToMetrics = map[string][]*cmodel.MetaData {
"nqm-fping": make([]*cmodel.MetaData, 0),
nqmRelayPool.mapToMetrics = map[string][]*cmodel.MetaData{
"nqm-fping": make([]*cmodel.MetaData, 0),
"nqm-tcpconn": make([]*cmodel.MetaData, 0),
"nqm-tcpping": make([]*cmodel.MetaData, 0),
}
@@ -88,7 +89,7 @@ func NewRelayFactoryByGlobalConfig(config *g.GlobalConfig) *RelayStationFactory
}
// :~)

return &RelayStationFactory { stationBase }
return &RelayStationFactory{stationBase}
}

// This object handles the input metric and try to dispatch metric to
@@ -159,7 +160,7 @@ func cloneRelayDelegatees(source []RelayDelegatee) []RelayDelegatee {
type genericRelayPool struct {
// This slice would be re-use across "Clone()" method
relayTargets *[]func([]*cmodel.MetaData)
metrics []*cmodel.MetaData
metrics []*cmodel.MetaData
}

func (p *genericRelayPool) Accept(metric *cmodel.MetaData) bool {
@@ -183,7 +184,7 @@ type stringMapRelayPool struct {
// This map would be re-use across "Clone()" method
mapToTargets map[string]func([]*cmodel.MetaData)
mapToMetrics map[string][]*cmodel.MetaData
stringify func(*cmodel.MetaData) string
stringify func(*cmodel.MetaData) string
}

func (p *stringMapRelayPool) Accept(metric *cmodel.MetaData) bool {
@@ -232,7 +233,7 @@ func (p *stringMapRelayPool) Clone() RelayDelegatee {

// Builds "*stringMapRelayPool" on value of *MetaData.Metric
func buildRelayPoolForMetricMap() *stringMapRelayPool {
return &stringMapRelayPool {
return &stringMapRelayPool{
stringify: func(metric *cmodel.MetaData) string {
return metric.Metric
},
@@ -243,6 +244,7 @@ func buildRelayPoolForMetricMap() *stringMapRelayPool {
type stageRelayPool struct {
metrics []*cmodel.MetricValue
}

func (p *stageRelayPool) Accept(metric *cmodel.MetaData) bool {
p.metrics = append(p.metrics, metric.SourceMetric)
return true
@@ -262,6 +264,7 @@ type filteredRelayPool struct {
RelayDelegatee
filter func(*cmodel.MetaData) bool
}

func (p *filteredRelayPool) Accept(metric *cmodel.MetaData) bool {
if p.filter(metric) {
p.RelayDelegatee.Accept(metric)
@@ -282,7 +285,7 @@ func buildRelayPoolForEffectiveFilterOnEndpoint(targetDelegatee RelayDelegatee,

if len(filters) > 0 &&
!(len(filters) == 1 && filters[0] == "") {
finalDelegatee = &filteredRelayPool {
finalDelegatee = &filteredRelayPool{
RelayDelegatee: targetDelegatee,
filter: func(metric *cmodel.MetaData) bool {
for _, filter := range filters {
Loading