Skip to content

Commit

Permalink
Improve parallelization of TfIdfVectorizer, Reduce memory consumption
Browse files Browse the repository at this point in the history
  • Loading branch information
xadupre committed Nov 21, 2023
1 parent 29a409a commit 6d6c82c
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 51 deletions.
89 changes: 41 additions & 48 deletions onnxruntime/core/providers/cpu/nn/tfidfvectorizer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -141,12 +141,11 @@ struct TfIdfVectorizer::Impl {
Impl(const Impl&) = delete;
Impl& operator=(const Impl&) = delete;

void IncrementCount(size_t ngram_id, size_t row_num,
std::vector<uint32_t>& frequencies) const {
void IncrementCount(size_t ngram_id, std::vector<uint32_t>& frequencies) const {
assert(ngram_id != 0);
--ngram_id;
assert(ngram_id < ngram_indexes_.size());
size_t output_idx = row_num * output_size_ + SafeInt<size_t>(ngram_indexes_[ngram_id]);
size_t output_idx = SafeInt<size_t>(ngram_indexes_[ngram_id]);
assert(output_idx < frequencies.size());
++frequencies[output_idx];
}
Expand Down Expand Up @@ -252,24 +251,10 @@ TfIdfVectorizer::TfIdfVectorizer(const OpKernelInfo& info) : OpKernel(info), imp

TfIdfVectorizer::~TfIdfVectorizer() = default;

void TfIdfVectorizer::OutputResult(OpKernelContext* ctx, size_t B, const std::vector<uint32_t>& frequences) const {
void TfIdfVectorizer::OutputResult(const std::vector<uint32_t>& frequences, float* output_data) const {
const Impl& impl = *impl_;
std::vector<int64_t> output_dims;
if (B == 0) {
output_dims.push_back(impl.output_size_);
B = 1; // For use in the loops below
} else {
output_dims.push_back(B);
output_dims.push_back(impl.output_size_);
}

const auto row_size = impl.output_size_;

TensorShape output_shape(output_dims);
assert(frequences.size() == static_cast<size_t>(output_shape.Size()));

auto Y = ctx->Output(0, output_shape);
auto output_data = Y->MutableData<float>();
const auto& w = impl.weights_;
switch (impl.weighting_criteria_) {
case kTF: {
Expand All @@ -280,10 +265,8 @@ void TfIdfVectorizer::OutputResult(OpKernelContext* ctx, size_t B, const std::ve
case kIDF: {
if (!w.empty()) {
const auto* freqs = frequences.data();
for (size_t batch = 0; batch < B; ++batch) {
for (size_t i = 0; i < row_size; ++i) {
*output_data++ = (*freqs++ > 0) ? w[i] : 0;
}
for (size_t i = 0; i < row_size; ++i) {
*output_data++ = (*freqs++ > 0) ? w[i] : 0;
}
} else {
for (auto f : frequences) {
Expand All @@ -294,10 +277,8 @@ void TfIdfVectorizer::OutputResult(OpKernelContext* ctx, size_t B, const std::ve
case kTFIDF: {
if (!w.empty()) {
const auto* freqs = frequences.data();
for (size_t batch = 0; batch < B; ++batch) {
for (size_t i = 0; i < row_size; ++i) {
*output_data++ = *freqs++ * w[i];
}
for (size_t i = 0; i < row_size; ++i) {
*output_data++ = *freqs++ * w[i];
}
} else {
for (auto f : frequences) {
Expand All @@ -311,12 +292,9 @@ void TfIdfVectorizer::OutputResult(OpKernelContext* ctx, size_t B, const std::ve
}
}

void TfIdfVectorizer::ComputeImpl(OpKernelContext* ctx, ptrdiff_t row_num, size_t row_size,
std::vector<uint32_t>& frequencies) const {
auto X = ctx->Input<Tensor>(0);
const auto elem_size = X->DataType()->Size();

const void* const row_begin = AdvanceElementPtr(X->DataRaw(), row_num * row_size, elem_size);
void TfIdfVectorizer::ComputeImpl(const void* x_data_raw, size_t elem_size, ptrdiff_t row_num, size_t row_size,
std::vector<uint32_t>& frequencies, bool is_input_string) const {
const void* const row_begin = AdvanceElementPtr(x_data_raw, row_num * row_size, elem_size);
const void* const row_end = AdvanceElementPtr(row_begin, row_size, elem_size);

const auto& impl = *impl_;
Expand All @@ -336,7 +314,7 @@ void TfIdfVectorizer::ComputeImpl(OpKernelContext* ctx, ptrdiff_t row_num, size_
}

auto ngram_item = ngram_start;
if (X->IsDataTypeString()) {
if (is_input_string) {
const std::string* str_item = reinterpret_cast<const std::string*>(ngram_item);
const StrMap* str_map = &impl.str_map_;
for (auto ngram_size = 1;
Expand All @@ -349,7 +327,7 @@ void TfIdfVectorizer::ComputeImpl(OpKernelContext* ctx, ptrdiff_t row_num, size_
break;
}
if (ngram_size >= start_ngram_size && hit->second->id_ != 0) {
impl.IncrementCount(hit->second->id_, row_num, frequencies);
impl.IncrementCount(hit->second->id_, frequencies);
}
str_map = &hit->second->leafs_;
}
Expand All @@ -360,13 +338,13 @@ void TfIdfVectorizer::ComputeImpl(OpKernelContext* ctx, ptrdiff_t row_num, size_
ngram_size <= max_gram_length &&
ngram_item < ngram_row_end;
++ngram_size, ngram_item = AdvanceElementPtr(ngram_item, skip_distance, elem_size)) {
int64_t val = (X->IsDataType<int32_t>()) ? int64_t{*reinterpret_cast<const int32_t*>(ngram_item)} : *reinterpret_cast<const int64_t*>(ngram_item);
int64_t val = (elem_size == 4) ? int64_t{*reinterpret_cast<const int32_t*>(ngram_item)} : *reinterpret_cast<const int64_t*>(ngram_item);

Check warning on line 341 in onnxruntime/core/providers/cpu/nn/tfidfvectorizer.cc

View workflow job for this annotation

GitHub Actions / Lint C++

[cpplint] reported by reviewdog 🐶 Lines should be <= 120 characters long [whitespace/line_length] [2] Raw Output: onnxruntime/core/providers/cpu/nn/tfidfvectorizer.cc:341: Lines should be <= 120 characters long [whitespace/line_length] [2]
auto hit = int_map->find(val);
if (hit == int_map->end()) {
break;
}
if (ngram_size >= start_ngram_size && hit->second->id_ != 0) {
impl.IncrementCount(hit->second->id_, row_num, frequencies);
impl.IncrementCount(hit->second->id_, frequencies);
}
int_map = &hit->second->leafs_;
}
Expand Down Expand Up @@ -412,31 +390,46 @@ Status TfIdfVectorizer::Compute(OpKernelContext* ctx) const {
}

assert((num_rows * C) == total_items);
// Frequency holder allocate [B..output_size_]
// and init all to zero
std::vector<uint32_t> frequencies;
frequencies.resize(num_rows * impl_->output_size_, 0);
const Impl& impl = *impl_;
std::vector<int64_t> output_dims;
if (B == 0) {
output_dims.push_back(impl.output_size_);
B = 1; // For use in the loops below
} else {
output_dims.push_back(B);
output_dims.push_back(impl.output_size_);
}
TensorShape output_shape(output_dims);

auto Y = ctx->Output(0, output_shape);
auto output_data = Y->MutableData<float>();
bool is_input_string = X->IsDataTypeString();

if (total_items == 0 ||
(X->IsDataTypeString() && impl_->str_map_.empty()) ||
(is_input_string && impl_->str_map_.empty()) ||
((X->IsDataType<int32_t>() || X->IsDataType<int64_t>()) && impl_->int64_map_.empty())) {
// TfidfVectorizer may receive an empty input when it follows a Tokenizer
// (for example for a string containing only stopwords).
// TfidfVectorizer returns a zero tensor of shape
// {b_dim, output_size} when b_dim is the number of received observations
// and output_size the is the maximum value in ngram_indexes attribute plus 1.
OutputResult(ctx, B, frequencies);
memset(output_data, 0, impl.output_size_ * B * sizeof(float));
return Status::OK();
}

std::function<void(ptrdiff_t)> fn = [this, ctx, C, &frequencies](ptrdiff_t row_num) {
ComputeImpl(ctx, row_num, C, frequencies);
};

concurrency::ThreadPool::TryBatchParallelFor(ctx->GetOperatorThreadPool(), num_rows, std::move(fn), 0);
auto x_data_raw = ctx->Input<Tensor>(0)->DataRaw();
const auto elem_size = X->DataType()->Size();

OutputResult(ctx, B, frequencies);
std::function<void(ptrdiff_t)> fn = [this, ctx, C, output_data, x_data_raw, elem_size, is_input_string](ptrdiff_t row_num) {

Check warning on line 423 in onnxruntime/core/providers/cpu/nn/tfidfvectorizer.cc

View workflow job for this annotation

GitHub Actions / Lint C++

[cpplint] reported by reviewdog 🐶 Lines should be <= 120 characters long [whitespace/line_length] [2] Raw Output: onnxruntime/core/providers/cpu/nn/tfidfvectorizer.cc:423: Lines should be <= 120 characters long [whitespace/line_length] [2]
// Frequency holder allocate [B..output_size_] and init all to zero.
std::vector<uint32_t> frequencies;
frequencies.resize(this->impl_->output_size_, 0);
ComputeImpl(x_data_raw, elem_size, row_num, C, frequencies, is_input_string);
OutputResult(frequencies, output_data + row_num * this->impl_->output_size_);
};

// concurrency::ThreadPool::TryBatchParallelFor(ctx->GetOperatorThreadPool(), num_rows, std::move(fn), 0);
for (size_t i = 0; i < num_rows; ++i) fn(i);
return Status::OK();
}

Expand Down
6 changes: 3 additions & 3 deletions onnxruntime/core/providers/cpu/nn/tfidfvectorizer.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ class TfIdfVectorizer final : public OpKernel {
Status Compute(OpKernelContext* ctx) const override;

private:
void ComputeImpl(OpKernelContext* ctx, ptrdiff_t row_num, size_t row_size,
std::vector<uint32_t>& frequencies) const;
void ComputeImpl(const void* x_data_raw, size_t elem_size, ptrdiff_t row_num, size_t row_size,
std::vector<uint32_t>& frequencies, bool is_input_string) const;

// Apply weighing criteria and output
void OutputResult(OpKernelContext* ctx, size_t b_dim, const std::vector<uint32_t>& frequences) const;
void OutputResult(const std::vector<uint32_t>& frequences, float* output_data) const;

struct Impl;
std::unique_ptr<Impl> impl_;
Expand Down

0 comments on commit 6d6c82c

Please sign in to comment.