Skip to content

Commit

Permalink
Add tracing for data streams.
Browse files Browse the repository at this point in the history
  • Loading branch information
ehpor committed May 22, 2024
1 parent 901b173 commit 9e0512d
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 2 deletions.
16 changes: 16 additions & 0 deletions catkit_core/DataStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
//#include "Log.h"
#include "Timing.h"
#include "Util.h"
#include "Tracing.h"

#include <algorithm>
#include <iostream>
Expand Down Expand Up @@ -145,6 +146,8 @@ std::shared_ptr<DataStream> DataStream::Open(const std::string &stream_id)

DataFrame DataStream::RequestNewFrame()
{
auto start = GetTimeStamp();

// If the frame buffer is full: make oldest frame unavailable.
if ((m_Header->m_LastId - m_Header->m_FirstId) == m_Header->m_NumFramesInBuffer)
m_Header->m_FirstId++;
Expand All @@ -160,11 +163,16 @@ DataFrame DataStream::RequestNewFrame()

frame.Set(m_Header->m_DataType, m_Header->m_NumDimensions, m_Header->m_Dimensions, m_Buffer + offset, false);

auto end = GetTimeStamp();
tracing_proxy.TraceInterval("DataStream::RequestNewFrame", GetStreamName(), start, end - start);

return frame;
}

void DataStream::SubmitFrame(size_t id)
{
auto start = GetTimeStamp();

// Save timing information to frame metadata.
DataFrameMetadata *meta = m_Header->m_FrameMetadata + (id % m_Header->m_NumFramesInBuffer);
meta->m_TimeStamp = GetTimeStamp();
Expand Down Expand Up @@ -203,16 +211,24 @@ void DataStream::SubmitFrame(size_t id)
m_Header->m_FrameRateCounter =
m_Header->m_FrameRateCounter * std::exp(-FRAMERATE_DECAY * time_delta)
+ FRAMERATE_DECAY;

auto end = GetTimeStamp();
tracing_proxy.TraceInterval("DataStream::SubmitFrame", GetStreamName(), start, end - start);
}

void DataStream::SubmitData(const void *data)
{
auto start = GetTimeStamp();

DataFrame frame = RequestNewFrame();

char *source = (char *) data;
std::copy(source, source + frame.GetSizeInBytes(), frame.m_Data);

SubmitFrame(frame.m_Id);

auto end = GetTimeStamp();
tracing_proxy.TraceInterval("DataStream::SubmitData", GetStreamName(), start, end - start);
}

std::vector<size_t> DataStream::GetDimensions()
Expand Down
3 changes: 3 additions & 0 deletions catkit_core/Service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "Finally.h"
#include "Timing.h"
#include "TestbedProxy.h"
#include "Tracing.h"
#include "proto/service.pb.h"

#include <chrono>
Expand Down Expand Up @@ -31,6 +32,8 @@ Service::Service(string service_type, string service_id, int service_port, int t

m_Heartbeat = DataStream::Create("heartbeat", service_id, DataType::DT_UINT64, {1}, 20);

tracing_proxy.Connect(service_id, "127.0.0.1", testbed_port + 3);

string state_stream_id = m_Testbed->RegisterService(
service_id,
service_type,
Expand Down
5 changes: 3 additions & 2 deletions tests/tracing_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ def main():
time.sleep(0.1)

with writer.open(f'trace_{get_timestamp()}.json'):
for i in range(100):
'''for i in range(100):
with trace_interval('a'):
for j in range(random.randint(1, 4)):
with trace_interval('b'):
Expand All @@ -22,7 +22,8 @@ def main():
if random.randint(0, 1):
trace_instant('blank')
trace_counter('adsf', 'iteration', i)
trace_counter('adsf', 'iteration', i)'''
time.sleep(30)

time.sleep(0.1)

Expand Down

0 comments on commit 9e0512d

Please sign in to comment.