From 33e6707a24b696f308b8b06cf129c586488860d2 Mon Sep 17 00:00:00 2001 From: itsafuu <89619301+itsafuu@users.noreply.github.com> Date: Wed, 28 Feb 2024 16:52:18 -0500 Subject: [PATCH] Added a data holder in the processor and accessed by CID to get data, just need to add MNN stuff to process and send hash to the swarm --- example/processing_mnn/processing_mnn.hpp | 124 +++++++------------- example/processing_mnn/processing_mnn_p.cpp | 3 +- 2 files changed, 43 insertions(+), 84 deletions(-) diff --git a/example/processing_mnn/processing_mnn.hpp b/example/processing_mnn/processing_mnn.hpp index b0cf0326..92bc8cea 100644 --- a/example/processing_mnn/processing_mnn.hpp +++ b/example/processing_mnn/processing_mnn.hpp @@ -28,6 +28,9 @@ namespace class ImageSplitter { public: + ImageSplitter() { + + } ImageSplitter( const char* filename, int width, @@ -241,6 +244,10 @@ namespace { } + void setImageSplitter(const ImageSplitter& imagesplit) { + imagesplit_ = imagesplit; + } + void ProcessSubTask( const SGProcessing::SubTask& subTask, SGProcessing::SubTaskResult& result, uint32_t initialHashCode) override @@ -256,35 +263,40 @@ namespace throw std::runtime_error("Maximal number of processed subtasks exceeded"); } } + std::cout << "Test CID: " << imagesplit_.GetPartCID(1).toPrettyString("") << std::endl; std::cout << "Process subtask " << subTask.subtaskid() << std::endl; - std::this_thread::sleep_for(std::chrono::milliseconds(m_subTaskProcessingTime)); - result.set_ipfs_results_data_id((boost::format("%s_%s") % "RESULT_IPFS" % subTask.subtaskid()).str()); - - bool isValidationSubTask = (subTask.subtaskid() == "subtask_validation"); - size_t subTaskResultHash = initialHashCode; - for (int chunkIdx = 0; chunkIdx < subTask.chunkstoprocess_size(); ++chunkIdx) - { - const auto& chunk = subTask.chunkstoprocess(chunkIdx); - - // Chunk result hash should be calculated - // Chunk data hash is calculated just as a stub - size_t chunkHash = 0; - if (isValidationSubTask) - { - chunkHash = ((size_t)chunkIdx < m_validationChunkHashes.size()) ? - m_validationChunkHashes[chunkIdx] : std::hash{}(chunk.SerializeAsString()); - } - else - { - chunkHash = ((size_t)chunkIdx < m_chunkResulHashes.size()) ? - m_chunkResulHashes[chunkIdx] : std::hash{}(chunk.SerializeAsString()); - } - - result.add_chunk_hashes(chunkHash); - boost::hash_combine(subTaskResultHash, chunkHash); - } - - result.set_result_hash(subTaskResultHash); + std::cout << "IPFS BLOCK: " << subTask.ipfsblock() << std::endl; + + //auto hash = libp2p::multi::Multihash::create() + auto data = imagesplit_.GetPartByCid(sgns::CID::fromString(subTask.ipfsblock()).value()); + //std::this_thread::sleep_for(std::chrono::milliseconds(m_subTaskProcessingTime)); + //result.set_ipfs_results_data_id((boost::format("%s_%s") % "RESULT_IPFS" % subTask.subtaskid()).str()); + + //bool isValidationSubTask = (subTask.subtaskid() == "subtask_validation"); + //size_t subTaskResultHash = initialHashCode; + //for (int chunkIdx = 0; chunkIdx < subTask.chunkstoprocess_size(); ++chunkIdx) + //{ + // const auto& chunk = subTask.chunkstoprocess(chunkIdx); + + // // Chunk result hash should be calculated + // // Chunk data hash is calculated just as a stub + // size_t chunkHash = 0; + // if (isValidationSubTask) + // { + // chunkHash = ((size_t)chunkIdx < m_validationChunkHashes.size()) ? + // m_validationChunkHashes[chunkIdx] : std::hash{}(chunk.SerializeAsString()); + // } + // else + // { + // chunkHash = ((size_t)chunkIdx < m_chunkResulHashes.size()) ? + // m_chunkResulHashes[chunkIdx] : std::hash{}(chunk.SerializeAsString()); + // } + + // result.add_chunk_hashes(chunkHash); + // boost::hash_combine(subTaskResultHash, chunkHash); + //} + + //result.set_result_hash(subTaskResultHash); } std::vector m_chunkResulHashes; @@ -297,61 +309,7 @@ namespace std::mutex m_subTaskCountMutex; size_t m_processingSubTaskCount; + ImageSplitter imagesplit_; }; - - //class ProcessingTaskQueueImpl : public ProcessingTaskQueue - //{ - //public: - // ProcessingTaskQueueImpl() - // { - // } - - // void EnqueueTask( - // const SGProcessing::Task& task, - // const std::list& subTasks) - // { - // m_tasks.push_back(task); - // m_subTasks.emplace(task.ipfs_block_id(), subTasks); - // } - - // bool GetSubTasks( - // const std::string& taskId, - // std::list& subTasks) - // { - // auto it = m_subTasks.find(taskId); - // if (it != m_subTasks.end()) - // { - // subTasks = it->second; - // return true; - // } - - // return false; - // } - - // bool GrabTask(std::string& taskKey, SGProcessing::Task& task) override - // { - // if (m_tasks.empty()) - // { - // return false; - // } - - - // task = std::move(m_tasks.back()); - // m_tasks.pop_back(); - // taskKey = (boost::format("TASK_%d") % m_tasks.size()).str(); - - // return true; - // }; - - // bool CompleteTask(const std::string& taskKey, const SGProcessing::TaskResult& task) override - // { - // return false; - // } - - //private: - // std::list m_tasks; - // std::map> m_subTasks; - //}; - } diff --git a/example/processing_mnn/processing_mnn_p.cpp b/example/processing_mnn/processing_mnn_p.cpp index 77af3ece..9e99e08b 100644 --- a/example/processing_mnn/processing_mnn_p.cpp +++ b/example/processing_mnn/processing_mnn_p.cpp @@ -94,6 +94,7 @@ int main(int argc, char* argv[]) globalDB2, 1000000, 2); + processingCore2->setImageSplitter(imagesplit); ProcessingServiceImpl processingService( pubs2, @@ -102,7 +103,7 @@ int main(int argc, char* argv[]) std::make_shared(), std::make_shared(globalDB2), processingCore2); - + processingService.SetChannelListRequestTimeout(boost::posix_time::milliseconds(10000)); processingService.StartProcessing(processingGridChannel);