diff --git a/duckdb b/duckdb index 8b6804a..bb927be 160000 --- a/duckdb +++ b/duckdb @@ -1 +1 @@ -Subproject commit 8b6804a1730ab79517550384e414528e4d73a006 +Subproject commit bb927beb445c8c0f16f9164cf2eb5c3cf2239886 diff --git a/src/functions/deltatable_scan.cpp b/src/functions/deltatable_scan.cpp index e1a8c75..71d39a4 100644 --- a/src/functions/deltatable_scan.cpp +++ b/src/functions/deltatable_scan.cpp @@ -295,18 +295,10 @@ bool DeltaMultiFileReader::Bind(MultiFileReaderOptions &options, MultiFileList & return_types.emplace_back(LogicalType::BIGINT); names.emplace_back("file_row_number"); } else { - // TODO: this is a bogus ID? + // TODO: this is a bogus ID? Change for flag indicating it should be enabled? bind_data.file_row_number_idx = names.size(); } - // This is where we want the extra column - // TODO: only enable this if we have deletion vectors for this scan? - bind_data.required_columns.push_back({ - "file_row_number", - LogicalType::BIGINT, - file_row_number_enabled - }); - return true; }; @@ -323,9 +315,6 @@ void DeltaMultiFileReader::BindOptions(MultiFileReaderOptions &options, MultiFil auto demo_gen_col_opt = options.custom_options.find("delta_file_number"); if (demo_gen_col_opt != options.custom_options.end()) { if (demo_gen_col_opt->second.GetValue()) { - auto parquet_columns_produced = names.size(); - D_ASSERT(bind_data.custom_data.find("file_number_column_idx") == bind_data.custom_data.end()); - bind_data.custom_data["file_number_column_idx"] = Value::UBIGINT(parquet_columns_produced); names.push_back("delta_file_number"); return_types.push_back(LogicalType::UBIGINT); } @@ -336,38 +325,27 @@ void DeltaMultiFileReader::FinalizeBind(const MultiFileReaderOptions &file_optio const string &filename, const vector &local_names, const vector &global_types, const vector &global_names, const vector &global_column_ids, MultiFileReaderData &reader_data, - ClientContext &context) { - MultiFileReader::FinalizeBind(file_options, options, filename, local_names, global_types, global_names, global_column_ids, reader_data, context); + ClientContext &context, optional_ptr global_state) { + MultiFileReader::FinalizeBind(file_options, options, filename, local_names, global_types, global_names, global_column_ids, reader_data, context, global_state); - // Handle custom delta option set in MultiFileReaderOptions::custom_options with data passed through in MultiFileReaderBindData::custom_data + // Handle custom delta option set in MultiFileReaderOptions::custom_options auto file_number_opt = file_options.custom_options.find("delta_file_number"); if (file_number_opt != file_options.custom_options.end()) { if (file_number_opt->second.GetValue()) { - auto maybe_file_number_column = options.custom_data.find("file_number_column_idx"); - D_ASSERT(maybe_file_number_column != options.custom_data.end()); - auto file_number_column_idx = maybe_file_number_column->second.GetValue(); - D_ASSERT(file_number_column_idx != DConstants::INVALID_INDEX); - - // TODO: we have the metadata for the file available here already, the reason we handle this in FinalizeChunk - // is purely for demonstration purposes - - // What you can do here is 2 things: - // - either add the constant directly using: reader_data.constant_map.emplace_back(i, Value::UBIGINT(0)); - // - add the constant as a nop, store the global - for (idx_t i = 0; i < global_column_ids.size(); i++) { - column_t col_id = global_column_ids[i]; - if (col_id == file_number_column_idx) { - reader_data.constant_map.emplace_back(i, col_id, Value::UBIGINT(0)); - break; - } - } + D_ASSERT(global_state); + auto &delta_global_state = global_state->Cast(); + D_ASSERT(delta_global_state.delta_file_number_idx != DConstants::INVALID_INDEX); + + // We add the constant column for the delta_file_number option + // NOTE: we add a placeholder here, to demonstrate how we can also populate extra columns in the FinalizeChunk + reader_data.constant_map.emplace_back(delta_global_state.delta_file_number_idx, Value::UBIGINT(0)); } } // Get the metadata for this file - D_ASSERT(reader_data.file_metadata.file_list); - const auto &snapshot = dynamic_cast(*reader_data.file_metadata.file_list); - auto &file_metadata = snapshot.metadata[reader_data.file_metadata.file_list_idx]; + D_ASSERT(global_state->file_list); + const auto &snapshot = dynamic_cast(*global_state->file_list); + auto &file_metadata = snapshot.metadata[reader_data.file_list_idx]; if (!file_metadata.partition_map.empty()) { for (idx_t i = 0; i < global_column_ids.size(); i++) { @@ -376,7 +354,7 @@ void DeltaMultiFileReader::FinalizeBind(const MultiFileReaderOptions &file_optio if (col_partition_entry != file_metadata.partition_map.end()) { // Todo: use https://github.com/delta-io/delta/blob/master/PROTOCOL.md#partition-value-serialization auto maybe_value = Value(col_partition_entry->second).DefaultCastAs(global_types[i]); - reader_data.constant_map.emplace_back(i, col_id, maybe_value); + reader_data.constant_map.emplace_back(i, maybe_value); } } } @@ -414,66 +392,154 @@ static SelectionVector DuckSVFromDeltaSV(ffi::KernelBoolSlice *dv, Vector row_id return result; } -static idx_t LocalColIdxToResultColIdx(idx_t local_idx, const MultiFileReaderData& reader_data) { - for (auto &constant_entry: reader_data.constant_map) { - if (constant_entry.local_column_id == local_idx) { - return constant_entry.result_column_id; +// Parses the columns that are used by the delta extension into +void DeltaMultiFileReaderGlobalState::SetColumnIdx(const string &column, idx_t idx) { + if (column == "file_row_number") { + file_row_number_idx = idx; + return; + } else if (column == "delta_file_number") { + delta_file_number_idx = idx; + return; + } + throw InternalException("Unknown column '%s' found as required by the DeltaMultiFileReader"); +} + +unique_ptr DeltaMultiFileReader::InitializeGlobalState(duckdb::ClientContext &context, + const duckdb::MultiFileReaderOptions &file_options, + const duckdb::MultiFileReaderBindData &bind_data, + const duckdb::MultiFileList &file_list, + const vector &global_types, + const vector &global_names, + const vector &global_column_ids) { + vector extra_columns; + vector> mapped_columns; + + // Create a map of the columns that are in the projection + case_insensitive_map_t selected_columns; + for (idx_t i = 0; i < global_column_ids.size(); i++) { + auto global_id = global_column_ids[i]; + auto &global_name = global_names[global_id]; + selected_columns.insert({global_name, i}); + } + + // The hardcoded (for now) columns to be mapped + case_insensitive_map_t columns_to_map = { + {"file_row_number", LogicalType::BIGINT}, + {"delta_file_number", LogicalType::UBIGINT} + }; + + // Map every column to either a column in the projection, or add it to the extra columns if it doesn't exist + idx_t col_offset = 0; + for (const auto &required_column : columns_to_map) { + // First check if the column is in the projection + auto res = selected_columns.find(required_column.first); + if (res != selected_columns.end()) { + // The column is in the projection, no special handling is required; we simply store the index + mapped_columns.push_back({required_column.first, res->second}); + continue; } + + // The column is NOT in the projection: it needs to be added as an extra_column + + // Calculate the index of the added column (extra columns are added after all other columns) + idx_t current_col_idx = global_column_ids.size() + col_offset++; + + // Add column to the map, to ensure the MultiFileReader can find it when processing the Chunk + mapped_columns.push_back({required_column.first, current_col_idx}); + + // Ensure the result DataChunk has a vector of the correct type to store this column + extra_columns.push_back(required_column.second); } - return DConstants::INVALID_INDEX; + + auto res = make_uniq(extra_columns, &file_list); + + // Parse all the mapped columns into the DeltaMultiFileReaderGlobalState for easy use; + for (const auto& mapped_column : mapped_columns) { + res->SetColumnIdx(mapped_column.first, mapped_column.second); + } + + return std::move(res); +} + +void DeltaMultiFileReader::CreateNameMapping(const string &file_name, const vector &local_types, + const vector &local_names, const vector &global_types, + const vector &global_names, const vector &global_column_ids, + MultiFileReaderData &reader_data, const string &initial_file, + optional_ptr global_state) { + // First call the base implementation to do most mapping + MultiFileReader::CreateNameMapping(file_name, local_types, local_names, global_types, global_names, global_column_ids, reader_data, initial_file, global_state); + + // Then we handle delta specific mapping + D_ASSERT(global_state); + auto &delta_global_state = global_state->Cast(); + + // Check if the file_row_number column is an "extra_column" which is not part of the projection + if (delta_global_state.file_row_number_idx >= global_column_ids.size()) { + D_ASSERT(delta_global_state.file_row_number_idx != DConstants::INVALID_INDEX); + + // Build the name map + case_insensitive_map_t name_map; + for (idx_t col_idx = 0; col_idx < local_names.size(); col_idx++) { + name_map[local_names[col_idx]] = col_idx; + } + + // Lookup the required column in the local map + auto entry = name_map.find("file_row_number"); + if (entry == name_map.end()) { + throw InternalException("Failed to find the file_row_number column"); + } + + // Register the column to be scanned from this file + reader_data.column_ids.push_back(entry->second); + reader_data.column_mapping.push_back(delta_global_state.file_row_number_idx); + } + + // This may have changed: update it + reader_data.empty_columns = reader_data.column_ids.empty(); } void DeltaMultiFileReader::FinalizeChunk(ClientContext &context, const MultiFileReaderBindData &bind_data, - const MultiFileReaderData &reader_data, DataChunk &chunk) { + const MultiFileReaderData &reader_data, DataChunk &chunk, optional_ptr global_state) { // Base class finalization first - MultiFileReader::FinalizeChunk(context, bind_data, reader_data, chunk); + MultiFileReader::FinalizeChunk(context, bind_data, reader_data, chunk, global_state); - D_ASSERT(reader_data.file_metadata.file_list); + D_ASSERT(global_state); + auto &delta_global_state = global_state->Cast(); + D_ASSERT(delta_global_state.file_list); // Get the metadata for this file - const auto &snapshot = dynamic_cast(*reader_data.file_metadata.file_list); - auto &metadata = snapshot.metadata[reader_data.file_metadata.file_list_idx]; + const auto &snapshot = dynamic_cast(*global_state->file_list); + auto &metadata = snapshot.metadata[reader_data.file_list_idx]; if (metadata.selection_vector.get() && chunk.size() != 0) { - idx_t select_count; - auto res = reader_data.required_column_map.find("file_row_number"); - if (res == reader_data.required_column_map.end()) { - throw InternalException("Failed to find file_row_number column used to apply the deletion vector at"); - } + D_ASSERT(delta_global_state.file_row_number_idx != DConstants::INVALID_INDEX); + auto &file_row_number_column = chunk.data[delta_global_state.file_row_number_idx]; // Construct the selection vector using the file_row_number column and the raw selection vector from delta - auto sv = DuckSVFromDeltaSV(metadata.selection_vector.get(), chunk.data[res->second], chunk.size(), select_count); - - // Slice the result + idx_t select_count; + auto sv = DuckSVFromDeltaSV(metadata.selection_vector.get(), file_row_number_column, chunk.size(), select_count); chunk.Slice(sv, select_count); } // Note: this demo function shows how we can use DuckDB's Binder create expression-based generated columns - auto maybe_file_number_column_idx = bind_data.custom_data.find("file_number_column_idx"); - if (maybe_file_number_column_idx != bind_data.custom_data.end()) { - auto file_number_column_idx = maybe_file_number_column_idx->second.GetValue(); - idx_t file_number_column_result_idx = LocalColIdxToResultColIdx(file_number_column_idx, reader_data); - - // If the idx is present it means that it occurs in the result and we need to modify its - if (file_number_column_result_idx != DConstants::INVALID_INDEX) { - //! Create Dummy expression (0 + file_number) - vector> child_expr; - child_expr.push_back(make_uniq(Value::UBIGINT(0))); - child_expr.push_back(make_uniq(Value::UBIGINT(metadata.file_number))); - unique_ptr expr = make_uniq("+", std::move(child_expr), nullptr, nullptr, false, true); - - //! s dummy expression - auto binder = Binder::CreateBinder(context); - ExpressionBinder expr_binder(*binder, context); - auto bound_expr = expr_binder.Bind(expr, nullptr); - - //! Execute dummy expression into result column - ExpressionExecutor expr_executor(context); - expr_executor.AddExpression(*bound_expr); - - //! Execute the expression directly into the output Chunk - expr_executor.ExecuteExpression(chunk.data[file_number_column_result_idx]); - } + if (delta_global_state.delta_file_number_idx != DConstants::INVALID_INDEX) { + //! Create Dummy expression (0 + file_number) + vector> child_expr; + child_expr.push_back(make_uniq(Value::UBIGINT(0))); + child_expr.push_back(make_uniq(Value::UBIGINT(metadata.file_number))); + unique_ptr expr = make_uniq("+", std::move(child_expr), nullptr, nullptr, false, true); + + //! s dummy expression + auto binder = Binder::CreateBinder(context); + ExpressionBinder expr_binder(*binder, context); + auto bound_expr = expr_binder.Bind(expr, nullptr); + + //! Execute dummy expression into result column + ExpressionExecutor expr_executor(context); + expr_executor.AddExpression(*bound_expr); + + //! Execute the expression directly into the output Chunk + expr_executor.ExecuteExpression(chunk.data[delta_global_state.delta_file_number_idx]); } }; diff --git a/src/include/functions/deltatable_scan.hpp b/src/include/functions/deltatable_scan.hpp index a6dae8c..bd085ab 100644 --- a/src/include/functions/deltatable_scan.hpp +++ b/src/include/functions/deltatable_scan.hpp @@ -74,6 +74,17 @@ struct DeltaTableSnapshot : public MultiFileList { ClientContext &context; }; +struct DeltaMultiFileReaderGlobalState : public MultiFileReaderGlobalState { + DeltaMultiFileReaderGlobalState(vector extra_columns_p, optional_ptr file_list_p) : MultiFileReaderGlobalState(extra_columns_p, file_list_p) { + } + //! The idx of the file number column in the result chunk + idx_t delta_file_number_idx = DConstants::INVALID_INDEX; + //! The idx of the file_row_number column in the result chunk + idx_t file_row_number_idx = DConstants::INVALID_INDEX; + + void SetColumnIdx(const string &column, idx_t idx); +}; + struct DeltaMultiFileReader : public MultiFileReader { static unique_ptr CreateInstance(); //! Return a DeltaTableSnapshot @@ -89,15 +100,26 @@ struct DeltaMultiFileReader : public MultiFileReader { void BindOptions(MultiFileReaderOptions &options, MultiFileList &files, vector &return_types, vector &names, MultiFileReaderBindData& bind_data) override; + void CreateNameMapping(const string &file_name, const vector &local_types, + const vector &local_names, const vector &global_types, + const vector &global_names, const vector &global_column_ids, + MultiFileReaderData &reader_data, const string &initial_file, + optional_ptr global_state); + + unique_ptr InitializeGlobalState(ClientContext &context, const MultiFileReaderOptions &file_options, + const MultiFileReaderBindData &bind_data, const MultiFileList &file_list, + const vector &global_types, const vector &global_names, + const vector &global_column_ids) override; + void FinalizeBind(const MultiFileReaderOptions &file_options, const MultiFileReaderBindData &options, const string &filename, const vector &local_names, const vector &global_types, const vector &global_names, const vector &global_column_ids, MultiFileReaderData &reader_data, - ClientContext &context) override; + ClientContext &context, optional_ptr global_state) override; //! Override the FinalizeChunk method void FinalizeChunk(ClientContext &context, const MultiFileReaderBindData &bind_data, - const MultiFileReaderData &reader_data, DataChunk &chunk) override; + const MultiFileReaderData &reader_data, DataChunk &chunk, optional_ptr global_state) override; //! Override the ParseOption call to parse delta_scan specific options bool ParseOption(const string &key, const Value &val, MultiFileReaderOptions &options, diff --git a/test/sql/deltatable_with_dv.test b/test/sql/deltatable_with_dv.test index 163cbec..b6609a6 100644 --- a/test/sql/deltatable_with_dv.test +++ b/test/sql/deltatable_with_dv.test @@ -6,21 +6,6 @@ require parquet require deltatable -# Enabling the file_row_number option, but projecting it out -#query I -#FROM delta_scan('./delta-kernel-rs/kernel/tests/data/table-with-dv-small/', file_row_number=1) -#---- -#1 -#2 -#3 -#4 -#5 -#6 -#7 -#8 -# -#mode skip - # Simplest example query I FROM delta_scan('./delta-kernel-rs/kernel/tests/data/table-with-dv-small/')