Skip to content

Commit

Permalink
add error tests
Browse files Browse the repository at this point in the history
  • Loading branch information
zeroshade committed Oct 23, 2024
1 parent a5de6c6 commit 6f179aa
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 4 deletions.
29 changes: 25 additions & 4 deletions cpp/src/arrow/c/bridge.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2662,11 +2662,26 @@ class AsyncRecordBatchIterator {
static void on_error(ArrowAsyncDeviceStreamHandler* self, int code, const char* message,
const char* metadata) {
auto* private_data = reinterpret_cast<PrivateData*>(self->private_data);
std::unique_lock<std::mutex> lock(private_data->state_->mutex_);
private_data->state_->error_ = Status::FromDetailAndArgs(
std::string message_str, metadata_str;
if (message != nullptr) {
message_str = message;
}
if (metadata != nullptr) {
metadata_str = metadata;
}

Status error = Status::FromDetailAndArgs(
StatusCode::UnknownError,
std::make_shared<AsyncErrorDetail>(code, message, metadata),
std::string(message));
std::make_shared<AsyncErrorDetail>(code, message_str, std::move(metadata_str)),
std::move(message_str));

if (!private_data->fut_iterator_.is_finished()) {
private_data->fut_iterator_.MarkFinished(error);
return;
}

std::unique_lock<std::mutex> lock(private_data->state_->mutex_);
private_data->state_->error_ = error;
lock.unlock();
private_data->state_->cv_.notify_one();
}
Expand Down Expand Up @@ -2809,6 +2824,12 @@ Future<> ExportAsyncRecordBatchReader(
std::shared_ptr<Schema> schema,
AsyncGenerator<std::shared_ptr<RecordBatch>> generator,
DeviceAllocationType device_type, struct ArrowAsyncDeviceStreamHandler* handler) {
if (!schema) {
handler->on_error(handler, EINVAL, "Schema is null", nullptr);
handler->release(handler);
return Future<>::MakeFinished(Status::Invalid("Schema is null"));
}

struct ArrowSchema c_schema;
SchemaExportGuard guard(&c_schema);

Expand Down
38 changes: 38 additions & 0 deletions cpp/src/arrow/c/bridge_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5380,4 +5380,42 @@ TEST_F(TestAsyncDeviceArrayStreamRoundTrip, Simple) {
AssertBatchesEqual(*results[1].batch, *batches[1]);
}

TEST_F(TestAsyncDeviceArrayStreamRoundTrip, NullSchema) {
struct ArrowAsyncDeviceStreamHandler handler;
auto fut_gen = CreateAsyncDeviceStreamHandler(&handler, internal::GetCpuThreadPool(), 1,
TestDeviceArrayRoundtrip::DeviceMapper);
ASSERT_FALSE(fut_gen.is_finished());

auto fut = ExportAsyncRecordBatchReader(nullptr, nullptr, DeviceAllocationType::kCPU,
&handler);
ASSERT_FINISHES_AND_RAISES(Invalid, fut);
ASSERT_FINISHES_AND_RAISES(UnknownError, fut_gen);
}

TEST_F(TestAsyncDeviceArrayStreamRoundTrip, PropagateError) {
std::shared_ptr<Device> device = std::make_shared<MyDevice>(1);
auto orig_schema = arrow::schema({field("ints", int32())});


struct ArrowAsyncDeviceStreamHandler handler;
auto fut_gen = CreateAsyncDeviceStreamHandler(&handler, internal::GetCpuThreadPool(), 1,
TestDeviceArrayRoundtrip::DeviceMapper);
ASSERT_FALSE(fut_gen.is_finished());

ASSERT_OK_AND_ASSIGN(auto fut, internal::GetCpuThreadPool()->Submit([&]() {
return ExportAsyncRecordBatchReader(
orig_schema,
MakeFailingGenerator<std::shared_ptr<RecordBatch>>(
Status::UnknownError("expected failure")),
device->device_type(), &handler);
}));

ASSERT_FINISHES_OK_AND_ASSIGN(auto generator, fut_gen);
ASSERT_NO_FATAL_FAILURE(AssertSchemaEqual(*orig_schema, *generator.schema));

auto collect_fut = CollectAsyncGenerator(generator.generator);
ASSERT_FINISHES_AND_RAISES(UnknownError, collect_fut);
ASSERT_FINISHES_AND_RAISES(UnknownError, fut);
}

} // namespace arrow

0 comments on commit 6f179aa

Please sign in to comment.