Skip to content

Commit

Permalink
suport query_cache in shared_data
Browse files Browse the repository at this point in the history
Signed-off-by: smartlxh <[email protected]>
  • Loading branch information
smartlxh committed Jun 20, 2024
1 parent 64cb37e commit f836b91
Show file tree
Hide file tree
Showing 21 changed files with 201 additions and 35 deletions.
2 changes: 2 additions & 0 deletions be/src/connector/connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ class DataSource {
void set_runtime_filters(RuntimeFilterProbeCollector* runtime_filters) { _runtime_filters = runtime_filters; }
void set_read_limit(const uint64_t limit) { _read_limit = limit; }
void set_split_context(pipeline::ScanSplitContext* split_context) { _split_context = split_context; }
void set_rowsets(const std::vector<BaseRowsetSharedPtr>* rowsets) { _rowsets = rowsets; }
Status parse_runtime_filters(RuntimeState* state);
void update_has_any_predicate();
// Called frequently, don't do heavy work
Expand All @@ -100,6 +101,7 @@ class DataSource {
RuntimeProfile* _runtime_profile = nullptr;
TupleDescriptor* _tuple_desc = nullptr;
pipeline::ScanSplitContext* _split_context = nullptr;
const std::vector<BaseRowsetSharedPtr>* _rowsets = nullptr;

virtual Status _init_chunk_if_needed(ChunkPtr* chunk, size_t n) {
*chunk = ChunkHelper::new_chunk(*_tuple_desc, n);
Expand Down
4 changes: 4 additions & 0 deletions be/src/exec/pipeline/fragment_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,10 @@ Status FragmentExecutor::_prepare_exec_plan(ExecEnv* exec_env, const UnifiedExec
cache_param.force_populate = tcache_param.force_populate;
cache_param.entry_max_bytes = tcache_param.entry_max_bytes;
cache_param.entry_max_rows = tcache_param.entry_max_rows;
if (tcache_param.__isset.is_lake) {
cache_param.is_lake = tcache_param.is_lake;
}
cache_param.is_lake = tcache_param.is_lake;
for (auto& [slot, remapped_slot] : tcache_param.slot_remapping) {
cache_param.slot_remapping[slot] = remapped_slot;
cache_param.reverse_slot_remapping[remapped_slot] = slot;
Expand Down
5 changes: 5 additions & 0 deletions be/src/exec/pipeline/scan/connector_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,11 @@ ConnectorChunkSource::ConnectorChunkSource(ScanOperator* op, RuntimeProfile* run
}
_data_source = scan_node->data_source_provider()->create_data_source(*scan_range);
_data_source->set_driver_sequence(op->get_driver_sequence());

if (!morsel->rowsets().empty()) {
_data_source->set_rowsets(&(morsel->rowsets()));
}

_data_source->set_split_context(split_context);

_data_source->set_morsel(scan_morsel);
Expand Down
121 changes: 94 additions & 27 deletions be/src/exec/query_cache/cache_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@
#include "column/vectorized_fwd.h"
#include "common/compiler_util.h"
#include "exec/pipeline/pipeline_driver.h"
#include "storage/rowset/rowset.h"
#include "storage/lake/tablet.h"
#include "storage/lake/tablet_manager.h"
#include "storage/rowset/base_rowset.h"
#include "storage/storage_engine.h"
#include "storage/tablet_manager.h"
#include "util/time.h"

namespace starrocks::query_cache {
enum PerLaneBufferState {
PLBS_INIT,
Expand All @@ -40,8 +43,8 @@ struct PerLaneBuffer {
LaneOwnerType lane_owner{-1};
int lane;
PerLaneBufferState state;
TabletSharedPtr tablet;
std::vector<RowsetSharedPtr> rowsets;
BaseTabletSharedPtr tablet;
std::vector<BaseRowsetSharedPtr> rowsets;
RowsetsAcqRelPtr rowsets_acq_rel;
int64_t required_version;
int64_t cached_version;
Expand Down Expand Up @@ -233,32 +236,63 @@ void CacheOperator::_handle_stale_cache_value_for_non_pk(int64_t tablet_id, Cach
PerLaneBufferPtr& buffer, int64_t version) {
// Try to reuse partial cache result when cached version is less than required version, delta versions
// should be captured at first.
auto status = StorageEngine::instance()->tablet_manager()->capture_tablet_and_rowsets(
tablet_id, cache_value.version + 1, version);
std::shared_ptr<BaseTablet> base_tablet;
std::vector<BaseRowsetSharedPtr> base_rowsets;
RowsetsAcqRelPtr rowsets_acq_rel = nullptr;

if (!_cache_param.is_lake) {
auto status = StorageEngine::instance()->tablet_manager()->capture_tablet_and_rowsets(
tablet_id, cache_value.version + 1, version);
if (!status.ok()) {
buffer->state = PLBS_MISS;
buffer->cached_version = 0;
return;
}

// Cache MISS if delta versions are not captured, because aggressive cumulative compactions.
if (!status.ok()) {
buffer->state = PLBS_MISS;
buffer->cached_version = 0;
return;
// Delta versions are captured, several situations should be taken into consideration.
auto& [tablet, rowsets, acq_rel] = status.value();
base_tablet = std::static_pointer_cast<BaseTablet>(tablet);
for (auto rowset_ptr : rowsets) {
base_rowsets.emplace_back(std::static_pointer_cast<BaseRowset>(rowset_ptr));
}
rowsets_acq_rel = std::move(acq_rel);

} else {
auto status = ExecEnv::GetInstance()->lake_tablet_manager()->capture_tablet_and_rowsets(
tablet_id, cache_value.version, version);
// Cache MISS if delta versions are not captured, because aggressive cumulative compactions.
if (!status.ok()) {
buffer->state = PLBS_MISS;
buffer->cached_version = 0;
return;
}

auto& [tablet, rowsets] = status.value();
base_tablet = std::static_pointer_cast<BaseTablet>(tablet);
base_rowsets = std::move(rowsets);
}

// Delta versions are captured, several situations should be taken into consideration.
auto& [tablet, rowsets, rowsets_acq_rel] = status.value();
auto all_rs_empty = true;
auto min_version = std::numeric_limits<int64_t>::max();
auto max_version = std::numeric_limits<int64_t>::min();
for (const auto& rs : rowsets) {
for (const auto& rs : base_rowsets) {
all_rs_empty &= !rs->has_data_files();
min_version = std::min(min_version, rs->start_version());
max_version = std::max(max_version, rs->end_version());
if (!_cache_param.is_lake) {
min_version = std::min(min_version, rs->start_version());
max_version = std::max(max_version, rs->end_version());
} else {
min_version = cache_value.version + 1;
max_version = version;
}
}
Version delta_versions(min_version, max_version);
buffer->tablet = tablet;
auto has_delete_predicates = tablet->has_delete_predicates(delta_versions);
buffer->tablet = base_tablet;
auto has_delete_predicates = base_tablet->has_delete_predicates(delta_versions);
// case 1: there exist delete predicates in delta versions, or data model can not support multiversion cache and
// the tablet has non-empty delta rowsets; then cache result is not reuse, so cache miss.
if (has_delete_predicates || (!_cache_param.can_use_multiversion && !all_rs_empty)) {
if (!has_delete_predicates.ok() || has_delete_predicates.value() ||
(!_cache_param.can_use_multiversion && !all_rs_empty)) {
buffer->state = PLBS_MISS;
buffer->cached_version = 0;
return;
Expand All @@ -278,7 +312,7 @@ void CacheOperator::_handle_stale_cache_value_for_non_pk(int64_t tablet_id, Cach
// case 3: otherwise, the cache result is partial result of per-tablet computation, so delta versions must
// be scanned and merged with cache result to generate total result.
buffer->state = PLBS_HIT_PARTIAL;
buffer->rowsets = std::move(rowsets);
buffer->rowsets = std::move(base_rowsets);
buffer->rowsets_acq_rel = std::move(rowsets_acq_rel);
buffer->num_rows = 0;
buffer->num_bytes = 0;
Expand All @@ -295,17 +329,50 @@ void CacheOperator::_handle_stale_cache_value_for_pk(int64_t tablet_id, starrock
DCHECK(_cache_param.keys_type == TKeysType::PRIMARY_KEYS);
// At the present, PRIMARY_KEYS can not support merge-on-read, so we can not merge stale cache values and delta
// rowsets. Capturing delta rowsets is meaningless and unsupported, thus we capture all rowsets of the PK tablet.
auto status = StorageEngine::instance()->tablet_manager()->capture_tablet_and_rowsets(tablet_id, 0, version);
if (!status.ok()) {
buffer->state = PLBS_MISS;
buffer->cached_version = 0;
return;
std::shared_ptr<BaseTablet> base_tablet;
std::vector<BaseRowsetSharedPtr> base_rowsets;
RowsetsAcqRelPtr rowsets_acq_rel = nullptr;

if (!_cache_param.is_lake) {
auto status = StorageEngine::instance()->tablet_manager()->capture_tablet_and_rowsets(tablet_id, 0, version);
if (!status.ok()) {
buffer->state = PLBS_MISS;
buffer->cached_version = 0;
return;
}

// Delta versions are captured, several situations should be taken into consideration.
auto& [tablet, rowsets, acq_rel] = status.value();
base_tablet = std::static_pointer_cast<BaseTablet>(tablet);
for (auto rowset_ptr : rowsets) {
base_rowsets.emplace_back(std::static_pointer_cast<BaseRowset>(rowset_ptr));
}
rowsets_acq_rel = std::move(acq_rel);

} else {
auto status = ExecEnv::GetInstance()->lake_tablet_manager()->capture_tablet_and_rowsets(tablet_id, 0, version);
// Cache MISS if delta versions are not captured, because aggressive cumulative compactions.
if (!status.ok()) {
buffer->state = PLBS_MISS;
buffer->cached_version = 0;
return;
}

auto& [tablet, rowsets] = status.value();
base_tablet = std::static_pointer_cast<BaseTablet>(tablet);
base_rowsets = std::move(rowsets);
}
auto& [tablet, rowsets, rowsets_acq_rel] = status.value();

const auto snapshot_version = cache_value.version;
bool can_pickup_delta_rowsets = false;
bool exists_non_empty_delta_rowsets = false;
for (auto& rs : rowsets) {

if (_cache_param.is_lake && !base_rowsets.empty()) {
buffer->state = PLBS_MISS;
buffer->cached_version = 0;
return;
}
for (auto& rs : base_rowsets) {
can_pickup_delta_rowsets |= rs->start_version() == snapshot_version + 1;
exists_non_empty_delta_rowsets |= rs->start_version() > snapshot_version && rs->has_data_files();
}
Expand Down Expand Up @@ -428,10 +495,10 @@ int64_t CacheOperator::cached_version(int64_t tablet_id) {
}
}

std::tuple<int64_t, vector<RowsetSharedPtr>> CacheOperator::delta_version_and_rowsets(int64_t tablet_id) {
std::tuple<int64_t, vector<BaseRowsetSharedPtr>> CacheOperator::delta_version_and_rowsets(int64_t tablet_id) {
auto lane_it = _owner_to_lanes.find(tablet_id);
if (lane_it == _owner_to_lanes.end()) {
return make_tuple(0, vector<RowsetSharedPtr>{});
return make_tuple(0, vector<BaseRowsetSharedPtr>{});
} else {
auto& buffer = _per_lane_buffers[lane_it->second];
return make_tuple(buffer->cached_version + 1, buffer->rowsets);
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/query_cache/cache_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class CacheOperator final : public pipeline::Operator {
Status reset_lane(RuntimeState* state, LaneOwnerType lane_owner);
void populate_cache(int64_t tablet_id);
int64_t cached_version(int64_t tablet_id);
std::tuple<int64_t, std::vector<RowsetSharedPtr>> delta_version_and_rowsets(int64_t tablet_id);
std::tuple<int64_t, std::vector<BaseRowsetSharedPtr>> delta_version_and_rowsets(int64_t tablet_id);
Status push_chunk(RuntimeState* state, const ChunkPtr& chunk) override;
StatusOr<ChunkPtr> pull_chunk(RuntimeState* state) override;
bool has_output() const override;
Expand Down
1 change: 1 addition & 0 deletions be/src/exec/query_cache/cache_param.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,6 @@ struct CacheParam {
bool can_use_multiversion;
TKeysType::type keys_type;
std::unordered_set<int32_t> cached_plan_node_ids;
bool is_lake = false;
};
} // namespace starrocks::query_cache
3 changes: 3 additions & 0 deletions be/src/storage/base_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include <memory>

#include "storage/olap_define.h"
#include "storage/rowset/base_rowset.h"
#include "storage/tablet_meta.h"
#include "storage/utils.h"

Expand Down Expand Up @@ -117,6 +118,8 @@ class BaseTablet : public std::enable_shared_from_this<BaseTablet> {

virtual size_t num_rows() const = 0;

virtual StatusOr<bool> has_delete_predicates(const Version& version) = 0;

protected:
virtual void on_shutdown() {}

Expand Down
6 changes: 6 additions & 0 deletions be/src/storage/lake/rowset.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,12 @@ class Rowset : public BaseRowset {

[[nodiscard]] int64_t version() const { return metadata().version(); }

bool has_data_files() const override { return num_segments() > 0 || num_dels() > 0; }

// no practical significance, just compatible interface
int64_t start_version() const override { return 0; }
int64_t end_version() const override { return 0; }

private:
TabletManager* _tablet_mgr;
int64_t _tablet_id;
Expand Down
10 changes: 10 additions & 0 deletions be/src/storage/lake/tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,16 @@ class Tablet : public BaseTablet {

StatusOr<bool> has_delete_predicates(int64_t version);

StatusOr<bool> has_delete_predicates(const Version& version) override {
for (int64_t from_version = version.first; from_version < version.second; from_version++) {
auto status = has_delete_predicates(from_version);
if ((status.ok() && status.value() == true) || !status.ok()) {
return status;
}
};
return false;
}

UpdateManager* update_mgr() const { return _mgr->update_mgr(); }

TabletManager* tablet_mgr() const { return _mgr; }
Expand Down
31 changes: 31 additions & 0 deletions be/src/storage/lake/tablet_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -679,4 +679,35 @@ StatusOr<SegmentPtr> TabletManager::load_segment(const FileInfo& segment_info, i
std::move(tablet_schema));
}

StatusOr<TabletAndRowsets> TabletManager::capture_tablet_and_rowsets(int64_t tablet_id, int64_t from_version,
int64_t to_version) {
auto tablet_ptr = std::make_shared<Tablet>(this, tablet_id);

std::vector<std::shared_ptr<BaseRowset>> rowsets;

std::vector<RowsetPtr> from_rowsets;
std::vector<RowsetPtr> to_rowsets;
if (from_version != 0) {
auto tablet_meta_from_version = get_tablet_metadata(tablet_id, from_version).value();
from_rowsets = Rowset::get_rowsets(this, tablet_meta_from_version);
}

auto tablet_meta_to_version = get_tablet_metadata(tablet_id, to_version).value();
to_rowsets = Rowset::get_rowsets(this, tablet_meta_to_version);

std::unordered_map<int64_t, std::shared_ptr<Rowset>> distinct_rowset;
for (const auto& rowset : from_rowsets) {
distinct_rowset[rowset->id()] = std::move(rowset);
}

for (const auto& rowset : to_rowsets) {
if (distinct_rowset.find(rowset->id()) != distinct_rowset.end()) {
auto base_rowset_ptr = std::static_pointer_cast<BaseRowset>(rowset);
rowsets.emplace_back(base_rowset_ptr);
}
}

return std::make_tuple(std::move(tablet_ptr), std::move(rowsets));
}

} // namespace starrocks::lake
3 changes: 3 additions & 0 deletions be/src/storage/lake/tablet_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "storage/lake/types_fwd.h"
#include "storage/options.h"
#include "util/bthreads/single_flight.h"
#include "storage/rowset/base_rowset.h"

namespace starrocks {
struct FileInfo;
Expand All @@ -43,6 +44,7 @@ class MetadataIterator;
class UpdateManager;
using TabletMetadataIter = MetadataIterator<TabletMetadataPtr>;
using TxnLogIter = MetadataIterator<TxnLogPtr>;
using TabletAndRowsets = std::tuple<std::shared_ptr<Tablet>, std::vector<BaseRowsetSharedPtr>>;

class CompactionScheduler;
class Metacache;
Expand Down Expand Up @@ -190,6 +192,7 @@ class TabletManager {
StatusOr<TabletSchemaPtr> get_tablet_schema(int64_t tablet_id, int64_t* version_hint = nullptr);

Status create_schema_file(int64_t tablet_id, const TabletSchemaPB& schema_pb);
StatusOr<TabletAndRowsets> capture_tablet_and_rowsets(int64_t tablet_id, int64_t from_version, int64_t to_version);

private:
static std::string global_schema_cache_key(int64_t index_id);
Expand Down
10 changes: 9 additions & 1 deletion be/src/storage/lake/versioned_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,15 @@ StatusOr<std::unique_ptr<TabletWriter>> VersionedTablet::new_writer(WriterType t
}
}

StatusOr<std::unique_ptr<TabletReader>> VersionedTablet::new_reader(Schema schema) {
StatusOr<std::unique_ptr<TabletReader>> VersionedTablet::new_reader(Schema schema,
std::vector<BaseRowsetSharedPtr>* base_rowsets) {
if (base_rowsets != nullptr) {
std::vector<std::shared_ptr<Rowset>> rowsets;
for (auto& rowset : *base_rowsets) {
rowsets.emplace_back(std::dynamic_pointer_cast<Rowset>(rowset));
}
return std::make_unique<TabletReader>(_tablet_mgr, _metadata, std::move(schema), std::move(rowsets));
}
return std::make_unique<TabletReader>(_tablet_mgr, _metadata, std::move(schema));
}

Expand Down
4 changes: 3 additions & 1 deletion be/src/storage/lake/versioned_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <vector>

#include "common/statusor.h"
#include "storage/rowset/base_rowset.h"

namespace starrocks {
class TabletSchema;
Expand Down Expand Up @@ -71,7 +72,8 @@ class VersionedTablet {
uint32_t max_rows_per_segment = 0,
ThreadPool* flush_pool = nullptr, bool is_compaction = false);

StatusOr<std::unique_ptr<TabletReader>> new_reader(Schema schema);
StatusOr<std::unique_ptr<TabletReader>> new_reader(Schema schema,
std::vector<BaseRowsetSharedPtr>* base_rowsets = nullptr);

StatusOr<std::unique_ptr<TabletReader>> new_reader(Schema schema, bool could_split, bool could_split_physically);

Expand Down
Loading

0 comments on commit f836b91

Please sign in to comment.