Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a parallelized bag reader class #1862

Closed
2 tasks
MaxFleur opened this issue Nov 22, 2024 · 7 comments
Closed
2 tasks

Add a parallelized bag reader class #1862

MaxFleur opened this issue Nov 22, 2024 · 7 comments
Labels
enhancement New feature or request more-information-needed Further information is required

Comments

@MaxFleur
Copy link

Description

Currently, I am writing a tool where I can extract and write sensor images stored in a ROS bag. Because some image writing operations are computationally expensive, I want to parallelize the message reading and image writing. Therefore, I need parallelized access to the bag file.
However and afaik, rosbag2_cpp::Reader currently only supports reading a bag file in a sequential way. Thus, reading messages independently is not possible.
Because data is often recorded and stored sequentially by design, it makes perfect sense to use a sequential reader for those cases. However, it might be an idea to add a parallelized reader class to speed up various computations relying on reading bag messages.

Related Issues

None so far.

Completion Criteria

  • Add a new class containing the parallel reader, e.g. parallel_reader.hpp
  • (Optional) Create an example of how to use a parallelized reader instance

Implementation Notes / Suggestions

One way to access a parallel reader would be similar to the sequential variant:

std::shared_ptr<rosbag2_cpp::parallel_reader>> reader = std::make_shared<rosbag2_cpp::parallel_reader>>();
reader->open("/path/to/bag_file");

// Function to access bag file
const auto readParallel = [reader]() mutable {
    while (reader->has_next()) {
        rosbag2_storage::SerializedBagMessageSharedPtr msg = reader->read_next();
        // do stuff with message
    }
};

// Create threads and start parallel access
std::vector<std::thread> threadPool;
for (unsigned int i = 0; i < std::thread::hardware_concurrency(); ++i) {
    threadPool.emplace_back(std::thread(writeFunction));
}

for (auto& t : threadPool) {
    t.join();
}

Another way might be to provide access using a for loop which could then be combined with, for example, OpenMP:

std::shared_ptr<rosbag2_cpp::parallel_reader>> reader = std::make_shared<rosbag2_cpp::parallel_reader>>();
reader->open("/path/to/bag_file");

#pragma omp parallel for
// Also needs additional functions to get a message count and to read at a certain index
for (auto i = 0; i < reader->get_message_count("/topic_name"); ++i) {
    rosbag2_storage::SerializedBagMessageSharedPtr msg = reader->read_at("/topic_name", i);
    // do stuff with msg
}

Testing Notes / Suggestions

None

@MaxFleur MaxFleur added the enhancement New feature or request label Nov 22, 2024
@christophebedard
Copy link
Member

christophebedard commented Nov 22, 2024

It's a bit more tricky than just publishing the message after reading it from the reader. Also, I don't think you could use publishers concurrently at the moment/with the current implementation (specifically looking at the map of publishers), so each thread would need to have exclusive access to a set of topics/publishers.

That being said, feel free to try it out and see if it's possible. I'd recommend waiting until after #1848 is merged, though, since it will change the Player class a bit.

@MichaelOrlov
Copy link
Contributor

@MaxFleur, could you please clarify what purposes or real use cases you want to use a parallel reader for?
It is unclear to me how data read in an unordered fashion could be useful.

Also, what problem are you trying to solve with the parallel reading? Is it a slow readout or an inability to parallel the processing workers due to the sequential readout?

  • If you are trying to solve the latter problem, I would recommend using a buffer or queue to read messages in a similar way as we use it in the Player class. You can have a single writer and multiple readers from the queue. Each reader can run in its own thread. This way you can parallel your data processing with the sequential reader.
  • If the matter of concern is slow readout, which is, by the way, has never been before the solution could be either using the buffering queue again or extending the reader API to read out a batch of messages at once. However, I don't envision a real use case when a non-sequential reader would need to exist.

@MichaelOrlov MichaelOrlov added the more-information-needed Further information is required label Nov 23, 2024
@MaxFleur
Copy link
Author

@MichaelOrlov
The context is that I want to write images from a ROS bagfile to a harddisk. The image writing is computationally expensive, however. Currently, I'm using cv::imwrite, which only utilizes a single thread. Therefore, to speed up the image writing and to utilize multiple threads, I want to parallelize the messages reading so that I can write multiple images at the same time.
So basically, I need to access the bagfile's messages in parallel. But I've taken a look at the Player class, where the messages are stored in a queue. Maybe I could do something similar, e.g. store them in a std:.deque instance and access this queue using different threads.
So I guess if there is no real use-case and there are workarounds for this, we could close this issue. Maybe it is a good idea to at least point out how to access messages apart from a sequential way somewhere in the documentation, though.

@MaxFleur MaxFleur closed this as not planned Won't fix, can't repro, duplicate, stale Nov 25, 2024
@christophebedard
Copy link
Member

Ah! You could build something based on rosbag2_cpp::Reader directly yourself, or use other tools (not in this repo) that allow you to read messages from a bag file and do something with them (e.g., rosbags, or an mcap lib).

@MaxFleur
Copy link
Author

@christophebedard
I found a first solution. My approach ist to store the messages in an instance of std::deque and write images in parallel using this instance.
The writing process is much faster now, but it takes some time storing the messages in the queue instance. Might have to look for some optimization for that, but it's something for the beginning.

@christophebedard
Copy link
Member

You could start writing messages to disk before your std::deque is full (unless you're already doing that). A lock-free single-producer single-consumer queue might be useful for that. The Player class is currently using this implementation (although that is going to change with #1848): https://github.com/cameron314/readerwriterqueue.

@MaxFleur
Copy link
Author

Might be an idea. The main problem is however that it takes quite some time to load all necessary data... did not find a good solution to speed up that process so far. :-\

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request more-information-needed Further information is required
Projects
None yet
Development

No branches or pull requests

3 participants