Skip to content

Commit

Permalink
working dvs now
Browse files Browse the repository at this point in the history
  • Loading branch information
samansmink committed May 8, 2024
1 parent 6b83593 commit 6aafcce
Show file tree
Hide file tree
Showing 4 changed files with 171 additions and 98 deletions.
226 changes: 146 additions & 80 deletions src/functions/deltatable_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -295,18 +295,10 @@ bool DeltaMultiFileReader::Bind(MultiFileReaderOptions &options, MultiFileList &
return_types.emplace_back(LogicalType::BIGINT);
names.emplace_back("file_row_number");
} else {
// TODO: this is a bogus ID?
// TODO: this is a bogus ID? Change for flag indicating it should be enabled?
bind_data.file_row_number_idx = names.size();
}

// This is where we want the extra column
// TODO: only enable this if we have deletion vectors for this scan?
bind_data.required_columns.push_back({
"file_row_number",
LogicalType::BIGINT,
file_row_number_enabled
});

return true;
};

Expand All @@ -323,9 +315,6 @@ void DeltaMultiFileReader::BindOptions(MultiFileReaderOptions &options, MultiFil
auto demo_gen_col_opt = options.custom_options.find("delta_file_number");
if (demo_gen_col_opt != options.custom_options.end()) {
if (demo_gen_col_opt->second.GetValue<bool>()) {
auto parquet_columns_produced = names.size();
D_ASSERT(bind_data.custom_data.find("file_number_column_idx") == bind_data.custom_data.end());
bind_data.custom_data["file_number_column_idx"] = Value::UBIGINT(parquet_columns_produced);
names.push_back("delta_file_number");
return_types.push_back(LogicalType::UBIGINT);
}
Expand All @@ -336,38 +325,27 @@ void DeltaMultiFileReader::FinalizeBind(const MultiFileReaderOptions &file_optio
const string &filename, const vector<string> &local_names,
const vector<LogicalType> &global_types, const vector<string> &global_names,
const vector<column_t> &global_column_ids, MultiFileReaderData &reader_data,
ClientContext &context) {
MultiFileReader::FinalizeBind(file_options, options, filename, local_names, global_types, global_names, global_column_ids, reader_data, context);
ClientContext &context, optional_ptr<MultiFileReaderGlobalState> global_state) {
MultiFileReader::FinalizeBind(file_options, options, filename, local_names, global_types, global_names, global_column_ids, reader_data, context, global_state);

// Handle custom delta option set in MultiFileReaderOptions::custom_options with data passed through in MultiFileReaderBindData::custom_data
// Handle custom delta option set in MultiFileReaderOptions::custom_options
auto file_number_opt = file_options.custom_options.find("delta_file_number");
if (file_number_opt != file_options.custom_options.end()) {
if (file_number_opt->second.GetValue<bool>()) {
auto maybe_file_number_column = options.custom_data.find("file_number_column_idx");
D_ASSERT(maybe_file_number_column != options.custom_data.end());
auto file_number_column_idx = maybe_file_number_column->second.GetValue<int64_t>();
D_ASSERT(file_number_column_idx != DConstants::INVALID_INDEX);

// TODO: we have the metadata for the file available here already, the reason we handle this in FinalizeChunk
// is purely for demonstration purposes

// What you can do here is 2 things:
// - either add the constant directly using: reader_data.constant_map.emplace_back(i, Value::UBIGINT(0));
// - add the constant as a nop, store the global
for (idx_t i = 0; i < global_column_ids.size(); i++) {
column_t col_id = global_column_ids[i];
if (col_id == file_number_column_idx) {
reader_data.constant_map.emplace_back(i, col_id, Value::UBIGINT(0));
break;
}
}
D_ASSERT(global_state);
auto &delta_global_state = global_state->Cast<DeltaMultiFileReaderGlobalState>();
D_ASSERT(delta_global_state.delta_file_number_idx != DConstants::INVALID_INDEX);

// We add the constant column for the delta_file_number option
// NOTE: we add a placeholder here, to demonstrate how we can also populate extra columns in the FinalizeChunk
reader_data.constant_map.emplace_back(delta_global_state.delta_file_number_idx, Value::UBIGINT(0));
}
}

// Get the metadata for this file
D_ASSERT(reader_data.file_metadata.file_list);
const auto &snapshot = dynamic_cast<const DeltaTableSnapshot&>(*reader_data.file_metadata.file_list);
auto &file_metadata = snapshot.metadata[reader_data.file_metadata.file_list_idx];
D_ASSERT(global_state->file_list);
const auto &snapshot = dynamic_cast<const DeltaTableSnapshot&>(*global_state->file_list);
auto &file_metadata = snapshot.metadata[reader_data.file_list_idx];

if (!file_metadata.partition_map.empty()) {
for (idx_t i = 0; i < global_column_ids.size(); i++) {
Expand All @@ -376,7 +354,7 @@ void DeltaMultiFileReader::FinalizeBind(const MultiFileReaderOptions &file_optio
if (col_partition_entry != file_metadata.partition_map.end()) {
// Todo: use https://github.com/delta-io/delta/blob/master/PROTOCOL.md#partition-value-serialization
auto maybe_value = Value(col_partition_entry->second).DefaultCastAs(global_types[i]);
reader_data.constant_map.emplace_back(i, col_id, maybe_value);
reader_data.constant_map.emplace_back(i, maybe_value);
}
}
}
Expand Down Expand Up @@ -414,66 +392,154 @@ static SelectionVector DuckSVFromDeltaSV(ffi::KernelBoolSlice *dv, Vector row_id
return result;
}

static idx_t LocalColIdxToResultColIdx(idx_t local_idx, const MultiFileReaderData& reader_data) {
for (auto &constant_entry: reader_data.constant_map) {
if (constant_entry.local_column_id == local_idx) {
return constant_entry.result_column_id;
// Parses the columns that are used by the delta extension into
void DeltaMultiFileReaderGlobalState::SetColumnIdx(const string &column, idx_t idx) {
if (column == "file_row_number") {
file_row_number_idx = idx;
return;
} else if (column == "delta_file_number") {
delta_file_number_idx = idx;
return;
}
throw InternalException("Unknown column '%s' found as required by the DeltaMultiFileReader");
}

unique_ptr<MultiFileReaderGlobalState> DeltaMultiFileReader::InitializeGlobalState(duckdb::ClientContext &context,
const duckdb::MultiFileReaderOptions &file_options,
const duckdb::MultiFileReaderBindData &bind_data,
const duckdb::MultiFileList &file_list,
const vector<duckdb::LogicalType> &global_types,
const vector<std::string> &global_names,
const vector<duckdb::column_t> &global_column_ids) {
vector<LogicalType> extra_columns;
vector<pair<string, idx_t>> mapped_columns;

// Create a map of the columns that are in the projection
case_insensitive_map_t<idx_t> selected_columns;
for (idx_t i = 0; i < global_column_ids.size(); i++) {
auto global_id = global_column_ids[i];
auto &global_name = global_names[global_id];
selected_columns.insert({global_name, i});
}

// The hardcoded (for now) columns to be mapped
case_insensitive_map_t<LogicalType> columns_to_map = {
{"file_row_number", LogicalType::BIGINT},
{"delta_file_number", LogicalType::UBIGINT}
};

// Map every column to either a column in the projection, or add it to the extra columns if it doesn't exist
idx_t col_offset = 0;
for (const auto &required_column : columns_to_map) {
// First check if the column is in the projection
auto res = selected_columns.find(required_column.first);
if (res != selected_columns.end()) {
// The column is in the projection, no special handling is required; we simply store the index
mapped_columns.push_back({required_column.first, res->second});
continue;
}

// The column is NOT in the projection: it needs to be added as an extra_column

// Calculate the index of the added column (extra columns are added after all other columns)
idx_t current_col_idx = global_column_ids.size() + col_offset++;

// Add column to the map, to ensure the MultiFileReader can find it when processing the Chunk
mapped_columns.push_back({required_column.first, current_col_idx});

// Ensure the result DataChunk has a vector of the correct type to store this column
extra_columns.push_back(required_column.second);
}
return DConstants::INVALID_INDEX;

auto res = make_uniq<DeltaMultiFileReaderGlobalState>(extra_columns, &file_list);

// Parse all the mapped columns into the DeltaMultiFileReaderGlobalState for easy use;
for (const auto& mapped_column : mapped_columns) {
res->SetColumnIdx(mapped_column.first, mapped_column.second);
}

return std::move(res);
}

void DeltaMultiFileReader::CreateNameMapping(const string &file_name, const vector<LogicalType> &local_types,
const vector<string> &local_names, const vector<LogicalType> &global_types,
const vector<string> &global_names, const vector<column_t> &global_column_ids,
MultiFileReaderData &reader_data, const string &initial_file,
optional_ptr<MultiFileReaderGlobalState> global_state) {
// First call the base implementation to do most mapping
MultiFileReader::CreateNameMapping(file_name, local_types, local_names, global_types, global_names, global_column_ids, reader_data, initial_file, global_state);

// Then we handle delta specific mapping
D_ASSERT(global_state);
auto &delta_global_state = global_state->Cast<DeltaMultiFileReaderGlobalState>();

// Check if the file_row_number column is an "extra_column" which is not part of the projection
if (delta_global_state.file_row_number_idx >= global_column_ids.size()) {
D_ASSERT(delta_global_state.file_row_number_idx != DConstants::INVALID_INDEX);

// Build the name map
case_insensitive_map_t<idx_t> name_map;
for (idx_t col_idx = 0; col_idx < local_names.size(); col_idx++) {
name_map[local_names[col_idx]] = col_idx;
}

// Lookup the required column in the local map
auto entry = name_map.find("file_row_number");
if (entry == name_map.end()) {
throw InternalException("Failed to find the file_row_number column");
}

// Register the column to be scanned from this file
reader_data.column_ids.push_back(entry->second);
reader_data.column_mapping.push_back(delta_global_state.file_row_number_idx);
}

// This may have changed: update it
reader_data.empty_columns = reader_data.column_ids.empty();
}

void DeltaMultiFileReader::FinalizeChunk(ClientContext &context, const MultiFileReaderBindData &bind_data,
const MultiFileReaderData &reader_data, DataChunk &chunk) {
const MultiFileReaderData &reader_data, DataChunk &chunk, optional_ptr<MultiFileReaderGlobalState> global_state) {
// Base class finalization first
MultiFileReader::FinalizeChunk(context, bind_data, reader_data, chunk);
MultiFileReader::FinalizeChunk(context, bind_data, reader_data, chunk, global_state);

D_ASSERT(reader_data.file_metadata.file_list);
D_ASSERT(global_state);
auto &delta_global_state = global_state->Cast<DeltaMultiFileReaderGlobalState>();
D_ASSERT(delta_global_state.file_list);

// Get the metadata for this file
const auto &snapshot = dynamic_cast<const DeltaTableSnapshot&>(*reader_data.file_metadata.file_list);
auto &metadata = snapshot.metadata[reader_data.file_metadata.file_list_idx];
const auto &snapshot = dynamic_cast<const DeltaTableSnapshot&>(*global_state->file_list);
auto &metadata = snapshot.metadata[reader_data.file_list_idx];

if (metadata.selection_vector.get() && chunk.size() != 0) {
idx_t select_count;
auto res = reader_data.required_column_map.find("file_row_number");
if (res == reader_data.required_column_map.end()) {
throw InternalException("Failed to find file_row_number column used to apply the deletion vector at");
}
D_ASSERT(delta_global_state.file_row_number_idx != DConstants::INVALID_INDEX);
auto &file_row_number_column = chunk.data[delta_global_state.file_row_number_idx];

// Construct the selection vector using the file_row_number column and the raw selection vector from delta
auto sv = DuckSVFromDeltaSV(metadata.selection_vector.get(), chunk.data[res->second], chunk.size(), select_count);

// Slice the result
idx_t select_count;
auto sv = DuckSVFromDeltaSV(metadata.selection_vector.get(), file_row_number_column, chunk.size(), select_count);
chunk.Slice(sv, select_count);
}

// Note: this demo function shows how we can use DuckDB's Binder create expression-based generated columns
auto maybe_file_number_column_idx = bind_data.custom_data.find("file_number_column_idx");
if (maybe_file_number_column_idx != bind_data.custom_data.end()) {
auto file_number_column_idx = maybe_file_number_column_idx->second.GetValue<int64_t>();
idx_t file_number_column_result_idx = LocalColIdxToResultColIdx(file_number_column_idx, reader_data);

// If the idx is present it means that it occurs in the result and we need to modify its
if (file_number_column_result_idx != DConstants::INVALID_INDEX) {
//! Create Dummy expression (0 + file_number)
vector<unique_ptr<ParsedExpression>> child_expr;
child_expr.push_back(make_uniq<ConstantExpression>(Value::UBIGINT(0)));
child_expr.push_back(make_uniq<ConstantExpression>(Value::UBIGINT(metadata.file_number)));
unique_ptr<ParsedExpression> expr = make_uniq<FunctionExpression>("+", std::move(child_expr), nullptr, nullptr, false, true);

//! s dummy expression
auto binder = Binder::CreateBinder(context);
ExpressionBinder expr_binder(*binder, context);
auto bound_expr = expr_binder.Bind(expr, nullptr);

//! Execute dummy expression into result column
ExpressionExecutor expr_executor(context);
expr_executor.AddExpression(*bound_expr);

//! Execute the expression directly into the output Chunk
expr_executor.ExecuteExpression(chunk.data[file_number_column_result_idx]);
}
if (delta_global_state.delta_file_number_idx != DConstants::INVALID_INDEX) {
//! Create Dummy expression (0 + file_number)
vector<unique_ptr<ParsedExpression>> child_expr;
child_expr.push_back(make_uniq<ConstantExpression>(Value::UBIGINT(0)));
child_expr.push_back(make_uniq<ConstantExpression>(Value::UBIGINT(metadata.file_number)));
unique_ptr<ParsedExpression> expr = make_uniq<FunctionExpression>("+", std::move(child_expr), nullptr, nullptr, false, true);

//! s dummy expression
auto binder = Binder::CreateBinder(context);
ExpressionBinder expr_binder(*binder, context);
auto bound_expr = expr_binder.Bind(expr, nullptr);

//! Execute dummy expression into result column
ExpressionExecutor expr_executor(context);
expr_executor.AddExpression(*bound_expr);

//! Execute the expression directly into the output Chunk
expr_executor.ExecuteExpression(chunk.data[delta_global_state.delta_file_number_idx]);
}
};

Expand Down
26 changes: 24 additions & 2 deletions src/include/functions/deltatable_scan.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,17 @@ struct DeltaTableSnapshot : public MultiFileList {
ClientContext &context;
};

struct DeltaMultiFileReaderGlobalState : public MultiFileReaderGlobalState {
DeltaMultiFileReaderGlobalState(vector<LogicalType> extra_columns_p, optional_ptr<const MultiFileList> file_list_p) : MultiFileReaderGlobalState(extra_columns_p, file_list_p) {
}
//! The idx of the file number column in the result chunk
idx_t delta_file_number_idx = DConstants::INVALID_INDEX;
//! The idx of the file_row_number column in the result chunk
idx_t file_row_number_idx = DConstants::INVALID_INDEX;

void SetColumnIdx(const string &column, idx_t idx);
};

struct DeltaMultiFileReader : public MultiFileReader {
static unique_ptr<MultiFileReader> CreateInstance();
//! Return a DeltaTableSnapshot
Expand All @@ -89,15 +100,26 @@ struct DeltaMultiFileReader : public MultiFileReader {
void BindOptions(MultiFileReaderOptions &options, MultiFileList &files,
vector<LogicalType> &return_types, vector<string> &names, MultiFileReaderBindData& bind_data) override;

void CreateNameMapping(const string &file_name, const vector<LogicalType> &local_types,
const vector<string> &local_names, const vector<LogicalType> &global_types,
const vector<string> &global_names, const vector<column_t> &global_column_ids,
MultiFileReaderData &reader_data, const string &initial_file,
optional_ptr<MultiFileReaderGlobalState> global_state);

unique_ptr<MultiFileReaderGlobalState> InitializeGlobalState(ClientContext &context, const MultiFileReaderOptions &file_options,
const MultiFileReaderBindData &bind_data, const MultiFileList &file_list,
const vector<LogicalType> &global_types, const vector<string> &global_names,
const vector<column_t> &global_column_ids) override;

void FinalizeBind(const MultiFileReaderOptions &file_options, const MultiFileReaderBindData &options,
const string &filename, const vector<string> &local_names,
const vector<LogicalType> &global_types, const vector<string> &global_names,
const vector<column_t> &global_column_ids, MultiFileReaderData &reader_data,
ClientContext &context) override;
ClientContext &context, optional_ptr<MultiFileReaderGlobalState> global_state) override;

//! Override the FinalizeChunk method
void FinalizeChunk(ClientContext &context, const MultiFileReaderBindData &bind_data,
const MultiFileReaderData &reader_data, DataChunk &chunk) override;
const MultiFileReaderData &reader_data, DataChunk &chunk, optional_ptr<MultiFileReaderGlobalState> global_state) override;

//! Override the ParseOption call to parse delta_scan specific options
bool ParseOption(const string &key, const Value &val, MultiFileReaderOptions &options,
Expand Down
15 changes: 0 additions & 15 deletions test/sql/deltatable_with_dv.test
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,6 @@ require parquet

require deltatable

# Enabling the file_row_number option, but projecting it out
#query I
#FROM delta_scan('./delta-kernel-rs/kernel/tests/data/table-with-dv-small/', file_row_number=1)
#----
#1
#2
#3
#4
#5
#6
#7
#8
#
#mode skip

# Simplest example
query I
FROM delta_scan('./delta-kernel-rs/kernel/tests/data/table-with-dv-small/')
Expand Down

0 comments on commit 6aafcce

Please sign in to comment.