Skip to content

Commit

Permalink
fix: wait for stream to synchronize before freeing
Browse files Browse the repository at this point in the history
  • Loading branch information
kevmo314 committed Dec 9, 2024
1 parent a539495 commit de58508
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 157 deletions.
83 changes: 24 additions & 59 deletions codegen/manual_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,76 +210,41 @@ cudaError_t cudaMemcpyAsync(void *dst, const void *src, size_t count, enum cudaM
cudaError_t return_value;

int request_id = rpc_start_request(0, RPC_cudaMemcpyAsync);
if (request_id < 0)
{
return cudaErrorDevicesUnavailable;
}

if (rpc_write(0, &kind, sizeof(enum cudaMemcpyKind)) < 0)
{
int stream_null_check = stream == 0 ? 1 : 0;
if (request_id < 0 ||
rpc_write(0, &kind, sizeof(enum cudaMemcpyKind)) < 0 ||
rpc_write(0, &stream_null_check, sizeof(int)) < 0 ||
(stream_null_check == 0 && rpc_write(0, &stream, sizeof(cudaStream_t)) < 0))
return cudaErrorDevicesUnavailable;
}

// we need to swap device directions in this case
if (kind == cudaMemcpyDeviceToHost)
{
if (rpc_write(0, &src, sizeof(void *)) < 0)
{
return cudaErrorDevicesUnavailable;
}

if (rpc_write(0, &count, sizeof(size_t)) < 0)
{
return cudaErrorDevicesUnavailable;
}

if (rpc_write(0, &stream, sizeof(cudaStream_t)) < 0)
{
return cudaErrorDevicesUnavailable;
}

if (rpc_wait_for_response(0) < 0)
{
return cudaErrorDevicesUnavailable;
}

if (rpc_read(0, dst, count) < 0)
{ // Read data into the destination buffer on the host
return cudaErrorDevicesUnavailable;
}
}
else
switch (kind)
{
if (rpc_write(0, &dst, sizeof(void *)) < 0)
{
return cudaErrorDevicesUnavailable;
}

if (rpc_write(0, &count, sizeof(size_t)) < 0)
{
return cudaErrorDevicesUnavailable;
}

if (rpc_write(0, src, count) < 0)
{
case cudaMemcpyDeviceToHost:
if (rpc_write(0, &src, sizeof(void *)) < 0 ||
rpc_write(0, &count, sizeof(size_t)) < 0 ||
rpc_wait_for_response(0) < 0 ||
rpc_read(0, dst, count) < 0)
return cudaErrorDevicesUnavailable;
}

if (rpc_write(0, &stream, sizeof(cudaStream_t)) < 0)
{
break;
case cudaMemcpyHostToDevice:
if (rpc_write(0, &dst, sizeof(void *)) < 0 ||
rpc_write(0, &count, sizeof(size_t)) < 0 ||
rpc_write(0, src, count) < 0 ||
rpc_wait_for_response(0) < 0)
return cudaErrorDevicesUnavailable;
}

if (rpc_wait_for_response(0) < 0)
{
break;
case cudaMemcpyDeviceToDevice:
if (rpc_write(0, &dst, sizeof(void *)) < 0 ||
rpc_write(0, &src, sizeof(void *)) < 0 ||
rpc_write(0, &count, sizeof(size_t)) < 0 ||
rpc_wait_for_response(0) < 0)
return cudaErrorDevicesUnavailable;
}
break;
}

if (rpc_end_response(0, &return_value) < 0)
{
return cudaErrorDevicesUnavailable;
}

return return_value;
}
Expand Down
158 changes: 67 additions & 91 deletions codegen/manual_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ int handle_cudaMemcpy(void *conn)
void *host_data;
std::size_t count;
enum cudaMemcpyKind kind;
int ret = -1;

if (rpc_read(conn, &kind, sizeof(enum cudaMemcpyKind)) < 0)
goto ERROR_0;
Expand All @@ -51,7 +52,7 @@ int handle_cudaMemcpy(void *conn)
if (request_id < 0)
goto ERROR_1;

result = cudaMemcpy(host_data, src, count, cudaMemcpyDeviceToHost);
result = cudaMemcpy(host_data, src, count, kind);

if (rpc_start_response(conn, request_id) < 0 ||
rpc_write(conn, host_data, count) < 0)
Expand Down Expand Up @@ -98,124 +99,99 @@ int handle_cudaMemcpy(void *conn)
if (rpc_end_response(conn, &result) < 0)
goto ERROR_1;

return 0;
ret = 0;
ERROR_1:
free((void *)host_data);
ERROR_0:
return -1;
return ret;
}

int handle_cudaMemcpyAsync(void *conn)
{
int request_id;
cudaError_t result;
void *src;
void *dst;

void *host_data;
std::size_t count;
enum cudaMemcpyKind kind;
if (rpc_read(conn, &kind, sizeof(enum cudaMemcpyKind)) < 0)
{
return -1;
}
int stream_null_check;
cudaStream_t stream = 0;
int ret = -1;

if (rpc_read(conn, &kind, sizeof(enum cudaMemcpyKind)) < 0 ||
rpc_read(conn, &stream_null_check, sizeof(int)) < 0 ||
(stream_null_check == 0 && rpc_read(conn, &stream, sizeof(cudaStream_t)) < 0))
goto ERROR_0;

if (kind == cudaMemcpyDeviceToHost)
switch (kind)
{
if (rpc_read(conn, &dst, sizeof(void *)) < 0)
return -1;
case cudaMemcpyDeviceToHost:
if (rpc_read(conn, &src, sizeof(void *)) < 0 ||
rpc_read(conn, &count, sizeof(size_t)) < 0)
goto ERROR_0;

host_data = malloc(count);
if (host_data == NULL)
goto ERROR_0;

std::size_t count;
if (rpc_read(conn, &count, sizeof(size_t)) < 0)
return -1;
request_id = rpc_end_request(conn);
if (request_id < 0)
goto ERROR_1;

cudaStream_t stream;
if (rpc_read(conn, &stream, sizeof(cudaStream_t)) < 0)
{
return -1;
}
result = cudaMemcpyAsync(host_data, src, count, kind, stream);

void *host_data = malloc(count);
if (rpc_start_response(conn, request_id) < 0 ||
rpc_write(conn, host_data, count) < 0)
goto ERROR_1;
break;
case cudaMemcpyHostToDevice:
if (rpc_read(conn, &dst, sizeof(void *)) < 0 ||
rpc_read(conn, &count, sizeof(size_t)) < 0)
goto ERROR_0;

host_data = malloc(count);
if (host_data == NULL)
{
std::cerr << "Failed to allocate host memory for device-to-host transfer." << std::endl;
return -1;
}
int request_id = rpc_end_request(conn);
goto ERROR_0;

if (rpc_read(conn, host_data, count) < 0)
goto ERROR_1;

request_id = rpc_end_request(conn);
if (request_id < 0)
{
return -1;
}
goto ERROR_1;

result = cudaMemcpyAsync(host_data, dst, count, cudaMemcpyDeviceToHost, stream);
if (result != cudaSuccess)
{
free(host_data);
return -1;
}
result = cudaMemcpyAsync(dst, host_data, count, kind, stream);

if (rpc_start_response(conn, request_id) < 0)
{
return -1;
}

if (rpc_write(conn, host_data, count) < 0)
{
free(host_data);
return -1;
}

// free temp memory after writing host data back
free(host_data);
}
else
{
if (rpc_read(conn, &dst, sizeof(void *)) < 0)
return -1;

std::size_t count;
if (rpc_read(conn, &count, sizeof(size_t)) < 0)
return -1;

void *src = malloc(count);
if (src == NULL)
{
return -1;
}

if (rpc_read(conn, src, count) < 0)
{
free(src);
return -1;
}

cudaStream_t stream;
if (rpc_read(conn, &stream, sizeof(cudaStream_t)) < 0)
{
free(src);
return -1;
}

int request_id = rpc_end_request(conn);
goto ERROR_1;
break;
case cudaMemcpyDeviceToDevice:
if (rpc_read(conn, &src, sizeof(void *)) < 0 ||
rpc_read(conn, &dst, sizeof(void *)) < 0 ||
rpc_read(conn, &count, sizeof(size_t)) < 0)
goto ERROR_0;

request_id = rpc_end_request(conn);
if (request_id < 0)
{
free(src);
return -1;
}
goto ERROR_0;

result = cudaMemcpyAsync(dst, src, count, kind, stream);

free(src);

if (rpc_start_response(conn, request_id) < 0)
{
return -1;
}
goto ERROR_0;
break;
}

if (rpc_end_response(conn, &result) < 0)
return -1;

std::cout << "end cudaMemcpyAsync" << std::endl;
if (rpc_end_response(conn, &result) < 0 ||
cudaStreamSynchronize(stream) != cudaSuccess)
goto ERROR_1;

return 0;
ret = 0;
ERROR_1:
free((void *)host_data);
ERROR_0:
return ret;
}

int handle_cudaLaunchKernel(void *conn)
Expand Down
19 changes: 12 additions & 7 deletions test/cublas_batched.cu
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@

using data_type = double;

int main(int argc, char *argv[]) {
int main(int argc, char *argv[])
{
cublasHandle_t cublasH = NULL;
cudaStream_t stream = NULL;

Expand All @@ -78,7 +79,7 @@ int main(int argc, char *argv[]) {
* | 7.0 | 8.0 | 11.0 | 12.0 |
*/

const std::vector<std::vector<data_type>> A_array = {{1.0 ,3.0, 2.0, 4.0},
const std::vector<std::vector<data_type>> A_array = {{1.0, 3.0, 2.0, 4.0},
{5.0, 7.0, 6.0, 8.0}};
const std::vector<std::vector<data_type>> B_array = {{5.0, 7.0, 6.0, 8.0},
{9.0, 11.0, 10.0, 12.0}};
Expand Down Expand Up @@ -121,7 +122,8 @@ int main(int argc, char *argv[]) {
CUBLAS_CHECK(cublasSetStream(cublasH, stream));

/* step 2: copy data to device */
for (int i = 0; i < batch_count; i++) {
for (int i = 0; i < batch_count; i++)
{
CUDA_CHECK(
cudaMalloc(reinterpret_cast<void **>(&d_A[i]), sizeof(data_type) * A_array[i].size()));
CUDA_CHECK(
Expand All @@ -137,7 +139,8 @@ int main(int argc, char *argv[]) {
CUDA_CHECK(
cudaMalloc(reinterpret_cast<void **>(&d_C_array), sizeof(data_type *) * batch_count));

for (int i = 0; i < batch_count; i++) {
for (int i = 0; i < batch_count; i++)
{
CUDA_CHECK(cudaMemcpyAsync(d_A[i], A_array[i].data(), sizeof(data_type) * A_array[i].size(),
cudaMemcpyHostToDevice, stream));
CUDA_CHECK(cudaMemcpyAsync(d_B[i], B_array[i].data(), sizeof(data_type) * B_array[i].size(),
Expand All @@ -156,8 +159,9 @@ int main(int argc, char *argv[]) {
d_B_array, ldb, &beta, d_C_array, ldc, batch_count));

/* step 4: copy data to host */
for (int i = 0; i < batch_count; i++) {
CUDA_CHECK(cudaMemcpy(C_array[i].data(), d_C[i], sizeof(data_type) * C_array[i].size(),
for (int i = 0; i < batch_count; i++)
{
CUDA_CHECK(cudaMemcpyAsync(C_array[i].data(), d_C[i], sizeof(data_type) * C_array[i].size(),
cudaMemcpyDeviceToHost));
}

Expand All @@ -180,7 +184,8 @@ int main(int argc, char *argv[]) {
CUDA_CHECK(cudaFree(d_A_array));
CUDA_CHECK(cudaFree(d_B_array));
CUDA_CHECK(cudaFree(d_C_array));
for (int i = 0; i < batch_count; i++) {
for (int i = 0; i < batch_count; i++)
{
CUDA_CHECK(cudaFree(d_A[i]));
CUDA_CHECK(cudaFree(d_B[i]));
CUDA_CHECK(cudaFree(d_C[i]));
Expand Down

0 comments on commit de58508

Please sign in to comment.