Skip to content

Commit

Permalink
Improve parallelization of TfIdfVectorizer, Reduce memory consumption (
Browse files Browse the repository at this point in the history
…#18539)

### Description

TfIdfVectorizer has two steps: first search for n-grams in the input,
second, weight the results. The second step was not parallelized. The PR
adresses that issue. Before two vectors were of the size of the output
were allocated to compute the results. The first one, frequencies, was
used as an intermediate vector between the two steps. This vector is now
broken into multiple small vectors, one per thread. The memory
consumption is then reduced for batches with a number of rows > the
number of threads.

### Motivation and Context
Performance and memory consumption.

For one model, the improvment is +15% faster (4 cores, model size is
~6Mb, batch size is 100). Here is another benchmark on
a machine with 32 cores with different size of vocabularies and batch
sizes. The tested TfIdfVectorizer only deals with unigram and processes
sequences of 10 tokens (integers).


![image](https://github.com/microsoft/onnxruntime/assets/22452781/0bb9abe9-ed81-44da-b5c4-ad2a12f129bd)
  • Loading branch information
xadupre authored Nov 28, 2023
1 parent 3f42fba commit 94a6020
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 90 deletions.
154 changes: 69 additions & 85 deletions onnxruntime/core/providers/cpu/nn/tfidfvectorizer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -141,14 +141,11 @@ struct TfIdfVectorizer::Impl {
Impl(const Impl&) = delete;
Impl& operator=(const Impl&) = delete;

void IncrementCount(size_t ngram_id, size_t row_num,
std::vector<uint32_t>& frequencies) const {
inline size_t OutputIdToIncrement(size_t ngram_id) const {
assert(ngram_id != 0);
--ngram_id;
assert(ngram_id < ngram_indexes_.size());
size_t output_idx = row_num * output_size_ + SafeInt<size_t>(ngram_indexes_[ngram_id]);
assert(output_idx < frequencies.size());
++frequencies[output_idx];
return SafeInt<size_t>(ngram_indexes_[ngram_id]);
}
};

Expand Down Expand Up @@ -252,77 +249,17 @@ TfIdfVectorizer::TfIdfVectorizer(const OpKernelInfo& info) : OpKernel(info), imp

TfIdfVectorizer::~TfIdfVectorizer() = default;

void TfIdfVectorizer::OutputResult(OpKernelContext* ctx, size_t B, const std::vector<uint32_t>& frequences) const {
const Impl& impl = *impl_;
std::vector<int64_t> output_dims;
if (B == 0) {
output_dims.push_back(impl.output_size_);
B = 1; // For use in the loops below
} else {
output_dims.push_back(B);
output_dims.push_back(impl.output_size_);
}

const auto row_size = impl.output_size_;

TensorShape output_shape(output_dims);
assert(frequences.size() == static_cast<size_t>(output_shape.Size()));

auto Y = ctx->Output(0, output_shape);
auto output_data = Y->MutableData<float>();
const auto& w = impl.weights_;
switch (impl.weighting_criteria_) {
case kTF: {
for (auto f : frequences) {
*output_data++ = static_cast<float>(f);
}
} break;
case kIDF: {
if (!w.empty()) {
const auto* freqs = frequences.data();
for (size_t batch = 0; batch < B; ++batch) {
for (size_t i = 0; i < row_size; ++i) {
*output_data++ = (*freqs++ > 0) ? w[i] : 0;
}
}
} else {
for (auto f : frequences) {
*output_data++ = (f > 0) ? 1.0f : 0;
}
}
} break;
case kTFIDF: {
if (!w.empty()) {
const auto* freqs = frequences.data();
for (size_t batch = 0; batch < B; ++batch) {
for (size_t i = 0; i < row_size; ++i) {
*output_data++ = *freqs++ * w[i];
}
}
} else {
for (auto f : frequences) {
*output_data++ = static_cast<float>(f);
}
}
} break;
case kNone: // fall-through
default:
assert(false);
}
}

void TfIdfVectorizer::ComputeImpl(OpKernelContext* ctx, ptrdiff_t row_num, size_t row_size,
std::vector<uint32_t>& frequencies) const {
auto X = ctx->Input<Tensor>(0);
const auto elem_size = X->DataType()->Size();

const void* const row_begin = AdvanceElementPtr(X->DataRaw(), row_num * row_size, elem_size);
void TfIdfVectorizer::ComputeImpl(const void* x_data_raw, size_t elem_size, ptrdiff_t row_num, size_t row_size,
bool is_input_string, gsl::span<float> output_data,
std::function<void(size_t, gsl::span<float>&)>& fn_weight) const {
const void* const row_begin = AdvanceElementPtr(x_data_raw, row_num * row_size, elem_size);
const void* const row_end = AdvanceElementPtr(row_begin, row_size, elem_size);

const auto& impl = *impl_;
const auto max_gram_length = impl.max_gram_length_;
const auto max_skip_distance = impl.max_skip_count_ + 1; // Convert to distance
auto start_ngram_size = impl.min_gram_length_;
size_t output_idx;

for (auto skip_distance = 1; skip_distance <= max_skip_distance; ++skip_distance) {
auto ngram_start = row_begin;
Expand All @@ -336,7 +273,7 @@ void TfIdfVectorizer::ComputeImpl(OpKernelContext* ctx, ptrdiff_t row_num, size_
}

auto ngram_item = ngram_start;
if (X->IsDataTypeString()) {
if (is_input_string) {
const std::string* str_item = reinterpret_cast<const std::string*>(ngram_item);
const StrMap* str_map = &impl.str_map_;
for (auto ngram_size = 1;
Expand All @@ -349,7 +286,8 @@ void TfIdfVectorizer::ComputeImpl(OpKernelContext* ctx, ptrdiff_t row_num, size_
break;
}
if (ngram_size >= start_ngram_size && hit->second->id_ != 0) {
impl.IncrementCount(hit->second->id_, row_num, frequencies);
output_idx = impl.OutputIdToIncrement(hit->second->id_);
fn_weight(output_idx, output_data);
}
str_map = &hit->second->leafs_;
}
Expand All @@ -360,13 +298,14 @@ void TfIdfVectorizer::ComputeImpl(OpKernelContext* ctx, ptrdiff_t row_num, size_
ngram_size <= max_gram_length &&
ngram_item < ngram_row_end;
++ngram_size, ngram_item = AdvanceElementPtr(ngram_item, skip_distance, elem_size)) {
int64_t val = (X->IsDataType<int32_t>()) ? int64_t{*reinterpret_cast<const int32_t*>(ngram_item)} : *reinterpret_cast<const int64_t*>(ngram_item);
int64_t val = (elem_size == 4) ? int64_t{*reinterpret_cast<const int32_t*>(ngram_item)} : *reinterpret_cast<const int64_t*>(ngram_item);
auto hit = int_map->find(val);
if (hit == int_map->end()) {
break;
}
if (ngram_size >= start_ngram_size && hit->second->id_ != 0) {
impl.IncrementCount(hit->second->id_, row_num, frequencies);
output_idx = impl.OutputIdToIncrement(hit->second->id_);
fn_weight(output_idx, output_data);
}
int_map = &hit->second->leafs_;
}
Expand Down Expand Up @@ -412,31 +351,76 @@ Status TfIdfVectorizer::Compute(OpKernelContext* ctx) const {
}

assert((num_rows * C) == total_items);
// Frequency holder allocate [B..output_size_]
// and init all to zero
std::vector<uint32_t> frequencies;
frequencies.resize(num_rows * impl_->output_size_, 0);
const Impl& impl = *impl_;
TensorShapeVector output_dims;
if (B == 0) {
output_dims.push_back(impl.output_size_);
B = 1; // For use in the loops below
} else {
output_dims.push_back(B);
output_dims.push_back(impl.output_size_);
}
TensorShape output_shape(output_dims);

auto Y = ctx->Output(0, output_shape);
auto output_data = Y->MutableData<float>();
const bool is_input_string = X->IsDataTypeString();

if (total_items == 0 ||
(X->IsDataTypeString() && impl_->str_map_.empty()) ||
(is_input_string && impl_->str_map_.empty()) ||
((X->IsDataType<int32_t>() || X->IsDataType<int64_t>()) && impl_->int64_map_.empty())) {
// TfidfVectorizer may receive an empty input when it follows a Tokenizer
// (for example for a string containing only stopwords).
// TfidfVectorizer returns a zero tensor of shape
// {b_dim, output_size} when b_dim is the number of received observations
// and output_size the is the maximum value in ngram_indexes attribute plus 1.
OutputResult(ctx, B, frequencies);
memset(output_data, 0, static_cast<size_t>(output_shape.Size() * sizeof(float)));
return Status::OK();
}

std::function<void(ptrdiff_t)> fn = [this, ctx, C, &frequencies](ptrdiff_t row_num) {
ComputeImpl(ctx, row_num, C, frequencies);
};
auto x_data_raw = ctx->Input<Tensor>(0)->DataRaw();
const auto elem_size = X->DataType()->Size();
int32_t num_batches = std::min<int32_t>(concurrency::ThreadPool::DegreeOfParallelism(ctx->GetOperatorThreadPool()) * 2, num_rows);

concurrency::ThreadPool::TryBatchParallelFor(ctx->GetOperatorThreadPool(), num_rows, std::move(fn), 0);
const auto& w = impl.weights_;
std::function<void(size_t, gsl::span<float>&)> fn_weight;

OutputResult(ctx, B, frequencies);
switch (impl.weighting_criteria_) {
case kTF:
fn_weight = [](size_t i, gsl::span<float>& out) { out[i] += 1.0f; };
break;
case kIDF:
if (!w.empty()) {
fn_weight = [&w](size_t i, gsl::span<float>& out) { out[i] = w[i]; };
} else {
fn_weight = [](size_t i, gsl::span<float>& out) { out[i] = 1.0f; };
}
break;
case kTFIDF:
if (!w.empty()) {
fn_weight = [&w](size_t i, gsl::span<float>& out) { out[i] += w[i]; };
} else {
fn_weight = [](size_t i, gsl::span<float>& out) { out[i] += 1.0f; };
}
break;
case kNone: // fall-through
default:
assert(false);
}

std::function<void(ptrdiff_t)> fn = [this, C, output_data, x_data_raw, elem_size,
is_input_string, num_batches, num_rows, &fn_weight](ptrdiff_t batch_num) {
// Frequency holder allocate [B..output_size_] and init all to zero.
auto work = concurrency::ThreadPool::PartitionWork(batch_num, num_batches, static_cast<size_t>(num_rows));
std::vector<uint32_t> frequencies(this->impl_->output_size_);
for (auto row_num = work.start; row_num < work.end; ++row_num) {
auto out = gsl::span<float>(output_data + row_num * this->impl_->output_size_, this->impl_->output_size_);
std::fill(out.begin(), out.end(), 0.0f);
ComputeImpl(x_data_raw, elem_size, row_num, C, is_input_string, out, fn_weight);
}
};

concurrency::ThreadPool::TrySimpleParallelFor(ctx->GetOperatorThreadPool(), num_batches, std::move(fn));
return Status::OK();
}

Expand Down
7 changes: 2 additions & 5 deletions onnxruntime/core/providers/cpu/nn/tfidfvectorizer.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,8 @@ class TfIdfVectorizer final : public OpKernel {
Status Compute(OpKernelContext* ctx) const override;

private:
void ComputeImpl(OpKernelContext* ctx, ptrdiff_t row_num, size_t row_size,
std::vector<uint32_t>& frequencies) const;

// Apply weighing criteria and output
void OutputResult(OpKernelContext* ctx, size_t b_dim, const std::vector<uint32_t>& frequences) const;
void ComputeImpl(const void* x_data_raw, size_t elem_size, ptrdiff_t row_num, size_t row_size, bool is_input_string,
gsl::span<float> output_data, std::function<void(size_t, gsl::span<float>&)>& fn_weight) const;

struct Impl;
std::unique_ptr<Impl> impl_;
Expand Down

0 comments on commit 94a6020

Please sign in to comment.