Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve parallelization of TfIdfVectorizer, Reduce memory consumption #18539

Merged
merged 12 commits into from
Nov 28, 2023
154 changes: 69 additions & 85 deletions onnxruntime/core/providers/cpu/nn/tfidfvectorizer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -141,14 +141,11 @@
Impl(const Impl&) = delete;
Impl& operator=(const Impl&) = delete;

void IncrementCount(size_t ngram_id, size_t row_num,
std::vector<uint32_t>& frequencies) const {
inline size_t OutputIdToIncrement(size_t ngram_id) const {
yuslepukhin marked this conversation as resolved.
Show resolved Hide resolved
assert(ngram_id != 0);
--ngram_id;
assert(ngram_id < ngram_indexes_.size());
size_t output_idx = row_num * output_size_ + SafeInt<size_t>(ngram_indexes_[ngram_id]);
assert(output_idx < frequencies.size());
++frequencies[output_idx];
return SafeInt<size_t>(ngram_indexes_[ngram_id]);
}
};

Expand Down Expand Up @@ -252,77 +249,17 @@

TfIdfVectorizer::~TfIdfVectorizer() = default;

void TfIdfVectorizer::OutputResult(OpKernelContext* ctx, size_t B, const std::vector<uint32_t>& frequences) const {
const Impl& impl = *impl_;
std::vector<int64_t> output_dims;
if (B == 0) {
output_dims.push_back(impl.output_size_);
B = 1; // For use in the loops below
} else {
output_dims.push_back(B);
output_dims.push_back(impl.output_size_);
}

const auto row_size = impl.output_size_;

TensorShape output_shape(output_dims);
assert(frequences.size() == static_cast<size_t>(output_shape.Size()));

auto Y = ctx->Output(0, output_shape);
auto output_data = Y->MutableData<float>();
const auto& w = impl.weights_;
switch (impl.weighting_criteria_) {
case kTF: {
for (auto f : frequences) {
*output_data++ = static_cast<float>(f);
}
} break;
case kIDF: {
if (!w.empty()) {
const auto* freqs = frequences.data();
for (size_t batch = 0; batch < B; ++batch) {
for (size_t i = 0; i < row_size; ++i) {
*output_data++ = (*freqs++ > 0) ? w[i] : 0;
}
}
} else {
for (auto f : frequences) {
*output_data++ = (f > 0) ? 1.0f : 0;
}
}
} break;
case kTFIDF: {
if (!w.empty()) {
const auto* freqs = frequences.data();
for (size_t batch = 0; batch < B; ++batch) {
for (size_t i = 0; i < row_size; ++i) {
*output_data++ = *freqs++ * w[i];
}
}
} else {
for (auto f : frequences) {
*output_data++ = static_cast<float>(f);
}
}
} break;
case kNone: // fall-through
default:
assert(false);
}
}

void TfIdfVectorizer::ComputeImpl(OpKernelContext* ctx, ptrdiff_t row_num, size_t row_size,
std::vector<uint32_t>& frequencies) const {
auto X = ctx->Input<Tensor>(0);
const auto elem_size = X->DataType()->Size();

const void* const row_begin = AdvanceElementPtr(X->DataRaw(), row_num * row_size, elem_size);
void TfIdfVectorizer::ComputeImpl(const void* x_data_raw, size_t elem_size, ptrdiff_t row_num, size_t row_size,
bool is_input_string, gsl::span<float> output_data,
std::function<void(size_t, gsl::span<float>&)>& fn_weight) const {
const void* const row_begin = AdvanceElementPtr(x_data_raw, row_num * row_size, elem_size);
const void* const row_end = AdvanceElementPtr(row_begin, row_size, elem_size);

const auto& impl = *impl_;
const auto max_gram_length = impl.max_gram_length_;
const auto max_skip_distance = impl.max_skip_count_ + 1; // Convert to distance
auto start_ngram_size = impl.min_gram_length_;
size_t output_idx;

for (auto skip_distance = 1; skip_distance <= max_skip_distance; ++skip_distance) {
auto ngram_start = row_begin;
Expand All @@ -336,7 +273,7 @@
}

auto ngram_item = ngram_start;
if (X->IsDataTypeString()) {
if (is_input_string) {
const std::string* str_item = reinterpret_cast<const std::string*>(ngram_item);
const StrMap* str_map = &impl.str_map_;
for (auto ngram_size = 1;
Expand All @@ -349,7 +286,8 @@
break;
}
if (ngram_size >= start_ngram_size && hit->second->id_ != 0) {
impl.IncrementCount(hit->second->id_, row_num, frequencies);
output_idx = impl.OutputIdToIncrement(hit->second->id_);
fn_weight(output_idx, output_data);
}
str_map = &hit->second->leafs_;
}
Expand All @@ -360,13 +298,14 @@
ngram_size <= max_gram_length &&
ngram_item < ngram_row_end;
++ngram_size, ngram_item = AdvanceElementPtr(ngram_item, skip_distance, elem_size)) {
int64_t val = (X->IsDataType<int32_t>()) ? int64_t{*reinterpret_cast<const int32_t*>(ngram_item)} : *reinterpret_cast<const int64_t*>(ngram_item);
int64_t val = (elem_size == 4) ? int64_t{*reinterpret_cast<const int32_t*>(ngram_item)} : *reinterpret_cast<const int64_t*>(ngram_item);

Check warning on line 301 in onnxruntime/core/providers/cpu/nn/tfidfvectorizer.cc

View workflow job for this annotation

GitHub Actions / Lint C++

[cpplint] reported by reviewdog 🐶 Lines should be <= 120 characters long [whitespace/line_length] [2] Raw Output: onnxruntime/core/providers/cpu/nn/tfidfvectorizer.cc:301: Lines should be <= 120 characters long [whitespace/line_length] [2]
xadupre marked this conversation as resolved.
Show resolved Hide resolved
auto hit = int_map->find(val);
if (hit == int_map->end()) {
break;
}
if (ngram_size >= start_ngram_size && hit->second->id_ != 0) {
impl.IncrementCount(hit->second->id_, row_num, frequencies);
output_idx = impl.OutputIdToIncrement(hit->second->id_);
fn_weight(output_idx, output_data);
}
int_map = &hit->second->leafs_;
}
Expand Down Expand Up @@ -412,31 +351,76 @@
}

assert((num_rows * C) == total_items);
// Frequency holder allocate [B..output_size_]
// and init all to zero
std::vector<uint32_t> frequencies;
frequencies.resize(num_rows * impl_->output_size_, 0);
const Impl& impl = *impl_;
TensorShapeVector output_dims;
if (B == 0) {
output_dims.push_back(impl.output_size_);
B = 1; // For use in the loops below
} else {
output_dims.push_back(B);
output_dims.push_back(impl.output_size_);
}
TensorShape output_shape(output_dims);

auto Y = ctx->Output(0, output_shape);
auto output_data = Y->MutableData<float>();
const bool is_input_string = X->IsDataTypeString();

if (total_items == 0 ||
(X->IsDataTypeString() && impl_->str_map_.empty()) ||
(is_input_string && impl_->str_map_.empty()) ||
((X->IsDataType<int32_t>() || X->IsDataType<int64_t>()) && impl_->int64_map_.empty())) {
// TfidfVectorizer may receive an empty input when it follows a Tokenizer
// (for example for a string containing only stopwords).
// TfidfVectorizer returns a zero tensor of shape
// {b_dim, output_size} when b_dim is the number of received observations
// and output_size the is the maximum value in ngram_indexes attribute plus 1.
OutputResult(ctx, B, frequencies);
memset(output_data, 0, static_cast<size_t>(output_shape.Size() * sizeof(float)));
return Status::OK();
}

std::function<void(ptrdiff_t)> fn = [this, ctx, C, &frequencies](ptrdiff_t row_num) {
ComputeImpl(ctx, row_num, C, frequencies);
};
auto x_data_raw = ctx->Input<Tensor>(0)->DataRaw();
const auto elem_size = X->DataType()->Size();
int32_t num_batches = std::min<int32_t>(concurrency::ThreadPool::DegreeOfParallelism(ctx->GetOperatorThreadPool()) * 2, num_rows);

Check warning on line 383 in onnxruntime/core/providers/cpu/nn/tfidfvectorizer.cc

View workflow job for this annotation

GitHub Actions / Lint C++

[cpplint] reported by reviewdog 🐶 Lines should be <= 120 characters long [whitespace/line_length] [2] Raw Output: onnxruntime/core/providers/cpu/nn/tfidfvectorizer.cc:383: Lines should be <= 120 characters long [whitespace/line_length] [2]

Check warning on line 383 in onnxruntime/core/providers/cpu/nn/tfidfvectorizer.cc

View workflow job for this annotation

GitHub Actions / Lint C++

[cpplint] reported by reviewdog 🐶 Add #include <algorithm> for min [build/include_what_you_use] [4] Raw Output: onnxruntime/core/providers/cpu/nn/tfidfvectorizer.cc:383: Add #include <algorithm> for min [build/include_what_you_use] [4]

concurrency::ThreadPool::TryBatchParallelFor(ctx->GetOperatorThreadPool(), num_rows, std::move(fn), 0);
const auto& w = impl.weights_;
std::function<void(size_t, gsl::span<float>&)> fn_weight;

OutputResult(ctx, B, frequencies);
switch (impl.weighting_criteria_) {
case kTF:
fn_weight = [](size_t i, gsl::span<float>& out) { out[i] += 1.0f; };
break;
case kIDF:
if (!w.empty()) {
fn_weight = [&w](size_t i, gsl::span<float>& out) { out[i] = w[i]; };
} else {
fn_weight = [](size_t i, gsl::span<float>& out) { out[i] = 1.0f; };
}
break;
case kTFIDF:
if (!w.empty()) {
fn_weight = [&w](size_t i, gsl::span<float>& out) { out[i] += w[i]; };
} else {
fn_weight = [](size_t i, gsl::span<float>& out) { out[i] += 1.0f; };
}
break;
case kNone: // fall-through
default:
assert(false);
}

std::function<void(ptrdiff_t)> fn = [this, C, output_data, x_data_raw, elem_size,
is_input_string, num_batches, num_rows, &fn_weight](ptrdiff_t batch_num) {
// Frequency holder allocate [B..output_size_] and init all to zero.
auto work = concurrency::ThreadPool::PartitionWork(batch_num, num_batches, static_cast<size_t>(num_rows));
std::vector<uint32_t> frequencies(this->impl_->output_size_);
for (auto row_num = work.start; row_num < work.end; ++row_num) {
auto out = gsl::span<float>(output_data + row_num * this->impl_->output_size_, this->impl_->output_size_);
std::fill(out.begin(), out.end(), 0.0f);
ComputeImpl(x_data_raw, elem_size, row_num, C, is_input_string, out, fn_weight);
}
};

concurrency::ThreadPool::TrySimpleParallelFor(ctx->GetOperatorThreadPool(), num_batches, std::move(fn));

Check warning on line 423 in onnxruntime/core/providers/cpu/nn/tfidfvectorizer.cc

View workflow job for this annotation

GitHub Actions / Lint C++

[cpplint] reported by reviewdog 🐶 Add #include <utility> for move [build/include_what_you_use] [4] Raw Output: onnxruntime/core/providers/cpu/nn/tfidfvectorizer.cc:423: Add #include <utility> for move [build/include_what_you_use] [4]
return Status::OK();
}

Expand Down
7 changes: 2 additions & 5 deletions onnxruntime/core/providers/cpu/nn/tfidfvectorizer.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,8 @@ class TfIdfVectorizer final : public OpKernel {
Status Compute(OpKernelContext* ctx) const override;

private:
void ComputeImpl(OpKernelContext* ctx, ptrdiff_t row_num, size_t row_size,
std::vector<uint32_t>& frequencies) const;

// Apply weighing criteria and output
void OutputResult(OpKernelContext* ctx, size_t b_dim, const std::vector<uint32_t>& frequences) const;
void ComputeImpl(const void* x_data_raw, size_t elem_size, ptrdiff_t row_num, size_t row_size, bool is_input_string,
gsl::span<float> output_data, std::function<void(size_t, gsl::span<float>&)>& fn_weight) const;

struct Impl;
std::unique_ptr<Impl> impl_;
Expand Down
Loading