Skip to content

Commit

Permalink
Optimize dictionary memoization in favor of memory (facebookincubator…
Browse files Browse the repository at this point in the history
…#7226)

Summary:
Pull Request resolved: facebookincubator#7226

We recently noticed that this optimization in expression evaluation
eagerly holds onto the base of the dictionary vector and the cached
results in anticipation that the next input vector might have the
same base. This additional memory can be held up unnecessarily in
cases where the input cannot take advantage of it. Therefore, with
this change we ensure that we only cache results if we encounter
the same base at least twice. Additionally, a strong reference to
the base is only held once results are cached. This is to ensure
that the vector held is not modified and re-used in-place.

Reviewed By: kagamiori

Differential Revision: D50622335

fbshipit-source-id: 684b94c6945585027806fe317c15fb1439dd7c9b
  • Loading branch information
Bikramjeet Vig authored and facebook-github-bot committed Nov 7, 2023
1 parent 26a0d2a commit 5ff9cee
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 80 deletions.
2 changes: 1 addition & 1 deletion velox/exec/tests/HashJoinTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3802,7 +3802,7 @@ TEST_F(HashJoinTest, memory) {
params.queryCtx = std::make_shared<core::QueryCtx>(driverExecutor_.get());
auto [taskCursor, rows] = readCursor(params, [](Task*) {});
EXPECT_GT(3'500, params.queryCtx->pool()->stats().numAllocs);
EXPECT_GT(7'500'000, params.queryCtx->pool()->stats().cumulativeBytes);
EXPECT_GT(18'000'000, params.queryCtx->pool()->stats().cumulativeBytes);
}

TEST_F(HashJoinTest, lazyVectors) {
Expand Down
139 changes: 77 additions & 62 deletions velox/expression/Expr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1142,81 +1142,96 @@ void Expr::evalWithNulls(
evalAll(rows, context, result);
}

// Optimization that attempts to cache results for inputs that are dictionary
// encoded and use the same base vector between subsequent input batches. Since
// this hold onto a reference to the base vector and the cached results, it can
// be memory intensive. Therefore in order to reduce this consumption and ensure
// it is only employed for cases where it can be useful, it only starts caching
// result after it encounters the same base at least twice.
void Expr::evalWithMemo(
const SelectivityVector& rows,
EvalCtx& context,
VectorPtr& result) {
VectorPtr base;
distinctFields_[0]->evalSpecialForm(rows, context, base);
++numCachableInput_;
if (baseDictionary_ == base) {
++numCacheableRepeats_;
if (cachedDictionaryIndices_) {
LocalSelectivityVector cachedHolder(context, rows);
auto cached = cachedHolder.get();
VELOX_DCHECK(cached != nullptr);
cached->intersect(*cachedDictionaryIndices_);
if (cached->hasSelections()) {
context.ensureWritable(rows, type(), result);
result->copy(dictionaryCache_.get(), *cached, nullptr);
}
}
LocalSelectivityVector uncachedHolder(context, rows);
auto uncached = uncachedHolder.get();
VELOX_DCHECK(uncached != nullptr);
if (cachedDictionaryIndices_) {
uncached->deselect(*cachedDictionaryIndices_);
}
if (uncached->hasSelections()) {
// Fix finalSelection at "rows" if uncached rows is a strict subset to
// avoid losing values not in uncached rows that were copied earlier into
// "result" from the cached rows.
ScopedFinalSelectionSetter scopedFinalSelectionSetter(
context, &rows, uncached->countSelected() < rows.countSelected());

evalWithNulls(*uncached, context, result);
context.deselectErrors(*uncached);
context.exprSet()->addToMemo(this);
auto newCacheSize = uncached->end();

// dictionaryCache_ is valid only for cachedDictionaryIndices_. Hence, a
// safe call to BaseVector::ensureWritable must include all the rows not
// covered by cachedDictionaryIndices_. If BaseVector::ensureWritable is
// called only for a subset of rows not covered by
// cachedDictionaryIndices_, it will attempt to copy rows that are not
// valid leading to a crash.
LocalSelectivityVector allUncached(context, dictionaryCache_->size());
allUncached.get()->setAll();
allUncached.get()->deselect(*cachedDictionaryIndices_);
context.ensureWritable(*allUncached.get(), type(), dictionaryCache_);

if (cachedDictionaryIndices_->size() < newCacheSize) {
cachedDictionaryIndices_->resize(newCacheSize, false);
}

cachedDictionaryIndices_->select(*uncached);
if (base.get() != baseOfDictionaryRawPtr_ ||
baseOfDictionaryWeakPtr_.expired()) {
baseOfDictionaryRepeats_ = 0;
baseOfDictionaryWeakPtr_ = base;
baseOfDictionaryRawPtr_ = base.get();
context.releaseVector(baseOfDictionary_);
context.releaseVector(dictionaryCache_);
evalWithNulls(rows, context, result);
return;
}
++baseOfDictionaryRepeats_;

// Resize the dictionaryCache_ to accommodate all the necessary rows.
if (dictionaryCache_->size() < uncached->end()) {
dictionaryCache_->resize(uncached->end());
}
dictionaryCache_->copy(result.get(), *uncached, nullptr);
if (baseOfDictionaryRepeats_ == 1) {
evalWithNulls(rows, context, result);
baseOfDictionary_ = base;
dictionaryCache_ = result;
if (!cachedDictionaryIndices_) {
cachedDictionaryIndices_ =
context.execCtx()->getSelectivityVector(rows.end());
}
context.releaseVector(base);
*cachedDictionaryIndices_ = rows;
context.deselectErrors(*cachedDictionaryIndices_);
return;
}
context.releaseVector(baseDictionary_);
baseDictionary_ = base;
evalWithNulls(rows, context, result);

context.releaseVector(dictionaryCache_);
dictionaryCache_ = result;
if (!cachedDictionaryIndices_) {
cachedDictionaryIndices_ =
context.execCtx()->getSelectivityVector(rows.end());
if (cachedDictionaryIndices_) {
LocalSelectivityVector cachedHolder(context, rows);
auto cached = cachedHolder.get();
VELOX_DCHECK(cached != nullptr);
cached->intersect(*cachedDictionaryIndices_);
if (cached->hasSelections()) {
context.ensureWritable(rows, type(), result);
result->copy(dictionaryCache_.get(), *cached, nullptr);
}
}
LocalSelectivityVector uncachedHolder(context, rows);
auto uncached = uncachedHolder.get();
VELOX_DCHECK(uncached != nullptr);
if (cachedDictionaryIndices_) {
uncached->deselect(*cachedDictionaryIndices_);
}
if (uncached->hasSelections()) {
// Fix finalSelection at "rows" if uncached rows is a strict subset to
// avoid losing values not in uncached rows that were copied earlier into
// "result" from the cached rows.
ScopedFinalSelectionSetter scopedFinalSelectionSetter(
context, &rows, uncached->countSelected() < rows.countSelected());

evalWithNulls(*uncached, context, result);
context.deselectErrors(*uncached);
context.exprSet()->addToMemo(this);
auto newCacheSize = uncached->end();

// dictionaryCache_ is valid only for cachedDictionaryIndices_. Hence, a
// safe call to BaseVector::ensureWritable must include all the rows not
// covered by cachedDictionaryIndices_. If BaseVector::ensureWritable is
// called only for a subset of rows not covered by
// cachedDictionaryIndices_, it will attempt to copy rows that are not
// valid leading to a crash.
LocalSelectivityVector allUncached(context, dictionaryCache_->size());
allUncached.get()->setAll();
allUncached.get()->deselect(*cachedDictionaryIndices_);
context.ensureWritable(*allUncached.get(), type(), dictionaryCache_);

if (cachedDictionaryIndices_->size() < newCacheSize) {
cachedDictionaryIndices_->resize(newCacheSize, false);
}

cachedDictionaryIndices_->select(*uncached);

// Resize the dictionaryCache_ to accommodate all the necessary rows.
if (dictionaryCache_->size() < uncached->end()) {
dictionaryCache_->resize(uncached->end());
}
dictionaryCache_->copy(result.get(), *uncached, nullptr);
}
*cachedDictionaryIndices_ = rows;
context.deselectErrors(*cachedDictionaryIndices_);
context.releaseVector(base);
}

void Expr::setAllNulls(
Expand Down
30 changes: 20 additions & 10 deletions velox/expression/Expr.h
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,10 @@ class Expr {
}

void clearMemo() {
baseDictionary_ = nullptr;
baseOfDictionaryRepeats_ = 0;
baseOfDictionary_.reset();
baseOfDictionaryWeakPtr_.reset();
baseOfDictionaryRawPtr_ = nullptr;
dictionaryCache_ = nullptr;
cachedDictionaryIndices_ = nullptr;
}
Expand Down Expand Up @@ -597,22 +600,29 @@ class Expr {
// evaluateSharedSubexpr() is called to the cached shared results.
std::map<std::vector<const BaseVector*>, SharedResults> sharedSubexprResults_;

VectorPtr baseDictionary_;
// Pointers to the last base vector of cachable dictionary input. Used to
// check if the current input's base vector is the same as the last. If it's
// the same, then results can be cached.
std::weak_ptr<BaseVector> baseOfDictionaryWeakPtr_;
BaseVector* baseOfDictionaryRawPtr_ = nullptr;

// This is a strong reference to the base vector and is only set if
// `baseOfDictionaryRepeats_` > 1. This is to ensure that the vector held is
// not modified and re-used in-place.
VectorPtr baseOfDictionary_;

// Number of times currently held cacheable vector is seen for a non-first
// time. Is reset everytime 'baseOfDictionaryRawPtr_' is different from the
// current input's base.
int baseOfDictionaryRepeats_ = 0;

// Values computed for the base dictionary, 1:1 to the positions in
// 'baseDictionary_'.
// 'baseOfDictionaryRawPtr_'.
VectorPtr dictionaryCache_;

// The indices that are valid in 'dictionaryCache_'.
std::unique_ptr<SelectivityVector> cachedDictionaryIndices_;

// Count of executions where this is wrapped in a dictionary so that
// results could be cached.
int32_t numCachableInput_{0};

// Count of times the cacheable vector is seen for a non-first time.
int32_t numCacheableRepeats_{0};

/// Runtime statistics. CPU time, wall time and number of processed rows.
ExprStats stats_;

Expand Down
52 changes: 45 additions & 7 deletions velox/expression/tests/ExprTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,14 +159,18 @@ class ExprTest : public testing::Test, public VectorTestBase {
std::pair<VectorPtr, std::unordered_map<std::string, exec::ExprStats>>
evaluateWithStats(const std::string& expression, const RowVectorPtr& input) {
auto exprSet = compileExpression(expression, asRowType(input->type()));
return evaluateWithStats(exprSet.get(), input);
}

std::pair<VectorPtr, std::unordered_map<std::string, exec::ExprStats>>
evaluateWithStats(exec::ExprSet* exprSetPtr, const RowVectorPtr& input) {
SelectivityVector rows(input->size());
std::vector<VectorPtr> results(1);

exec::EvalCtx context(execCtx_.get(), exprSet.get(), input.get());
exprSet->eval(rows, context, results);
exec::EvalCtx context(execCtx_.get(), exprSetPtr, input.get());
exprSetPtr->eval(rows, context, results);

return {results[0], exprSet->stats()};
return {results[0], exprSetPtr->stats()};
}

template <
Expand Down Expand Up @@ -2062,7 +2066,10 @@ TEST_P(ParameterizedExprTest, rewriteInputs) {
}
}

TEST_P(ParameterizedExprTest, memo) {
TEST_F(ExprTest, memo) {
// Verify that dictionary memoization
// 1. correctly evaluates the unevaluated rows on subsequent runs
// 2. Only caches results if it encounters the same base twice
auto base = makeArrayVector<int64_t>(
1'000,
[](auto row) { return row % 5 + 1; },
Expand All @@ -2074,24 +2081,55 @@ TEST_P(ParameterizedExprTest, memo) {
auto rowType = ROW({"c0"}, {base->type()});
auto exprSet = compileExpression("c0[1] = 1", rowType);

auto result = evaluate(
auto [result, stats] = evaluateWithStats(
exprSet.get(), makeRowVector({wrapInDictionary(evenIndices, 100, base)}));
auto expectedResult = makeFlatVector<bool>(
100, [](auto row) { return (8 + row * 2) % 3 == 1; });
assertEqualVectors(expectedResult, result);
VELOX_CHECK_EQ(stats["eq"].numProcessedRows, 100);
VELOX_CHECK(base.unique());

result = evaluate(
// After this results would be cached
std::tie(result, stats) = evaluateWithStats(
exprSet.get(), makeRowVector({wrapInDictionary(evenIndices, 100, base)}));
assertEqualVectors(expectedResult, result);
VELOX_CHECK_EQ(stats["eq"].numProcessedRows, 200);
VELOX_CHECK(!base.unique());

// Unevaluated rows are processed
std::tie(result, stats) = evaluateWithStats(
exprSet.get(), makeRowVector({wrapInDictionary(oddIndices, 100, base)}));
expectedResult = makeFlatVector<bool>(
100, [](auto row) { return (9 + row * 2) % 3 == 1; });
assertEqualVectors(expectedResult, result);
VELOX_CHECK_EQ(stats["eq"].numProcessedRows, 300);
VELOX_CHECK(!base.unique());

auto everyFifth = makeIndices(100, [](auto row) { return row * 5; });
result = evaluate(
std::tie(result, stats) = evaluateWithStats(
exprSet.get(), makeRowVector({wrapInDictionary(everyFifth, 100, base)}));
expectedResult =
makeFlatVector<bool>(100, [](auto row) { return (row * 5) % 3 == 1; });
assertEqualVectors(expectedResult, result);
VELOX_CHECK_EQ(
stats["eq"].numProcessedRows,
360,
"Fewer rows expected as memoization should have kicked in.");
VELOX_CHECK(!base.unique());

// Create a new base
base = makeArrayVector<int64_t>(
1'000,
[](auto row) { return row % 5 + 1; },
[](auto row, auto index) { return (row % 3) + index; });

std::tie(result, stats) = evaluateWithStats(
exprSet.get(), makeRowVector({wrapInDictionary(oddIndices, 100, base)}));
expectedResult = makeFlatVector<bool>(
100, [](auto row) { return (9 + row * 2) % 3 == 1; });
assertEqualVectors(expectedResult, result);
VELOX_CHECK_EQ(stats["eq"].numProcessedRows, 460);
VELOX_CHECK(base.unique());
}

// This test triggers the situation when peelEncodings() produces an empty
Expand Down

0 comments on commit 5ff9cee

Please sign in to comment.