diff --git a/cpp/src/arrow/c/abi.h b/cpp/src/arrow/c/abi.h index 64e078378bccb..631f515e23a1f 100644 --- a/cpp/src/arrow/c/abi.h +++ b/cpp/src/arrow/c/abi.h @@ -331,7 +331,7 @@ struct ArrowAsyncProducer { // defined callbacks, this is intended to be created by the consumer instead. // The consumer passes this handler to the producer, which in turn uses the // callbacks to inform the consumer of events in the stream. -struct ArrowAsyncDeviceStreamHandler { +struct ArrowAsyncDeviceStreamHandler { // Handler for receiving a schema. The passed in stream_schema must be // released or moved by the handler (producer is giving ownership of the schema to // the handler, but not ownership of the top level object itself). @@ -359,8 +359,7 @@ struct ArrowAsyncDeviceStreamHandler { // A producer that receives a non-zero return here should stop producing and eventually // call release instead. int (*on_schema)(struct ArrowAsyncDeviceStreamHandler* self, - struct ArrowSchema* stream_schema, - const char* addl_metadata); + struct ArrowSchema* stream_schema, const char* addl_metadata); // Handler for receiving data. This is called when data is available providing an // ArrowAsyncTask struct to signify it. The producer indicates the end of the stream diff --git a/cpp/src/arrow/c/bridge.cc b/cpp/src/arrow/c/bridge.cc index 751e60d40311f..469718dd50ccb 100644 --- a/cpp/src/arrow/c/bridge.cc +++ b/cpp/src/arrow/c/bridge.cc @@ -2715,7 +2715,7 @@ struct AsyncProducer { ARROW_DISALLOW_COPY_AND_ASSIGN(PrivateTaskData); }; - Status operator()(const std::shared_ptr& record) { + Status operator()(const std::shared_ptr& record) { std::unique_lock lock(state_->mutex_); if (state_->pending_requests_ == 0) { state_->cv_.wait(lock, [this]() -> bool { @@ -2789,7 +2789,7 @@ struct AsyncProducer { return ret; } - struct ArrowAsyncDeviceStreamHandler* handler_; + struct ArrowAsyncDeviceStreamHandler* handler_; std::shared_ptr state_; }; @@ -2798,7 +2798,8 @@ struct AsyncProducer { Future CreateAsyncDeviceStreamHandler( struct ArrowAsyncDeviceStreamHandler* handler, internal::Executor* executor, uint64_t queue_size, const DeviceMemoryMapper mapper) { - auto iterator = std::make_shared(queue_size, std::move(mapper)); + auto iterator = + std::make_shared(queue_size, std::move(mapper)); return AsyncRecordBatchIterator::Make(*iterator, handler) .Then([executor](std::shared_ptr state) -> Result { @@ -2831,9 +2832,9 @@ Future<> ExportAsyncRecordBatchReader( int status = handler->on_next_task(handler, nullptr, nullptr); handler->release(handler); if (status != 0) { - return Status::UnknownError( - "Received error from handler::on_next_task ", status); - } + return Status::UnknownError("Received error from handler::on_next_task ", + status); + } return Future<>::MakeFinished(); }, [handler](const Status status) -> Future<> { diff --git a/cpp/src/arrow/c/bridge.h b/cpp/src/arrow/c/bridge.h index f0b11e9c94d30..1a4625510c42d 100644 --- a/cpp/src/arrow/c/bridge.h +++ b/cpp/src/arrow/c/bridge.h @@ -418,7 +418,7 @@ class AsyncErrorDetail : public StatusDetail { : code_(code), message_(std::move(message)), metadata_(std::move(metadata)) {} const char* type_id() const override { return "AsyncErrorDetail"; } std::string ToString() const override { return message_; } - + int code() const { return code_; } const std::string& ErrorMetadata() const { return metadata_; } private: @@ -455,8 +455,7 @@ class Executor; ARROW_EXPORT Future CreateAsyncDeviceStreamHandler( struct ArrowAsyncDeviceStreamHandler* handler, internal::Executor* executor, - uint64_t queue_size = 5, - const DeviceMemoryMapper mapper = DefaultDeviceMemoryMapper); + uint64_t queue_size = 5, const DeviceMemoryMapper mapper = DefaultDeviceMemoryMapper); /// \brief Export an AsyncGenerator of record batches using a provided handler /// diff --git a/cpp/src/arrow/c/bridge_test.cc b/cpp/src/arrow/c/bridge_test.cc index 08b1eda9d5ee8..c1b8ea7da1819 100644 --- a/cpp/src/arrow/c/bridge_test.cc +++ b/cpp/src/arrow/c/bridge_test.cc @@ -5363,8 +5363,7 @@ TEST_F(TestAsyncDeviceArrayStreamRoundTrip, Simple) { ASSERT_FALSE(fut_gen.is_finished()); ASSERT_OK_AND_ASSIGN(auto fut, internal::GetCpuThreadPool()->Submit([&]() { - return ExportAsyncRecordBatchReader(orig_schema, - MakeVectorGenerator(batches), + return ExportAsyncRecordBatchReader(orig_schema, MakeVectorGenerator(batches), device->device_type(), &handler); })); diff --git a/cpp/src/arrow/record_batch.h b/cpp/src/arrow/record_batch.h index e6bc55e3fa60e..a173da52848ed 100644 --- a/cpp/src/arrow/record_batch.h +++ b/cpp/src/arrow/record_batch.h @@ -298,14 +298,12 @@ struct ARROW_EXPORT RecordBatchWithMetadata { std::shared_ptr custom_metadata; }; - template <> struct IterationTraits { static RecordBatchWithMetadata End() { return {nullptr, nullptr}; } static bool IsEnd(const RecordBatchWithMetadata& val) { return val.batch == nullptr; } }; - /// \brief Abstract interface for reading stream of record batches class ARROW_EXPORT RecordBatchReader { public: