Skip to content

Commit

Permalink
Clean up code and reorganize test files
Browse files Browse the repository at this point in the history
  • Loading branch information
samansmink committed May 14, 2024
1 parent d332377 commit b185940
Show file tree
Hide file tree
Showing 19 changed files with 161 additions and 220 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/_extension_distribution.yml
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,8 @@ jobs:
- name: Test extension
if: ${{ matrix.duckdb_arch != 'linux_arm64'}}
env:
DAT_AVAILABLE: 1
run: |
make test
Expand Down Expand Up @@ -264,6 +266,8 @@ jobs:
- name: Test Extension
if: ${{ matrix.osx_build_arch == 'arm64'}}
shell: bash
env:
DAT_AVAILABLE: 1
run: |
make test
Expand Down
22 changes: 11 additions & 11 deletions src/functions/delta_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,19 +39,19 @@ static void visit_callback(ffi::NullableCvoid engine_context, const struct ffi::

// First we append the file to our resolved files
context->resolved_files.push_back(DeltaSnapshot::ToDuckDBPath(path_string));
context->metadata.push_back({});
context->metadata.emplace_back(make_uniq<DeltaFileMetaData>());

D_ASSERT(context->resolved_files.size() == context->metadata.size());

// Initialize the file metadata
context->metadata.back().delta_snapshot_version = context->version;
context->metadata.back().file_number = context->resolved_files.size() - 1;
context->metadata.back()->delta_snapshot_version = context->version;
context->metadata.back()->file_number = context->resolved_files.size() - 1;

// Fetch the deletion vector
auto selection_vector_res = ffi::selection_vector_from_dv(dv_info, context->table_client, context->global_state);
auto selection_vector = unpack_result_or_throw(selection_vector_res, "selection_vector_from_dv for path " + context->GetPath());
if (selection_vector.ptr) {
context->metadata.back().selection_vector = selection_vector;
context->metadata.back()->selection_vector = selection_vector;
}

// Lookup all columns for potential hits in the constant map
Expand All @@ -65,7 +65,7 @@ static void visit_callback(ffi::NullableCvoid engine_context, const struct ffi::
delete partition_val;
}
}
context->metadata.back().partition_map = std::move(constant_map);
context->metadata.back()->partition_map = std::move(constant_map);
}

static void visit_data(void *engine_context, struct ffi::EngineDataHandle *engine_data, const struct ffi::KernelBoolSlice selection_vec) {
Expand Down Expand Up @@ -347,11 +347,11 @@ void DeltaMultiFileReader::FinalizeBind(const MultiFileReaderOptions &file_optio
const auto &snapshot = dynamic_cast<const DeltaSnapshot&>(*global_state->file_list);
auto &file_metadata = snapshot.metadata[reader_data.file_list_idx.GetIndex()];

if (!file_metadata.partition_map.empty()) {
if (!file_metadata->partition_map.empty()) {
for (idx_t i = 0; i < global_column_ids.size(); i++) {
column_t col_id = global_column_ids[i];
auto col_partition_entry = file_metadata.partition_map.find(global_names[col_id]);
if (col_partition_entry != file_metadata.partition_map.end()) {
auto col_partition_entry = file_metadata->partition_map.find(global_names[col_id]);
if (col_partition_entry != file_metadata->partition_map.end()) {
// Todo: use https://github.com/delta-io/delta/blob/master/PROTOCOL.md#partition-value-serialization
auto maybe_value = Value(col_partition_entry->second).DefaultCastAs(global_types[i]);
reader_data.constant_map.emplace_back(i, maybe_value);
Expand Down Expand Up @@ -513,13 +513,13 @@ void DeltaMultiFileReader::FinalizeChunk(ClientContext &context, const MultiFile
const auto &snapshot = dynamic_cast<const DeltaSnapshot&>(*global_state->file_list);
auto &metadata = snapshot.metadata[reader_data.file_list_idx.GetIndex()];

if (metadata.selection_vector.ptr && chunk.size() != 0) {
if (metadata->selection_vector.ptr && chunk.size() != 0) {
D_ASSERT(delta_global_state.file_row_number_idx != DConstants::INVALID_INDEX);
auto &file_row_number_column = chunk.data[delta_global_state.file_row_number_idx];

// Construct the selection vector using the file_row_number column and the raw selection vector from delta
idx_t select_count;
auto sv = DuckSVFromDeltaSV(metadata.selection_vector, file_row_number_column, chunk.size(), select_count);
auto sv = DuckSVFromDeltaSV(metadata->selection_vector, file_row_number_column, chunk.size(), select_count);
chunk.Slice(sv, select_count);
}

Expand All @@ -528,7 +528,7 @@ void DeltaMultiFileReader::FinalizeChunk(ClientContext &context, const MultiFile
//! Create Dummy expression (0 + file_number)
vector<unique_ptr<ParsedExpression>> child_expr;
child_expr.push_back(make_uniq<ConstantExpression>(Value::UBIGINT(0)));
child_expr.push_back(make_uniq<ConstantExpression>(Value::UBIGINT(metadata.file_number)));
child_expr.push_back(make_uniq<ConstantExpression>(Value::UBIGINT(metadata->file_number)));
unique_ptr<ParsedExpression> expr = make_uniq<FunctionExpression>("+", std::move(child_expr), nullptr, nullptr, false, true);

//! s dummy expression
Expand Down
8 changes: 7 additions & 1 deletion src/include/delta_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ class SchemaVisitor {
visitor.data = &state;
visitor.make_field_list = (uintptr_t (*)(void*, uintptr_t)) &MakeFieldList;
visitor.visit_struct = (void (*)(void*, uintptr_t, ffi::KernelStringSlice, uintptr_t)) &VisitStruct;
visitor.visit_decimal = (void (*)(void*, uintptr_t, ffi::KernelStringSlice, uint8_t , int8_t)) &VisitDecimal;
visitor.visit_string = VisitSimpleType<LogicalType::VARCHAR>();
visitor.visit_long = VisitSimpleType<LogicalType::BIGINT>();
visitor.visit_integer = VisitSimpleType<LogicalType::SMALLINT>();
visitor.visit_integer = VisitSimpleType<LogicalType::INTEGER>();
visitor.visit_short = VisitSimpleType<LogicalType::SMALLINT>();
visitor.visit_byte = VisitSimpleType<LogicalType::TINYINT>();
visitor.visit_float = VisitSimpleType<LogicalType::FLOAT>();
visitor.visit_double = VisitSimpleType<LogicalType::DOUBLE>();
Expand Down Expand Up @@ -51,6 +53,10 @@ class SchemaVisitor {
state->AppendToList(sibling_list_id, name, TypeId);
}

static void VisitDecimal(SchemaVisitor* state, uintptr_t sibling_list_id, ffi::KernelStringSlice name, uint8_t precision, int8_t scale) {
state->AppendToList(sibling_list_id, name, LogicalType::DECIMAL(precision, scale));
}

static uintptr_t MakeFieldList(SchemaVisitor* state, uintptr_t capacity_hint) {
return state->MakeFieldListImpl(capacity_hint);
}
Expand Down
12 changes: 9 additions & 3 deletions src/include/functions/delta_scan.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,20 @@
namespace duckdb {

struct DeltaFileMetaData {
DeltaFileMetaData() {};

// No copying pls
DeltaFileMetaData (const DeltaFileMetaData&) = delete;
DeltaFileMetaData& operator= (const DeltaFileMetaData&) = delete;

~DeltaFileMetaData() {
if (selection_vector.ptr) {
ffi::drop_bool_slice(selection_vector);
}
}

idx_t delta_snapshot_version;
idx_t file_number;
idx_t delta_snapshot_version = DConstants::INVALID_INDEX;
idx_t file_number = DConstants::INVALID_INDEX;
ffi::KernelBoolSlice selection_vector = {nullptr, 0};
case_insensitive_map_t<string> partition_map;
};
Expand Down Expand Up @@ -66,7 +72,7 @@ struct DeltaSnapshot : public MultiFileList {
vector<string> names;

//! Metadata map for files
vector<DeltaFileMetaData> metadata;
vector<unique_ptr<DeltaFileMetaData>> metadata;

//! Current file list resolution state
bool initialized = false;
Expand Down
31 changes: 14 additions & 17 deletions test/sql/dat/basic_append.test
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,15 @@ require-env DAT_AVAILABLE
# - part-00000-c156ac8b-f738-4479-803d-750072dd4c51-c000.snappy.parquet
# - contains letters d,e

mode skip

# TODO Missing types: double
# Query the whole table
query II
SELECT letter, number
query III
SELECT *
FROM delta_scan('./delta-kernel-rs/acceptance/tests/dat/out/reader_tests/generated/basic_append/delta')
----
d 4
e 5
a 1
b 2
c 3
d 4 4.4
e 5 5.5
a 1 1.1
b 2 2.2
c 3 3.3

query I
SELECT letter
Expand All @@ -48,7 +44,8 @@ FROM delta_scan('./delta-kernel-rs/acceptance/tests/dat/out/reader_tests/generat
2
3

# TODO add a_float column test
# TODO: Figure out what's wrong here
mode skip

# Now we add a filter that filters out one of the files
query II
Expand All @@ -61,18 +58,18 @@ a 1
mode unskip

# Now we add a filter that filters out the other file
query II
SELECT letter, number
query III
SELECT a_float, letter, number,
FROM delta_scan('./delta-kernel-rs/acceptance/tests/dat/out/reader_tests/generated/basic_append/delta')
WHERE number > 4
----
e 5
5.5 e 5

mode skip

# Now we add a filter that filters out all columns
query II
SELECT letter, number
query III
SELECT a_float, number, letter
FROM delta_scan('./delta-kernel-rs/acceptance/tests/dat/out/reader_tests/generated/basic_append/delta')
WHERE number > 6
----
26 changes: 26 additions & 0 deletions test/sql/dat/custom_parameters.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# name: test/sql/dat/custom_parameters.test
# description: DAT test suite: use the basic append test to test the delta_file_number param
# group: [delta]

require parquet

require delta

require-env DAT_AVAILABLE

# Note: this table has 2 parquet files:
# - part-00000-ef42f28f-e8e8-4d54-b51f-c3af96c72a44-c000.snappy.parquet
# - contains letters a,b,c
# - part-00000-c156ac8b-f738-4479-803d-750072dd4c51-c000.snappy.parquet
# - contains letters d,e

# Test with appends and several custom options
query IIIII
SELECT parse_filename(filename), file_row_number, letter, delta_file_number, number
FROM delta_scan('./delta-kernel-rs/acceptance/tests/dat/out/reader_tests/generated/basic_append/delta', delta_file_number=1, file_row_number=1, filename=1)
----
part-00000-c156ac8b-f738-4479-803d-750072dd4c51-c000.snappy.parquet 0 d 0 4
part-00000-c156ac8b-f738-4479-803d-750072dd4c51-c000.snappy.parquet 1 e 0 5
part-00000-ef42f28f-e8e8-4d54-b51f-c3af96c72a44-c000.snappy.parquet 0 a 1 1
part-00000-ef42f28f-e8e8-4d54-b51f-c3af96c72a44-c000.snappy.parquet 1 b 1 2
part-00000-ef42f28f-e8e8-4d54-b51f-c3af96c72a44-c000.snappy.parquet 2 c 1 3
17 changes: 8 additions & 9 deletions test/sql/dat/primitive_types.test
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,12 @@ require delta

require-env DAT_AVAILABLE

# TODO add all the other types
query III
SELECT "utf8", "int64", "int32"
FROM delta_scan('delta-kernel-rs/acceptance/tests/dat/out/reader_tests/generated/all_primitive_types/delta')
query IIIIIIIIIIII
SELECT *
FROM delta_scan('./delta-kernel-rs/acceptance/tests/dat/out/reader_tests/generated/all_primitive_types/delta')
----
0 0 0
1 1 1
2 2 2
3 3 3
4 4 4
0 0 0 0 0 0.0 0.0 true (empty) 10.000 1970-01-01 1970-01-01 00:00:00
1 1 1 1 1 1.0 1.0 false \x00 11.000 1970-01-02 1970-01-01 01:00:00
2 2 2 2 2 2.0 2.0 true \x00\x00 12.000 1970-01-03 1970-01-01 02:00:00
3 3 3 3 3 3.0 3.0 false \x00\x00\x00 13.000 1970-01-04 1970-01-01 03:00:00
4 4 4 4 4 4.0 4.0 true \x00\x00\x00\x00 14.000 1970-01-05 1970-01-01 04:00:00
38 changes: 0 additions & 38 deletions test/sql/dat/test_custom_delta_scan_param.test

This file was deleted.

15 changes: 15 additions & 0 deletions test/sql/delta_kernel_rs/basic_partitioned.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# name: test/sql/delta_kernel_rs/basic_partitioned.test
# description: simple table with partitions
# group: [delta_kernel_rs]

require parquet

require delta

# FIXME: this fails due some weird error
mode skip

statement error
SELECT * FROM delta_scan('./delta-kernel-rs/kernel/tests/data/basic_partitioned')
----
Failed to read file "/Users/sam/Development/delta-kernel-testing/delta-kernel-rs/kernel/tests/data/basic_partitioned/letter=__HIVE_DEFAULT_PARTITION__
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# name: test/sql/delta_with_dv.test
# name: test/sql/delta_kernel_rs/simple_with_dv.test
# description: test delta extension with deletion vector
# group: [delta]

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# name: test/sql/delta_scan_parquet_params.test
# description: test that named params to the parquet_scan still work
# group: [delta]
# name: test/sql/delta_kernel_rs/simple_without_dv.test
# description: simple table
# group: [delta_kernel_rs]

require parquet

Expand Down
Loading

0 comments on commit b185940

Please sign in to comment.