diff --git a/libvmaf/src/feature/feature_extractor.c b/libvmaf/src/feature/feature_extractor.c index 94a4e16ca..21f9e64e5 100644 --- a/libvmaf/src/feature/feature_extractor.c +++ b/libvmaf/src/feature/feature_extractor.c @@ -386,6 +386,8 @@ int vmaf_fex_ctx_pool_aquire(VmafFeatureExtractorContextPool *pool, } err = vmaf_feature_extractor_context_create(&f, entry->fex, d); if (err) goto unlock; + if (f->fex->flags & VMAF_FEATURE_FRAME_SYNC) + f->fex->framesync = (fex->framesync); } if (!entry->ctx_list[i].in_use) { entry->ctx_list[i].fex_ctx = *fex_ctx = f; diff --git a/libvmaf/src/feature/feature_extractor.h b/libvmaf/src/feature/feature_extractor.h index e34188c3b..574436e76 100644 --- a/libvmaf/src/feature/feature_extractor.h +++ b/libvmaf/src/feature/feature_extractor.h @@ -24,6 +24,7 @@ #include #include "dict.h" +#include "framesync.h" #include "feature_collector.h" #include "opt.h" @@ -36,6 +37,7 @@ enum VmafFeatureExtractorFlags { VMAF_FEATURE_EXTRACTOR_TEMPORAL = 1 << 0, VMAF_FEATURE_EXTRACTOR_CUDA = 1 << 1, + VMAF_FEATURE_FRAME_SYNC = 1 << 2, }; typedef struct VmafFeatureExtractor { @@ -94,6 +96,8 @@ typedef struct VmafFeatureExtractor { VmafCudaState *cu_state; ///< VmafCudaState, set by framework #endif + VmafFrameSyncContext *framesync; + } VmafFeatureExtractor; VmafFeatureExtractor *vmaf_get_feature_extractor_by_name(const char *name); diff --git a/libvmaf/src/feature/float_adm.c b/libvmaf/src/feature/float_adm.c index 488298514..60d1986ec 100644 --- a/libvmaf/src/feature/float_adm.c +++ b/libvmaf/src/feature/float_adm.c @@ -22,6 +22,7 @@ #include "dict.h" #include "feature_collector.h" +#include "framesync.h" #include "feature_extractor.h" #include "feature_name.h" diff --git a/libvmaf/src/framesync.c b/libvmaf/src/framesync.c new file mode 100644 index 000000000..5c4c57758 --- /dev/null +++ b/libvmaf/src/framesync.c @@ -0,0 +1,227 @@ +/** + * + * Copyright 2016-2023 Netflix, Inc. + * + * Licensed under the BSD+Patent License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSDplusPatent + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include +#include +#include +#include +#include +#include "framesync.h" + +enum { + BUF_FREE = 0, + BUF_ACQUIRED, + BUF_FILLED, + BUF_RETRIEVED, +}; + +typedef struct VmafFrameSyncBuf { + void *frame_data; + int buf_status; + signed long index; + struct VmafFrameSyncBuf *next; +} VmafFrameSyncBuf; + +typedef struct VmafFrameSyncContext { + VmafFrameSyncBuf *buf_que; + pthread_mutex_t acquire_lock; + pthread_mutex_t retrieve_lock; + pthread_cond_t retrieve; + unsigned buf_cnt; +} VmafFrameSyncContext; + +int vmaf_framesync_init(VmafFrameSyncContext **fs_ctx) +{ + VmafFrameSyncContext *const ctx = *fs_ctx = malloc(sizeof(VmafFrameSyncContext)); + if (!ctx) return -ENOMEM; + memset(ctx, 0, sizeof(VmafFrameSyncContext)); + ctx->buf_cnt = 1; + + pthread_mutex_init(&(ctx->acquire_lock), NULL); + pthread_mutex_init(&(ctx->retrieve_lock), NULL); + pthread_cond_init(&(ctx->retrieve), NULL); + + VmafFrameSyncBuf *buf_que = ctx->buf_que = malloc(sizeof(VmafFrameSyncBuf)); + + buf_que->frame_data = NULL; + buf_que->buf_status = BUF_FREE; + buf_que->index = -1; + buf_que->next = NULL; + + return 0; +} + +int vmaf_framesync_acquire_new_buf(VmafFrameSyncContext *fs_ctx, void **data, + unsigned data_sz, unsigned index) +{ + VmafFrameSyncBuf *buf_que = fs_ctx->buf_que; + *data = NULL; + + pthread_mutex_lock(&(fs_ctx->acquire_lock)); + + // traverse until a free buffer is found + for (unsigned i = 0; i < fs_ctx->buf_cnt; i++) { + if (buf_que->buf_status == BUF_FREE) { + buf_que->frame_data = *data = malloc(data_sz); + if (!buf_que->frame_data) + return -ENOMEM; + buf_que->buf_status = BUF_ACQUIRED; + buf_que->index = index; + break; + } + // move to next node + if (buf_que->next != NULL) + buf_que = buf_que->next; + } + + // create a new node if all nodes are occupied in the list and append to the tail + if (*data == NULL) { + VmafFrameSyncBuf *new_buf_node = malloc(sizeof(VmafFrameSyncBuf)); + buf_que->next = new_buf_node; + new_buf_node->buf_status = BUF_FREE; + new_buf_node->index = -1; + new_buf_node->next = NULL; + fs_ctx->buf_cnt++; + + new_buf_node->frame_data = *data = malloc(data_sz); + if (!new_buf_node->frame_data) + return -ENOMEM; + new_buf_node->buf_status = BUF_ACQUIRED; + new_buf_node->index = index; + } + + pthread_mutex_unlock(&(fs_ctx->acquire_lock)); + + return 0; +} + +int vmaf_framesync_submit_filled_data(VmafFrameSyncContext *fs_ctx, void *data, + unsigned index) +{ + VmafFrameSyncBuf *buf_que = fs_ctx->buf_que; + + pthread_mutex_lock(&(fs_ctx->retrieve_lock)); + + // loop until a matchng buffer is found + for (unsigned i = 0; i < fs_ctx->buf_cnt; i++) { + if ((buf_que->index == index) && (buf_que->buf_status == BUF_ACQUIRED)) { + buf_que->buf_status = BUF_FILLED; + if (data != buf_que->frame_data) + return -1; + break; + } + + // move to next node + if (NULL != buf_que->next) + buf_que = buf_que->next; + } + + pthread_cond_broadcast(&(fs_ctx->retrieve)); + pthread_mutex_unlock(&(fs_ctx->retrieve_lock)); + + return 0; +} + +int vmaf_framesync_retrieve_filled_data(VmafFrameSyncContext *fs_ctx, + void **data, unsigned index) +{ + *data = NULL; + + while (*data == NULL) { + VmafFrameSyncBuf *buf_que = fs_ctx->buf_que; + pthread_mutex_lock(&(fs_ctx->retrieve_lock)); + // loop until a free buffer is found + for (unsigned i = 0; i < fs_ctx->buf_cnt; i++) { + if ((buf_que->index == index) && (buf_que->buf_status == BUF_FILLED)) { + buf_que->buf_status = BUF_RETRIEVED; + *data = buf_que->frame_data; + break; + } + + // move to next node + if (NULL != buf_que->next) + buf_que = buf_que->next; + } + + if (*data == NULL) + pthread_cond_wait(&(fs_ctx->retrieve), &(fs_ctx->retrieve_lock)); + + pthread_mutex_unlock(&(fs_ctx->retrieve_lock)); + } + + return 0; +} + +int vmaf_framesync_release_buf(VmafFrameSyncContext *fs_ctx, void *data, + unsigned index) +{ + VmafFrameSyncBuf *buf_que = fs_ctx->buf_que; + + pthread_mutex_lock(&(fs_ctx->acquire_lock)); + // loop until a matching buffer is found + for (unsigned i = 0; i < fs_ctx->buf_cnt; i++) { + if ((buf_que->index == index) && (buf_que->buf_status == BUF_RETRIEVED)) { + if (data != buf_que->frame_data) + return -1; + + free(buf_que->frame_data); + buf_que->frame_data = NULL; + buf_que->buf_status = BUF_FREE; + buf_que->index = -1; + break; + } + + // move to next node + if (NULL != buf_que->next) + buf_que = buf_que->next; + } + + pthread_mutex_unlock(&(fs_ctx->acquire_lock)); + return 0; +} + +int vmaf_framesync_destroy(VmafFrameSyncContext *fs_ctx) +{ + VmafFrameSyncBuf *buf_que = fs_ctx->buf_que; + VmafFrameSyncBuf *buf_que_tmp; + + pthread_mutex_destroy(&(fs_ctx->acquire_lock)); + pthread_mutex_destroy(&(fs_ctx->retrieve_lock)); + pthread_cond_destroy(&(fs_ctx->retrieve)); + + //check for any data buffers which are not freed + for (unsigned i = 0; i < fs_ctx->buf_cnt; i++) { + if (NULL != buf_que->frame_data) { + free(buf_que->frame_data); + buf_que->frame_data = NULL; + } + + // move to next node + if (NULL != buf_que->next) { + buf_que_tmp = buf_que; + buf_que = buf_que->next; + free(buf_que_tmp); + } else { + free(buf_que); + } + } + + free(fs_ctx); + + return 0; +} diff --git a/libvmaf/src/framesync.h b/libvmaf/src/framesync.h new file mode 100644 index 000000000..08e9a543a --- /dev/null +++ b/libvmaf/src/framesync.h @@ -0,0 +1,46 @@ +/** + * + * Copyright 2016-2023 Netflix, Inc. + * + * Licensed under the BSD+Patent License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSDplusPatent + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef __VMAF_FRAME_SYNC_H__ +#define __VMAF_FRAME_SYNC_H__ + +#include +#include +#include +#include +#include "libvmaf/libvmaf.h" + +typedef struct VmafFrameSyncContext VmafFrameSyncContext; + +int vmaf_framesync_init(VmafFrameSyncContext **fs_ctx); + +int vmaf_framesync_acquire_new_buf(VmafFrameSyncContext *fs_ctx, void **data, + unsigned data_sz, unsigned index); + +int vmaf_framesync_submit_filled_data(VmafFrameSyncContext *fs_ctx, void *data, + unsigned index); + +int vmaf_framesync_retrieve_filled_data(VmafFrameSyncContext *fs_ctx, void **data, + unsigned index); + +int vmaf_framesync_release_buf(VmafFrameSyncContext *fs_ctx, void *data, + unsigned index); + +int vmaf_framesync_destroy(VmafFrameSyncContext *fs_ctx); + +#endif /* __VMAF_FRAME_SYNC_H__ */ diff --git a/libvmaf/src/libvmaf.c b/libvmaf/src/libvmaf.c index 3fbb050c5..fa3fe6787 100644 --- a/libvmaf/src/libvmaf.c +++ b/libvmaf/src/libvmaf.c @@ -56,6 +56,7 @@ typedef struct VmafContext { RegisteredFeatureExtractors registered_feature_extractors; VmafFeatureExtractorContextPool *fex_ctx_pool; VmafThreadPool *thread_pool; + VmafFrameSyncContext *framesync; #ifdef HAVE_CUDA struct { struct { @@ -99,8 +100,10 @@ int vmaf_init(VmafContext **vmaf, VmafConfiguration cfg) vmaf_set_log_level(cfg.log_level); - err = vmaf_feature_collector_init(&(v->feature_collector)); + err = vmaf_framesync_init(&(v->framesync)); if (err) goto free_v; + err = vmaf_feature_collector_init(&(v->feature_collector)); + if (err) goto free_framesync; err = feature_extractor_vector_init(&(v->registered_feature_extractors)); if (err) goto free_feature_collector; @@ -119,6 +122,8 @@ int vmaf_init(VmafContext **vmaf, VmafConfiguration cfg) feature_extractor_vector_destroy(&(v->registered_feature_extractors)); free_feature_collector: vmaf_feature_collector_destroy(v->feature_collector); +free_framesync: + vmaf_framesync_destroy(v->framesync); free_v: free(v); fail: @@ -235,11 +240,20 @@ static int set_fex_cuda_state(VmafFeatureExtractorContext *fex_ctx, #endif +static int set_fex_framesync(VmafFeatureExtractorContext *fex_ctx, + VmafContext *vmaf) +{ + if (fex_ctx->fex->flags & VMAF_FEATURE_FRAME_SYNC) + fex_ctx->fex->framesync = (vmaf->framesync); + return 0; +} + int vmaf_close(VmafContext *vmaf) { if (!vmaf) return -EINVAL; vmaf_thread_pool_wait(vmaf->thread_pool); + vmaf_framesync_destroy(vmaf->framesync); feature_extractor_vector_destroy(&(vmaf->registered_feature_extractors)); vmaf_feature_collector_destroy(vmaf->feature_collector); vmaf_thread_pool_destroy(vmaf->thread_pool); @@ -292,6 +306,7 @@ int vmaf_use_feature(VmafContext *vmaf, const char *feature_name, #ifdef HAVE_CUDA err |= set_fex_cuda_state(fex_ctx, vmaf); #endif + err |= set_fex_framesync(fex_ctx, vmaf); if (err) return err; RegisteredFeatureExtractors *rfe = &(vmaf->registered_feature_extractors); @@ -339,6 +354,7 @@ int vmaf_use_features_from_model(VmafContext *vmaf, VmafModel *model) #ifdef HAVE_CUDA err |= set_fex_cuda_state(fex_ctx, vmaf); #endif + err |= set_fex_framesync(fex_ctx, vmaf); if (err) return err; err = feature_extractor_vector_append(rfe, fex_ctx, 0); if (err) { @@ -405,6 +421,7 @@ static int threaded_read_pictures(VmafContext *vmaf, VmafPicture *ref, continue; } + fex->framesync = vmaf->framesync; VmafFeatureExtractorContext *fex_ctx; err = vmaf_fex_ctx_pool_aquire(vmaf->fex_ctx_pool, fex, opts_dict, &fex_ctx); diff --git a/libvmaf/src/meson.build b/libvmaf/src/meson.build index 623639d4e..065c59e20 100644 --- a/libvmaf/src/meson.build +++ b/libvmaf/src/meson.build @@ -461,6 +461,7 @@ libvmaf_sources = [ src_dir + 'read_json_model.c', src_dir + 'pdjson.c', src_dir + 'log.c', + src_dir + 'framesync.c', ] if is_cuda_enabled diff --git a/libvmaf/test/meson.build b/libvmaf/test/meson.build index 250272035..8ae58e512 100644 --- a/libvmaf/test/meson.build +++ b/libvmaf/test/meson.build @@ -122,6 +122,15 @@ test_psnr = executable('test_psnr', link_with : get_option('default_library') == 'both' ? libvmaf.get_static_lib() : libvmaf, ) +test_framesync = executable('test_framesync', + ['test.c', 'test_framesync.c'], + include_directories : [libvmaf_inc, test_inc, include_directories('../src/')], + link_with : get_option('default_library') == 'both' ? libvmaf.get_static_lib() : libvmaf, + c_args : vmaf_cflags_common, + cpp_args : vmaf_cflags_common, + dependencies : thread_lib, +) + if get_option('enable_cuda') test_ring_buffer = executable('test_ring_buffer', ['test.c', 'test_ring_buffer.c', '../src/cuda/ring_buffer.c', '../src/cuda/picture_cuda.c'], @@ -158,3 +167,4 @@ test('test_cambi', test_cambi) test('test_luminance_tools', test_luminance_tools) test('test_cli_parse', test_cli_parse) test('test_psnr', test_psnr) +test('test_framesync', test_framesync) diff --git a/libvmaf/test/test_framesync.c b/libvmaf/test/test_framesync.c new file mode 100644 index 000000000..f5e9997cc --- /dev/null +++ b/libvmaf/test/test_framesync.c @@ -0,0 +1,150 @@ +/** + * + * Copyright 2016-2020 Netflix, Inc. + * + * Licensed under the BSD+Patent License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSDplusPatent + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include +#include +#ifdef _WIN32 +#include +#else +#include +#endif + +#include "framesync.h" +#include "test.h" +#include "thread_pool.h" + +#define NUM_TEST_FRAMES 10 +#define FRAME_BUF_LEN 1024 + +typedef struct ThreadData { + uint8_t *ref; + uint8_t *dist; + unsigned index; + VmafFrameSyncContext *fs_ctx; + int err; +} ThreadData; + +static void my_worker(void *data) +{ + int ctr; + struct ThreadData *thread_data = data; + uint8_t *shared_buf; + uint8_t *dependent_buf; + + //acquire new buffer from frame sync + vmaf_framesync_acquire_new_buf(thread_data->fs_ctx, (void*)&shared_buf, + FRAME_BUF_LEN, thread_data->index); + + //populate shared buffer with values + for (ctr = 0; ctr < FRAME_BUF_LEN; ctr++) + shared_buf[ctr] = thread_data->ref[ctr] + thread_data->dist[ctr] + 2; + + //submit filled buffer back to frame sync + vmaf_framesync_submit_filled_data(thread_data->fs_ctx, shared_buf, + thread_data->index); + + //sleep to simulate work load + const int sleep_seconds = 1; +#ifdef _WIN32 + Sleep(1000 * sleep_seconds); +#else + sleep(sleep_seconds); +#endif + + if (thread_data->index == 0) goto cleanup; + + //retrieve dependent buffer from frame sync + vmaf_framesync_retrieve_filled_data(thread_data->fs_ctx, + (void*)&dependent_buf, + thread_data->index - 1); + + for (ctr = 0; ctr < FRAME_BUF_LEN; ctr++) { + if (dependent_buf[ctr] != (thread_data->ref[ctr] + thread_data->dist[ctr])) { + fprintf(stderr, "verification error in frame index %d\n", + thread_data->index); + } + } + + //release dependent buffer from frame sync + vmaf_framesync_release_buf(thread_data->fs_ctx, dependent_buf, + thread_data->index - 1); + +cleanup: + free(thread_data->ref); + free(thread_data->dist); +} + +static char *test_framesync_create_process_and_destroy() +{ + int err, frame_index; + + VmafThreadPool *pool; + VmafFrameSyncContext *fs_ctx; + unsigned n_threads = 2; + + err = vmaf_thread_pool_create(&pool, n_threads); + mu_assert("problem during vmaf_thread_pool_init", !err); + + err = vmaf_framesync_init(&fs_ctx); + mu_assert("problem during vmaf_framesync_init", !err); + + fprintf(stderr, "\n"); + for (frame_index = 0; frame_index < NUM_TEST_FRAMES; frame_index++) { + uint8_t *pic_a = malloc(FRAME_BUF_LEN); + uint8_t *pic_b = malloc(FRAME_BUF_LEN); + + fprintf(stderr, "processing frame %d\r", frame_index); + + memset(pic_a, frame_index, FRAME_BUF_LEN); + memset(pic_b, frame_index, FRAME_BUF_LEN); + + struct ThreadData data = { + .ref = pic_a, + .dist = pic_b, + .index = frame_index, + .fs_ctx = fs_ctx, + .err = 0, + }; + + err = vmaf_thread_pool_enqueue(pool, my_worker, &data, sizeof(ThreadData)); + + mu_assert("problem during vmaf_thread_pool_enqueue with data", !err); + + //wait once in 2 frames + if ((frame_index >= 1) && (frame_index & 1)) { + err = vmaf_thread_pool_wait(pool); + mu_assert("problem during vmaf_thread_pool_wait", !err); + } + } + fprintf(stderr, "\n"); + + err = vmaf_thread_pool_wait(pool); + mu_assert("problem during vmaf_thread_pool_wait\n", !err); + err = vmaf_thread_pool_destroy(pool); + mu_assert("problem during vmaf_thread_pool_destroy\n", !err); + err = vmaf_framesync_destroy(fs_ctx); + mu_assert("problem during vmaf_framesync_destroy\n", !err); + + return NULL; +} + +char *run_tests() +{ + mu_run_test(test_framesync_create_process_and_destroy); + return NULL; +}