Skip to content

Commit

Permalink
more consistent rate math
Browse files Browse the repository at this point in the history
  • Loading branch information
nicolasparada committed Dec 3, 2021
1 parent ad2ecde commit 886d9b9
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 33 deletions.
45 changes: 19 additions & 26 deletions cmd/calyptia/top_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func (m *AgentModel) Update(msg tea.Msg) (*AgentModel, tea.Cmd) {
for _, pluginName := range pluginNames(measurement.Plugins) {
item.plugin = pluginName
plugin := measurement.Plugins[pluginName]
item.values = collectAgentMetricValues(plugin.Metrics, m.MetricsInterval)
item.values = collectAgentMetricValues(plugin.Metrics)
}
items = append(items, item)
}
Expand Down Expand Up @@ -294,67 +294,60 @@ func (v agentMetricValues) Empty() bool {
return v.size == "" && v.events == "" && v.retries == "" && v.retriedEvents == "" && v.retriesFailed == "" && v.droppedEvents == ""
}

func collectAgentMetricValues(metrics map[string][]cloud.MetricFields, interval time.Duration) agentMetricValues {
func collectAgentMetricValues(metrics map[string][]cloud.MetricFields) agentMetricValues {
var out agentMetricValues

for _, metricName := range metricNames(metrics) {
points := metrics[metricName]

d := len(points)
if d < 2 {
// We only need 2 points to calculate the rate, but the last one is not reliable
// As it's time value is not in the specified interval, and lots of times it is nil.
if d < 3 {
continue
}

var val *float64
for i := d - 1; i > 0; i-- {
curr := points[i].Value
prev := points[i-1].Value
curr := points[d-2]
prev := points[d-3]

if curr == nil || prev == nil {
continue
}

if *curr < *prev {
continue
}

secs := interval.Seconds()
v := (*curr / secs) - (*prev / secs)
val = &v
break
if curr.Value == nil || prev.Value == nil {
continue
}

if val == nil {
if *curr.Value < *prev.Value {
continue
}

secs := curr.Time.Sub(prev.Time).Seconds()
val := (*curr.Value / secs) - (*prev.Value / secs)

if strings.Contains(metricName, "dropped_records") {
out.droppedEvents = fmtFloat64(*val) + "ev/s"
out.droppedEvents = fmtFloat64(val) + "ev/s"
continue
}

if strings.Contains(metricName, "retried_records") {
out.retriedEvents = fmtFloat64(*val) + "ev/s"
out.retriedEvents = fmtFloat64(val) + "ev/s"
continue
}

if strings.Contains(metricName, "retries_failed") {
out.retriesFailed = fmtFloat64(*val) + "ev/s"
out.retriesFailed = fmtFloat64(val) + "ev/s"
continue
}

if strings.Contains(metricName, "retries") {
out.retries = fmtFloat64(*val) + "ev/s"
out.retries = fmtFloat64(val) + "ev/s"
continue
}

if strings.Contains(metricName, "byte") || strings.Contains(metricName, "size") {
out.size = strings.ToLower(bytefmt.ByteSize(uint64(math.Round(*val)))) + "/s"
out.size = strings.ToLower(bytefmt.ByteSize(uint64(math.Round(val)))) + "/s"
continue
}

if strings.Contains(metricName, "record") {
out.events = fmtFloat64(*val) + "ev/s"
out.events = fmtFloat64(val) + "ev/s"
continue
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/calyptia/top_pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func newCmdTopPipeline(config *config) *cobra.Command {
}

plugin := measurement.Plugins[pluginName]
values := fmtLatestMetrics(plugin.Metrics, interval)
values := fmtLatestMetrics(plugin.Metrics)
var value string
if len(values) == 0 {
value = "No data"
Expand Down
12 changes: 6 additions & 6 deletions cmd/calyptia/top_project.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ func (m *ProjectModel) Update(msg tea.Msg) (*ProjectModel, tea.Cmd) {
for i, a := range m.agents {
item := agentListItem{agent: a}
if metrics, ok := m.agentsMetrics[a.ID]; ok {
item.values = makeAgentMeasurementValues(metrics, m.MetricsInterval)
item.values = makeAgentMeasurementValues(metrics)
}
items[i] = item
}
Expand Down Expand Up @@ -339,7 +339,7 @@ func (m *ProjectModel) View() string {
}

plugin := measurement.Plugins[pluginName]
values := fmtLatestMetrics(plugin.Metrics, m.MetricsInterval)
values := fmtLatestMetrics(plugin.Metrics)
var value string
if len(values) == 0 {
value = "No data"
Expand Down Expand Up @@ -431,11 +431,11 @@ func (m *ProjectModel) viewAgentListHeader() string {
return strings.Join(cells, " ")
}

func makeAgentMeasurementValues(metrics cloud.AgentMetrics, interval time.Duration) agentMeasurementValues {
func makeAgentMeasurementValues(metrics cloud.AgentMetrics) agentMeasurementValues {
var out agentMeasurementValues
for _, measurementName := range agentMeasurementNames(metrics.Measurements) {
measurement := metrics.Measurements[measurementName]
values := fmtLatestMetrics(measurement.Totals, interval)
values := fmtLatestMetrics(measurement.Totals)
if len(values) != 0 {
value := strings.Join(values, ", ")
switch cloud.MeasurementType(measurementName) {
Expand Down Expand Up @@ -598,8 +598,8 @@ func fmtFloat64(f float64) string {
return s
}

func fmtLatestMetrics(metrics map[string][]cloud.MetricFields, interval time.Duration) []string {
got := collectAgentMetricValues(metrics, interval)
func fmtLatestMetrics(metrics map[string][]cloud.MetricFields) []string {
got := collectAgentMetricValues(metrics)
var values []string
if got.size != "" {
values = append(values, fmt.Sprintf("size: %s", got.size))
Expand Down

0 comments on commit 886d9b9

Please sign in to comment.