Skip to content

Commit

Permalink
Merge pull request #207 from DenverM80/put_dir_test
Browse files Browse the repository at this point in the history
Fix ds3_connection_pool queue behavior
  • Loading branch information
rpmoore authored Jul 25, 2017
2 parents 878c7d3 + 7783a2d commit 743571a
Show file tree
Hide file tree
Showing 11 changed files with 208 additions and 51 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,7 @@ Debug/
*.ilk
*.pdb
ds3.dll
/win32/output/bin
/win32/output/bin
.idea/
cmake-build-debug/
*.dylib
1 change: 1 addition & 0 deletions src/ds3.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

#include "ds3.h"
#include "ds3_net.h"
#include "ds3_connection.h"
#include "ds3_request.h"
#include "ds3_string_multimap_impl.h"
#include "ds3_utils.h"
Expand Down
59 changes: 41 additions & 18 deletions src/ds3_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,35 @@
#include <curl/curl.h>
#include <glib.h>
#include <inttypes.h>
#include "ds3_net.h"

#include "ds3_connection.h"

//-- Opaque struct
struct _ds3_connection_pool{
ds3_connection** connections;
uint16_t num_connections; // the number of connections created
ds3_connection** connection_queue;
uint16_t max_connections; // max number of possible connections, which the connections and queue arrays will be initialized to
int queue_head;
int queue_tail;
ds3_mutex mutex;
ds3_condition available_connection_notifier;
uint16_t ref_count;
};

ds3_connection_pool* ds3_connection_pool_init(void) {
return ds3_connection_pool_init_with_size(CONNECTION_POOL_SIZE);
return ds3_connection_pool_init_with_size(DEFAULT_CONNECTION_POOL_SIZE);
}

ds3_connection_pool* ds3_connection_pool_init_with_size(uint16_t pool_size) {
ds3_connection_pool* pool = g_new0(ds3_connection_pool, 1);

pool->connections = g_new0(ds3_connection*, pool_size);
pool->num_connections = pool_size;
pool->connection_queue = g_new0(ds3_connection*, pool_size);

pool->max_connections = pool_size;

g_mutex_init(&pool->mutex);
g_cond_init(&pool->available_connections);
g_cond_init(&pool->available_connection_notifier);
pool->ref_count = 1;
return pool;
}
Expand All @@ -55,35 +71,39 @@ void ds3_connection_pool_clear(ds3_connection_pool* pool, ds3_bool already_locke
}

g_free(pool->connections);
g_free(pool->connection_queue);
g_mutex_unlock(&pool->mutex);
g_mutex_clear(&pool->mutex); // an attempt to clear a locked mutex is undefined
g_cond_clear(&pool->available_connections);
g_cond_clear(&pool->available_connection_notifier);
}

static int _pool_inc(int index, uint16_t num_connections) {
return (index+1) % num_connections;
static int _queue_inc(int index, uint16_t size) {
return (index+1) % size;
}

static int _pool_full(ds3_connection_pool* pool) {
return (_pool_inc(pool->head, pool->num_connections) == pool->tail);
static int _queue_is_empty(ds3_connection_pool* pool) {
int queue_head = pool->queue_head;
return pool->queue_tail == queue_head && pool->connection_queue[queue_head] == NULL;
}

ds3_connection* ds3_connection_acquire(ds3_connection_pool* pool) {
ds3_connection* connection = NULL;

g_mutex_lock(&pool->mutex);
while (_pool_full(pool)) {
g_cond_wait(&pool->available_connections, &pool->mutex);
while (_queue_is_empty(pool) && pool->num_connections >= pool->max_connections) {
g_cond_wait(&pool->available_connection_notifier, &pool->mutex);
}

if (pool->connections[pool->head] == NULL) {
if (_queue_is_empty(pool)) {
connection = curl_easy_init();

pool->connections[pool->head] = connection;
pool->connections[pool->num_connections] = connection;
pool->num_connections++;
} else {
connection = pool->connections[pool->head];
connection = pool->connection_queue[pool->queue_tail];
pool->connection_queue[pool->queue_tail] = NULL;
pool->queue_tail = _queue_inc(pool->queue_tail, pool->max_connections);
}
pool->head = _pool_inc(pool->head, pool->num_connections);

g_mutex_unlock(&pool->mutex);

Expand All @@ -94,10 +114,13 @@ void ds3_connection_release(ds3_connection_pool* pool, ds3_connection* connectio
g_mutex_lock(&pool->mutex);

curl_easy_reset(connection);
pool->tail = _pool_inc(pool->tail, pool->num_connections);

pool->connection_queue[pool->queue_head] = connection;

pool->queue_head = _queue_inc(pool->queue_head, pool->max_connections);

g_cond_signal(&pool->available_connection_notifier);
g_mutex_unlock(&pool->mutex);
g_cond_signal(&pool->available_connections);
}

void ds3_connection_pool_inc_ref(ds3_connection_pool* pool) {
Expand Down
14 changes: 3 additions & 11 deletions src/ds3_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,24 +26,16 @@ extern "C" {

#include <curl/curl.h>
#include <glib.h>
#include "ds3.h"

#define CONNECTION_POOL_SIZE 10
#define DEFAULT_CONNECTION_POOL_SIZE 10

typedef GMutex ds3_mutex;
typedef GCond ds3_condition;

typedef CURL ds3_connection;

//-- Opaque struct
struct _ds3_connection_pool{
ds3_connection** connections;
uint16_t num_connections;
int head;
int tail;
ds3_mutex mutex;
ds3_condition available_connections;
uint16_t ref_count;
};
typedef struct _ds3_connection_pool ds3_connection_pool;

ds3_connection_pool* ds3_connection_pool_init(void);
ds3_connection_pool* ds3_connection_pool_init_with_size(uint16_t pool_size);
Expand Down
2 changes: 0 additions & 2 deletions src/ds3_net.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,8 @@
#include <curl/curl.h>

#include "ds3_request.h"
#include "ds3.h"
#include "ds3_net.h"
#include "ds3_utils.h"
#include "ds3_string_multimap.h"
#include "ds3_string_multimap_impl.h"
#include "ds3_connection.h"

Expand Down
1 change: 0 additions & 1 deletion src/ds3_net.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ extern "C" {

#include "ds3.h"
#include "ds3_string_multimap.h"
#include "ds3_connection.h"

char* escape_url(const char* url);
char* escape_url_extended(const char* url, const char** delimiters, uint32_t num_delimiters);
Expand Down
1 change: 1 addition & 0 deletions test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ add_executable(ds3_c_tests
search_tests.cpp
service_tests.cpp
connection_tests.cpp
put_directory.cpp
test.cpp)

add_test(regression_tests ds3_c_tests)
Expand Down
28 changes: 13 additions & 15 deletions test/connection_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ BOOST_AUTO_TEST_CASE( ds3_client_create_free ) {
printf("-----Testing ds3_client create and free-------\n");

ds3_client* client = get_client();
BOOST_CHECK_EQUAL(client->connection_pool->ref_count, 1);
BOOST_CHECK(client->connection_pool != NULL);
ds3_creds_free(client->creds);
ds3_client_free(client);
}
Expand All @@ -34,10 +34,8 @@ BOOST_AUTO_TEST_CASE( ds3_connection_pool_copy ) {
printf("-----Testing ds3_copy_client-------\n");

ds3_client* client = get_client();
BOOST_CHECK_EQUAL(client->connection_pool->ref_count, 1);

ds3_client* client_copy = ds3_copy_client(client);
BOOST_CHECK_EQUAL(client->connection_pool->ref_count, 2);
BOOST_CHECK_EQUAL(client->endpoint->value, client_copy->endpoint->value);
if (client->proxy) {
BOOST_CHECK_EQUAL(client->proxy->value, client_copy->proxy->value);
Expand All @@ -53,7 +51,7 @@ BOOST_AUTO_TEST_CASE( ds3_connection_pool_copy ) {
ds3_creds_free(client->creds);
ds3_client_free(client);

BOOST_CHECK_EQUAL(client_copy->connection_pool->ref_count, 1);
BOOST_CHECK(client_copy->connection_pool != NULL);
ds3_creds_free(client_copy->creds);
ds3_client_free(client_copy);
}
Expand All @@ -63,18 +61,18 @@ BOOST_AUTO_TEST_CASE( create_bucket_with_copied_client ) {

ds3_client* client = get_client();
ds3_connection_pool* cp = client->connection_pool;
BOOST_CHECK_EQUAL(cp->ref_count, 1);
BOOST_CHECK(cp != NULL);

ds3_client* client_copy = ds3_copy_client(client);
BOOST_CHECK_EQUAL(cp->ref_count, 2);
BOOST_CHECK_EQUAL(cp, client_copy->connection_pool);

const char* client_bucket_name = "create_bucket_from_original_client";
ds3_error* error = create_bucket_with_data_policy(client, client_bucket_name, ids.data_policy_id->value);
handle_error(error);
clear_bucket(client, client_bucket_name);
ds3_creds_free(client->creds);
ds3_client_free(client);
BOOST_CHECK_EQUAL(cp->ref_count, 1);
BOOST_CHECK(client_copy->connection_pool != NULL);

const char* copied_client_bucket_name = "create_bucket_from_copied_client";
error = create_bucket_with_data_policy(client_copy, copied_client_bucket_name, ids.data_policy_id->value);
Expand Down Expand Up @@ -162,7 +160,7 @@ BOOST_AUTO_TEST_CASE( bulk_put_200_very_small_files_multithreaded ) {

ds3_master_object_list_response* chunk_response = ensure_available_chunks(client, bulk_response->job_id);

GPtrArray* put_objs_args_array = new_put_chunks_threads_args(client, object_name, bucket_name, bulk_response, chunk_response, num_threads, False);
GPtrArray* put_objs_args_array = new_put_chunks_threads_args(client, object_name, NULL, bucket_name, bulk_response, chunk_response, num_threads, False);

GThread* chunks_thread_0 = g_thread_new("objects_0", (GThreadFunc)put_chunks_from_file, g_ptr_array_index(put_objs_args_array, 0));
GThread* chunks_thread_1 = g_thread_new("objects_1", (GThreadFunc)put_chunks_from_file, g_ptr_array_index(put_objs_args_array, 1));
Expand Down Expand Up @@ -204,7 +202,7 @@ BOOST_AUTO_TEST_CASE( sequential_vs_parallel_xfer ) {

ds3_master_object_list_response* sequential_chunks = ensure_available_chunks(client, mol->job_id);

GPtrArray* put_sequential_objs_threads_array = new_put_chunks_threads_args(client, obj_name, sequential_bucket_name, mol, sequential_chunks, 1, False);
GPtrArray* put_sequential_objs_threads_array = new_put_chunks_threads_args(client, obj_name, NULL, sequential_bucket_name, mol, sequential_chunks, 1, False);

// capture sequential test start time
clock_gettime(CLOCK_MONOTONIC, &start_time_t);
Expand Down Expand Up @@ -238,7 +236,7 @@ BOOST_AUTO_TEST_CASE( sequential_vs_parallel_xfer ) {

ds3_master_object_list_response* parallel_chunks = ensure_available_chunks(client, mol->job_id);

GPtrArray* put_parallel_objs_threads_array = new_put_chunks_threads_args(client, obj_name, parallel_bucket_name, mol, parallel_chunks, 4, False);
GPtrArray* put_parallel_objs_threads_array = new_put_chunks_threads_args(client, obj_name, NULL, parallel_bucket_name, mol, parallel_chunks, 4, False);

// capture sequential test start time
clock_gettime(CLOCK_MONOTONIC, &start_time_t);
Expand Down Expand Up @@ -315,8 +313,8 @@ BOOST_AUTO_TEST_CASE( multiple_client_xfer ) {
ds3_master_object_list_response* client1_chunks = ensure_available_chunks(client1, mol1->job_id);
ds3_master_object_list_response* client2_chunks = ensure_available_chunks(client2, mol2->job_id);

GPtrArray* client1_put_objs_args = new_put_chunks_threads_args(client1, obj_name, client1_bucket_name, mol1, client1_chunks, 1, True);
GPtrArray* client2_put_objs_args = new_put_chunks_threads_args(client2, obj_name, client2_bucket_name, mol2, client2_chunks, 1, True);
GPtrArray* client1_put_objs_args = new_put_chunks_threads_args(client1, obj_name, NULL, client1_bucket_name, mol1, client1_chunks, 1, True);
GPtrArray* client2_put_objs_args = new_put_chunks_threads_args(client2, obj_name, NULL, client2_bucket_name, mol2, client2_chunks, 1, True);

// capture sequential test start time
clock_gettime(CLOCK_MONOTONIC, &start_time_t);
Expand Down Expand Up @@ -407,9 +405,9 @@ BOOST_AUTO_TEST_CASE( performance_bulk_put ) {
ds3_master_object_list_response* chunks_response2 = ensure_available_chunks(client2, bulk_response2->job_id);
ds3_master_object_list_response* chunks_response3 = ensure_available_chunks(client3, bulk_response3->job_id);

GPtrArray* put_perf_objs_threads_array1 = new_put_chunks_threads_args(client1, obj_prefix, bucket_name1, bulk_response1, chunks_response1, 1, True);
GPtrArray* put_perf_objs_threads_array2 = new_put_chunks_threads_args(client2, obj_prefix, bucket_name2, bulk_response2, chunks_response2, 1, True);
GPtrArray* put_perf_objs_threads_array3 = new_put_chunks_threads_args(client3, obj_prefix, bucket_name3, bulk_response3, chunks_response3, 1, True);
GPtrArray* put_perf_objs_threads_array1 = new_put_chunks_threads_args(client1, obj_prefix, NULL, bucket_name1, bulk_response1, chunks_response1, 1, True);
GPtrArray* put_perf_objs_threads_array2 = new_put_chunks_threads_args(client2, obj_prefix, NULL, bucket_name2, bulk_response2, chunks_response2, 1, True);
GPtrArray* put_perf_objs_threads_array3 = new_put_chunks_threads_args(client3, obj_prefix, NULL, bucket_name3, bulk_response3, chunks_response3, 1, True);

// capture sequential test start time
struct timespec start_time_t, end_time_t;
Expand Down
98 changes: 98 additions & 0 deletions test/put_directory.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* ******************************************************************************
* Copyright 2014-2016 Spectra Logic Corporation. All Rights Reserved.
* Licensed under the Apache License, Version 2.0 (the "License"). You may not use
* this file except in compliance with the License. A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file.
* This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
* ****************************************************************************
*/

#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <glib.h>
#include <sys/stat.h>
#include <boost/test/unit_test.hpp>
#include <inttypes.h>
#include "ds3.h"
#include "ds3_net.h"
#include "ds3_utils.h"
#include "test.h"

BOOST_AUTO_TEST_CASE( put_directory_4_threads) {
printf("-----Testing PUT all objects in a directory with 4 threads-------\n");

const char* dir_path = getenv("DS3_TEST_DIRECTORY");
if (dir_path == NULL) {
printf("ENV[DS3_TEST_DIRECTORY] unset - Skipping put_directory_4_threads test.\n");
return;
}

const char* bucket_name = "test_bulk_put_directory";
printf(" Putting all files in [%s] to bucket [%s]\n", dir_path, bucket_name);

ds3_client* client = get_client_at_loglvl(DS3_DEBUG);
int client_thread=1;
ds3_client_register_logging(client, DS3_DEBUG, test_log, (void*)&client_thread); // Use DEBUG level logging

ds3_error* error = create_bucket_with_data_policy(client, bucket_name, ids.data_policy_id->value);

char* objects_list[100];
uint64_t num_objs = 0;
GDir* dir_info = g_dir_open(dir_path, 0, NULL);
for (char* current_obj = (char*)g_dir_read_name(dir_info); current_obj != NULL; current_obj = (char*)g_dir_read_name(dir_info)) {
objects_list[num_objs++] = current_obj;
printf(" obj[%" PRIu64 "][%s]\n", num_objs, objects_list[num_objs-1]);
}

ds3_bulk_object_list_response* bulk_object_list = ds3_convert_file_list_with_basepath((const char**)objects_list, num_objs, dir_path);

ds3_request* request = ds3_init_put_bulk_job_spectra_s3_request(bucket_name, bulk_object_list);
ds3_master_object_list_response* mol;
error = ds3_put_bulk_job_spectra_s3_request(client, request, &mol);
ds3_request_free(request);
ds3_bulk_object_list_response_free(bulk_object_list);
handle_error(error);

// Allocate cache
ds3_master_object_list_response* chunks_list = ensure_available_chunks(client, mol->job_id);

// Use helper functions from test.cpp
const uint8_t num_threads = 4;
GPtrArray* put_dir_args = new_put_chunks_threads_args(client, NULL, dir_path, bucket_name, mol, chunks_list, num_threads, True); // Last param indicates verbose logging in the spawned thread


// capture test start time
struct timespec start_time_t, end_time_t;
double elapsed_t;
clock_gettime(CLOCK_MONOTONIC, &start_time_t);

GThread* put_dir_xfer_thread_0 = g_thread_new("put_dir_xfer_thread_0", (GThreadFunc)put_chunks_from_file, g_ptr_array_index(put_dir_args, 0));
GThread* put_dir_xfer_thread_1 = g_thread_new("put_dir_xfer_thread_1", (GThreadFunc)put_chunks_from_file, g_ptr_array_index(put_dir_args, 1));
GThread* put_dir_xfer_thread_2 = g_thread_new("put_dir_xfer_thread_2", (GThreadFunc)put_chunks_from_file, g_ptr_array_index(put_dir_args, 2));
GThread* put_dir_xfer_thread_3 = g_thread_new("put_dir_xfer_thread_3", (GThreadFunc)put_chunks_from_file, g_ptr_array_index(put_dir_args, 3));

// Block and cleanup GThread(s)
g_thread_join(put_dir_xfer_thread_0);
g_thread_join(put_dir_xfer_thread_1);
g_thread_join(put_dir_xfer_thread_2);
g_thread_join(put_dir_xfer_thread_3);

// find elapsed CPU and real time
clock_gettime(CLOCK_MONOTONIC, &end_time_t);
elapsed_t = timespec_to_seconds(&end_time_t) - timespec_to_seconds(&start_time_t);
ds3_log_message(client->log, DS3_INFO, " Elapsed time[%f]", elapsed_t);

g_dir_close(dir_info);
ds3_master_object_list_response_free(chunks_list);
ds3_master_object_list_response_free(mol);
put_chunks_threads_args_free(put_dir_args);
clear_bucket(client, bucket_name);
free_client(client);
}
Loading

0 comments on commit 743571a

Please sign in to comment.