Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: wait for stream to synchronize before freeing #63

Merged
merged 1 commit into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 24 additions & 59 deletions codegen/manual_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,76 +210,41 @@ cudaError_t cudaMemcpyAsync(void *dst, const void *src, size_t count, enum cudaM
cudaError_t return_value;

int request_id = rpc_start_request(0, RPC_cudaMemcpyAsync);
if (request_id < 0)
{
return cudaErrorDevicesUnavailable;
}

if (rpc_write(0, &kind, sizeof(enum cudaMemcpyKind)) < 0)
{
int stream_null_check = stream == 0 ? 1 : 0;
if (request_id < 0 ||
rpc_write(0, &kind, sizeof(enum cudaMemcpyKind)) < 0 ||
rpc_write(0, &stream_null_check, sizeof(int)) < 0 ||
(stream_null_check == 0 && rpc_write(0, &stream, sizeof(cudaStream_t)) < 0))
return cudaErrorDevicesUnavailable;
}

// we need to swap device directions in this case
if (kind == cudaMemcpyDeviceToHost)
{
if (rpc_write(0, &src, sizeof(void *)) < 0)
{
return cudaErrorDevicesUnavailable;
}

if (rpc_write(0, &count, sizeof(size_t)) < 0)
{
return cudaErrorDevicesUnavailable;
}

if (rpc_write(0, &stream, sizeof(cudaStream_t)) < 0)
{
return cudaErrorDevicesUnavailable;
}

if (rpc_wait_for_response(0) < 0)
{
return cudaErrorDevicesUnavailable;
}

if (rpc_read(0, dst, count) < 0)
{ // Read data into the destination buffer on the host
return cudaErrorDevicesUnavailable;
}
}
else
switch (kind)
{
if (rpc_write(0, &dst, sizeof(void *)) < 0)
{
return cudaErrorDevicesUnavailable;
}

if (rpc_write(0, &count, sizeof(size_t)) < 0)
{
return cudaErrorDevicesUnavailable;
}

if (rpc_write(0, src, count) < 0)
{
case cudaMemcpyDeviceToHost:
if (rpc_write(0, &src, sizeof(void *)) < 0 ||
rpc_write(0, &count, sizeof(size_t)) < 0 ||
rpc_wait_for_response(0) < 0 ||
rpc_read(0, dst, count) < 0)
return cudaErrorDevicesUnavailable;
}

if (rpc_write(0, &stream, sizeof(cudaStream_t)) < 0)
{
break;
case cudaMemcpyHostToDevice:
if (rpc_write(0, &dst, sizeof(void *)) < 0 ||
rpc_write(0, &count, sizeof(size_t)) < 0 ||
rpc_write(0, src, count) < 0 ||
rpc_wait_for_response(0) < 0)
return cudaErrorDevicesUnavailable;
}

if (rpc_wait_for_response(0) < 0)
{
break;
case cudaMemcpyDeviceToDevice:
if (rpc_write(0, &dst, sizeof(void *)) < 0 ||
rpc_write(0, &src, sizeof(void *)) < 0 ||
rpc_write(0, &count, sizeof(size_t)) < 0 ||
rpc_wait_for_response(0) < 0)
return cudaErrorDevicesUnavailable;
}
break;
}

if (rpc_end_response(0, &return_value) < 0)
{
return cudaErrorDevicesUnavailable;
}

return return_value;
}
Expand Down
158 changes: 67 additions & 91 deletions codegen/manual_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ int handle_cudaMemcpy(void *conn)
void *host_data;
std::size_t count;
enum cudaMemcpyKind kind;
int ret = -1;

if (rpc_read(conn, &kind, sizeof(enum cudaMemcpyKind)) < 0)
goto ERROR_0;
Expand All @@ -51,7 +52,7 @@ int handle_cudaMemcpy(void *conn)
if (request_id < 0)
goto ERROR_1;

result = cudaMemcpy(host_data, src, count, cudaMemcpyDeviceToHost);
result = cudaMemcpy(host_data, src, count, kind);

if (rpc_start_response(conn, request_id) < 0 ||
rpc_write(conn, host_data, count) < 0)
Expand Down Expand Up @@ -98,124 +99,99 @@ int handle_cudaMemcpy(void *conn)
if (rpc_end_response(conn, &result) < 0)
goto ERROR_1;

return 0;
ret = 0;
ERROR_1:
free((void *)host_data);
ERROR_0:
return -1;
return ret;
}

int handle_cudaMemcpyAsync(void *conn)
{
int request_id;
cudaError_t result;
void *src;
void *dst;

void *host_data;
std::size_t count;
enum cudaMemcpyKind kind;
if (rpc_read(conn, &kind, sizeof(enum cudaMemcpyKind)) < 0)
{
return -1;
}
int stream_null_check;
cudaStream_t stream = 0;
int ret = -1;

if (rpc_read(conn, &kind, sizeof(enum cudaMemcpyKind)) < 0 ||
rpc_read(conn, &stream_null_check, sizeof(int)) < 0 ||
(stream_null_check == 0 && rpc_read(conn, &stream, sizeof(cudaStream_t)) < 0))
goto ERROR_0;

if (kind == cudaMemcpyDeviceToHost)
switch (kind)
{
if (rpc_read(conn, &dst, sizeof(void *)) < 0)
return -1;
case cudaMemcpyDeviceToHost:
if (rpc_read(conn, &src, sizeof(void *)) < 0 ||
rpc_read(conn, &count, sizeof(size_t)) < 0)
goto ERROR_0;

host_data = malloc(count);
if (host_data == NULL)
goto ERROR_0;

std::size_t count;
if (rpc_read(conn, &count, sizeof(size_t)) < 0)
return -1;
request_id = rpc_end_request(conn);
if (request_id < 0)
goto ERROR_1;

cudaStream_t stream;
if (rpc_read(conn, &stream, sizeof(cudaStream_t)) < 0)
{
return -1;
}
result = cudaMemcpyAsync(host_data, src, count, kind, stream);

void *host_data = malloc(count);
if (rpc_start_response(conn, request_id) < 0 ||
rpc_write(conn, host_data, count) < 0)
goto ERROR_1;
break;
case cudaMemcpyHostToDevice:
if (rpc_read(conn, &dst, sizeof(void *)) < 0 ||
rpc_read(conn, &count, sizeof(size_t)) < 0)
goto ERROR_0;

host_data = malloc(count);
if (host_data == NULL)
{
std::cerr << "Failed to allocate host memory for device-to-host transfer." << std::endl;
return -1;
}
int request_id = rpc_end_request(conn);
goto ERROR_0;

if (rpc_read(conn, host_data, count) < 0)
goto ERROR_1;

request_id = rpc_end_request(conn);
if (request_id < 0)
{
return -1;
}
goto ERROR_1;

result = cudaMemcpyAsync(host_data, dst, count, cudaMemcpyDeviceToHost, stream);
if (result != cudaSuccess)
{
free(host_data);
return -1;
}
result = cudaMemcpyAsync(dst, host_data, count, kind, stream);

if (rpc_start_response(conn, request_id) < 0)
{
return -1;
}

if (rpc_write(conn, host_data, count) < 0)
{
free(host_data);
return -1;
}

// free temp memory after writing host data back
free(host_data);
}
else
{
if (rpc_read(conn, &dst, sizeof(void *)) < 0)
return -1;

std::size_t count;
if (rpc_read(conn, &count, sizeof(size_t)) < 0)
return -1;

void *src = malloc(count);
if (src == NULL)
{
return -1;
}

if (rpc_read(conn, src, count) < 0)
{
free(src);
return -1;
}

cudaStream_t stream;
if (rpc_read(conn, &stream, sizeof(cudaStream_t)) < 0)
{
free(src);
return -1;
}

int request_id = rpc_end_request(conn);
goto ERROR_1;
break;
case cudaMemcpyDeviceToDevice:
if (rpc_read(conn, &src, sizeof(void *)) < 0 ||
rpc_read(conn, &dst, sizeof(void *)) < 0 ||
rpc_read(conn, &count, sizeof(size_t)) < 0)
goto ERROR_0;

request_id = rpc_end_request(conn);
if (request_id < 0)
{
free(src);
return -1;
}
goto ERROR_0;

result = cudaMemcpyAsync(dst, src, count, kind, stream);

free(src);

if (rpc_start_response(conn, request_id) < 0)
{
return -1;
}
goto ERROR_0;
break;
}

if (rpc_end_response(conn, &result) < 0)
return -1;

std::cout << "end cudaMemcpyAsync" << std::endl;
if (rpc_end_response(conn, &result) < 0 ||
cudaStreamSynchronize(stream) != cudaSuccess)
goto ERROR_1;

return 0;
ret = 0;
ERROR_1:
free((void *)host_data);
ERROR_0:
return ret;
}

int handle_cudaLaunchKernel(void *conn)
Expand Down
19 changes: 12 additions & 7 deletions test/cublas_batched.cu
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@

using data_type = double;

int main(int argc, char *argv[]) {
int main(int argc, char *argv[])
{
cublasHandle_t cublasH = NULL;
cudaStream_t stream = NULL;

Expand All @@ -78,7 +79,7 @@ int main(int argc, char *argv[]) {
* | 7.0 | 8.0 | 11.0 | 12.0 |
*/

const std::vector<std::vector<data_type>> A_array = {{1.0 ,3.0, 2.0, 4.0},
const std::vector<std::vector<data_type>> A_array = {{1.0, 3.0, 2.0, 4.0},
{5.0, 7.0, 6.0, 8.0}};
const std::vector<std::vector<data_type>> B_array = {{5.0, 7.0, 6.0, 8.0},
{9.0, 11.0, 10.0, 12.0}};
Expand Down Expand Up @@ -121,7 +122,8 @@ int main(int argc, char *argv[]) {
CUBLAS_CHECK(cublasSetStream(cublasH, stream));

/* step 2: copy data to device */
for (int i = 0; i < batch_count; i++) {
for (int i = 0; i < batch_count; i++)
{
CUDA_CHECK(
cudaMalloc(reinterpret_cast<void **>(&d_A[i]), sizeof(data_type) * A_array[i].size()));
CUDA_CHECK(
Expand All @@ -137,7 +139,8 @@ int main(int argc, char *argv[]) {
CUDA_CHECK(
cudaMalloc(reinterpret_cast<void **>(&d_C_array), sizeof(data_type *) * batch_count));

for (int i = 0; i < batch_count; i++) {
for (int i = 0; i < batch_count; i++)
{
CUDA_CHECK(cudaMemcpyAsync(d_A[i], A_array[i].data(), sizeof(data_type) * A_array[i].size(),
cudaMemcpyHostToDevice, stream));
CUDA_CHECK(cudaMemcpyAsync(d_B[i], B_array[i].data(), sizeof(data_type) * B_array[i].size(),
Expand All @@ -156,8 +159,9 @@ int main(int argc, char *argv[]) {
d_B_array, ldb, &beta, d_C_array, ldc, batch_count));

/* step 4: copy data to host */
for (int i = 0; i < batch_count; i++) {
CUDA_CHECK(cudaMemcpy(C_array[i].data(), d_C[i], sizeof(data_type) * C_array[i].size(),
for (int i = 0; i < batch_count; i++)
{
CUDA_CHECK(cudaMemcpyAsync(C_array[i].data(), d_C[i], sizeof(data_type) * C_array[i].size(),
cudaMemcpyDeviceToHost));
}

Expand All @@ -180,7 +184,8 @@ int main(int argc, char *argv[]) {
CUDA_CHECK(cudaFree(d_A_array));
CUDA_CHECK(cudaFree(d_B_array));
CUDA_CHECK(cudaFree(d_C_array));
for (int i = 0; i < batch_count; i++) {
for (int i = 0; i < batch_count; i++)
{
CUDA_CHECK(cudaFree(d_A[i]));
CUDA_CHECK(cudaFree(d_B[i]));
CUDA_CHECK(cudaFree(d_C[i]));
Expand Down
Loading