From de58508dd48e329807cfe0dc12c54fc649c6632b Mon Sep 17 00:00:00 2001 From: Kevin Wang Date: Mon, 9 Dec 2024 22:50:09 +0000 Subject: [PATCH] fix: wait for stream to synchronize before freeing --- codegen/manual_client.cpp | 83 ++++++-------------- codegen/manual_server.cpp | 158 ++++++++++++++++---------------------- test/cublas_batched.cu | 19 +++-- 3 files changed, 103 insertions(+), 157 deletions(-) diff --git a/codegen/manual_client.cpp b/codegen/manual_client.cpp index 2e1bb59..02643cd 100755 --- a/codegen/manual_client.cpp +++ b/codegen/manual_client.cpp @@ -210,76 +210,41 @@ cudaError_t cudaMemcpyAsync(void *dst, const void *src, size_t count, enum cudaM cudaError_t return_value; int request_id = rpc_start_request(0, RPC_cudaMemcpyAsync); - if (request_id < 0) - { - return cudaErrorDevicesUnavailable; - } - - if (rpc_write(0, &kind, sizeof(enum cudaMemcpyKind)) < 0) - { + int stream_null_check = stream == 0 ? 1 : 0; + if (request_id < 0 || + rpc_write(0, &kind, sizeof(enum cudaMemcpyKind)) < 0 || + rpc_write(0, &stream_null_check, sizeof(int)) < 0 || + (stream_null_check == 0 && rpc_write(0, &stream, sizeof(cudaStream_t)) < 0)) return cudaErrorDevicesUnavailable; - } // we need to swap device directions in this case - if (kind == cudaMemcpyDeviceToHost) - { - if (rpc_write(0, &src, sizeof(void *)) < 0) - { - return cudaErrorDevicesUnavailable; - } - - if (rpc_write(0, &count, sizeof(size_t)) < 0) - { - return cudaErrorDevicesUnavailable; - } - - if (rpc_write(0, &stream, sizeof(cudaStream_t)) < 0) - { - return cudaErrorDevicesUnavailable; - } - - if (rpc_wait_for_response(0) < 0) - { - return cudaErrorDevicesUnavailable; - } - - if (rpc_read(0, dst, count) < 0) - { // Read data into the destination buffer on the host - return cudaErrorDevicesUnavailable; - } - } - else + switch (kind) { - if (rpc_write(0, &dst, sizeof(void *)) < 0) - { - return cudaErrorDevicesUnavailable; - } - - if (rpc_write(0, &count, sizeof(size_t)) < 0) - { - return cudaErrorDevicesUnavailable; - } - - if (rpc_write(0, src, count) < 0) - { + case cudaMemcpyDeviceToHost: + if (rpc_write(0, &src, sizeof(void *)) < 0 || + rpc_write(0, &count, sizeof(size_t)) < 0 || + rpc_wait_for_response(0) < 0 || + rpc_read(0, dst, count) < 0) return cudaErrorDevicesUnavailable; - } - - if (rpc_write(0, &stream, sizeof(cudaStream_t)) < 0) - { + break; + case cudaMemcpyHostToDevice: + if (rpc_write(0, &dst, sizeof(void *)) < 0 || + rpc_write(0, &count, sizeof(size_t)) < 0 || + rpc_write(0, src, count) < 0 || + rpc_wait_for_response(0) < 0) return cudaErrorDevicesUnavailable; - } - - if (rpc_wait_for_response(0) < 0) - { + break; + case cudaMemcpyDeviceToDevice: + if (rpc_write(0, &dst, sizeof(void *)) < 0 || + rpc_write(0, &src, sizeof(void *)) < 0 || + rpc_write(0, &count, sizeof(size_t)) < 0 || + rpc_wait_for_response(0) < 0) return cudaErrorDevicesUnavailable; - } + break; } if (rpc_end_response(0, &return_value) < 0) - { return cudaErrorDevicesUnavailable; - } return return_value; } diff --git a/codegen/manual_server.cpp b/codegen/manual_server.cpp index cd2b0fa..3054b57 100755 --- a/codegen/manual_server.cpp +++ b/codegen/manual_server.cpp @@ -32,6 +32,7 @@ int handle_cudaMemcpy(void *conn) void *host_data; std::size_t count; enum cudaMemcpyKind kind; + int ret = -1; if (rpc_read(conn, &kind, sizeof(enum cudaMemcpyKind)) < 0) goto ERROR_0; @@ -51,7 +52,7 @@ int handle_cudaMemcpy(void *conn) if (request_id < 0) goto ERROR_1; - result = cudaMemcpy(host_data, src, count, cudaMemcpyDeviceToHost); + result = cudaMemcpy(host_data, src, count, kind); if (rpc_start_response(conn, request_id) < 0 || rpc_write(conn, host_data, count) < 0) @@ -98,124 +99,99 @@ int handle_cudaMemcpy(void *conn) if (rpc_end_response(conn, &result) < 0) goto ERROR_1; - return 0; + ret = 0; ERROR_1: free((void *)host_data); ERROR_0: - return -1; + return ret; } int handle_cudaMemcpyAsync(void *conn) { + int request_id; cudaError_t result; + void *src; void *dst; - + void *host_data; + std::size_t count; enum cudaMemcpyKind kind; - if (rpc_read(conn, &kind, sizeof(enum cudaMemcpyKind)) < 0) - { - return -1; - } + int stream_null_check; + cudaStream_t stream = 0; + int ret = -1; + + if (rpc_read(conn, &kind, sizeof(enum cudaMemcpyKind)) < 0 || + rpc_read(conn, &stream_null_check, sizeof(int)) < 0 || + (stream_null_check == 0 && rpc_read(conn, &stream, sizeof(cudaStream_t)) < 0)) + goto ERROR_0; - if (kind == cudaMemcpyDeviceToHost) + switch (kind) { - if (rpc_read(conn, &dst, sizeof(void *)) < 0) - return -1; + case cudaMemcpyDeviceToHost: + if (rpc_read(conn, &src, sizeof(void *)) < 0 || + rpc_read(conn, &count, sizeof(size_t)) < 0) + goto ERROR_0; + + host_data = malloc(count); + if (host_data == NULL) + goto ERROR_0; - std::size_t count; - if (rpc_read(conn, &count, sizeof(size_t)) < 0) - return -1; + request_id = rpc_end_request(conn); + if (request_id < 0) + goto ERROR_1; - cudaStream_t stream; - if (rpc_read(conn, &stream, sizeof(cudaStream_t)) < 0) - { - return -1; - } + result = cudaMemcpyAsync(host_data, src, count, kind, stream); - void *host_data = malloc(count); + if (rpc_start_response(conn, request_id) < 0 || + rpc_write(conn, host_data, count) < 0) + goto ERROR_1; + break; + case cudaMemcpyHostToDevice: + if (rpc_read(conn, &dst, sizeof(void *)) < 0 || + rpc_read(conn, &count, sizeof(size_t)) < 0) + goto ERROR_0; + + host_data = malloc(count); if (host_data == NULL) - { - std::cerr << "Failed to allocate host memory for device-to-host transfer." << std::endl; - return -1; - } - - int request_id = rpc_end_request(conn); + goto ERROR_0; + + if (rpc_read(conn, host_data, count) < 0) + goto ERROR_1; + + request_id = rpc_end_request(conn); if (request_id < 0) - { - return -1; - } + goto ERROR_1; - result = cudaMemcpyAsync(host_data, dst, count, cudaMemcpyDeviceToHost, stream); - if (result != cudaSuccess) - { - free(host_data); - return -1; - } + result = cudaMemcpyAsync(dst, host_data, count, kind, stream); if (rpc_start_response(conn, request_id) < 0) - { - return -1; - } - - if (rpc_write(conn, host_data, count) < 0) - { - free(host_data); - return -1; - } - - // free temp memory after writing host data back - free(host_data); - } - else - { - if (rpc_read(conn, &dst, sizeof(void *)) < 0) - return -1; - - std::size_t count; - if (rpc_read(conn, &count, sizeof(size_t)) < 0) - return -1; - - void *src = malloc(count); - if (src == NULL) - { - return -1; - } - - if (rpc_read(conn, src, count) < 0) - { - free(src); - return -1; - } - - cudaStream_t stream; - if (rpc_read(conn, &stream, sizeof(cudaStream_t)) < 0) - { - free(src); - return -1; - } - - int request_id = rpc_end_request(conn); + goto ERROR_1; + break; + case cudaMemcpyDeviceToDevice: + if (rpc_read(conn, &src, sizeof(void *)) < 0 || + rpc_read(conn, &dst, sizeof(void *)) < 0 || + rpc_read(conn, &count, sizeof(size_t)) < 0) + goto ERROR_0; + + request_id = rpc_end_request(conn); if (request_id < 0) - { - free(src); - return -1; - } + goto ERROR_0; result = cudaMemcpyAsync(dst, src, count, kind, stream); - free(src); - if (rpc_start_response(conn, request_id) < 0) - { - return -1; - } + goto ERROR_0; + break; } - if (rpc_end_response(conn, &result) < 0) - return -1; - - std::cout << "end cudaMemcpyAsync" << std::endl; + if (rpc_end_response(conn, &result) < 0 || + cudaStreamSynchronize(stream) != cudaSuccess) + goto ERROR_1; - return 0; + ret = 0; +ERROR_1: + free((void *)host_data); +ERROR_0: + return ret; } int handle_cudaLaunchKernel(void *conn) diff --git a/test/cublas_batched.cu b/test/cublas_batched.cu index d5e3f92..1edd635 100644 --- a/test/cublas_batched.cu +++ b/test/cublas_batched.cu @@ -58,7 +58,8 @@ using data_type = double; -int main(int argc, char *argv[]) { +int main(int argc, char *argv[]) +{ cublasHandle_t cublasH = NULL; cudaStream_t stream = NULL; @@ -78,7 +79,7 @@ int main(int argc, char *argv[]) { * | 7.0 | 8.0 | 11.0 | 12.0 | */ - const std::vector> A_array = {{1.0 ,3.0, 2.0, 4.0}, + const std::vector> A_array = {{1.0, 3.0, 2.0, 4.0}, {5.0, 7.0, 6.0, 8.0}}; const std::vector> B_array = {{5.0, 7.0, 6.0, 8.0}, {9.0, 11.0, 10.0, 12.0}}; @@ -121,7 +122,8 @@ int main(int argc, char *argv[]) { CUBLAS_CHECK(cublasSetStream(cublasH, stream)); /* step 2: copy data to device */ - for (int i = 0; i < batch_count; i++) { + for (int i = 0; i < batch_count; i++) + { CUDA_CHECK( cudaMalloc(reinterpret_cast(&d_A[i]), sizeof(data_type) * A_array[i].size())); CUDA_CHECK( @@ -137,7 +139,8 @@ int main(int argc, char *argv[]) { CUDA_CHECK( cudaMalloc(reinterpret_cast(&d_C_array), sizeof(data_type *) * batch_count)); - for (int i = 0; i < batch_count; i++) { + for (int i = 0; i < batch_count; i++) + { CUDA_CHECK(cudaMemcpyAsync(d_A[i], A_array[i].data(), sizeof(data_type) * A_array[i].size(), cudaMemcpyHostToDevice, stream)); CUDA_CHECK(cudaMemcpyAsync(d_B[i], B_array[i].data(), sizeof(data_type) * B_array[i].size(), @@ -156,8 +159,9 @@ int main(int argc, char *argv[]) { d_B_array, ldb, &beta, d_C_array, ldc, batch_count)); /* step 4: copy data to host */ - for (int i = 0; i < batch_count; i++) { - CUDA_CHECK(cudaMemcpy(C_array[i].data(), d_C[i], sizeof(data_type) * C_array[i].size(), + for (int i = 0; i < batch_count; i++) + { + CUDA_CHECK(cudaMemcpyAsync(C_array[i].data(), d_C[i], sizeof(data_type) * C_array[i].size(), cudaMemcpyDeviceToHost)); } @@ -180,7 +184,8 @@ int main(int argc, char *argv[]) { CUDA_CHECK(cudaFree(d_A_array)); CUDA_CHECK(cudaFree(d_B_array)); CUDA_CHECK(cudaFree(d_C_array)); - for (int i = 0; i < batch_count; i++) { + for (int i = 0; i < batch_count; i++) + { CUDA_CHECK(cudaFree(d_A[i])); CUDA_CHECK(cudaFree(d_B[i])); CUDA_CHECK(cudaFree(d_C[i]));