diff --git a/velox/exec/tests/HashJoinTest.cpp b/velox/exec/tests/HashJoinTest.cpp index 693e065c02d0..6f0fe835824a 100644 --- a/velox/exec/tests/HashJoinTest.cpp +++ b/velox/exec/tests/HashJoinTest.cpp @@ -3802,7 +3802,7 @@ TEST_F(HashJoinTest, memory) { params.queryCtx = std::make_shared(driverExecutor_.get()); auto [taskCursor, rows] = readCursor(params, [](Task*) {}); EXPECT_GT(3'500, params.queryCtx->pool()->stats().numAllocs); - EXPECT_GT(7'500'000, params.queryCtx->pool()->stats().cumulativeBytes); + EXPECT_GT(18'000'000, params.queryCtx->pool()->stats().cumulativeBytes); } TEST_F(HashJoinTest, lazyVectors) { diff --git a/velox/expression/Expr.cpp b/velox/expression/Expr.cpp index 8163b43c7850..1eb0b2e5f3b7 100644 --- a/velox/expression/Expr.cpp +++ b/velox/expression/Expr.cpp @@ -1142,81 +1142,96 @@ void Expr::evalWithNulls( evalAll(rows, context, result); } +// Optimization that attempts to cache results for inputs that are dictionary +// encoded and use the same base vector between subsequent input batches. Since +// this hold onto a reference to the base vector and the cached results, it can +// be memory intensive. Therefore in order to reduce this consumption and ensure +// it is only employed for cases where it can be useful, it only starts caching +// result after it encounters the same base at least twice. void Expr::evalWithMemo( const SelectivityVector& rows, EvalCtx& context, VectorPtr& result) { VectorPtr base; distinctFields_[0]->evalSpecialForm(rows, context, base); - ++numCachableInput_; - if (baseDictionary_ == base) { - ++numCacheableRepeats_; - if (cachedDictionaryIndices_) { - LocalSelectivityVector cachedHolder(context, rows); - auto cached = cachedHolder.get(); - VELOX_DCHECK(cached != nullptr); - cached->intersect(*cachedDictionaryIndices_); - if (cached->hasSelections()) { - context.ensureWritable(rows, type(), result); - result->copy(dictionaryCache_.get(), *cached, nullptr); - } - } - LocalSelectivityVector uncachedHolder(context, rows); - auto uncached = uncachedHolder.get(); - VELOX_DCHECK(uncached != nullptr); - if (cachedDictionaryIndices_) { - uncached->deselect(*cachedDictionaryIndices_); - } - if (uncached->hasSelections()) { - // Fix finalSelection at "rows" if uncached rows is a strict subset to - // avoid losing values not in uncached rows that were copied earlier into - // "result" from the cached rows. - ScopedFinalSelectionSetter scopedFinalSelectionSetter( - context, &rows, uncached->countSelected() < rows.countSelected()); - - evalWithNulls(*uncached, context, result); - context.deselectErrors(*uncached); - context.exprSet()->addToMemo(this); - auto newCacheSize = uncached->end(); - - // dictionaryCache_ is valid only for cachedDictionaryIndices_. Hence, a - // safe call to BaseVector::ensureWritable must include all the rows not - // covered by cachedDictionaryIndices_. If BaseVector::ensureWritable is - // called only for a subset of rows not covered by - // cachedDictionaryIndices_, it will attempt to copy rows that are not - // valid leading to a crash. - LocalSelectivityVector allUncached(context, dictionaryCache_->size()); - allUncached.get()->setAll(); - allUncached.get()->deselect(*cachedDictionaryIndices_); - context.ensureWritable(*allUncached.get(), type(), dictionaryCache_); - - if (cachedDictionaryIndices_->size() < newCacheSize) { - cachedDictionaryIndices_->resize(newCacheSize, false); - } - cachedDictionaryIndices_->select(*uncached); + if (base.get() != baseOfDictionaryRawPtr_ || + baseOfDictionaryWeakPtr_.expired()) { + baseOfDictionaryRepeats_ = 0; + baseOfDictionaryWeakPtr_ = base; + baseOfDictionaryRawPtr_ = base.get(); + context.releaseVector(baseOfDictionary_); + context.releaseVector(dictionaryCache_); + evalWithNulls(rows, context, result); + return; + } + ++baseOfDictionaryRepeats_; - // Resize the dictionaryCache_ to accommodate all the necessary rows. - if (dictionaryCache_->size() < uncached->end()) { - dictionaryCache_->resize(uncached->end()); - } - dictionaryCache_->copy(result.get(), *uncached, nullptr); + if (baseOfDictionaryRepeats_ == 1) { + evalWithNulls(rows, context, result); + baseOfDictionary_ = base; + dictionaryCache_ = result; + if (!cachedDictionaryIndices_) { + cachedDictionaryIndices_ = + context.execCtx()->getSelectivityVector(rows.end()); } - context.releaseVector(base); + *cachedDictionaryIndices_ = rows; + context.deselectErrors(*cachedDictionaryIndices_); return; } - context.releaseVector(baseDictionary_); - baseDictionary_ = base; - evalWithNulls(rows, context, result); - context.releaseVector(dictionaryCache_); - dictionaryCache_ = result; - if (!cachedDictionaryIndices_) { - cachedDictionaryIndices_ = - context.execCtx()->getSelectivityVector(rows.end()); + if (cachedDictionaryIndices_) { + LocalSelectivityVector cachedHolder(context, rows); + auto cached = cachedHolder.get(); + VELOX_DCHECK(cached != nullptr); + cached->intersect(*cachedDictionaryIndices_); + if (cached->hasSelections()) { + context.ensureWritable(rows, type(), result); + result->copy(dictionaryCache_.get(), *cached, nullptr); + } + } + LocalSelectivityVector uncachedHolder(context, rows); + auto uncached = uncachedHolder.get(); + VELOX_DCHECK(uncached != nullptr); + if (cachedDictionaryIndices_) { + uncached->deselect(*cachedDictionaryIndices_); + } + if (uncached->hasSelections()) { + // Fix finalSelection at "rows" if uncached rows is a strict subset to + // avoid losing values not in uncached rows that were copied earlier into + // "result" from the cached rows. + ScopedFinalSelectionSetter scopedFinalSelectionSetter( + context, &rows, uncached->countSelected() < rows.countSelected()); + + evalWithNulls(*uncached, context, result); + context.deselectErrors(*uncached); + context.exprSet()->addToMemo(this); + auto newCacheSize = uncached->end(); + + // dictionaryCache_ is valid only for cachedDictionaryIndices_. Hence, a + // safe call to BaseVector::ensureWritable must include all the rows not + // covered by cachedDictionaryIndices_. If BaseVector::ensureWritable is + // called only for a subset of rows not covered by + // cachedDictionaryIndices_, it will attempt to copy rows that are not + // valid leading to a crash. + LocalSelectivityVector allUncached(context, dictionaryCache_->size()); + allUncached.get()->setAll(); + allUncached.get()->deselect(*cachedDictionaryIndices_); + context.ensureWritable(*allUncached.get(), type(), dictionaryCache_); + + if (cachedDictionaryIndices_->size() < newCacheSize) { + cachedDictionaryIndices_->resize(newCacheSize, false); + } + + cachedDictionaryIndices_->select(*uncached); + + // Resize the dictionaryCache_ to accommodate all the necessary rows. + if (dictionaryCache_->size() < uncached->end()) { + dictionaryCache_->resize(uncached->end()); + } + dictionaryCache_->copy(result.get(), *uncached, nullptr); } - *cachedDictionaryIndices_ = rows; - context.deselectErrors(*cachedDictionaryIndices_); + context.releaseVector(base); } void Expr::setAllNulls( diff --git a/velox/expression/Expr.h b/velox/expression/Expr.h index 9def4603bef2..27819d98bc20 100644 --- a/velox/expression/Expr.h +++ b/velox/expression/Expr.h @@ -251,7 +251,10 @@ class Expr { } void clearMemo() { - baseDictionary_ = nullptr; + baseOfDictionaryRepeats_ = 0; + baseOfDictionary_.reset(); + baseOfDictionaryWeakPtr_.reset(); + baseOfDictionaryRawPtr_ = nullptr; dictionaryCache_ = nullptr; cachedDictionaryIndices_ = nullptr; } @@ -597,22 +600,29 @@ class Expr { // evaluateSharedSubexpr() is called to the cached shared results. std::map, SharedResults> sharedSubexprResults_; - VectorPtr baseDictionary_; + // Pointers to the last base vector of cachable dictionary input. Used to + // check if the current input's base vector is the same as the last. If it's + // the same, then results can be cached. + std::weak_ptr baseOfDictionaryWeakPtr_; + BaseVector* baseOfDictionaryRawPtr_ = nullptr; + + // This is a strong reference to the base vector and is only set if + // `baseOfDictionaryRepeats_` > 1. This is to ensure that the vector held is + // not modified and re-used in-place. + VectorPtr baseOfDictionary_; + + // Number of times currently held cacheable vector is seen for a non-first + // time. Is reset everytime 'baseOfDictionaryRawPtr_' is different from the + // current input's base. + int baseOfDictionaryRepeats_ = 0; // Values computed for the base dictionary, 1:1 to the positions in - // 'baseDictionary_'. + // 'baseOfDictionaryRawPtr_'. VectorPtr dictionaryCache_; // The indices that are valid in 'dictionaryCache_'. std::unique_ptr cachedDictionaryIndices_; - // Count of executions where this is wrapped in a dictionary so that - // results could be cached. - int32_t numCachableInput_{0}; - - // Count of times the cacheable vector is seen for a non-first time. - int32_t numCacheableRepeats_{0}; - /// Runtime statistics. CPU time, wall time and number of processed rows. ExprStats stats_; diff --git a/velox/expression/tests/ExprTest.cpp b/velox/expression/tests/ExprTest.cpp index 4a9b1363e70d..baa10f71d0f7 100644 --- a/velox/expression/tests/ExprTest.cpp +++ b/velox/expression/tests/ExprTest.cpp @@ -159,14 +159,18 @@ class ExprTest : public testing::Test, public VectorTestBase { std::pair> evaluateWithStats(const std::string& expression, const RowVectorPtr& input) { auto exprSet = compileExpression(expression, asRowType(input->type())); + return evaluateWithStats(exprSet.get(), input); + } + std::pair> + evaluateWithStats(exec::ExprSet* exprSetPtr, const RowVectorPtr& input) { SelectivityVector rows(input->size()); std::vector results(1); - exec::EvalCtx context(execCtx_.get(), exprSet.get(), input.get()); - exprSet->eval(rows, context, results); + exec::EvalCtx context(execCtx_.get(), exprSetPtr, input.get()); + exprSetPtr->eval(rows, context, results); - return {results[0], exprSet->stats()}; + return {results[0], exprSetPtr->stats()}; } template < @@ -2062,7 +2066,10 @@ TEST_P(ParameterizedExprTest, rewriteInputs) { } } -TEST_P(ParameterizedExprTest, memo) { +TEST_F(ExprTest, memo) { + // Verify that dictionary memoization + // 1. correctly evaluates the unevaluated rows on subsequent runs + // 2. Only caches results if it encounters the same base twice auto base = makeArrayVector( 1'000, [](auto row) { return row % 5 + 1; }, @@ -2074,24 +2081,55 @@ TEST_P(ParameterizedExprTest, memo) { auto rowType = ROW({"c0"}, {base->type()}); auto exprSet = compileExpression("c0[1] = 1", rowType); - auto result = evaluate( + auto [result, stats] = evaluateWithStats( exprSet.get(), makeRowVector({wrapInDictionary(evenIndices, 100, base)})); auto expectedResult = makeFlatVector( 100, [](auto row) { return (8 + row * 2) % 3 == 1; }); assertEqualVectors(expectedResult, result); + VELOX_CHECK_EQ(stats["eq"].numProcessedRows, 100); + VELOX_CHECK(base.unique()); - result = evaluate( + // After this results would be cached + std::tie(result, stats) = evaluateWithStats( + exprSet.get(), makeRowVector({wrapInDictionary(evenIndices, 100, base)})); + assertEqualVectors(expectedResult, result); + VELOX_CHECK_EQ(stats["eq"].numProcessedRows, 200); + VELOX_CHECK(!base.unique()); + + // Unevaluated rows are processed + std::tie(result, stats) = evaluateWithStats( exprSet.get(), makeRowVector({wrapInDictionary(oddIndices, 100, base)})); expectedResult = makeFlatVector( 100, [](auto row) { return (9 + row * 2) % 3 == 1; }); assertEqualVectors(expectedResult, result); + VELOX_CHECK_EQ(stats["eq"].numProcessedRows, 300); + VELOX_CHECK(!base.unique()); auto everyFifth = makeIndices(100, [](auto row) { return row * 5; }); - result = evaluate( + std::tie(result, stats) = evaluateWithStats( exprSet.get(), makeRowVector({wrapInDictionary(everyFifth, 100, base)})); expectedResult = makeFlatVector(100, [](auto row) { return (row * 5) % 3 == 1; }); assertEqualVectors(expectedResult, result); + VELOX_CHECK_EQ( + stats["eq"].numProcessedRows, + 360, + "Fewer rows expected as memoization should have kicked in."); + VELOX_CHECK(!base.unique()); + + // Create a new base + base = makeArrayVector( + 1'000, + [](auto row) { return row % 5 + 1; }, + [](auto row, auto index) { return (row % 3) + index; }); + + std::tie(result, stats) = evaluateWithStats( + exprSet.get(), makeRowVector({wrapInDictionary(oddIndices, 100, base)})); + expectedResult = makeFlatVector( + 100, [](auto row) { return (9 + row * 2) % 3 == 1; }); + assertEqualVectors(expectedResult, result); + VELOX_CHECK_EQ(stats["eq"].numProcessedRows, 460); + VELOX_CHECK(base.unique()); } // This test triggers the situation when peelEncodings() produces an empty