Skip to content

Commit

Permalink
Rename reader config to file scan info
Browse files Browse the repository at this point in the history
  • Loading branch information
andyfengHKU committed Jan 2, 2025
1 parent 42cd8a0 commit 62b47c6
Show file tree
Hide file tree
Showing 33 changed files with 163 additions and 162 deletions.
2 changes: 1 addition & 1 deletion extension/delta/src/function/delta_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ static std::unique_ptr<TableFuncBindData> bindFunc(main::ClientContext* context,
KU_ASSERT(returnTypes.size() == returnColumnNames.size());
auto columns = input->binder->createVariables(returnColumnNames, returnTypes);
return std::make_unique<DeltaScanBindData>(std::move(query), connector,
duckdb_extension::DuckDBResultConverter{returnTypes}, columns, ReaderConfig{}, context);
duckdb_extension::DuckDBResultConverter{returnTypes}, columns, FileScanInfo{}, context);
}

struct DeltaScanSharedState final : BaseScanSharedState {
Expand Down
4 changes: 2 additions & 2 deletions extension/delta/src/include/function/delta_scan.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ struct DeltaScanBindData final : function::ScanBindData {
DeltaScanBindData(std::string query,
std::shared_ptr<duckdb_extension::DuckDBConnector> connector,
duckdb_extension::DuckDBResultConverter converter, binder::expression_vector columns,
common::ReaderConfig config, main::ClientContext* ctx)
: ScanBindData{std::move(columns), std::move(config), ctx}, query{std::move(query)},
common::FileScanInfo fileScanInfo, main::ClientContext* ctx)
: ScanBindData{std::move(columns), std::move(fileScanInfo), ctx}, query{std::move(query)},
connector{std::move(connector)}, converter{std::move(converter)} {}

std::unique_ptr<TableFuncBindData> copy() const override {
Expand Down
4 changes: 2 additions & 2 deletions extension/iceberg/src/function/iceberg_bindfunc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ static std::string generateQueryOptions(const TableFuncBindInput* input,
};
if (functionName == "ICEBERG_SCAN") {
auto scanInput = input->extraInput->constPtrCast<ExtraScanTableFuncBindInput>();
appendOptions(scanInput->config.options);
appendOptions(scanInput->fileScanInfo.options);
} else {
appendOptions(input->optionalParams);
}
Expand Down Expand Up @@ -72,7 +72,7 @@ std::unique_ptr<TableFuncBindData> bindFuncHelper(main::ClientContext* context,
KU_ASSERT(returnTypes.size() == returnColumnNames.size());
auto columns = input->binder->createVariables(returnColumnNames, returnTypes);
return std::make_unique<delta_extension::DeltaScanBindData>(std::move(query), connector,
duckdb_extension::DuckDBResultConverter{returnTypes}, columns, ReaderConfig{}, context);
duckdb_extension::DuckDBResultConverter{returnTypes}, columns, FileScanInfo{}, context);
}
} // namespace iceberg_extension
} // namespace kuzu
16 changes: 8 additions & 8 deletions extension/json/src/functions/table_functions/json_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -686,9 +686,9 @@ struct JsonScanBindData : public ScanBindData {
JsonScanFormat format;

JsonScanBindData(binder::expression_vector columns, column_id_t numWarningDataColumns,
ReaderConfig config, main::ClientContext* ctx, case_insensitive_map_t<idx_t> colNameToIdx,
FileScanInfo fileScanInfo, main::ClientContext* ctx, case_insensitive_map_t<idx_t> colNameToIdx,
JsonScanFormat format)
: ScanBindData(columns, std::move(config), ctx, numWarningDataColumns, 0),
: ScanBindData(columns, std::move(fileScanInfo), ctx, numWarningDataColumns, 0),
colNameToIdx{std::move(colNameToIdx)}, format{format} {}

uint64_t getFieldIdx(const std::string& fieldName) const;
Expand Down Expand Up @@ -772,7 +772,7 @@ static std::unique_ptr<TableFuncBindData> bindFunc(main::ClientContext* context,
auto scanInput = ku_dynamic_cast<ExtraScanTableFuncBindInput*>(input->extraInput.get());
std::vector<LogicalType> columnTypes;
std::vector<std::string> columnNames;
JsonScanConfig scanConfig(scanInput->config.options);
JsonScanConfig scanConfig(scanInput->fileScanInfo.options);
case_insensitive_map_t<idx_t> colNameToIdx;
if (!scanInput->expectedColumnNames.empty() || !scanConfig.autoDetect) {
if (scanInput->expectedColumnNames.empty()) {
Expand All @@ -789,15 +789,15 @@ static std::unique_ptr<TableFuncBindData> bindFunc(main::ClientContext* context,

if (scanConfig.format == JsonScanFormat::AUTO_DETECT) {
JSONScanSharedState sharedState(*context,
scanInput->config.getFilePath(JsonExtension::JSON_SCAN_FILE_IDX), scanConfig.format,
scanInput->fileScanInfo.getFilePath(JsonExtension::JSON_SCAN_FILE_IDX), scanConfig.format,
0);
JSONScanLocalState localState(*context->getMemoryManager(), sharedState, context);
localState.readNext();
scanConfig.format = sharedState.jsonReader->getFormat();
}
} else {
scanConfig.format =
autoDetect(context, scanInput->config.getFilePath(JsonExtension::JSON_SCAN_FILE_IDX),
autoDetect(context, scanInput->fileScanInfo.getFilePath(JsonExtension::JSON_SCAN_FILE_IDX),
scanConfig, columnTypes, columnNames, colNameToIdx);
}
scanInput->tableFunction->canParallelFunc = [scanConfig]() {
Expand All @@ -806,7 +806,7 @@ static std::unique_ptr<TableFuncBindData> bindFunc(main::ClientContext* context,

auto columns = input->binder->createVariables(columnNames, columnTypes);

const bool ignoreErrors = scanInput->config.getOption(CopyConstants::IGNORE_ERRORS_OPTION_NAME,
const bool ignoreErrors = scanInput->fileScanInfo.getOption(CopyConstants::IGNORE_ERRORS_OPTION_NAME,
CopyConstants::DEFAULT_IGNORE_ERRORS);

std::vector<std::string> warningColumnNames;
Expand All @@ -825,7 +825,7 @@ static std::unique_ptr<TableFuncBindData> bindFunc(main::ClientContext* context,
columns.push_back(column);
}
return std::make_unique<JsonScanBindData>(columns, numWarningDataColumns,
scanInput->config.copy(), context, std::move(colNameToIdx), scanConfig.format);
scanInput->fileScanInfo.copy(), context, std::move(colNameToIdx), scanConfig.format);
}

static decltype(auto) getWarningDataVectors(const DataChunk& chunk, column_id_t numWarningColumns) {
Expand Down Expand Up @@ -871,7 +871,7 @@ static offset_t tableFunc(const TableFuncInput& input, TableFuncOutput& output)
static std::unique_ptr<TableFuncSharedState> initSharedState(const TableFunctionInitInput& input) {
auto jsonBindData = input.bindData->constPtrCast<JsonScanBindData>();
return std::make_unique<JSONScanSharedState>(*jsonBindData->context,
jsonBindData->config.filePaths[0], jsonBindData->format, 0);
jsonBindData->fileScanInfo.filePaths[0], jsonBindData->format, 0);
}

static std::unique_ptr<TableFuncLocalState> initLocalState(const TableFunctionInitInput& input,
Expand Down
2 changes: 1 addition & 1 deletion scripts/headers.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ src/include/common/case_insensitive_map.h
src/include/common/cast.h
src/include/common/constants.h
src/include/common/copier_config/csv_reader_config.h
src/include/common/copier_config/reader_config.h
src/include/common/copier_config/file_scan_info.h
src/include/common/copy_constructors.h
src/include/common/data_chunk/data_chunk.h
src/include/common/data_chunk/data_chunk_state.h
Expand Down
16 changes: 8 additions & 8 deletions src/binder/bind/bind_file_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,10 @@ std::unique_ptr<BoundBaseScanSource> Binder::bindFileScanSource(const BaseScanSo
auto filePaths = bindFilePaths(fileSource->filePaths);
auto parsingOptions = bindParsingOptions(options);
FileTypeInfo fileTypeInfo;
if (parsingOptions.contains(ReaderConfig::FILE_FORMAT_OPTION_NAME)) {
auto fileFormat = parsingOptions.at(ReaderConfig::FILE_FORMAT_OPTION_NAME).toString();
if (parsingOptions.contains(FileScanInfo::FILE_FORMAT_OPTION_NAME)) {
auto fileFormat = parsingOptions.at(FileScanInfo::FILE_FORMAT_OPTION_NAME).toString();
fileTypeInfo = FileTypeInfo{FileTypeUtils::fromString(fileFormat), fileFormat};
parsingOptions.erase(ReaderConfig::FILE_FORMAT_OPTION_NAME);
parsingOptions.erase(FileScanInfo::FILE_FORMAT_OPTION_NAME);
} else {
fileTypeInfo = bindFileTypeInfo(filePaths);
}
Expand All @@ -122,14 +122,14 @@ std::unique_ptr<BoundBaseScanSource> Binder::bindFileScanSource(const BaseScanSo
}
}
// Bind file configuration
auto config = std::make_unique<ReaderConfig>(std::move(fileTypeInfo), filePaths);
config->options = std::move(parsingOptions);
auto func = getScanFunction(config->fileTypeInfo, *config);
auto fileScanInfo = std::make_unique<FileScanInfo>(std::move(fileTypeInfo), filePaths);
fileScanInfo->options = std::move(parsingOptions);
auto func = getScanFunction(fileScanInfo->fileTypeInfo, *fileScanInfo);
// Bind table function
auto bindInput = TableFuncBindInput();
bindInput.addLiteralParam(Value::createValue(filePaths[0]));
auto extraInput = std::make_unique<ExtraScanTableFuncBindInput>();
extraInput->config = config->copy();
extraInput->fileScanInfo = fileScanInfo->copy();
extraInput->expectedColumnNames = columnNames;
extraInput->expectedColumnTypes = LogicalType::copy(columnTypes);
extraInput->tableFunction = &func;
Expand Down Expand Up @@ -186,7 +186,7 @@ std::unique_ptr<BoundBaseScanSource> Binder::bindObjectScanSource(const BaseScan
if (replacementData != nullptr) { // Replace as python object
func = replacementData->func;
auto replaceExtraInput = std::make_unique<ExtraScanTableFuncBindInput>();
replaceExtraInput->config.options = bindParsingOptions(options);
replaceExtraInput->fileScanInfo.options = bindParsingOptions(options);
replacementData->bindInput.extraInput = std::move(replaceExtraInput);
replacementData->bindInput.binder = this;
bindData = func.bindFunc(clientContext, &replacementData->bindInput);
Expand Down
4 changes: 2 additions & 2 deletions src/binder/bind/copy/bind_copy_from.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,9 @@ std::unique_ptr<BoundStatement> Binder::bindCopyNodeFrom(const Statement& statem
if (boundSource->type == ScanSourceType::FILE) {
auto& source = boundSource->constCast<BoundTableScanSource>();
auto bindData = source.info.bindData->constPtrCast<ScanBindData>();
if (copyStatement.byColumn() && bindData->config.fileTypeInfo.fileType != FileType::NPY) {
if (copyStatement.byColumn() && bindData->fileScanInfo.fileTypeInfo.fileType != FileType::NPY) {
throw BinderException(stringFormat("Copy by column with {} file type is not supported.",
bindData->config.fileTypeInfo.fileTypeStr));
bindData->fileScanInfo.fileTypeInfo.fileTypeStr));
}
}
expression_vector columns;
Expand Down
4 changes: 2 additions & 2 deletions src/binder/binder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ void Binder::restoreScope(BinderScope prevScope) {
scope = std::move(prevScope);
}

function::TableFunction Binder::getScanFunction(FileTypeInfo typeInfo, const ReaderConfig& config) {
function::TableFunction Binder::getScanFunction(FileTypeInfo typeInfo, const FileScanInfo& fileScanInfo) {
function::Function* func = nullptr;
std::vector<LogicalType> inputTypes;
inputTypes.push_back(LogicalType::STRING());
Expand All @@ -242,7 +242,7 @@ function::TableFunction Binder::getScanFunction(FileTypeInfo typeInfo, const Rea
NpyScanFunction::name, inputTypes, functions);
} break;
case FileType::CSV: {
auto csvConfig = CSVReaderConfig::construct(config.options);
auto csvConfig = CSVReaderConfig::construct(fileScanInfo.options);
func = function::BuiltInFunctionsUtils::matchFunction(clientContext->getTx(),
csvConfig.parallel ? ParallelCSVScan::name : SerialCSVScan::name, inputTypes,
functions);
Expand Down
3 changes: 1 addition & 2 deletions src/common/copier_config/reader_config.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#include "common/copier_config/reader_config.h"

#include "common/assert.h"
#include "common/copier_config/file_scan_info.h"
#include "common/string_utils.h"

namespace kuzu {
Expand Down
2 changes: 1 addition & 1 deletion src/function/table/bind_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ bool TableFuncBindData::getIgnoreErrorsOption() const {
}

bool ScanBindData::getIgnoreErrorsOption() const {
return config.getOption(common::CopyConstants::IGNORE_ERRORS_OPTION_NAME,
return fileScanInfo.getOption(common::CopyConstants::IGNORE_ERRORS_OPTION_NAME,
common::CopyConstants::DEFAULT_IGNORE_ERRORS);
}

Expand Down
2 changes: 1 addition & 1 deletion src/function/table/scan_functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ namespace function {

std::pair<uint64_t, uint64_t> ScanSharedState::getNext() {
std::lock_guard<std::mutex> guard{lock};
if (fileIdx >= readerConfig.getNumFiles()) {
if (fileIdx >= fileScanInfo.getNumFiles()) {
return {UINT64_MAX, UINT64_MAX};
}
return {fileIdx, blockIdx++};
Expand Down
4 changes: 2 additions & 2 deletions src/include/binder/binder.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
#include "binder/query/bound_regular_query.h"
#include "binder/query/query_graph.h"
#include "catalog/catalog_entry/table_catalog_entry.h"
#include "common/copier_config/reader_config.h"
#include "common/copier_config/file_scan_info.h"
#include "common/enums/table_type.h"
#include "parser/ddl/parsed_property_definition.h"
#include "parser/query/graph_pattern/pattern_element.h"
Expand Down Expand Up @@ -302,7 +302,7 @@ class Binder {
void restoreScope(BinderScope prevScope);

function::TableFunction getScanFunction(common::FileTypeInfo typeInfo,
const common::ReaderConfig& config);
const common::FileScanInfo& fileScanInfo);

ExpressionBinder* getExpressionBinder() { return &expressionBinder; }

Expand Down
6 changes: 3 additions & 3 deletions src/include/binder/bound_export_database.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#include "binder/binder.h"
#include "binder/bound_statement.h"
#include "binder/query/bound_regular_query.h"
#include "common/copier_config/reader_config.h"
#include "common/copier_config/file_scan_info.h"

namespace kuzu {
namespace binder {
Expand Down Expand Up @@ -34,12 +34,12 @@ class BoundExportDatabase final : public BoundStatement {
common::case_insensitive_map_t<common::Value> getExportOptions() const {
return boundFileInfo.options;
}
const common::ReaderConfig* getBoundFileInfo() const { return &boundFileInfo; }
const common::FileScanInfo* getBoundFileInfo() const { return &boundFileInfo; }
const std::vector<ExportedTableData>* getExportData() const { return &exportData; }

private:
std::vector<ExportedTableData> exportData;
common::ReaderConfig boundFileInfo;
common::FileScanInfo boundFileInfo;
};

} // namespace binder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,17 @@ struct FileTypeUtils {
static FileType fromString(std::string fileType);
};

struct ReaderConfig {
struct FileScanInfo {
static constexpr const char* FILE_FORMAT_OPTION_NAME = "FILE_FORMAT";

FileTypeInfo fileTypeInfo;
std::vector<std::string> filePaths;
case_insensitive_map_t<Value> options;

ReaderConfig() : fileTypeInfo{FileType::UNKNOWN, ""} {}
ReaderConfig(FileTypeInfo fileTypeInfo, std::vector<std::string> filePaths)
FileScanInfo() : fileTypeInfo{FileType::UNKNOWN, ""} {}
FileScanInfo(FileTypeInfo fileTypeInfo, std::vector<std::string> filePaths)
: fileTypeInfo{std::move(fileTypeInfo)}, filePaths{std::move(filePaths)} {}
EXPLICIT_COPY_DEFAULT_MOVE(ReaderConfig);
EXPLICIT_COPY_DEFAULT_MOVE(FileScanInfo);

uint32_t getNumFiles() const { return filePaths.size(); }
std::string getFilePath(idx_t fileIdx) const {
Expand All @@ -57,7 +57,7 @@ struct ReaderConfig {
}

private:
ReaderConfig(const ReaderConfig& other)
FileScanInfo(const FileScanInfo& other)
: fileTypeInfo{other.fileTypeInfo}, filePaths{other.filePaths}, options{other.options} {}
};

Expand Down
14 changes: 7 additions & 7 deletions src/include/function/table/bind_data.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#pragma once

#include "common/copier_config/reader_config.h"
#include "common/copier_config/file_scan_info.h"
#include "common/types/types.h"
#include "main/client_context.h"
#include "storage/predicate/column_predicate.h"
Expand Down Expand Up @@ -63,19 +63,19 @@ struct KUZU_API TableFuncBindData {
};

struct KUZU_API ScanBindData : public TableFuncBindData {
common::ReaderConfig config;
common::FileScanInfo fileScanInfo;
main::ClientContext* context;

ScanBindData(binder::expression_vector columns, common::ReaderConfig config,
ScanBindData(binder::expression_vector columns, common::FileScanInfo fileScanInfo,
main::ClientContext* context)
: TableFuncBindData{std::move(columns)}, config{std::move(config)}, context{context} {}
ScanBindData(binder::expression_vector columns, common::ReaderConfig config,
: TableFuncBindData{std::move(columns)}, fileScanInfo{std::move(fileScanInfo)}, context{context} {}
ScanBindData(binder::expression_vector columns, common::FileScanInfo fileScanInfo,
main::ClientContext* context, common::column_id_t numWarningDataColumns,
common::row_idx_t estCardinality)
: TableFuncBindData{std::move(columns), numWarningDataColumns, estCardinality},
config{std::move(config)}, context{context} {}
fileScanInfo{std::move(fileScanInfo)}, context{context} {}
ScanBindData(const ScanBindData& other)
: TableFuncBindData{other}, config{other.config.copy()}, context{other.context} {}
: TableFuncBindData{other}, fileScanInfo{other.fileScanInfo.copy()}, context{other.context} {}

bool getIgnoreErrorsOption() const override;

Expand Down
4 changes: 2 additions & 2 deletions src/include/function/table/bind_input.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

#include "binder/expression/expression.h"
#include "common/case_insensitive_map.h"
#include "common/copier_config/reader_config.h"
#include "common/copier_config/file_scan_info.h"
#include "common/types/value/value.h"

namespace kuzu {
Expand Down Expand Up @@ -52,7 +52,7 @@ struct KUZU_API TableFuncBindInput {
};

struct KUZU_API ExtraScanTableFuncBindInput : ExtraTableFuncBindInput {
common::ReaderConfig config;
common::FileScanInfo fileScanInfo;
std::vector<std::string> expectedColumnNames;
std::vector<common::LogicalType> expectedColumnTypes;
function::TableFunction* tableFunction = nullptr;
Expand Down
12 changes: 6 additions & 6 deletions src/include/function/table/scan_functions.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

#include <mutex>

#include "common/copier_config/reader_config.h"
#include "common/copier_config/file_scan_info.h"
#include "function/table_functions.h"

namespace kuzu {
Expand All @@ -27,12 +27,12 @@ struct BaseScanSharedStateWithNumRows : public BaseScanSharedState {
};

struct ScanSharedState : public BaseScanSharedStateWithNumRows {
const common::ReaderConfig readerConfig;
const common::FileScanInfo fileScanInfo;
uint64_t fileIdx;
uint64_t blockIdx;

ScanSharedState(common::ReaderConfig readerConfig, uint64_t numRows)
: BaseScanSharedStateWithNumRows{numRows}, readerConfig{std::move(readerConfig)},
ScanSharedState(common::FileScanInfo fileScanInfo, uint64_t numRows)
: BaseScanSharedStateWithNumRows{numRows}, fileScanInfo{std::move(fileScanInfo)},
fileIdx{0}, blockIdx{0} {}

std::pair<uint64_t, uint64_t> getNext();
Expand All @@ -43,9 +43,9 @@ struct ScanFileSharedState : public ScanSharedState {
uint64_t totalSize; // TODO(Mattias): I think we should unify the design on how we calculate the
// progress bar for scanning. Can we simply rely on a numRowsScaned stored
// in the TableFuncSharedState to determine the progress.
ScanFileSharedState(common::ReaderConfig readerConfig, uint64_t numRows,
ScanFileSharedState(common::FileScanInfo fileScanInfo, uint64_t numRows,
main::ClientContext* context)
: ScanSharedState{std::move(readerConfig), numRows}, context{context}, totalSize{0} {}
: ScanSharedState{std::move(fileScanInfo), numRows}, context{context}, totalSize{0} {}
};

} // namespace function
Expand Down
Loading

0 comments on commit 62b47c6

Please sign in to comment.