Skip to content

Commit

Permalink
Added a data holder in the processor and accessed by CID to get data,…
Browse files Browse the repository at this point in the history
… just need to add MNN stuff to process and send hash to the swarm
  • Loading branch information
itsafuu committed Feb 28, 2024
1 parent 22ee006 commit 33e6707
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 84 deletions.
124 changes: 41 additions & 83 deletions example/processing_mnn/processing_mnn.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ namespace
class ImageSplitter
{
public:
ImageSplitter() {

}
ImageSplitter(
const char* filename,
int width,
Expand Down Expand Up @@ -241,6 +244,10 @@ namespace
{
}

void setImageSplitter(const ImageSplitter& imagesplit) {
imagesplit_ = imagesplit;
}

void ProcessSubTask(
const SGProcessing::SubTask& subTask, SGProcessing::SubTaskResult& result,
uint32_t initialHashCode) override
Expand All @@ -256,35 +263,40 @@ namespace
throw std::runtime_error("Maximal number of processed subtasks exceeded");
}
}
std::cout << "Test CID: " << imagesplit_.GetPartCID(1).toPrettyString("") << std::endl;
std::cout << "Process subtask " << subTask.subtaskid() << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(m_subTaskProcessingTime));
result.set_ipfs_results_data_id((boost::format("%s_%s") % "RESULT_IPFS" % subTask.subtaskid()).str());

bool isValidationSubTask = (subTask.subtaskid() == "subtask_validation");
size_t subTaskResultHash = initialHashCode;
for (int chunkIdx = 0; chunkIdx < subTask.chunkstoprocess_size(); ++chunkIdx)
{
const auto& chunk = subTask.chunkstoprocess(chunkIdx);

// Chunk result hash should be calculated
// Chunk data hash is calculated just as a stub
size_t chunkHash = 0;
if (isValidationSubTask)
{
chunkHash = ((size_t)chunkIdx < m_validationChunkHashes.size()) ?
m_validationChunkHashes[chunkIdx] : std::hash<std::string>{}(chunk.SerializeAsString());
}
else
{
chunkHash = ((size_t)chunkIdx < m_chunkResulHashes.size()) ?
m_chunkResulHashes[chunkIdx] : std::hash<std::string>{}(chunk.SerializeAsString());
}

result.add_chunk_hashes(chunkHash);
boost::hash_combine(subTaskResultHash, chunkHash);
}

result.set_result_hash(subTaskResultHash);
std::cout << "IPFS BLOCK: " << subTask.ipfsblock() << std::endl;

//auto hash = libp2p::multi::Multihash::create()
auto data = imagesplit_.GetPartByCid(sgns::CID::fromString(subTask.ipfsblock()).value());
//std::this_thread::sleep_for(std::chrono::milliseconds(m_subTaskProcessingTime));
//result.set_ipfs_results_data_id((boost::format("%s_%s") % "RESULT_IPFS" % subTask.subtaskid()).str());

//bool isValidationSubTask = (subTask.subtaskid() == "subtask_validation");
//size_t subTaskResultHash = initialHashCode;
//for (int chunkIdx = 0; chunkIdx < subTask.chunkstoprocess_size(); ++chunkIdx)
//{
// const auto& chunk = subTask.chunkstoprocess(chunkIdx);

// // Chunk result hash should be calculated
// // Chunk data hash is calculated just as a stub
// size_t chunkHash = 0;
// if (isValidationSubTask)
// {
// chunkHash = ((size_t)chunkIdx < m_validationChunkHashes.size()) ?
// m_validationChunkHashes[chunkIdx] : std::hash<std::string>{}(chunk.SerializeAsString());
// }
// else
// {
// chunkHash = ((size_t)chunkIdx < m_chunkResulHashes.size()) ?
// m_chunkResulHashes[chunkIdx] : std::hash<std::string>{}(chunk.SerializeAsString());
// }

// result.add_chunk_hashes(chunkHash);
// boost::hash_combine(subTaskResultHash, chunkHash);
//}

//result.set_result_hash(subTaskResultHash);
}

std::vector<size_t> m_chunkResulHashes;
Expand All @@ -297,61 +309,7 @@ namespace

std::mutex m_subTaskCountMutex;
size_t m_processingSubTaskCount;
ImageSplitter imagesplit_;
};


//class ProcessingTaskQueueImpl : public ProcessingTaskQueue
//{
//public:
// ProcessingTaskQueueImpl()
// {
// }

// void EnqueueTask(
// const SGProcessing::Task& task,
// const std::list<SGProcessing::SubTask>& subTasks)
// {
// m_tasks.push_back(task);
// m_subTasks.emplace(task.ipfs_block_id(), subTasks);
// }

// bool GetSubTasks(
// const std::string& taskId,
// std::list<SGProcessing::SubTask>& subTasks)
// {
// auto it = m_subTasks.find(taskId);
// if (it != m_subTasks.end())
// {
// subTasks = it->second;
// return true;
// }

// return false;
// }

// bool GrabTask(std::string& taskKey, SGProcessing::Task& task) override
// {
// if (m_tasks.empty())
// {
// return false;
// }


// task = std::move(m_tasks.back());
// m_tasks.pop_back();
// taskKey = (boost::format("TASK_%d") % m_tasks.size()).str();

// return true;
// };

// bool CompleteTask(const std::string& taskKey, const SGProcessing::TaskResult& task) override
// {
// return false;
// }

//private:
// std::list<SGProcessing::Task> m_tasks;
// std::map<std::string, std::list<SGProcessing::SubTask>> m_subTasks;
//};

}
3 changes: 2 additions & 1 deletion example/processing_mnn/processing_mnn_p.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ int main(int argc, char* argv[])
globalDB2,
1000000,
2);
processingCore2->setImageSplitter(imagesplit);

ProcessingServiceImpl processingService(
pubs2,
Expand All @@ -102,7 +103,7 @@ int main(int argc, char* argv[])
std::make_shared<SubTaskStateStorageImpl>(),
std::make_shared<SubTaskResultStorageImpl>(globalDB2),
processingCore2);

processingService.SetChannelListRequestTimeout(boost::posix_time::milliseconds(10000));

processingService.StartProcessing(processingGridChannel);
Expand Down

0 comments on commit 33e6707

Please sign in to comment.