Skip to content

Commit

Permalink
#1003 Make ecs_run_pipeline multi-threaded
Browse files Browse the repository at this point in the history
* Add support for running multi-threaded custom pipelines using worker threads

* Add tests for multi-threaded pipeline

* Add missing tests to test/cpp_api/project.json

* Move ecs_pipeline_state_t typedef to private_types.h

* Remove calls to ecs_set_threads from ecs_set_pipeline

* Remove todo's about system time

* Move ecs_pipeline_state_t* from ecs_stage_t to ecs_world_t
  • Loading branch information
ZeroErrors authored Jul 18, 2023
1 parent 132764e commit b5241cc
Show file tree
Hide file tree
Showing 11 changed files with 431 additions and 496 deletions.
401 changes: 158 additions & 243 deletions flecs.c

Large diffs are not rendered by default.

3 changes: 0 additions & 3 deletions flecs.h
Original file line number Diff line number Diff line change
Expand Up @@ -10550,9 +10550,6 @@ void ecs_reset_clock(
* default pipeline (either the builtin pipeline or the pipeline set with
* set_pipeline()). An application may run additional pipelines.
*
* Note: calling this function from an application currently only works in
* single threaded applications with a single stage.
*
* @param world The world.
* @param pipeline The pipeline to run.
*/
Expand Down
3 changes: 0 additions & 3 deletions include/flecs/addons/pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,6 @@ void ecs_reset_clock(
* default pipeline (either the builtin pipeline or the pipeline set with
* set_pipeline()). An application may run additional pipelines.
*
* Note: calling this function from an application currently only works in
* single threaded applications with a single stage.
*
* @param world The world.
* @param pipeline The pipeline to run.
*/
Expand Down
181 changes: 112 additions & 69 deletions src/addons/pipeline/pipeline.c
Original file line number Diff line number Diff line change
Expand Up @@ -508,10 +508,75 @@ void ecs_run_pipeline(
pipeline = world->pipeline;
}

EcsPipeline *pq = (EcsPipeline*)ecs_get(world, pipeline, EcsPipeline);
flecs_pipeline_update(world, pq->state, true);
flecs_run_pipeline((ecs_world_t*)flecs_stage_from_world(&world),
pq->state, delta_time);
/* create any worker task threads request */
if (ecs_using_task_threads(world))
{
flecs_create_worker_threads(world);
}

EcsPipeline *p = (EcsPipeline*)ecs_get(world, pipeline, EcsPipeline);
flecs_workers_progress(world, p->state, delta_time);

if (ecs_using_task_threads(world))
{
/* task threads were temporary and may now be joined */
flecs_join_worker_threads(world);
}
}

int32_t flecs_run_pipeline_ops(
ecs_world_t* world,
ecs_stage_t* stage,
int32_t stage_index,
int32_t stage_count,
ecs_ftime_t delta_time,
bool main_thread)
{
ecs_pipeline_state_t* pq = world->pq;
ecs_pipeline_op_t* op = pq->cur_op;
int32_t i = pq->cur_i;

int32_t count = ecs_vec_count(&pq->systems);
ecs_entity_t* systems = ecs_vec_first_t(&pq->systems, ecs_entity_t);
int32_t ran_since_merge = i - op->offset;

for (; i < count; i++) {
/* Run system if:
* - this is the main thread, or if
* - the system is multithreaded
*/
if (main_thread || op->multi_threaded) {
ecs_entity_t system = systems[i];
const EcsPoly* poly = ecs_get_pair(world, system, EcsPoly, EcsSystem);
ecs_assert(poly != NULL, ECS_INTERNAL_ERROR, NULL);
ecs_system_t* sys = ecs_poly(poly->poly, ecs_system_t);

/* Keep track of the last frame for which the system has ran, so we
* know from where to resume the schedule in case the schedule
* changes during a merge. */
sys->last_frame = world->info.frame_count_total + 1;

ecs_stage_t* s = NULL;
if (!op->no_readonly) {
/* If system is no_readonly it operates on the actual world, not
* the stage. Only pass stage to system if it's readonly. */
s = stage;
}

ecs_run_intern(world, s, system, sys, stage_index,
stage_count, delta_time, 0, 0, NULL);
}

world->info.systems_ran_frame++;
ran_since_merge++;

if (ran_since_merge == op->count) {
/* Merge */
break;
}
}

return i;
}

void flecs_run_pipeline(
Expand All @@ -528,82 +593,67 @@ void flecs_run_pipeline(
int32_t stage_index = ecs_get_stage_id(stage->thread_ctx);
int32_t stage_count = ecs_get_stage_count(world);

if (!flecs_worker_begin(world, stage, pq, true)) {
return;
}
ecs_assert(!stage_index, ECS_INVALID_OPERATION, NULL);

ecs_time_t st = {0};
bool main_thread = !stage_index;
bool measure_time = main_thread && (world->flags & EcsWorldMeasureSystemTime);
ecs_pipeline_op_t *op = ecs_vec_first_t(&pq->ops, ecs_pipeline_op_t);
int32_t i = 0;
bool multi_threaded = ecs_get_stage_count(world) > 1;;

do {
int32_t count = ecs_vec_count(&pq->systems);
ecs_entity_t *systems = ecs_vec_first_t(&pq->systems, ecs_entity_t);
int32_t ran_since_merge = i - op->offset;
// Update the pipeline the workers will execute
world->pq = pq;

if (i == count) {
break;
}
// Update the pipeline before waking the workers.
flecs_pipeline_update(world, pq, true);

if (measure_time) {
ecs_time_measure(&st);
// If there are no operations to execute in the pipeline bail early,
// no need to wake the workers since they have nothing to do.
while (pq->cur_op != NULL) {
if (pq->cur_i == ecs_vec_count(&pq->systems)) {
flecs_pipeline_update(world, pq, false);
continue;
}

for (; i < count; i ++) {
/* Run system if:
* - this is the main thread, or if
* - the system is multithreaded
*/
if (main_thread || op->multi_threaded) {
ecs_entity_t system = systems[i];
const EcsPoly *poly = ecs_get_pair(world, system, EcsPoly, EcsSystem);
ecs_assert(poly != NULL, ECS_INTERNAL_ERROR, NULL);
ecs_system_t *sys = ecs_poly(poly->poly, ecs_system_t);

/* Keep track of the last frame for which the system has ran, so we
* know from where to resume the schedule in case the schedule
* changes during a merge. */
sys->last_frame = world->info.frame_count_total + 1;

ecs_stage_t *s = NULL;
if (!op->no_readonly) {
/* If system is no_readonly it operates on the actual world, not
* the stage. Only pass stage to system if it's readonly. */
s = stage;
}
bool no_readonly = pq->cur_op->no_readonly;
bool op_multi_threaded = multi_threaded && pq->cur_op->multi_threaded;

ecs_run_intern(world, s, system, sys, stage_index,
stage_count, delta_time, 0, 0, NULL);
}
pq->no_readonly = no_readonly;

world->info.systems_ran_frame ++;
ran_since_merge ++;
if (!no_readonly) {
ecs_readonly_begin(world);
}

if (ran_since_merge == op->count) {
/* Merge */
break;
}
ECS_BIT_COND(world->flags, EcsWorldMultiThreaded, op_multi_threaded);
ecs_assert(world->workers_waiting == 0, ECS_INTERNAL_ERROR, NULL);

if (op_multi_threaded) {
flecs_signal_workers(world);
}

ecs_time_t st = { 0 };
bool measure_time = world->flags & EcsWorldMeasureSystemTime;
if (measure_time) {
ecs_time_measure(&st);
}

const int32_t i = flecs_run_pipeline_ops(world, stage, stage_index, stage_count, delta_time, true);

if (measure_time) {
/* Don't include merge time in system time */
world->info.system_time_total +=
(ecs_ftime_t)ecs_time_measure(&st);
world->info.system_time_total += (ecs_ftime_t)ecs_time_measure(&st);
}

/* Synchronize workers, rebuild pipeline if necessary. Pass current op
* and system index to function, so we know where to resume from. */
} while (flecs_worker_sync(world, stage, pq, &op, &i));
if (op_multi_threaded) {
flecs_wait_for_sync(world);
}

if (measure_time) {
world->info.system_time_total += (ecs_ftime_t)ecs_time_measure(&st);
}
if (!no_readonly) {
ecs_readonly_end(world);
}

flecs_worker_end(world, stage);
/* Store the current state of the schedule after we synchronized the
* threads, to avoid race conditions. */
pq->cur_i = i;

return;
flecs_pipeline_update(world, pq, false);
}
}

static
Expand Down Expand Up @@ -721,14 +771,7 @@ void ecs_set_pipeline(
ecs_check( ecs_get(world, pipeline, EcsPipeline) != NULL,
ECS_INVALID_PARAMETER, "not a pipeline");

int32_t thread_count = ecs_get_stage_count(world);
if (thread_count > 1) {
ecs_set_threads(world, 1);
}
world->pipeline = pipeline;
if (thread_count > 1) {
ecs_set_threads(world, thread_count);
}
error:
return;
}
Expand Down
35 changes: 16 additions & 19 deletions src/addons/pipeline/pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ typedef struct ecs_pipeline_op_t {
bool no_readonly; /* Whether systems are staged or not */
} ecs_pipeline_op_t;

typedef struct ecs_pipeline_state_t {
struct ecs_pipeline_state_t {
ecs_query_t *query; /* Pipeline query */
ecs_vec_t ops; /* Pipeline schedule */
ecs_vec_t systems; /* Vector with system ids */
Expand All @@ -35,7 +35,7 @@ typedef struct ecs_pipeline_state_t {
int32_t cur_i; /* Index in current result */
int32_t ran_since_merge; /* Index in current op */
bool no_readonly; /* Is pipeline in readonly mode */
} ecs_pipeline_state_t;
};

typedef struct EcsPipeline {
/* Stable ptr so threads can safely access while entity/components move */
Expand All @@ -56,27 +56,18 @@ void flecs_run_pipeline(
ecs_pipeline_state_t *pq,
ecs_ftime_t delta_time);

int32_t flecs_run_pipeline_ops(
ecs_world_t* world,
ecs_stage_t* stage,
int32_t stage_index,
int32_t stage_count,
ecs_ftime_t delta_time,
bool main_thread);

////////////////////////////////////////////////////////////////////////////////
//// Worker API
////////////////////////////////////////////////////////////////////////////////

bool flecs_worker_begin(
ecs_world_t *world,
ecs_stage_t *stage,
ecs_pipeline_state_t *pq,
bool start_of_frame);

void flecs_worker_end(
ecs_world_t *world,
ecs_stage_t *stage);

bool flecs_worker_sync(
ecs_world_t *world,
ecs_stage_t *stage,
ecs_pipeline_state_t *pq,
ecs_pipeline_op_t **cur_op,
int32_t *cur_i);

void flecs_workers_progress(
ecs_world_t *world,
ecs_pipeline_state_t *pq,
Expand All @@ -88,4 +79,10 @@ void flecs_create_worker_threads(
bool flecs_join_worker_threads(
ecs_world_t *world);

void flecs_signal_workers(
ecs_world_t *world);

void flecs_wait_for_sync(
ecs_world_t *world);

#endif
Loading

0 comments on commit b5241cc

Please sign in to comment.