From 76871ec096e9021e1e99c2ce7d17e220c0d0b5c9 Mon Sep 17 00:00:00 2001 From: DenverM80 Date: Tue, 18 Jul 2017 11:37:05 -0600 Subject: [PATCH 1/9] WIP: Add file for an optional put_directory test --- test/CMakeLists.txt | 1 + test/put_directory.cpp | 91 ++++++++++++++++++++++++++++++++++++++++++ test/test.cpp | 6 ++- 3 files changed, 97 insertions(+), 1 deletion(-) create mode 100644 test/put_directory.cpp diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 7d303973..33cedd9c 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -77,6 +77,7 @@ add_executable(ds3_c_tests search_tests.cpp service_tests.cpp connection_tests.cpp + put_directory.cpp test.cpp) add_test(regression_tests ds3_c_tests) diff --git a/test/put_directory.cpp b/test/put_directory.cpp new file mode 100644 index 00000000..3e71874f --- /dev/null +++ b/test/put_directory.cpp @@ -0,0 +1,91 @@ +/* + * ****************************************************************************** + * Copyright 2014-2016 Spectra Logic Corporation. All Rights Reserved. + * Licensed under the Apache License, Version 2.0 (the "License"). You may not use + * this file except in compliance with the License. A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. + * This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * **************************************************************************** + */ + +#include +#include +#include +#include +#include +#include +#include +#include "ds3.h" +#include "ds3_net.h" +#include "test.h" + +struct check_ds3_test_directory_given { + return getenv(DS3_TEST_DIRECTORY) != NULL; +}; + +BOOST_AUTO_TEST_CASE( put_directory, + * boost::unit_test::precondition(check_ds3_test_directory_given)) { + printf("-----Testing PUT all objects in a directory-------\n"); + + const char* dir_path = getenv("DS3_TEST_DIRECTORY"); + BOOST_CHECK(dir_path != NULL); + + const char* bucket_name = "test_bulk_put_directory"; + + ds3_request* request = NULL; + ds3_master_object_list_response* bulk_response = NULL; + + ds3_client* client = get_client(); + int client_thread=1; + ds3_client_register_logging(client, DS3_DEBUG, test_log, (void*)&client_thread); // Use DEBUG level logging + + ds3_error* error = create_bucket_with_data_policy(client, bucket_name, ids.data_policy_id->value); + + char** objects_list; + uint64_t num_objs = 0; + GDir* dir_info = g_dir_open(dir_path, 0, NULL); + for (char* current_obj = (char*)g_dir_read_name(dir_info); current_obj != NULL; current_obj = (char*)g_dir_read_name(dir_info)) { + objects_list[num_objs++] = current_obj; + } + + ds3_bulk_object_list_response* bulk_object_list = ds3_convert_object_list_from_strings((const char**)objects_list, num_objs); + + request = ds3_init_put_bulk_job_spectra_s3_request(bucket_name, bulk_object_list); + ds3_master_object_list_response* mol; + error = ds3_put_bulk_job_spectra_s3_request(client, request, &mol); + ds3_request_free(request); + ds3_bulk_object_list_response_free(bulk_object_list); + handle_error(error); + + ds3_master_object_list_response* chunks_list = ensure_available_chunks(client, mol->job_id); + + // Use helper functions from test.cpp + GPtrArray* put_dir_args = new_put_chunks_threads_args(client, NULL, bucket_name, mol, chunks_list, 1, True); // Last param indicates verbose logging in the spawned thread + + // capture test start time + struct timespec start_time_t, end_time_t; + double elapsed_t; + clock_gettime(CLOCK_MONOTONIC, &start_time_t); + + GThread* put_dir_xfer_thread = g_thread_new("put_dir_xfer_thread", (GThreadFunc)put_chunks_from_file, g_ptr_array_index(put_dir_args, 0)); + + // Block and cleanup GThread(s) + g_thread_join(put_dir_xfer_thread); + + // find elapsed CPU and real time + clock_gettime(CLOCK_MONOTONIC, &end_time_t); + elapsed_t = timespec_to_seconds(&end_time_t) - timespec_to_seconds(&start_time_t); + ds3_log_message(client1->log, DS3_INFO, " Elapsed time[%f]", elapsed_t); + + ds3_master_object_list_response_free(chunks_list); + ds3_master_object_list_response_free(mol); + put_chunks_threads_args_free(put_dir_args); + clear_bucket(client, bucket_name); + free_client(client); +} + diff --git a/test/test.cpp b/test/test.cpp index 1e27dd64..72c34460 100644 --- a/test/test.cpp +++ b/test/test.cpp @@ -525,7 +525,11 @@ void put_chunks_from_file(void* args) { if (_args->verbose) { ds3_log_message(_args->client->log, DS3_INFO, " GlibThread[%d] BEGIN xfer File[%s] Chunk[%lu]", _args->thread_num, object->name->value, _args->chunks_list->num_objects); } - file = fopen(_args->src_object_name, "r"); + if (_args->src_object_name) { + file = fopen(_args->src_object_name, "r"); + } else { + file = fopen(object->name->value, "r"); + } if (object->offset != 0) { fseek(file, object->offset, SEEK_SET); } From 5cafcefe6116d7fc8bcfa2a601344c6670874fc0 Mon Sep 17 00:00:00 2001 From: DenverM80 Date: Tue, 18 Jul 2017 16:54:46 -0600 Subject: [PATCH 2/9] WIP: disable all tests except put_directory; verified no memory leaks; need to tweak for multiple threads --- test/CMakeLists.txt | 26 +++++++++++++------------- test/put_directory.cpp | 28 ++++++++++++++++------------ test/test.cpp | 15 ++++++++++++++- test/test.h | 2 ++ 4 files changed, 45 insertions(+), 26 deletions(-) diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 33cedd9c..68e1245f 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -64,19 +64,19 @@ else(WIN32) # POSIX endif(WIN32) add_executable(ds3_c_tests - bucket_tests.cpp - bulk_get.cpp - bulk_put.cpp - checksum.cpp - deletes_test.cpp - get_physical_placement.cpp - job_tests.cpp - metadata_tests.cpp - multimap_tests.cpp - negative_tests.cpp - search_tests.cpp - service_tests.cpp - connection_tests.cpp + #bucket_tests.cpp + #bulk_get.cpp + #bulk_put.cpp + #checksum.cpp + #deletes_test.cpp + #get_physical_placement.cpp + #job_tests.cpp + #metadata_tests.cpp + #multimap_tests.cpp + #negative_tests.cpp + #search_tests.cpp + #service_tests.cpp + #connection_tests.cpp put_directory.cpp test.cpp) diff --git a/test/put_directory.cpp b/test/put_directory.cpp index 3e71874f..04c876ee 100644 --- a/test/put_directory.cpp +++ b/test/put_directory.cpp @@ -22,23 +22,24 @@ #include #include "ds3.h" #include "ds3_net.h" +#include "ds3_utils.h" #include "test.h" -struct check_ds3_test_directory_given { - return getenv(DS3_TEST_DIRECTORY) != NULL; +namespace utf = boost::unit_test; + +bool check_ds3_test_directory_given() { + return getenv("DS3_TEST_DIRECTORY") != NULL; }; -BOOST_AUTO_TEST_CASE( put_directory, - * boost::unit_test::precondition(check_ds3_test_directory_given)) { +BOOST_AUTO_TEST_CASE( put_directory) { + //* boost::unit_test::precondition(check_ds3_test_directory_given)) { printf("-----Testing PUT all objects in a directory-------\n"); const char* dir_path = getenv("DS3_TEST_DIRECTORY"); BOOST_CHECK(dir_path != NULL); const char* bucket_name = "test_bulk_put_directory"; - - ds3_request* request = NULL; - ds3_master_object_list_response* bulk_response = NULL; + printf(" Putting all files in [%s] to bucket [%s]\n", dir_path, bucket_name); ds3_client* client = get_client(); int client_thread=1; @@ -46,26 +47,28 @@ BOOST_AUTO_TEST_CASE( put_directory, ds3_error* error = create_bucket_with_data_policy(client, bucket_name, ids.data_policy_id->value); - char** objects_list; + char* objects_list[100]; uint64_t num_objs = 0; GDir* dir_info = g_dir_open(dir_path, 0, NULL); for (char* current_obj = (char*)g_dir_read_name(dir_info); current_obj != NULL; current_obj = (char*)g_dir_read_name(dir_info)) { objects_list[num_objs++] = current_obj; + printf(" obj[%" PRIu64 "][%s]\n", num_objs, objects_list[num_objs-1]); } - ds3_bulk_object_list_response* bulk_object_list = ds3_convert_object_list_from_strings((const char**)objects_list, num_objs); + ds3_bulk_object_list_response* bulk_object_list = ds3_convert_file_list_with_basepath((const char**)objects_list, num_objs, dir_path); - request = ds3_init_put_bulk_job_spectra_s3_request(bucket_name, bulk_object_list); + ds3_request* request = ds3_init_put_bulk_job_spectra_s3_request(bucket_name, bulk_object_list); ds3_master_object_list_response* mol; error = ds3_put_bulk_job_spectra_s3_request(client, request, &mol); ds3_request_free(request); ds3_bulk_object_list_response_free(bulk_object_list); handle_error(error); + // Allocate cache ds3_master_object_list_response* chunks_list = ensure_available_chunks(client, mol->job_id); // Use helper functions from test.cpp - GPtrArray* put_dir_args = new_put_chunks_threads_args(client, NULL, bucket_name, mol, chunks_list, 1, True); // Last param indicates verbose logging in the spawned thread + GPtrArray* put_dir_args = new_put_chunks_threads_args(client, NULL, dir_path, bucket_name, mol, chunks_list, 1, True); // Last param indicates verbose logging in the spawned thread // capture test start time struct timespec start_time_t, end_time_t; @@ -80,8 +83,9 @@ BOOST_AUTO_TEST_CASE( put_directory, // find elapsed CPU and real time clock_gettime(CLOCK_MONOTONIC, &end_time_t); elapsed_t = timespec_to_seconds(&end_time_t) - timespec_to_seconds(&start_time_t); - ds3_log_message(client1->log, DS3_INFO, " Elapsed time[%f]", elapsed_t); + ds3_log_message(client->log, DS3_INFO, " Elapsed time[%f]", elapsed_t); + g_dir_close(dir_info); ds3_master_object_list_response_free(chunks_list); ds3_master_object_list_response_free(mol); put_chunks_threads_args_free(put_dir_args); diff --git a/test/test.cpp b/test/test.cpp index 72c34460..0ec9d895 100644 --- a/test/test.cpp +++ b/test/test.cpp @@ -469,13 +469,22 @@ ds3_bulk_object_list_response* create_bulk_object_list_from_prefix_with_size(con return obj_list; } +/* + * Provide *EITHER* src_obj_name *OR* src_dir, not both + */ GPtrArray* new_put_chunks_threads_args(ds3_client* client, const char* src_obj_name, + const char* src_dir, const char* dest_bucket_name, const ds3_master_object_list_response* bulk_response, ds3_master_object_list_response* available_chunks, const uint8_t num_threads, const ds3_bool verbose) { + if (src_obj_name && src_dir) { + printf("Error: provide new_put_chunks_threads_with_args() with either src_object_name or src_dir, not both\n"); + return NULL; + } + GPtrArray* put_chunks_args_array = g_ptr_array_new(); for (uint8_t thread_index = 0; thread_index < num_threads; thread_index++) { @@ -483,6 +492,7 @@ GPtrArray* new_put_chunks_threads_args(ds3_client* client, put_objects_args->client = client; put_objects_args->job_id = bulk_response->job_id->value; put_objects_args->src_object_name = (char*)src_obj_name; + put_objects_args->src_dir = (char*)src_dir; put_objects_args->bucket_name = (char*)dest_bucket_name; put_objects_args->chunks_list = available_chunks; put_objects_args->thread_num = thread_index; @@ -528,7 +538,10 @@ void put_chunks_from_file(void* args) { if (_args->src_object_name) { file = fopen(_args->src_object_name, "r"); } else { - file = fopen(object->name->value, "r"); + char* file_with_path = g_strconcat(_args->src_dir, object->name->value, (char*)NULL); + printf(" opening file[%s]\n", file_with_path); + file = fopen(file_with_path, "r"); + g_free(file_with_path); } if (object->offset != 0) { fseek(file, object->offset, SEEK_SET); diff --git a/test/test.h b/test/test.h index 580d5640..12930664 100644 --- a/test/test.h +++ b/test/test.h @@ -93,6 +93,7 @@ typedef struct { ds3_client* client; char* job_id; char* src_object_name; + char* src_dir; char* bucket_name; ds3_master_object_list_response* chunks_list; ds3_bool verbose; @@ -107,6 +108,7 @@ void test_log(const char* message, void* user_data); */ GPtrArray* new_put_chunks_threads_args(ds3_client* client, const char* src_obj_name, + const char* src_dir, const char* dest_bucket_name, const ds3_master_object_list_response* bulk_response, ds3_master_object_list_response* available_chunks, From cdc2992d16073693ade62a86f597f2cc7ce5e4b0 Mon Sep 17 00:00:00 2001 From: DenverM80 Date: Tue, 18 Jul 2017 17:09:05 -0600 Subject: [PATCH 3/9] add put_directory test with 4 threads --- test/put_directory.cpp | 85 ++++++++++++++++++++++++++++++++++++++---- 1 file changed, 77 insertions(+), 8 deletions(-) diff --git a/test/put_directory.cpp b/test/put_directory.cpp index 04c876ee..b76d9d7a 100644 --- a/test/put_directory.cpp +++ b/test/put_directory.cpp @@ -25,18 +25,15 @@ #include "ds3_utils.h" #include "test.h" -namespace utf = boost::unit_test; - -bool check_ds3_test_directory_given() { - return getenv("DS3_TEST_DIRECTORY") != NULL; -}; - +/* BOOST_AUTO_TEST_CASE( put_directory) { - //* boost::unit_test::precondition(check_ds3_test_directory_given)) { printf("-----Testing PUT all objects in a directory-------\n"); const char* dir_path = getenv("DS3_TEST_DIRECTORY"); - BOOST_CHECK(dir_path != NULL); + if (dir_path == NULL) { + printf("ENV[DS3_TEST_DIRECTORY] unset - Skipping put_directory test.\n"); + return; + } const char* bucket_name = "test_bulk_put_directory"; printf(" Putting all files in [%s] to bucket [%s]\n", dir_path, bucket_name); @@ -92,4 +89,76 @@ BOOST_AUTO_TEST_CASE( put_directory) { clear_bucket(client, bucket_name); free_client(client); } +*/ + +BOOST_AUTO_TEST_CASE( put_directory_4_threads) { + printf("-----Testing PUT all objects in a directory with 4 threads-------\n"); + + const char* dir_path = getenv("DS3_TEST_DIRECTORY"); + if (dir_path == NULL) { + printf("ENV[DS3_TEST_DIRECTORY] unset - Skipping put_directory test.\n"); + return; + } + + const char* bucket_name = "test_bulk_put_directory"; + printf(" Putting all files in [%s] to bucket [%s]\n", dir_path, bucket_name); + + ds3_client* client = get_client(); + int client_thread=1; + ds3_client_register_logging(client, DS3_DEBUG, test_log, (void*)&client_thread); // Use DEBUG level logging + + ds3_error* error = create_bucket_with_data_policy(client, bucket_name, ids.data_policy_id->value); + + char* objects_list[100]; + uint64_t num_objs = 0; + GDir* dir_info = g_dir_open(dir_path, 0, NULL); + for (char* current_obj = (char*)g_dir_read_name(dir_info); current_obj != NULL; current_obj = (char*)g_dir_read_name(dir_info)) { + objects_list[num_objs++] = current_obj; + printf(" obj[%" PRIu64 "][%s]\n", num_objs, objects_list[num_objs-1]); + } + + ds3_bulk_object_list_response* bulk_object_list = ds3_convert_file_list_with_basepath((const char**)objects_list, num_objs, dir_path); + + ds3_request* request = ds3_init_put_bulk_job_spectra_s3_request(bucket_name, bulk_object_list); + ds3_master_object_list_response* mol; + error = ds3_put_bulk_job_spectra_s3_request(client, request, &mol); + ds3_request_free(request); + ds3_bulk_object_list_response_free(bulk_object_list); + handle_error(error); + + // Allocate cache + ds3_master_object_list_response* chunks_list = ensure_available_chunks(client, mol->job_id); + + // Use helper functions from test.cpp + const uint8_t num_threads = 4; + GPtrArray* put_dir_args = new_put_chunks_threads_args(client, NULL, dir_path, bucket_name, mol, chunks_list, num_threads, True); // Last param indicates verbose logging in the spawned thread + + + // capture test start time + struct timespec start_time_t, end_time_t; + double elapsed_t; + clock_gettime(CLOCK_MONOTONIC, &start_time_t); + GThread* put_dir_xfer_thread_0 = g_thread_new("put_dir_xfer_thread_1", (GThreadFunc)put_chunks_from_file, g_ptr_array_index(put_dir_args, 0)); + GThread* put_dir_xfer_thread_1 = g_thread_new("put_dir_xfer_thread_2", (GThreadFunc)put_chunks_from_file, g_ptr_array_index(put_dir_args, 1)); + GThread* put_dir_xfer_thread_2 = g_thread_new("put_dir_xfer_thread_3", (GThreadFunc)put_chunks_from_file, g_ptr_array_index(put_dir_args, 2)); + GThread* put_dir_xfer_thread_3 = g_thread_new("put_dir_xfer_thread_4", (GThreadFunc)put_chunks_from_file, g_ptr_array_index(put_dir_args, 3)); + + // Block and cleanup GThread(s) + g_thread_join(put_dir_xfer_thread_0); + g_thread_join(put_dir_xfer_thread_1); + g_thread_join(put_dir_xfer_thread_2); + g_thread_join(put_dir_xfer_thread_3); + + // find elapsed CPU and real time + clock_gettime(CLOCK_MONOTONIC, &end_time_t); + elapsed_t = timespec_to_seconds(&end_time_t) - timespec_to_seconds(&start_time_t); + ds3_log_message(client->log, DS3_INFO, " Elapsed time[%f]", elapsed_t); + + g_dir_close(dir_info); + ds3_master_object_list_response_free(chunks_list); + ds3_master_object_list_response_free(mol); + put_chunks_threads_args_free(put_dir_args); + clear_bucket(client, bucket_name); + free_client(client); +} From 46b661bdae82fb897011ab2d4859211957acd4b1 Mon Sep 17 00:00:00 2001 From: DenverM80 Date: Wed, 19 Jul 2017 17:11:11 -0600 Subject: [PATCH 4/9] Add debugging prints, WIP log timestamp --- src/ds3_connection.c | 8 ++++++++ test/put_directory.cpp | 10 +++++----- test/test.cpp | 15 +++++++++++++-- 3 files changed, 26 insertions(+), 7 deletions(-) diff --git a/src/ds3_connection.c b/src/ds3_connection.c index a90fb3b4..38d3fb68 100644 --- a/src/ds3_connection.c +++ b/src/ds3_connection.c @@ -28,6 +28,7 @@ ds3_connection_pool* ds3_connection_pool_init(void) { } ds3_connection_pool* ds3_connection_pool_init_with_size(uint16_t pool_size) { + printf("ds3_connection_pool_init_with_size(%u)\n", pool_size); ds3_connection_pool* pool = g_new0(ds3_connection_pool, 1); pool->connections = g_new0(ds3_connection*, pool_size); pool->num_connections = pool_size; @@ -38,6 +39,7 @@ ds3_connection_pool* ds3_connection_pool_init_with_size(uint16_t pool_size) { } void ds3_connection_pool_clear(ds3_connection_pool* pool, ds3_bool already_locked) { + printf("ds3_connection_pool_clear(%s)\n", (already_locked ? "locked" : "not locked")); int index; if (pool == NULL) { @@ -61,14 +63,17 @@ void ds3_connection_pool_clear(ds3_connection_pool* pool, ds3_bool already_locke } static int _pool_inc(int index, uint16_t num_connections) { + printf("_pool_inc(%d, %u) :[%d]\n", index, num_connections, (index+1) % num_connections); return (index+1) % num_connections; } static int _pool_full(ds3_connection_pool* pool) { + printf("_pool_full(): head[%d] tail[%d] : [%d]\n", pool->head, pool->tail, (_pool_inc(pool->head, pool->num_connections) == pool->tail) ); return (_pool_inc(pool->head, pool->num_connections) == pool->tail); } ds3_connection* ds3_connection_acquire(ds3_connection_pool* pool) { + printf("ds3_connection_acquire() BEGIN: head[%d] tail[%d]\n", pool->head, pool->tail); ds3_connection* connection = NULL; g_mutex_lock(&pool->mutex); @@ -87,10 +92,12 @@ ds3_connection* ds3_connection_acquire(ds3_connection_pool* pool) { g_mutex_unlock(&pool->mutex); + printf("ds3_connection_acquire() END: head[%d] tail[%d]\n", pool->head, pool->tail); return connection; } void ds3_connection_release(ds3_connection_pool* pool, ds3_connection* connection) { + printf("ds3_connection_release() BEGIN: head[%d] tail[%d]\n", pool->head, pool->tail); g_mutex_lock(&pool->mutex); curl_easy_reset(connection); @@ -98,6 +105,7 @@ void ds3_connection_release(ds3_connection_pool* pool, ds3_connection* connectio g_mutex_unlock(&pool->mutex); g_cond_signal(&pool->available_connections); + printf("ds3_connection_release() END: head[%d] tail[%d]\n", pool->head, pool->tail); } void ds3_connection_pool_inc_ref(ds3_connection_pool* pool) { diff --git a/test/put_directory.cpp b/test/put_directory.cpp index b76d9d7a..fd23098f 100644 --- a/test/put_directory.cpp +++ b/test/put_directory.cpp @@ -103,7 +103,7 @@ BOOST_AUTO_TEST_CASE( put_directory_4_threads) { const char* bucket_name = "test_bulk_put_directory"; printf(" Putting all files in [%s] to bucket [%s]\n", dir_path, bucket_name); - ds3_client* client = get_client(); + ds3_client* client = get_client_at_loglvl(DS3_DEBUG); int client_thread=1; ds3_client_register_logging(client, DS3_DEBUG, test_log, (void*)&client_thread); // Use DEBUG level logging @@ -139,10 +139,10 @@ BOOST_AUTO_TEST_CASE( put_directory_4_threads) { double elapsed_t; clock_gettime(CLOCK_MONOTONIC, &start_time_t); - GThread* put_dir_xfer_thread_0 = g_thread_new("put_dir_xfer_thread_1", (GThreadFunc)put_chunks_from_file, g_ptr_array_index(put_dir_args, 0)); - GThread* put_dir_xfer_thread_1 = g_thread_new("put_dir_xfer_thread_2", (GThreadFunc)put_chunks_from_file, g_ptr_array_index(put_dir_args, 1)); - GThread* put_dir_xfer_thread_2 = g_thread_new("put_dir_xfer_thread_3", (GThreadFunc)put_chunks_from_file, g_ptr_array_index(put_dir_args, 2)); - GThread* put_dir_xfer_thread_3 = g_thread_new("put_dir_xfer_thread_4", (GThreadFunc)put_chunks_from_file, g_ptr_array_index(put_dir_args, 3)); + GThread* put_dir_xfer_thread_0 = g_thread_new("put_dir_xfer_thread_0", (GThreadFunc)put_chunks_from_file, g_ptr_array_index(put_dir_args, 0)); + GThread* put_dir_xfer_thread_1 = g_thread_new("put_dir_xfer_thread_1", (GThreadFunc)put_chunks_from_file, g_ptr_array_index(put_dir_args, 1)); + GThread* put_dir_xfer_thread_2 = g_thread_new("put_dir_xfer_thread_2", (GThreadFunc)put_chunks_from_file, g_ptr_array_index(put_dir_args, 2)); + GThread* put_dir_xfer_thread_3 = g_thread_new("put_dir_xfer_thread_3", (GThreadFunc)put_chunks_from_file, g_ptr_array_index(put_dir_args, 3)); // Block and cleanup GThread(s) g_thread_join(put_dir_xfer_thread_0); diff --git a/test/test.cpp b/test/test.cpp index 0ec9d895..f2ff686d 100644 --- a/test/test.cpp +++ b/test/test.cpp @@ -45,12 +45,20 @@ struct BoostTestFixture { BOOST_GLOBAL_FIXTURE( BoostTestFixture ); +void log_timestamp(char* string_buff, long buff_size) +{ + + g_snprintf(string_buff, buff_size, "%s", ); +} + void test_log(const char* message, void* user_data) { + char timebuffer[32]; + log_timestamp(timebuffer, 32); if (user_data) { int client_num = *((int*)user_data); - fprintf(stderr, "ClientNum[%d], Log Message: %s\n", client_num, message); + fprintf(stderr, "%s Client[%d] %s\n", client_num, message); } else { - fprintf(stderr, "Log Message: %s\n", message); + fprintf(stderr, "%s %s\n", message); } } @@ -541,6 +549,9 @@ void put_chunks_from_file(void* args) { char* file_with_path = g_strconcat(_args->src_dir, object->name->value, (char*)NULL); printf(" opening file[%s]\n", file_with_path); file = fopen(file_with_path, "r"); + if (file == NULL) { + printf(" ***Unable to open file[%s]!!!\n", file_with_path); + } g_free(file_with_path); } if (object->offset != 0) { From 2c86955bbb9a5f9863f2084235f523bd02d47025 Mon Sep 17 00:00:00 2001 From: DenverM80 Date: Mon, 24 Jul 2017 10:53:45 -0600 Subject: [PATCH 5/9] WIP; ds3_connection_pool queue --- src/ds3_connection.c | 60 +++++++++++++++++++++++++++++++------------- src/ds3_connection.h | 3 ++- test/test.cpp | 21 +++++++++++++--- 3 files changed, 61 insertions(+), 23 deletions(-) diff --git a/src/ds3_connection.c b/src/ds3_connection.c index 38d3fb68..3b1c44df 100644 --- a/src/ds3_connection.c +++ b/src/ds3_connection.c @@ -27,11 +27,26 @@ ds3_connection_pool* ds3_connection_pool_init(void) { return ds3_connection_pool_init_with_size(CONNECTION_POOL_SIZE); } +void _ds3_queue_init(ds3_connection_pool* pool) { + int index; + pool->queue = g_new0(int, pool->size); + for (index = 0; index < pool->size; index++) { + pool->queue[index] = index; // init to default + } +} + +void _ds3_queue_free(ds3_connection_pool* pool) { + g_free(pool->queue); +} + ds3_connection_pool* ds3_connection_pool_init_with_size(uint16_t pool_size) { printf("ds3_connection_pool_init_with_size(%u)\n", pool_size); ds3_connection_pool* pool = g_new0(ds3_connection_pool, 1); pool->connections = g_new0(ds3_connection*, pool_size); - pool->num_connections = pool_size; + pool->size = pool_size; + + _ds3_queue_init(pool); + g_mutex_init(&pool->mutex); g_cond_init(&pool->available_connections); pool->ref_count = 1; @@ -39,7 +54,6 @@ ds3_connection_pool* ds3_connection_pool_init_with_size(uint16_t pool_size) { } void ds3_connection_pool_clear(ds3_connection_pool* pool, ds3_bool already_locked) { - printf("ds3_connection_pool_clear(%s)\n", (already_locked ? "locked" : "not locked")); int index; if (pool == NULL) { @@ -49,63 +63,73 @@ void ds3_connection_pool_clear(ds3_connection_pool* pool, ds3_bool already_locke if (already_locked == False) { g_mutex_lock(&pool->mutex); } + printf("ds3_connection_pool_clear(%s)\n", (already_locked ? "locked" : "not locked")); - for (index = 0; index < pool->num_connections; index++) { + for (index = 0; index < pool->size; index++) { if (pool->connections[index] != NULL) { curl_easy_cleanup(pool->connections[index]); } } + _ds3_queue_free(pool); + g_free(pool->connections); g_mutex_unlock(&pool->mutex); g_mutex_clear(&pool->mutex); // an attempt to clear a locked mutex is undefined g_cond_clear(&pool->available_connections); } -static int _pool_inc(int index, uint16_t num_connections) { - printf("_pool_inc(%d, %u) :[%d]\n", index, num_connections, (index+1) % num_connections); - return (index+1) % num_connections; +static int _queue_inc(int index, uint16_t size) { + printf("_pool_inc(%d, %u) :[%d]\n", index, size, (index+1) % size); + return (index+1) % size; } -static int _pool_full(ds3_connection_pool* pool) { - printf("_pool_full(): head[%d] tail[%d] : [%d]\n", pool->head, pool->tail, (_pool_inc(pool->head, pool->num_connections) == pool->tail) ); - return (_pool_inc(pool->head, pool->num_connections) == pool->tail); +static int _queue_full(ds3_connection_pool* pool) { + return (_pool_inc(pool->head, pool->size) == pool->tail); } ds3_connection* ds3_connection_acquire(ds3_connection_pool* pool) { - printf("ds3_connection_acquire() BEGIN: head[%d] tail[%d]\n", pool->head, pool->tail); ds3_connection* connection = NULL; + int next_connection_index; g_mutex_lock(&pool->mutex); + printf("ds3_connection_acquire() BEGIN: head[%d] tail[%d]\n", pool->head, pool->tail); while (_pool_full(pool)) { g_cond_wait(&pool->available_connections, &pool->mutex); } - if (pool->connections[pool->head] == NULL) { + next_connection_index = pool->queue[pool->head]; + if (pool->connections[next_connection_index] == NULL) { + connection = g_new0(ds3_connection, 1); connection = curl_easy_init(); - pool->connections[pool->head] = connection; + pool->connections[next_connection_index] = connection; } else { - connection = pool->connections[pool->head]; + connection = pool->connections[next_connection_index]; } - pool->head = _pool_inc(pool->head, pool->num_connections); + pool->head = _pool_inc(pool->head, pool->size); + printf("ds3_connection_acquire() END: head[%d] tail[%d]\n", pool->head, pool->tail); g_mutex_unlock(&pool->mutex); - printf("ds3_connection_acquire() END: head[%d] tail[%d]\n", pool->head, pool->tail); return connection; } void ds3_connection_release(ds3_connection_pool* pool, ds3_connection* connection) { - printf("ds3_connection_release() BEGIN: head[%d] tail[%d]\n", pool->head, pool->tail); + int tail_connection_index; + g_mutex_lock(&pool->mutex); + printf("ds3_connection_release() BEGIN: head[%d] tail[%d]\n", pool->head, pool->tail); curl_easy_reset(connection); - pool->tail = _pool_inc(pool->tail, pool->num_connections); + tail_connection_index = pool->queue[pool->tail]; + + pool->connections[tail_connection_index] = connection; + pool->tail = _pool_inc(pool->tail, pool->size); + printf("ds3_connection_release() END: head[%d] tail[%d]\n", pool->head, pool->tail); g_mutex_unlock(&pool->mutex); g_cond_signal(&pool->available_connections); - printf("ds3_connection_release() END: head[%d] tail[%d]\n", pool->head, pool->tail); } void ds3_connection_pool_inc_ref(ds3_connection_pool* pool) { diff --git a/src/ds3_connection.h b/src/ds3_connection.h index 5374bddd..0a437eac 100644 --- a/src/ds3_connection.h +++ b/src/ds3_connection.h @@ -37,7 +37,8 @@ typedef CURL ds3_connection; //-- Opaque struct struct _ds3_connection_pool{ ds3_connection** connections; - uint16_t num_connections; + int* queue; + uint16_t size; int head; int tail; ds3_mutex mutex; diff --git a/test/test.cpp b/test/test.cpp index f2ff686d..8a4509cf 100644 --- a/test/test.cpp +++ b/test/test.cpp @@ -47,8 +47,21 @@ BOOST_GLOBAL_FIXTURE( BoostTestFixture ); void log_timestamp(char* string_buff, long buff_size) { - - g_snprintf(string_buff, buff_size, "%s", ); + time_t ltime; + struct tm result; + struct timeval tv; + char usec_buff[8]; + + gettimeofday(&tv, NULL); + millisec = lrint(tv.tv_usec/1000.0); // Round to nearest millisec + + ltime = time(NULL); + localtime_r(<ime, &result); + + strftime(string_buff, buff_size, "%Y:%m:%dT%H:%M:%S", tm); + strcat(string_buff, "."); + sprintf(usec_buff,"%d", (int)tmnow.tv_usec); + strcat(string_buff, usec_buff); } void test_log(const char* message, void* user_data) { @@ -56,9 +69,9 @@ void test_log(const char* message, void* user_data) { log_timestamp(timebuffer, 32); if (user_data) { int client_num = *((int*)user_data); - fprintf(stderr, "%s Client[%d] %s\n", client_num, message); + fprintf(stderr, "%s Client[%d] %s\n", timebuffer, client_num, message); } else { - fprintf(stderr, "%s %s\n", message); + fprintf(stderr, "%s %s\n", timebuffer, message); } } From 34ea83437d7cd452a4fc3bcf527149473dd32509 Mon Sep 17 00:00:00 2001 From: DenverM80 Date: Mon, 24 Jul 2017 11:06:42 -0600 Subject: [PATCH 6/9] save index of head into pool->queue[tail] at time of acquire --- src/ds3_connection.c | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/ds3_connection.c b/src/ds3_connection.c index 3b1c44df..2d69a947 100644 --- a/src/ds3_connection.c +++ b/src/ds3_connection.c @@ -99,6 +99,7 @@ ds3_connection* ds3_connection_acquire(ds3_connection_pool* pool) { } next_connection_index = pool->queue[pool->head]; + pool->queue[pool->head] = -1; // invalid while in use if (pool->connections[next_connection_index] == NULL) { connection = g_new0(ds3_connection, 1); connection = curl_easy_init(); @@ -107,6 +108,7 @@ ds3_connection* ds3_connection_acquire(ds3_connection_pool* pool) { } else { connection = pool->connections[next_connection_index]; } + pool->queue[pool->tail] = pool->head; pool->head = _pool_inc(pool->head, pool->size); printf("ds3_connection_acquire() END: head[%d] tail[%d]\n", pool->head, pool->tail); @@ -116,15 +118,11 @@ ds3_connection* ds3_connection_acquire(ds3_connection_pool* pool) { } void ds3_connection_release(ds3_connection_pool* pool, ds3_connection* connection) { - int tail_connection_index; - g_mutex_lock(&pool->mutex); printf("ds3_connection_release() BEGIN: head[%d] tail[%d]\n", pool->head, pool->tail); curl_easy_reset(connection); - tail_connection_index = pool->queue[pool->tail]; - pool->connections[tail_connection_index] = connection; pool->tail = _pool_inc(pool->tail, pool->size); printf("ds3_connection_release() END: head[%d] tail[%d]\n", pool->head, pool->tail); From d67fb0cb2a4e2b99d78c633db1695774190a61bb Mon Sep 17 00:00:00 2001 From: Ryan Moore Date: Mon, 24 Jul 2017 14:28:25 -0600 Subject: [PATCH 7/9] Adding the connection pooling implementation and some header file cleanup --- .gitignore | 5 ++- src/ds3.c | 1 + src/ds3_connection.c | 82 ++++++++++++++++++++++---------------------- src/ds3_connection.h | 15 ++------ src/ds3_net.c | 2 -- src/ds3_net.h | 1 - 6 files changed, 49 insertions(+), 57 deletions(-) diff --git a/.gitignore b/.gitignore index c5623f58..4944f8c2 100644 --- a/.gitignore +++ b/.gitignore @@ -43,4 +43,7 @@ Debug/ *.ilk *.pdb ds3.dll -/win32/output/bin \ No newline at end of file +/win32/output/bin +.idea/ +cmake-build-debug/ +*.dylib diff --git a/src/ds3.c b/src/ds3.c index 5eafced3..83c80b13 100644 --- a/src/ds3.c +++ b/src/ds3.c @@ -25,6 +25,7 @@ #include "ds3.h" #include "ds3_net.h" +#include "ds3_connection.h" #include "ds3_request.h" #include "ds3_string_multimap_impl.h" #include "ds3_utils.h" diff --git a/src/ds3_connection.c b/src/ds3_connection.c index 2d69a947..26279d00 100644 --- a/src/ds3_connection.c +++ b/src/ds3_connection.c @@ -20,35 +20,36 @@ #include #include #include -#include "ds3_net.h" - +#include "ds3_connection.h" + +//-- Opaque struct +struct _ds3_connection_pool{ + ds3_connection** connections; + uint16_t num_connections; // the number of connections created + ds3_connection** connection_queue; + uint16_t max_connections; // max number of possible connections, which the connections and queue arrays will be initialized to + int queue_head; + int queue_tail; + ds3_mutex mutex; + ds3_condition available_connection_notifier; + uint16_t ref_count; +}; ds3_connection_pool* ds3_connection_pool_init(void) { - return ds3_connection_pool_init_with_size(CONNECTION_POOL_SIZE); -} - -void _ds3_queue_init(ds3_connection_pool* pool) { - int index; - pool->queue = g_new0(int, pool->size); - for (index = 0; index < pool->size; index++) { - pool->queue[index] = index; // init to default - } -} - -void _ds3_queue_free(ds3_connection_pool* pool) { - g_free(pool->queue); + return ds3_connection_pool_init_with_size(DEFAULT_CONNECTION_POOL_SIZE); } ds3_connection_pool* ds3_connection_pool_init_with_size(uint16_t pool_size) { printf("ds3_connection_pool_init_with_size(%u)\n", pool_size); ds3_connection_pool* pool = g_new0(ds3_connection_pool, 1); + pool->connections = g_new0(ds3_connection*, pool_size); - pool->size = pool_size; + pool->connection_queue = g_new0(ds3_connection*, pool_size); - _ds3_queue_init(pool); + pool->max_connections = pool_size; g_mutex_init(&pool->mutex); - g_cond_init(&pool->available_connections); + g_cond_init(&pool->available_connection_notifier); pool->ref_count = 1; return pool; } @@ -65,18 +66,17 @@ void ds3_connection_pool_clear(ds3_connection_pool* pool, ds3_bool already_locke } printf("ds3_connection_pool_clear(%s)\n", (already_locked ? "locked" : "not locked")); - for (index = 0; index < pool->size; index++) { + for (index = 0; index < pool->num_connections; index++) { if (pool->connections[index] != NULL) { curl_easy_cleanup(pool->connections[index]); } } - _ds3_queue_free(pool); - g_free(pool->connections); + g_free(pool->connection_queue); g_mutex_unlock(&pool->mutex); g_mutex_clear(&pool->mutex); // an attempt to clear a locked mutex is undefined - g_cond_clear(&pool->available_connections); + g_cond_clear(&pool->available_connection_notifier); } static int _queue_inc(int index, uint16_t size) { @@ -84,34 +84,32 @@ static int _queue_inc(int index, uint16_t size) { return (index+1) % size; } -static int _queue_full(ds3_connection_pool* pool) { - return (_pool_inc(pool->head, pool->size) == pool->tail); +static int _queue_is_empty(ds3_connection_pool* pool) { + int queue_head = pool->queue_head; + return pool->queue_tail == queue_head && pool->connection_queue[queue_head] == NULL; } ds3_connection* ds3_connection_acquire(ds3_connection_pool* pool) { ds3_connection* connection = NULL; - int next_connection_index; g_mutex_lock(&pool->mutex); - printf("ds3_connection_acquire() BEGIN: head[%d] tail[%d]\n", pool->head, pool->tail); - while (_pool_full(pool)) { - g_cond_wait(&pool->available_connections, &pool->mutex); + printf("ds3_connection_acquire() BEGIN: head[%d] tail[%d]\n", pool->queue_head, pool->queue_tail); + while (_queue_is_empty(pool) && pool->num_connections >= pool->max_connections) { + g_cond_wait(&pool->available_connection_notifier, &pool->mutex); } - next_connection_index = pool->queue[pool->head]; - pool->queue[pool->head] = -1; // invalid while in use - if (pool->connections[next_connection_index] == NULL) { - connection = g_new0(ds3_connection, 1); + if (_queue_is_empty(pool)) { connection = curl_easy_init(); - pool->connections[next_connection_index] = connection; + pool->connections[pool->num_connections] = connection; + pool->num_connections++; } else { - connection = pool->connections[next_connection_index]; + connection = pool->connection_queue[pool->queue_tail]; + pool->connection_queue[pool->queue_tail] = NULL; + pool->queue_tail = _queue_inc(pool->queue_tail, pool->max_connections); } - pool->queue[pool->tail] = pool->head; - pool->head = _pool_inc(pool->head, pool->size); - printf("ds3_connection_acquire() END: head[%d] tail[%d]\n", pool->head, pool->tail); + printf("ds3_connection_acquire() END: head[%d] tail[%d]\n", pool->queue_head, pool->queue_tail); g_mutex_unlock(&pool->mutex); return connection; @@ -119,15 +117,17 @@ ds3_connection* ds3_connection_acquire(ds3_connection_pool* pool) { void ds3_connection_release(ds3_connection_pool* pool, ds3_connection* connection) { g_mutex_lock(&pool->mutex); - printf("ds3_connection_release() BEGIN: head[%d] tail[%d]\n", pool->head, pool->tail); + printf("ds3_connection_release() BEGIN: head[%d] tail[%d]\n", pool->queue_head, pool->queue_tail); curl_easy_reset(connection); - pool->tail = _pool_inc(pool->tail, pool->size); + pool->connection_queue[pool->queue_head] = connection; + + pool->queue_head = _queue_inc(pool->queue_head, pool->max_connections); - printf("ds3_connection_release() END: head[%d] tail[%d]\n", pool->head, pool->tail); + printf("ds3_connection_release() END: head[%d] tail[%d]\n", pool->queue_head, pool->queue_tail); + g_cond_signal(&pool->available_connection_notifier); g_mutex_unlock(&pool->mutex); - g_cond_signal(&pool->available_connections); } void ds3_connection_pool_inc_ref(ds3_connection_pool* pool) { diff --git a/src/ds3_connection.h b/src/ds3_connection.h index 0a437eac..154d96fa 100644 --- a/src/ds3_connection.h +++ b/src/ds3_connection.h @@ -26,25 +26,16 @@ extern "C" { #include #include +#include "ds3.h" -#define CONNECTION_POOL_SIZE 10 +#define DEFAULT_CONNECTION_POOL_SIZE 10 typedef GMutex ds3_mutex; typedef GCond ds3_condition; typedef CURL ds3_connection; -//-- Opaque struct -struct _ds3_connection_pool{ - ds3_connection** connections; - int* queue; - uint16_t size; - int head; - int tail; - ds3_mutex mutex; - ds3_condition available_connections; - uint16_t ref_count; -}; +typedef struct _ds3_connection_pool ds3_connection_pool; ds3_connection_pool* ds3_connection_pool_init(void); ds3_connection_pool* ds3_connection_pool_init_with_size(uint16_t pool_size); diff --git a/src/ds3_net.c b/src/ds3_net.c index 8d64b263..e06df88d 100644 --- a/src/ds3_net.c +++ b/src/ds3_net.c @@ -20,10 +20,8 @@ #include #include "ds3_request.h" -#include "ds3.h" #include "ds3_net.h" #include "ds3_utils.h" -#include "ds3_string_multimap.h" #include "ds3_string_multimap_impl.h" #include "ds3_connection.h" diff --git a/src/ds3_net.h b/src/ds3_net.h index 5e9ad115..1bca07e4 100644 --- a/src/ds3_net.h +++ b/src/ds3_net.h @@ -26,7 +26,6 @@ extern "C" { #include "ds3.h" #include "ds3_string_multimap.h" -#include "ds3_connection.h" char* escape_url(const char* url); char* escape_url_extended(const char* url, const char** delimiters, uint32_t num_delimiters); From 7b8094f81bc3cc52c35e63d4ecd76f01736d37a2 Mon Sep 17 00:00:00 2001 From: DenverM80 Date: Mon, 24 Jul 2017 16:25:48 -0600 Subject: [PATCH 8/9] Remove debug prints and other code cleanup --- src/ds3_connection.c | 7 ----- test/put_directory.cpp | 68 +----------------------------------------- test/test.cpp | 9 +++--- 3 files changed, 6 insertions(+), 78 deletions(-) diff --git a/src/ds3_connection.c b/src/ds3_connection.c index 26279d00..3b76b326 100644 --- a/src/ds3_connection.c +++ b/src/ds3_connection.c @@ -40,7 +40,6 @@ ds3_connection_pool* ds3_connection_pool_init(void) { } ds3_connection_pool* ds3_connection_pool_init_with_size(uint16_t pool_size) { - printf("ds3_connection_pool_init_with_size(%u)\n", pool_size); ds3_connection_pool* pool = g_new0(ds3_connection_pool, 1); pool->connections = g_new0(ds3_connection*, pool_size); @@ -64,7 +63,6 @@ void ds3_connection_pool_clear(ds3_connection_pool* pool, ds3_bool already_locke if (already_locked == False) { g_mutex_lock(&pool->mutex); } - printf("ds3_connection_pool_clear(%s)\n", (already_locked ? "locked" : "not locked")); for (index = 0; index < pool->num_connections; index++) { if (pool->connections[index] != NULL) { @@ -80,7 +78,6 @@ void ds3_connection_pool_clear(ds3_connection_pool* pool, ds3_bool already_locke } static int _queue_inc(int index, uint16_t size) { - printf("_pool_inc(%d, %u) :[%d]\n", index, size, (index+1) % size); return (index+1) % size; } @@ -93,7 +90,6 @@ ds3_connection* ds3_connection_acquire(ds3_connection_pool* pool) { ds3_connection* connection = NULL; g_mutex_lock(&pool->mutex); - printf("ds3_connection_acquire() BEGIN: head[%d] tail[%d]\n", pool->queue_head, pool->queue_tail); while (_queue_is_empty(pool) && pool->num_connections >= pool->max_connections) { g_cond_wait(&pool->available_connection_notifier, &pool->mutex); } @@ -109,7 +105,6 @@ ds3_connection* ds3_connection_acquire(ds3_connection_pool* pool) { pool->queue_tail = _queue_inc(pool->queue_tail, pool->max_connections); } - printf("ds3_connection_acquire() END: head[%d] tail[%d]\n", pool->queue_head, pool->queue_tail); g_mutex_unlock(&pool->mutex); return connection; @@ -117,7 +112,6 @@ ds3_connection* ds3_connection_acquire(ds3_connection_pool* pool) { void ds3_connection_release(ds3_connection_pool* pool, ds3_connection* connection) { g_mutex_lock(&pool->mutex); - printf("ds3_connection_release() BEGIN: head[%d] tail[%d]\n", pool->queue_head, pool->queue_tail); curl_easy_reset(connection); @@ -125,7 +119,6 @@ void ds3_connection_release(ds3_connection_pool* pool, ds3_connection* connectio pool->queue_head = _queue_inc(pool->queue_head, pool->max_connections); - printf("ds3_connection_release() END: head[%d] tail[%d]\n", pool->queue_head, pool->queue_tail); g_cond_signal(&pool->available_connection_notifier); g_mutex_unlock(&pool->mutex); } diff --git a/test/put_directory.cpp b/test/put_directory.cpp index fd23098f..b14e3227 100644 --- a/test/put_directory.cpp +++ b/test/put_directory.cpp @@ -25,78 +25,12 @@ #include "ds3_utils.h" #include "test.h" -/* -BOOST_AUTO_TEST_CASE( put_directory) { - printf("-----Testing PUT all objects in a directory-------\n"); - - const char* dir_path = getenv("DS3_TEST_DIRECTORY"); - if (dir_path == NULL) { - printf("ENV[DS3_TEST_DIRECTORY] unset - Skipping put_directory test.\n"); - return; - } - - const char* bucket_name = "test_bulk_put_directory"; - printf(" Putting all files in [%s] to bucket [%s]\n", dir_path, bucket_name); - - ds3_client* client = get_client(); - int client_thread=1; - ds3_client_register_logging(client, DS3_DEBUG, test_log, (void*)&client_thread); // Use DEBUG level logging - - ds3_error* error = create_bucket_with_data_policy(client, bucket_name, ids.data_policy_id->value); - - char* objects_list[100]; - uint64_t num_objs = 0; - GDir* dir_info = g_dir_open(dir_path, 0, NULL); - for (char* current_obj = (char*)g_dir_read_name(dir_info); current_obj != NULL; current_obj = (char*)g_dir_read_name(dir_info)) { - objects_list[num_objs++] = current_obj; - printf(" obj[%" PRIu64 "][%s]\n", num_objs, objects_list[num_objs-1]); - } - - ds3_bulk_object_list_response* bulk_object_list = ds3_convert_file_list_with_basepath((const char**)objects_list, num_objs, dir_path); - - ds3_request* request = ds3_init_put_bulk_job_spectra_s3_request(bucket_name, bulk_object_list); - ds3_master_object_list_response* mol; - error = ds3_put_bulk_job_spectra_s3_request(client, request, &mol); - ds3_request_free(request); - ds3_bulk_object_list_response_free(bulk_object_list); - handle_error(error); - - // Allocate cache - ds3_master_object_list_response* chunks_list = ensure_available_chunks(client, mol->job_id); - - // Use helper functions from test.cpp - GPtrArray* put_dir_args = new_put_chunks_threads_args(client, NULL, dir_path, bucket_name, mol, chunks_list, 1, True); // Last param indicates verbose logging in the spawned thread - - // capture test start time - struct timespec start_time_t, end_time_t; - double elapsed_t; - clock_gettime(CLOCK_MONOTONIC, &start_time_t); - - GThread* put_dir_xfer_thread = g_thread_new("put_dir_xfer_thread", (GThreadFunc)put_chunks_from_file, g_ptr_array_index(put_dir_args, 0)); - - // Block and cleanup GThread(s) - g_thread_join(put_dir_xfer_thread); - - // find elapsed CPU and real time - clock_gettime(CLOCK_MONOTONIC, &end_time_t); - elapsed_t = timespec_to_seconds(&end_time_t) - timespec_to_seconds(&start_time_t); - ds3_log_message(client->log, DS3_INFO, " Elapsed time[%f]", elapsed_t); - - g_dir_close(dir_info); - ds3_master_object_list_response_free(chunks_list); - ds3_master_object_list_response_free(mol); - put_chunks_threads_args_free(put_dir_args); - clear_bucket(client, bucket_name); - free_client(client); -} -*/ - BOOST_AUTO_TEST_CASE( put_directory_4_threads) { printf("-----Testing PUT all objects in a directory with 4 threads-------\n"); const char* dir_path = getenv("DS3_TEST_DIRECTORY"); if (dir_path == NULL) { - printf("ENV[DS3_TEST_DIRECTORY] unset - Skipping put_directory test.\n"); + printf("ENV[DS3_TEST_DIRECTORY] unset - Skipping put_directory_4_threads test.\n"); return; } diff --git a/test/test.cpp b/test/test.cpp index 8a4509cf..b6f67cf2 100644 --- a/test/test.cpp +++ b/test/test.cpp @@ -45,12 +45,13 @@ struct BoostTestFixture { BOOST_GLOBAL_FIXTURE( BoostTestFixture ); -void log_timestamp(char* string_buff, long buff_size) +static void _log_timestamp(char* string_buff, long buff_size) { time_t ltime; struct tm result; struct timeval tv; char usec_buff[8]; + int millisec; gettimeofday(&tv, NULL); millisec = lrint(tv.tv_usec/1000.0); // Round to nearest millisec @@ -58,15 +59,15 @@ void log_timestamp(char* string_buff, long buff_size) ltime = time(NULL); localtime_r(<ime, &result); - strftime(string_buff, buff_size, "%Y:%m:%dT%H:%M:%S", tm); + strftime(string_buff, buff_size, "%Y:%m:%dT%H:%M:%S", &result); strcat(string_buff, "."); - sprintf(usec_buff,"%d", (int)tmnow.tv_usec); + sprintf(usec_buff,"%03d", millisec); strcat(string_buff, usec_buff); } void test_log(const char* message, void* user_data) { char timebuffer[32]; - log_timestamp(timebuffer, 32); + _log_timestamp(timebuffer, 32); if (user_data) { int client_num = *((int*)user_data); fprintf(stderr, "%s Client[%d] %s\n", timebuffer, client_num, message); From 7783a2db7c0301d305e52dd997fd1d8dabd3014f Mon Sep 17 00:00:00 2001 From: DenverM80 Date: Mon, 24 Jul 2017 16:50:50 -0600 Subject: [PATCH 9/9] Uncomment all tests, update connection tests for test.h helper function changes --- test/CMakeLists.txt | 26 +++++++++++++------------- test/connection_tests.cpp | 28 +++++++++++++--------------- 2 files changed, 26 insertions(+), 28 deletions(-) diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 68e1245f..33cedd9c 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -64,19 +64,19 @@ else(WIN32) # POSIX endif(WIN32) add_executable(ds3_c_tests - #bucket_tests.cpp - #bulk_get.cpp - #bulk_put.cpp - #checksum.cpp - #deletes_test.cpp - #get_physical_placement.cpp - #job_tests.cpp - #metadata_tests.cpp - #multimap_tests.cpp - #negative_tests.cpp - #search_tests.cpp - #service_tests.cpp - #connection_tests.cpp + bucket_tests.cpp + bulk_get.cpp + bulk_put.cpp + checksum.cpp + deletes_test.cpp + get_physical_placement.cpp + job_tests.cpp + metadata_tests.cpp + multimap_tests.cpp + negative_tests.cpp + search_tests.cpp + service_tests.cpp + connection_tests.cpp put_directory.cpp test.cpp) diff --git a/test/connection_tests.cpp b/test/connection_tests.cpp index fe8ccb3b..1bf7d357 100644 --- a/test/connection_tests.cpp +++ b/test/connection_tests.cpp @@ -25,7 +25,7 @@ BOOST_AUTO_TEST_CASE( ds3_client_create_free ) { printf("-----Testing ds3_client create and free-------\n"); ds3_client* client = get_client(); - BOOST_CHECK_EQUAL(client->connection_pool->ref_count, 1); + BOOST_CHECK(client->connection_pool != NULL); ds3_creds_free(client->creds); ds3_client_free(client); } @@ -34,10 +34,8 @@ BOOST_AUTO_TEST_CASE( ds3_connection_pool_copy ) { printf("-----Testing ds3_copy_client-------\n"); ds3_client* client = get_client(); - BOOST_CHECK_EQUAL(client->connection_pool->ref_count, 1); ds3_client* client_copy = ds3_copy_client(client); - BOOST_CHECK_EQUAL(client->connection_pool->ref_count, 2); BOOST_CHECK_EQUAL(client->endpoint->value, client_copy->endpoint->value); if (client->proxy) { BOOST_CHECK_EQUAL(client->proxy->value, client_copy->proxy->value); @@ -53,7 +51,7 @@ BOOST_AUTO_TEST_CASE( ds3_connection_pool_copy ) { ds3_creds_free(client->creds); ds3_client_free(client); - BOOST_CHECK_EQUAL(client_copy->connection_pool->ref_count, 1); + BOOST_CHECK(client_copy->connection_pool != NULL); ds3_creds_free(client_copy->creds); ds3_client_free(client_copy); } @@ -63,10 +61,10 @@ BOOST_AUTO_TEST_CASE( create_bucket_with_copied_client ) { ds3_client* client = get_client(); ds3_connection_pool* cp = client->connection_pool; - BOOST_CHECK_EQUAL(cp->ref_count, 1); + BOOST_CHECK(cp != NULL); ds3_client* client_copy = ds3_copy_client(client); - BOOST_CHECK_EQUAL(cp->ref_count, 2); + BOOST_CHECK_EQUAL(cp, client_copy->connection_pool); const char* client_bucket_name = "create_bucket_from_original_client"; ds3_error* error = create_bucket_with_data_policy(client, client_bucket_name, ids.data_policy_id->value); @@ -74,7 +72,7 @@ BOOST_AUTO_TEST_CASE( create_bucket_with_copied_client ) { clear_bucket(client, client_bucket_name); ds3_creds_free(client->creds); ds3_client_free(client); - BOOST_CHECK_EQUAL(cp->ref_count, 1); + BOOST_CHECK(client_copy->connection_pool != NULL); const char* copied_client_bucket_name = "create_bucket_from_copied_client"; error = create_bucket_with_data_policy(client_copy, copied_client_bucket_name, ids.data_policy_id->value); @@ -162,7 +160,7 @@ BOOST_AUTO_TEST_CASE( bulk_put_200_very_small_files_multithreaded ) { ds3_master_object_list_response* chunk_response = ensure_available_chunks(client, bulk_response->job_id); - GPtrArray* put_objs_args_array = new_put_chunks_threads_args(client, object_name, bucket_name, bulk_response, chunk_response, num_threads, False); + GPtrArray* put_objs_args_array = new_put_chunks_threads_args(client, object_name, NULL, bucket_name, bulk_response, chunk_response, num_threads, False); GThread* chunks_thread_0 = g_thread_new("objects_0", (GThreadFunc)put_chunks_from_file, g_ptr_array_index(put_objs_args_array, 0)); GThread* chunks_thread_1 = g_thread_new("objects_1", (GThreadFunc)put_chunks_from_file, g_ptr_array_index(put_objs_args_array, 1)); @@ -204,7 +202,7 @@ BOOST_AUTO_TEST_CASE( sequential_vs_parallel_xfer ) { ds3_master_object_list_response* sequential_chunks = ensure_available_chunks(client, mol->job_id); - GPtrArray* put_sequential_objs_threads_array = new_put_chunks_threads_args(client, obj_name, sequential_bucket_name, mol, sequential_chunks, 1, False); + GPtrArray* put_sequential_objs_threads_array = new_put_chunks_threads_args(client, obj_name, NULL, sequential_bucket_name, mol, sequential_chunks, 1, False); // capture sequential test start time clock_gettime(CLOCK_MONOTONIC, &start_time_t); @@ -238,7 +236,7 @@ BOOST_AUTO_TEST_CASE( sequential_vs_parallel_xfer ) { ds3_master_object_list_response* parallel_chunks = ensure_available_chunks(client, mol->job_id); - GPtrArray* put_parallel_objs_threads_array = new_put_chunks_threads_args(client, obj_name, parallel_bucket_name, mol, parallel_chunks, 4, False); + GPtrArray* put_parallel_objs_threads_array = new_put_chunks_threads_args(client, obj_name, NULL, parallel_bucket_name, mol, parallel_chunks, 4, False); // capture sequential test start time clock_gettime(CLOCK_MONOTONIC, &start_time_t); @@ -315,8 +313,8 @@ BOOST_AUTO_TEST_CASE( multiple_client_xfer ) { ds3_master_object_list_response* client1_chunks = ensure_available_chunks(client1, mol1->job_id); ds3_master_object_list_response* client2_chunks = ensure_available_chunks(client2, mol2->job_id); - GPtrArray* client1_put_objs_args = new_put_chunks_threads_args(client1, obj_name, client1_bucket_name, mol1, client1_chunks, 1, True); - GPtrArray* client2_put_objs_args = new_put_chunks_threads_args(client2, obj_name, client2_bucket_name, mol2, client2_chunks, 1, True); + GPtrArray* client1_put_objs_args = new_put_chunks_threads_args(client1, obj_name, NULL, client1_bucket_name, mol1, client1_chunks, 1, True); + GPtrArray* client2_put_objs_args = new_put_chunks_threads_args(client2, obj_name, NULL, client2_bucket_name, mol2, client2_chunks, 1, True); // capture sequential test start time clock_gettime(CLOCK_MONOTONIC, &start_time_t); @@ -407,9 +405,9 @@ BOOST_AUTO_TEST_CASE( performance_bulk_put ) { ds3_master_object_list_response* chunks_response2 = ensure_available_chunks(client2, bulk_response2->job_id); ds3_master_object_list_response* chunks_response3 = ensure_available_chunks(client3, bulk_response3->job_id); - GPtrArray* put_perf_objs_threads_array1 = new_put_chunks_threads_args(client1, obj_prefix, bucket_name1, bulk_response1, chunks_response1, 1, True); - GPtrArray* put_perf_objs_threads_array2 = new_put_chunks_threads_args(client2, obj_prefix, bucket_name2, bulk_response2, chunks_response2, 1, True); - GPtrArray* put_perf_objs_threads_array3 = new_put_chunks_threads_args(client3, obj_prefix, bucket_name3, bulk_response3, chunks_response3, 1, True); + GPtrArray* put_perf_objs_threads_array1 = new_put_chunks_threads_args(client1, obj_prefix, NULL, bucket_name1, bulk_response1, chunks_response1, 1, True); + GPtrArray* put_perf_objs_threads_array2 = new_put_chunks_threads_args(client2, obj_prefix, NULL, bucket_name2, bulk_response2, chunks_response2, 1, True); + GPtrArray* put_perf_objs_threads_array3 = new_put_chunks_threads_args(client3, obj_prefix, NULL, bucket_name3, bulk_response3, chunks_response3, 1, True); // capture sequential test start time struct timespec start_time_t, end_time_t;