Skip to content

Commit

Permalink
C++ Client: update examples, demos, and docs with style guide, Arrow …
Browse files Browse the repository at this point in the history
…deprecation (deephaven#4860)
  • Loading branch information
kosak authored Nov 29, 2023
1 parent 1745a66 commit b7f7e9b
Show file tree
Hide file tree
Showing 7 changed files with 94 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "deephaven/client/client.h"
#include "deephaven/client/flight.h"
#include "deephaven/client/utility/table_maker.h"
#include "deephaven/dhcore/utility/utility.h"

using deephaven::client::Client;
using deephaven::client::TableHandle;
Expand All @@ -13,6 +14,7 @@ using deephaven::client::utility::ConvertTicketToFlightDescriptor;
using deephaven::client::utility::OkOrThrow;
using deephaven::client::utility::TableMaker;
using deephaven::client::utility::ValueOrThrow;
using deephaven::dhcore::utility::GetWhat;

namespace {
void Doit(const TableHandleManager &manager);
Expand All @@ -23,7 +25,7 @@ int main(int argc, char *argv[]) {
const char *server = "localhost:10000";
if (argc > 1) {
if (argc != 2 || std::strcmp("-h", argv[1]) == 0) {
std::cerr << "Usage: " << argv[0] << " [host:port]" << std::endl;
std::cerr << "Usage: " << argv[0] << " [host:port]\n";
std::exit(1);
}
server = argv[1];
Expand All @@ -33,72 +35,72 @@ int main(int argc, char *argv[]) {
auto client = Client::Connect(server);
auto manager = client.GetManager();
Doit(manager);
} catch (const std::exception &e) {
std::cerr << "Caught exception: " << e.what() << '\n';
} catch (...) {
std::cerr << "Caught exception: " << GetWhat(std::current_exception()) << '\n';
}
}

namespace {
void Doit(const TableHandleManager &manager) {
// 1. Build schema
arrow::SchemaBuilder schemaBuilder;
arrow::SchemaBuilder schema_builder;

// 2. Add "Symbol" column (type: string) to schema
{
auto symbolMetadata = std::make_shared<arrow::KeyValueMetadata>();
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(symbolMetadata->Set("deephaven:type", "java.lang.String")));
auto symbolField = std::make_shared<arrow::Field>("Symbol",
std::make_shared<arrow::StringType>(), true, std::move(symbolMetadata));
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(schemaBuilder.AddField(symbolField)));
auto symbol_metadata = std::make_shared<arrow::KeyValueMetadata>();
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(symbol_metadata->Set("deephaven:type", "java.lang.String")));
auto symbol_field = std::make_shared<arrow::Field>("Symbol",
std::make_shared<arrow::StringType>(), true, std::move(symbol_metadata));
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(schema_builder.AddField(symbol_field)));
}

// 3. Add "Price" column (type: double) to schema
{
auto priceMetadata = std::make_shared<arrow::KeyValueMetadata>();
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(priceMetadata->Set("deephaven:type", "double")));
auto priceField = std::make_shared<arrow::Field>("Price",
std::make_shared<arrow::DoubleType>(), true, std::move(priceMetadata));
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(schemaBuilder.AddField(priceField)));
auto price_metadata = std::make_shared<arrow::KeyValueMetadata>();
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(price_metadata->Set("deephaven:type", "double")));
auto price_field = std::make_shared<arrow::Field>("Price",
std::make_shared<arrow::DoubleType>(), true, std::move(price_metadata));
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(schema_builder.AddField(price_field)));
}

// 4. Add "Volume" column (type: int32) to schema
{
auto volumeMetadata = std::make_shared<arrow::KeyValueMetadata>();
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(volumeMetadata->Set("deephaven:type", "int")));
auto volumeField = std::make_shared<arrow::Field>("Volume",
std::make_shared<arrow::Int32Type>(), true, std::move(volumeMetadata));
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(schemaBuilder.AddField(volumeField)));
auto volume_metadata = std::make_shared<arrow::KeyValueMetadata>();
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(volume_metadata->Set("deephaven:type", "int")));
auto volume_field = std::make_shared<arrow::Field>("Volume",
std::make_shared<arrow::Int32Type>(), true, std::move(volume_metadata));
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(schema_builder.AddField(volume_field)));
}

// 4. Schema is done
auto schema = ValueOrThrow(DEEPHAVEN_LOCATION_EXPR(schemaBuilder.Finish()));
auto schema = ValueOrThrow(DEEPHAVEN_LOCATION_EXPR(schema_builder.Finish()));

// 5. Prepare symbol, price, and volume data cells
std::vector<std::string> symbols{"FB", "AAPL", "NFLX", "GOOG"};
std::vector<double> prices{101.1, 102.2, 103.3, 104.4};
std::vector<int32_t> volumes{1000, 2000, 3000, 4000};
auto numRows = symbols.size();
if (numRows != prices.size() || numRows != volumes.size()) {
throw DEEPHAVEN_LOCATION_EXPR(std::runtime_error(DEEPHAVEN_LOCATION_STR("sizes don't match")));
auto num_rows = symbols.size();
if (num_rows != prices.size() || num_rows != volumes.size()) {
throw std::runtime_error(DEEPHAVEN_LOCATION_STR("sizes don't match"));
}

// 6. Move data to Arrow column builders
arrow::StringBuilder symbolBuilder;
arrow::DoubleBuilder priceBuilder;
arrow::Int32Builder volumeBuilder;
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(symbolBuilder.AppendValues(symbols)));
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(priceBuilder.AppendValues(prices)));
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(volumeBuilder.AppendValues(volumes)));
arrow::StringBuilder symbol_builder;
arrow::DoubleBuilder price_builder;
arrow::Int32Builder volume_builder;
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(symbol_builder.AppendValues(symbols)));
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(price_builder.AppendValues(prices)));
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(volume_builder.AppendValues(volumes)));

auto symbolArray = ValueOrThrow(DEEPHAVEN_LOCATION_EXPR(symbolBuilder.Finish()));
auto priceArray = ValueOrThrow(DEEPHAVEN_LOCATION_EXPR(priceBuilder.Finish()));
auto volumeArray = ValueOrThrow(DEEPHAVEN_LOCATION_EXPR(volumeBuilder.Finish()));
auto symbol_array = ValueOrThrow(DEEPHAVEN_LOCATION_EXPR(symbol_builder.Finish()));
auto price_array = ValueOrThrow(DEEPHAVEN_LOCATION_EXPR(price_builder.Finish()));
auto volume_array = ValueOrThrow(DEEPHAVEN_LOCATION_EXPR(volume_builder.Finish()));

// 7. Get Arrow columns from builders
std::vector<std::shared_ptr<arrow::Array>> columns = {
std::move(symbolArray),
std::move(priceArray),
std::move(volumeArray)
std::move(symbol_array),
std::move(price_array),
std::move(volume_array)
};

// 8. Get a Deephaven "FlightWrapper" object to access Arrow Flight
Expand All @@ -116,24 +118,23 @@ void Doit(const TableHandleManager &manager) {
auto fd = deephaven::client::utility::ConvertTicketToFlightDescriptor(ticket);

// 12. Perform the doPut
std::unique_ptr<arrow::flight::FlightStreamWriter> fsw;
std::unique_ptr<arrow::flight::FlightMetadataReader> fmr;
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(wrapper.FlightClient()->DoPut(options, fd, schema, &fsw, &fmr)));
auto res = wrapper.FlightClient()->DoPut(options, fd, schema);
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(res));

// 13. Make a RecordBatch containing both the schema and the data
auto batch = arrow::RecordBatch::Make(schema, static_cast<std::int64_t>(numRows), std::move(columns));
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(fsw->WriteRecordBatch(*batch)));
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(fsw->DoneWriting()));
auto batch = arrow::RecordBatch::Make(schema, static_cast<std::int64_t>(num_rows), std::move(columns));
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(res->writer->WriteRecordBatch(*batch)));
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(res->writer->DoneWriting()));

// 14. Read back a metadata message (ignored), then close the Writer
std::shared_ptr<arrow::Buffer> buf;
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(fmr->ReadMetadata(&buf)));
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(fsw->Close()));
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(res->reader->ReadMetadata(&buf)));
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(res->writer->Close()));

// 15. Now that the table is ready, bind the ticket to a TableHandle.
auto table = manager.MakeTableHandleFromTicket(ticket);

// 16. Use Deephaven high level operations to fetch the table and print it
std::cout << "table is:\n" << table.Stream(true) << std::endl;
std::cout << "table is:\n" << table.Stream(true) << '\n';
}
} // namespace
10 changes: 5 additions & 5 deletions cpp-client/deephaven/examples/demos/chapter1.cc
Original file line number Diff line number Diff line change
Expand Up @@ -135,15 +135,15 @@ void PrintTable(const TableHandle &table, bool null_aware) {
auto fsr = table.GetFlightStreamReader();

while (true) {
arrow::flight::FlightStreamChunk chunk;
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(fsr->Next(&chunk)));
if (chunk.data == nullptr) {
auto chunk = fsr->Next();
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(chunk));
if (chunk->data == nullptr) {
break;
}

auto int64_data = chunk.data->GetColumnByName("Int64Value");
auto int64_data = chunk->data->GetColumnByName("Int64Value");
CheckNotNull(int64_data.get(), DEEPHAVEN_LOCATION_STR("Int64Value column not found"));
auto double_data = chunk.data->GetColumnByName("DoubleValue");
auto double_data = chunk->data->GetColumnByName("DoubleValue");
CheckNotNull(double_data.get(), DEEPHAVEN_LOCATION_STR("DoubleValue column not found"));

auto int64_array = std::dynamic_pointer_cast<arrow::Int64Array>(int64_data);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ Consider the following program from ``cpp-examples/make_table``:
std::vector<double> prices{101.1, 102.2, 103.3, 104.4};
tm.addColumn("Symbol", symbols);
tm.addColumn("Price", prices);
auto table = tm.makeTable(manager, "myTable");
auto table = tm.MakeTable(manager, "myTable");

std::cout << "table is:\n" << table.stream(true) << std::endl;
}
Expand Down
2 changes: 1 addition & 1 deletion cpp-client/deephaven/examples/doc/fluent.rst
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ are created as the result of relational operators on other expressions. For exam
TableMaker tm;
tm.addColumn("A", aValues);
tm.addColumn("S", sValues);
auto temp = tm.makeTable(manager);
auto temp = tm.MakeTable(manager);
auto a = temp.getNumCol("A");
auto result = temp.where(a > 15);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ Consider the following program from ``cpp-examples/read_table_with_arrow_flight`
auto manager = client.getManager();
try {
auto table = makeTable(manager);
dumpSymbolColumn(table);
auto table = MakeTable(manager);
DumpSymbolColumn(table);
} catch (const std::runtime_error &e) {
std::cerr << "Caught exception: " << e.what() << '\n';
}
Expand Down
36 changes: 17 additions & 19 deletions cpp-client/deephaven/examples/read_csv/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ arrow::Status Doit(const TableHandleManager &manager, const std::string &csvfn);
int main(int argc, char* argv[]) {
const char *server = "localhost:10000";
if (argc != 2 && argc != 3) {
std::cerr << "Usage: " << argv[0] << " [host:port] filename" << std::endl;
std::cerr << "Usage: " << argv[0] << " [host:port] filename\n";
std::exit(1);
}
int c = 1;
Expand All @@ -41,7 +41,7 @@ int main(int argc, char* argv[]) {
auto manager = client.GetManager();
auto st = Doit(manager, filename);
if (!st.ok()) {
std::cerr << "Failed with status " << st << std::endl;
std::cerr << "Failed with status " << st << '\n';
}
} catch (const std::exception &e) {
std::cerr << "Caught exception: " << e.what() << '\n';
Expand Down Expand Up @@ -74,31 +74,29 @@ arrow::Status Doit(const TableHandleManager &manager, const std::string &csvfn)
wrapper.AddHeaders(&options);

auto fd = ConvertTicketToFlightDescriptor(ticket);
std::unique_ptr<arrow::flight::FlightStreamWriter> fsw;
std::unique_ptr<arrow::flight::FlightMetadataReader> fmr;
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(
wrapper.FlightClient()->DoPut(options, fd, arrow_table->schema(), &fsw, &fmr)));
auto res = wrapper.FlightClient()->DoPut(options, fd, arrow_table->schema());
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(res));

const auto &srcColumns = arrow_table->columns();
const size_t ncols = srcColumns.size();
const size_t nchunks = srcColumns[0]->num_chunks();
std::vector<std::shared_ptr<arrow::Array>> destColumns(ncols);
for (size_t chunkIndex = 0; chunkIndex < nchunks; ++chunkIndex) {
for (size_t colIndex = 0; colIndex < ncols; ++colIndex) {
destColumns[colIndex] = srcColumns[colIndex]->chunk(chunkIndex);
const auto &src_columns = arrow_table->columns();
const size_t ncols = src_columns.size();
const size_t nchunks = src_columns[0]->num_chunks();
std::vector<std::shared_ptr<arrow::Array>> dest_columns(ncols);
for (size_t chunk_index = 0; chunk_index < nchunks; ++chunk_index) {
for (size_t col_index = 0; col_index < ncols; ++col_index) {
dest_columns[col_index] = src_columns[col_index]->chunk(chunk_index);
}
auto batch = arrow::RecordBatch::Make(arrow_table->schema(), destColumns[0]->length(), destColumns);
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(fsw->WriteRecordBatch(*batch)));
auto batch = arrow::RecordBatch::Make(arrow_table->schema(), dest_columns[0]->length(), dest_columns);
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(res->writer->WriteRecordBatch(*batch)));
}

OkOrThrow(DEEPHAVEN_LOCATION_EXPR(fsw->DoneWriting()));
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(res->writer->DoneWriting()));

std::shared_ptr<arrow::Buffer> buf;
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(fmr->ReadMetadata(&buf)));
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(fsw->Close()));
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(res->reader->ReadMetadata(&buf)));
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(res->writer->Close()));

auto table_handle = manager.MakeTableHandleFromTicket(ticket);
std::cout << "table is:\n" << table_handle.Stream(true) << std::endl;
std::cout << "table is:\n" << table_handle.Stream(true) << '\n';
table_handle.BindToVariable("showme");
return arrow::Status::OK();
}
Expand Down
46 changes: 23 additions & 23 deletions cpp-client/deephaven/examples/read_table_with_arrow_flight/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@ using deephaven::client::utility::OkOrThrow;
using deephaven::client::utility::TableMaker;

namespace {
TableHandle makeTable(const TableHandleManager &manager);
void dumpSymbolColumn(const TableHandle &tableHandle);
TableHandle MakeTable(const TableHandleManager &manager);
void DumpSymbolColumn(const TableHandle &table_handle);
} // namespace

int main(int argc, char *argv[]) {
const char *server = "localhost:10000";
if (argc > 1) {
if (argc != 2 || std::strcmp("-h", argv[1]) == 0) {
std::cerr << "Usage: " << argv[0] << " [host:port]" << std::endl;
std::cerr << "Usage: " << argv[0] << " [host:port]\n";
std::exit(1);
}
server = argv[1];
Expand All @@ -29,15 +29,15 @@ int main(int argc, char *argv[]) {
try {
auto client = Client::Connect(server);
auto manager = client.GetManager();
auto table = makeTable(manager);
dumpSymbolColumn(table);
auto table = MakeTable(manager);
DumpSymbolColumn(table);
} catch (const std::exception &e) {
std::cerr << "Caught exception: " << e.what() << '\n';
}
}

namespace {
TableHandle makeTable(const TableHandleManager &manager) {
TableHandle MakeTable(const TableHandleManager &manager) {
TableMaker tm;
std::vector<std::string> symbols{"FB", "AAPL", "NFLX", "GOOG"};
std::vector<double> prices{101.1, 102.2, 103.3, 104.4};
Expand All @@ -46,40 +46,40 @@ TableHandle makeTable(const TableHandleManager &manager) {
return tm.MakeTable(manager);
}

void dumpSymbolColumn(const TableHandle &tableHandle) {
auto fsr = tableHandle.GetFlightStreamReader();
void DumpSymbolColumn(const TableHandle &table_handle) {
auto fsr = table_handle.GetFlightStreamReader();
while (true) {
arrow::flight::FlightStreamChunk chunk;
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(fsr->Next(&chunk)));
if (chunk.data == nullptr) {
auto res = fsr->Next();
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(res));
if (res->data == nullptr) {
break;
}

auto symbolChunk = chunk.data->GetColumnByName("Symbol");
if (symbolChunk == nullptr) {
auto symbol_chunk = res->data->GetColumnByName("Symbol");
if (symbol_chunk == nullptr) {
throw std::runtime_error(DEEPHAVEN_LOCATION_STR("Symbol column not found"));
}
auto priceChunk = chunk.data->GetColumnByName("Price");
if (priceChunk == nullptr) {
auto price_chunk = res->data->GetColumnByName("Price");
if (price_chunk == nullptr) {
throw std::runtime_error(DEEPHAVEN_LOCATION_STR("Price column not found"));
}

auto symbolAsStringArray = std::dynamic_pointer_cast<arrow::StringArray>(symbolChunk);
auto priceAsDoubleArray = std::dynamic_pointer_cast<arrow::DoubleArray>(priceChunk);
if (symbolAsStringArray == nullptr) {
auto symbol_as_string_array = std::dynamic_pointer_cast<arrow::StringArray>(symbol_chunk);
auto price_as_double_array = std::dynamic_pointer_cast<arrow::DoubleArray>(price_chunk);
if (symbol_as_string_array == nullptr) {
throw std::runtime_error(DEEPHAVEN_LOCATION_STR("symbolChunk was not an arrow::StringArray"));
}
if (priceAsDoubleArray == nullptr) {
if (price_as_double_array == nullptr) {
throw std::runtime_error(DEEPHAVEN_LOCATION_STR("priceChunk was not an arrow::DoubleArray"));
}

if (symbolAsStringArray->length() != priceAsDoubleArray->length()) {
if (symbol_as_string_array->length() != price_as_double_array->length()) {
throw std::runtime_error(DEEPHAVEN_LOCATION_STR("Lengths differ"));
}

for (int64_t i = 0; i < symbolAsStringArray->length(); ++i) {
auto symbol = symbolAsStringArray->GetView(i);
auto price = priceAsDoubleArray->Value(i);
for (int64_t i = 0; i < symbol_as_string_array->length(); ++i) {
auto symbol = symbol_as_string_array->GetView(i);
auto price = price_as_double_array->Value(i);
std::cout << symbol << ' ' << price << '\n';
}
}
Expand Down

0 comments on commit b7f7e9b

Please sign in to comment.