Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/planop parameter serialization #341

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion src/bin/units_access/ops_select.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,15 @@ TEST_F(SelectTests, simple_expression) {
TEST_F(SelectTests, should_throw_without_predicates) {
Json::Value v(Json::objectValue);
v["type"] = "SimpleTableScan";
ASSERT_THROW(SimpleTableScan::parse(v), std::runtime_error);

std::stringstream ss;
ss << v.toStyledString();

cereal::JSONInputArchive archive(ss);

typename SimpleTableScan::Parameters params;

ASSERT_THROW(params.serialize(archive), std::runtime_error);
}


Expand Down
38 changes: 16 additions & 22 deletions src/lib/access/SimpleTableScan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,26 @@

#include "helper/checked_cast.h"

#include <cereal/archives/json.hpp>
#include <cereal/types/string.hpp>
#include <sstream>

namespace hyrise {
namespace access {

namespace {
auto _ = QueryParser::registerPlanOperation<SimpleTableScan>("SimpleTableScan");
auto _ = QueryParser::registerSerializablePlanOperation<SimpleTableScan>("SimpleTableScan");
}

SimpleTableScan::SimpleTableScan() : _comparator(nullptr) {}
SimpleTableScan::SimpleTableScan(const Parameters& parameters) : _comparator(nullptr) {
setPredicate(buildExpression(parameters.predicates));
if (parameters.materializing)
setProducesPositions(!*parameters.materializing);
if (parameters.ofDelta)
_parameters.ofDelta = *parameters.ofDelta;
else
_parameters.ofDelta = false;
}

SimpleTableScan::~SimpleTableScan() {
if (_comparator)
Expand All @@ -29,7 +41,7 @@ void SimpleTableScan::executePositional() {
storage::pos_list_t* pos_list = new pos_list_t();


size_t row = _ofDelta ? checked_pointer_cast<const storage::Store>(tbl)->deltaOffset() : 0;
size_t row = *_parameters.ofDelta ? checked_pointer_cast<const storage::Store>(tbl)->deltaOffset() : 0;
for (size_t input_size = tbl->size(); row < input_size; ++row) {
if ((*_comparator)(row)) {
pos_list->push_back(row);
Expand All @@ -43,7 +55,7 @@ void SimpleTableScan::executeMaterialized() {
auto result_table = tbl->copy_structure_modifiable();
size_t target_row = 0;

size_t row = _ofDelta ? checked_pointer_cast<const storage::Store>(tbl)->deltaOffset() : 0;
size_t row = *_parameters.ofDelta ? checked_pointer_cast<const storage::Store>(tbl)->deltaOffset() : 0;
for (size_t input_size = tbl->size(); row < input_size; ++row) {
if ((*_comparator)(row)) {
// TODO materializing result set will make the allocation the boundary
Expand All @@ -62,24 +74,6 @@ void SimpleTableScan::executePlanOperation() {
}
}

std::shared_ptr<PlanOperation> SimpleTableScan::parse(const Json::Value& data) {
std::shared_ptr<SimpleTableScan> pop = std::make_shared<SimpleTableScan>();

if (data.isMember("materializing"))
pop->setProducesPositions(!data["materializing"].asBool());

if (!data.isMember("predicates")) {
throw std::runtime_error("There is no reason for a Selection without predicates");
}
pop->setPredicate(buildExpression(data["predicates"]));

if (data.isMember("ofDelta")) {
pop->_ofDelta = data["ofDelta"].asBool();
}

return pop;
}

const std::string SimpleTableScan::vname() { return "SimpleTableScan"; }

void SimpleTableScan::setPredicate(SimpleExpression* c) { _comparator = c; }
Expand Down
18 changes: 15 additions & 3 deletions src/lib/access/SimpleTableScan.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,38 @@

#include "access/system/ParallelizablePlanOperation.h"
#include "access/expressions/pred_SimpleExpression.h"
#include "helper/serialization.h"

namespace hyrise {
namespace access {

class SimpleTableScan : public ParallelizablePlanOperation {

public:
struct Parameters {
std::string type;
Json::Value predicates;
std::optional<bool> materializing, ofDelta;

SERIALIZE(type, predicates, materializing, ofDelta)
};

public:
SimpleTableScan();
SimpleTableScan(const Parameters& parameters = Parameters());
virtual ~SimpleTableScan();

void setupPlanOperation();
void executePlanOperation();
void executePositional();
void executeMaterialized();
static std::shared_ptr<PlanOperation> parse(const Json::Value& data);

const std::string vname();
void setPredicate(SimpleExpression* c);

private:
Parameters _parameters;

SimpleExpression* _comparator;
bool _ofDelta = false;
};
}
}
Expand Down
100 changes: 43 additions & 57 deletions src/lib/access/storage/TableLoad.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,103 +16,89 @@ namespace hyrise {
namespace access {

namespace {
auto _ = QueryParser::registerPlanOperation<TableLoad>("TableLoad");
auto _ = QueryParser::registerSerializablePlanOperation<TableLoad>("TableLoad");
log4cxx::LoggerPtr logger(log4cxx::Logger::getLogger("access.plan.PlanOperation"));
}

TableLoad::TableLoad() : _hasDelimiter(false), _binary(false), _unsafe(false), _raw(false), _nonvolatile(false) {}

TableLoad::TableLoad(const Parameters& parameters) : _parameters(parameters) {
if (!_parameters.path)
_parameters.path = std::string("");
if (!_parameters.unsafe)
_parameters.unsafe = false;
if (!_parameters.raw)
_parameters.raw = false;
}

TableLoad::~TableLoad() {}

void TableLoad::executePlanOperation() {
auto sm = io::StorageManager::getInstance();
if (!sm->exists(_table_name)) {
if (!sm->exists(_parameters.table)) {

// load from absolute path?

// Load Raw Table
if (_raw) {
if (*_parameters.raw) {
io::Loader::params p;
p.setHeader(io::CSVHeader(_file_name));
p.setInput(io::RawTableLoader(_file_name));
sm->loadTable(_table_name, p, _path);
p.setHeader(io::CSVHeader(_parameters.filename));
p.setInput(io::RawTableLoader(_parameters.filename));
sm->loadTable(_parameters.table, p, *_parameters.path);

} else if (!_header_string.empty()) {
} else if (_parameters.header_string) {
// Load based on header string
auto p = io::Loader::shortcuts::loadWithStringHeaderParams(_file_name, _header_string);
sm->loadTable(_table_name, p, _path);
auto p = io::Loader::shortcuts::loadWithStringHeaderParams(_parameters.filename, *_parameters.header_string);
sm->loadTable(_parameters.table, p, *_parameters.path);

} else if (_header_file_name.empty()) {
} else if (!_parameters.header) {
// Load only with single file
sm->loadTableFile(_table_name, _file_name, _path);
sm->loadTableFile(_parameters.table, _parameters.filename, *_parameters.path);

} else if ((!_table_name.empty()) && (!_file_name.empty()) && (!_header_file_name.empty())) {
} else if ((!_parameters.table.empty()) && (!_parameters.filename.empty()) && (_parameters.header)) {

// Load with dedicated header file
io::Loader::params p;
p.setCompressed(false);
p.setHeader(io::CSVHeader(_header_file_name));
auto params = io::CSVInput::params().setUnsafe(_unsafe);
if (_hasDelimiter)
params.setCSVParams(io::csv::params().setDelimiter(_delimiter.at(0)));
p.setInput(io::CSVInput(_file_name, params));
sm->loadTable(_table_name, p, _path);
p.setHeader(io::CSVHeader(*_parameters.header));
auto params = io::CSVInput::params().setUnsafe(*_parameters.unsafe);
if (_parameters.delimiter)
params.setCSVParams(io::csv::params().setDelimiter((*_parameters.delimiter).at(0)));
p.setInput(io::CSVInput(_parameters.filename, params));
sm->loadTable(_parameters.table, p, *_parameters.path);
}
auto table = sm->getTable(_table_name);
table->setName(_table_name);
auto table = sm->getTable(_parameters.table);
table->setName(_parameters.table);

// We don't load unless the necessary prerequisites are met,
// let StorageManager error if table does not exist
} else {
sm->getTable(_table_name);
sm->getTable(_parameters.table);
}
auto _table = sm->getTable(_table_name);
auto _table = sm->getTable(_parameters.table);
LOG4CXX_DEBUG(logger, "Loaded Table Size" << _table->size());
addResult(_table);
}

std::shared_ptr<PlanOperation> TableLoad::parse(const Json::Value& data) {
std::shared_ptr<TableLoad> s = std::make_shared<TableLoad>();
s->setTableName(data["table"].asString());
s->setFileName(data["filename"].asString());
s->setHeaderFileName(data["header"].asString());
s->setHeaderString(data["header_string"].asString());
s->setUnsafe(data["unsafe"].asBool());
s->setRaw(data["raw"].asBool());
if (data.isMember("delimiter")) {
s->setDelimiter(data["delimiter"].asString());
}
if (data.isMember("path")) {
s->setPath(data["path"].asString());
} else {
s->setPath("");
}
return s;
}

const std::string TableLoad::vname() { return "TableLoad"; }

void TableLoad::setTableName(const std::string& tablename) { _table_name = tablename; }

void TableLoad::setFileName(const std::string& filename) { _file_name = filename; }

void TableLoad::setPath(const std::string& path) { _path = path; }
void TableLoad::setTableName(const std::string& tablename) { _parameters.table = tablename; }

void TableLoad::setHeaderFileName(const std::string& filename) { _header_file_name = filename; }
void TableLoad::setFileName(const std::string& filename) { _parameters.filename = filename; }

void TableLoad::setHeaderString(const std::string& header) { _header_string = header; }
void TableLoad::setHeaderFileName(const std::string& filename) {
_parameters.header = std::optional<std::string>(filename);
}

void TableLoad::setBinary(const bool binary) { _binary = binary; }
void TableLoad::setHeaderString(const std::string& header) {
_parameters.header_string = std::optional<std::string>(header);
}

void TableLoad::setUnsafe(const bool unsafe) { _unsafe = unsafe; }
void TableLoad::setUnsafe(const bool unsafe) { _parameters.unsafe = std::optional<bool>(unsafe); }

void TableLoad::setRaw(const bool raw) { _raw = raw; }
void TableLoad::setRaw(const bool raw) { _parameters.raw = std::optional<bool>(raw); }

void TableLoad::setDelimiter(const std::string& d) {
_delimiter = d;
_hasDelimiter = true;
}
void TableLoad::setDelimiter(const std::string& d) { _parameters.delimiter = std::optional<std::string>(d); }

void TableLoad::setNonvolatile(const bool nonvolatile) { _nonvolatile = nonvolatile; }
void TableLoad::setPath(const std::string& path) { _parameters.path = std::optional<std::string>(path); }
}
}
28 changes: 13 additions & 15 deletions src/lib/access/storage/TableLoad.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#define SRC_LIB_ACCESS_TABLELOAD_H_

#include "access/system/PlanOperation.h"
#include "helper/serialization.h"

namespace hyrise {
namespace access {
Expand All @@ -15,35 +16,32 @@ class TableLoad : public PlanOperation {
friend class LoadTests_simple_unloadall_op_Test;

public:
TableLoad();
struct Parameters {
std::string type, table, filename;
std::optional<std::string> header, header_string, delimiter, path;
std::optional<bool> unsafe, raw;

SERIALIZE(type, table, filename, header, header_string, delimiter, path, unsafe, raw)
};

public:
TableLoad(const Parameters& parameters = Parameters());
virtual ~TableLoad();

void executePlanOperation();
static std::shared_ptr<PlanOperation> parse(const Json::Value& data);

const std::string vname();
void setTableName(const std::string& tablename);
void setFileName(const std::string& filename);
void setPath(const std::string& path);
void setHeaderFileName(const std::string& filename);
void setHeaderString(const std::string& header);
void setBinary(const bool binary);
void setUnsafe(const bool unsafe);
void setRaw(const bool raw);
void setDelimiter(const std::string& d);
void setNonvolatile(const bool nonvolatile);

private:
std::string _table_name;
std::string _header_file_name;
std::string _file_name;
std::string _path;
std::string _header_string;
std::string _delimiter;
bool _hasDelimiter;
bool _binary;
bool _unsafe;
bool _raw;
bool _nonvolatile;
Parameters _parameters;
};
}
}
Expand Down
26 changes: 26 additions & 0 deletions src/lib/access/system/QueryParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@
#include <string>
#include <stdexcept>
#include <mutex>
#include <sstream>

#include <json.h>

#include "helper/cereal/HyriseCerealJsonArchive.h"
#include "access/system/BasicParser.h"

const std::string autojsonReferenceTableId = "-1";
Expand Down Expand Up @@ -38,6 +40,7 @@ struct AbstractQueryParserFactory {

struct parse_construct {};
struct default_construct {};
struct cereal_construct {};

template <typename T, typename parse_construction>
struct QueryParserFactory;
Expand All @@ -56,6 +59,23 @@ struct QueryParserFactory<T, default_construct> : public AbstractQueryParserFact
}
};

template <typename T>
struct QueryParserFactory<T, cereal_construct> : public AbstractQueryParserFactory {

virtual std::shared_ptr<PlanOperation> parse(const Json::Value& data) {
std::stringstream ss;
Json::FastWriter writer;
ss << writer.write(data);

cereal::JSONInputArchive archive(ss);

typename T::Parameters params;
params.serialize(archive);

return std::make_shared<T>(params);
}
};

/*
* The Query Parser parses a given Json Value to create a plan operation
*
Expand Down Expand Up @@ -108,6 +128,12 @@ class QueryParser {
return true;
}

template <typename T>
static bool registerSerializablePlanOperation(const std::string& name) {
QueryParser::instance()._factory[name] = new QueryParserFactory<T, cereal_construct>();
return true;
}

std::shared_ptr<PlanOperation> parse(std::string name, const Json::Value& d);

static QueryParser& instance();
Expand Down
Loading