Skip to content

Commit

Permalink
Bug fixes and update Legion version (#1287)
Browse files Browse the repository at this point in the history
* bug fixes and update Legion version

* fix

* bug fix

* update legion

* fix arithmetic error due to num_devices uninitialized

* update legion version

* update ci

* fix

* debugging ci

* Revert "debugging ci"

This reverts commit 0b3148e.

* update mapper interface

* add ncclFinalize

* Only delete nccl communications for training jobs

---------

Co-authored-by: Zhihao Jia <[email protected]>
  • Loading branch information
goliaro and jiazhihao authored Jan 27, 2024
1 parent abf9fb8 commit d21ed66
Show file tree
Hide file tree
Showing 12 changed files with 111 additions and 80 deletions.
12 changes: 6 additions & 6 deletions .github/workflows/gpu-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ jobs:
CONDA: "3"
needs: inference-tests
container:
image: ghcr.io/flexflow/flexflow-environment-cuda:latest
image: ghcr.io/flexflow/flexflow-environment-cuda-11.8:latest
options: --gpus all --shm-size=8192m
steps:
- name: Install updated git version
Expand All @@ -243,7 +243,7 @@ jobs:

- name: Build and Install FlexFlow
run: |
export PATH=/opt/conda/bin:$PATH
export PATH=$CONDA_PREFIX/bin:$PATH
export FF_HOME=$(pwd)
export FF_BUILD_ALL_EXAMPLES=ON
export FF_BUILD_ALL_INFERENCE_EXAMPLES=ON
Expand All @@ -252,18 +252,18 @@ jobs:
- name: Check FlexFlow Python interface (pip)
run: |
export PATH=/opt/conda/bin:$PATH
export PATH=$CONDA_PREFIX/bin:$PATH
export FF_HOME=$(pwd)
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/opt/conda/lib
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$CONDA_PREFIX/lib
./tests/python_interface_test.sh after-installation
- name: Run multi-gpu tests
run: |
export PATH=/opt/conda/bin:$PATH
export PATH=$CONDA_PREFIX/bin:$PATH
export CUDNN_DIR=/usr/local/cuda
export CUDA_DIR=/usr/local/cuda
export FF_HOME=$(pwd)
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/opt/conda/lib
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$CONDA_PREFIX/lib
# C++ tests
./tests/cpp_gpu_tests.sh 4
# Python tests
Expand Down
8 changes: 5 additions & 3 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,7 @@ if(NOT BUILD_LEGION_ONLY)

# python related
if (FF_USE_PYTHON)
find_package(Python COMPONENTS Interpreter Development)
# create flexflow_cffi_header.py
add_custom_command(TARGET flexflow
PRE_BUILD
Expand All @@ -424,13 +425,13 @@ if(NOT BUILD_LEGION_ONLY)
# generate the Legion Python bindings library. When building from pip, we need to do this post-install to prevent Legion from overwriting the path to the Legion shared library
add_custom_command(TARGET flexflow
POST_BUILD
COMMAND ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/deps/legion/bindings/python/setup.py build --cmake-build-dir ${Legion_BINARY_DIR}/runtime --prefix ${Legion_BINARY_DIR} --build-lib=${Legion_BINARY_DIR}/bindings/python ${Legion_PYTHON_EXTRA_INSTALL_ARGS}
COMMAND ${Python_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/deps/legion/bindings/python/setup.py build --cmake-build-dir ${Legion_BINARY_DIR}/runtime --prefix ${Legion_BINARY_DIR} --build-lib=${Legion_BINARY_DIR}/bindings/python ${Legion_PYTHON_EXTRA_INSTALL_ARGS}
WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/deps/legion/bindings/python
)
# create flexflow_python interpreter. When building from pip, we install the FF_HOME/python/flexflow_python script instead.
add_custom_command(TARGET flexflow
PRE_BUILD
COMMAND ${PYTHON_EXECUTABLE} ${FLEXFLOW_ROOT}/python/flexflow_python_build.py --build-dir ${CMAKE_BINARY_DIR}
COMMAND ${Python_EXECUTABLE} ${FLEXFLOW_ROOT}/python/flexflow_python_build.py --build-dir ${CMAKE_BINARY_DIR}
WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}
COMMENT "Creating flexflow_python interpreter..."
)
Expand Down Expand Up @@ -567,7 +568,8 @@ if(NOT BUILD_LEGION_ONLY)
install(TARGETS flexflow DESTINATION ${LIB_DEST})
# install python
if (FF_USE_PYTHON)
execute_process(COMMAND ${PYTHON_EXECUTABLE} -c "import site, os; print([pkg for func in (site.getsitepackages(), site.getusersitepackages()) for pkg in ([func] if isinstance(func, str) else func) if os.access(pkg, os.W_OK)][0])" OUTPUT_VARIABLE PY_DEST OUTPUT_STRIP_TRAILING_WHITESPACE)
find_package(Python COMPONENTS Interpreter Development)
execute_process(COMMAND ${Python_EXECUTABLE} -c "import site, os; print([pkg for func in (site.getsitepackages(), site.getusersitepackages()) for pkg in ([func] if isinstance(func, str) else func) if os.access(pkg, os.W_OK)][0])" OUTPUT_VARIABLE PY_DEST OUTPUT_STRIP_TRAILING_WHITESPACE)
if (NOT FF_BUILD_FROM_PYPI)
install(
DIRECTORY ${FLEXFLOW_ROOT}/python/flexflow/
Expand Down
4 changes: 2 additions & 2 deletions cmake/pip_install/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
# Use setup.py script to re-install the Python bindings library with the right library paths
if (FF_USE_PYTHON)
execute_process(COMMAND ${PYTHON_EXECUTABLE} -c "import site, os; print([pkg for func in (site.getsitepackages(), site.getusersitepackages()) for pkg in ([func] if isinstance(func, str) else func) if os.access(pkg, os.W_OK)][0])" OUTPUT_VARIABLE PY_DEST OUTPUT_STRIP_TRAILING_WHITESPACE)
execute_process(COMMAND ${Python_EXECUTABLE} -c "import site, os; print([pkg for func in (site.getsitepackages(), site.getusersitepackages()) for pkg in ([func] if isinstance(func, str) else func) if os.access(pkg, os.W_OK)][0])" OUTPUT_VARIABLE PY_DEST OUTPUT_STRIP_TRAILING_WHITESPACE)
if(FF_BUILD_FROM_PYPI)
install(CODE "execute_process(COMMAND ${CMAKE_COMMAND} -E echo \"Editing path to Legion library using path: ${PY_DEST}/flexflow/lib \")")
# CMAKE_CURRENT_SOURCE_DIR=/usr/FlexFlow/cmake/pip_install
# Legion_BINARY_DIR=/usr/FlexFlow/build/<something>/deps/legion
install(CODE "execute_process(COMMAND ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/../../deps/legion/bindings/python/setup.py install --cmake-build-dir ${Legion_BINARY_DIR}/runtime --prefix ${PY_DEST}/flexflow ${Legion_PYTHON_EXTRA_INSTALL_ARGS} WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/../../deps/legion/bindings/python)")
install(CODE "execute_process(COMMAND ${Python_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/../../deps/legion/bindings/python/setup.py install --cmake-build-dir ${Legion_BINARY_DIR}/runtime --prefix ${PY_DEST}/flexflow ${Legion_PYTHON_EXTRA_INSTALL_ARGS} WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/../../deps/legion/bindings/python)")
endif()
endif()
2 changes: 1 addition & 1 deletion deps/legion
Submodule legion updated from 626b55 to 24e8c4
9 changes: 4 additions & 5 deletions include/flexflow/mapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,10 @@ class FFMapper : public NullMapper {
Task const &task,
MapTaskInput const &input,
MapTaskOutput &output);
virtual void map_replicate_task(const MapperContext ctx,
Task const &task,
MapTaskInput const &input,
MapTaskOutput const &default_output,
MapReplicateTaskOutput &output);
virtual void replicate_task(const MapperContext ctx,
Task const &task,
ReplicateTaskInput const &input,
ReplicateTaskOutput &output);
virtual void select_task_variant(const MapperContext ctx,
Task const &task,
SelectVariantInput const &input,
Expand Down
2 changes: 2 additions & 0 deletions include/flexflow/model.h
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ enum TaskIDs {
// NCCL tasks
NCCL_GETUNIQUEID_TASK_ID,
NCCL_INIT_COMMS_TASK_ID,
NCCL_FINISH_COMMS_TASK_ID,
// Search
STRATEGY_SEARCH_TASK_ID,
// Graph
Expand Down Expand Up @@ -397,6 +398,7 @@ std::vector<ParallelTensorShape>
class FFModel {
public:
FFModel(FFConfig &config, bool cpu_offload = false);
~FFModel();

static constexpr float PROPAGATION_CHANCE = 0.25;
static constexpr float CONTINUE_PROPAGATION_CHANCE = 0.75;
Expand Down
5 changes: 5 additions & 0 deletions include/flexflow/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,11 @@ class Op {
std::vector<Legion::PhysicalRegion> const &regions,
Legion::Context ctx,
Legion::Runtime *runtime);
static void
finish_nccl_comms_task(Legion::Task const *task,
std::vector<Legion::PhysicalRegion> const &regions,
Legion::Context ctx,
Legion::Runtime *runtime);
#endif
protected:
void set_argumentmap_for_init(FFModel const &ff, Legion::ArgumentMap &argmap);
Expand Down
1 change: 0 additions & 1 deletion include/flexflow/request_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ class InferenceManager {
public:
std::unordered_map<ParallelTensor, std::vector<ParallelTensor>> tensor_buffer;
std::unordered_map<FFModel *, FileDataLoader *> model_weights_loaders;
int num_devices;
};

struct Request {
Expand Down
47 changes: 20 additions & 27 deletions src/mapper/mapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -661,44 +661,37 @@ void FFMapper::map_task(const MapperContext ctx,
} // for idx
}

void FFMapper::map_replicate_task(const MapperContext ctx,
Task const &task,
MapTaskInput const &input,
MapTaskOutput const &default_output,
MapReplicateTaskOutput &output) {
void FFMapper::replicate_task(const MapperContext ctx,
Task const &task,
ReplicateTaskInput const &input,
ReplicateTaskOutput &output) {
// Should only be replicated for the top-level task
assert((task.get_depth() == 0) && (task.regions.size() == 0));
const Processor::Kind target_kind = task.target_proc.kind();
VariantID chosen_variant;
VariantID vid;
{
std::vector<VariantID> variant_ids;
runtime->find_valid_variants(
ctx, task.task_id, variant_ids, task.target_proc.kind());
runtime->find_valid_variants(ctx, task.task_id, variant_ids, target_kind);
// Currently assume there is exactly one variant
assert(variant_ids.size() == 1);
chosen_variant = variant_ids[0];
output.chosen_variant = variant_ids[0];
}
std::vector<Processor> const &all_procs = all_procs_by_kind(target_kind);
// Place on replicate on each node by default
output.task_mappings.resize(total_nodes, default_output);
// Assume default_output does not include any target_procs
assert(default_output.target_procs.size() == 0);
for (std::vector<Processor>::const_iterator it = all_procs.begin();
it != all_procs.end();
output.target_processors.resize(total_nodes);
std::vector<bool> handled(total_nodes, false);
size_t count = 0;
Machine::ProcessorQuery procs(machine);
procs.only_kind(target_kind);
for (Machine::ProcessorQuery::iterator it = procs.begin(); it != procs.end();
it++) {
AddressSpace space = it->address_space();
assert(space < output.task_mappings.size());
// Add *it as a target_proc if we haven't found one
if (output.task_mappings[space].target_procs.size() == 0) {
output.task_mappings[space].target_procs.push_back(*it);
const AddressSpace space = it->address_space();
if (handled[space]) {
continue;
}
output.target_processors[space] = *it;
handled[space] = true;
count++;
}
output.control_replication_map.resize(total_nodes);
for (int idx = 0; idx < total_nodes; idx++) {
output.task_mappings[idx].chosen_variant = chosen_variant;
output.control_replication_map[idx] =
output.task_mappings[idx].target_procs[0];
}
assert(count == total_nodes);
}

void FFMapper::select_task_variant(const MapperContext ctx,
Expand Down
8 changes: 2 additions & 6 deletions src/ops/linear.cc
Original file line number Diff line number Diff line change
Expand Up @@ -467,12 +467,8 @@ OpMeta *Linear::init_task_with_dim(Task const *task,
ctx,
runtime,
false /*readOutput*/);
// TensorAccessorW<WT, NDIM> acc_kernel(regions[2],
// task->regions[2],
// FID_DATA,
// ctx,
// runtime,
// false /*readOutput*/);
TensorAccessorR<WT, NDIM> acc_kernel(
regions[2], task->regions[2], FID_DATA, ctx, runtime);

// TensorAccessorR<float, 1> acc_bias(
// regions[3], task->regions[3], FID_DATA, ctx, runtime);
Expand Down
30 changes: 1 addition & 29 deletions src/runtime/inference_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,33 +28,7 @@ using namespace Legion;
LegionRuntime::Logger::Category log_inf_mgr("InferenceManager");
LegionRuntime::Logger::Category log_offload("Offloading");

InferenceManager::InferenceManager() {
#ifdef DEADCODE
num_devices = ff_config.workersPerNode * ff_config.numNodes;
// Check parallelization degrees
assert(ff_config.data_parallelism_degree <= num_devices &&
"Data parallelism degree exceeds number of available devices");
assert(num_devices % ff_config.data_parallelism_degree == 0 &&
"Number of available devices is not divisible by data parallelism "
"degree");
assert(ff_config.tensor_parallelism_degree <= num_devices &&
"Tensor parallelism degree exceeds number of available devices");
assert(num_devices % ff_config.tensor_parallelism_degree == 0 &&
"Number of available devices is not divisible by tensor parallelism "
"degree");
assert(ff_config.pipeline_parallelism_degree <= num_devices &&
"Pipeline parallelism degree exceeds number of available devices");
assert(num_devices % ff_config.pipeline_parallelism_degree == 0 &&
"Number of available devices is not divisible by pipeline parallelism "
"degree");
assert(ff_config.data_parallelism_degree *
ff_config.tensor_parallelism_degree *
ff_config.pipeline_parallelism_degree ==
num_devices &&
"Product of data, tensor, and pipeline parallelism degrees does not "
"match the number of available devices");
#endif
}
InferenceManager::InferenceManager() {}

InferenceManager *inference_manager_singleton = nullptr;

Expand Down Expand Up @@ -296,8 +270,6 @@ void InferenceManager::compile_model_and_allocate_buffer(FFModel *model) {
void InferenceManager::init_operators_inference(FFModel *model) {
for (int batch_index = 0; batch_index < model->config.data_parallelism_degree;
batch_index++) {
int expert_device_index = 0;
int device_index = batch_index % num_devices;
for (size_t o = 0; o < model->operators.size(); o++) {
Op *op = model->operators[o];
if (op->op_type == OP_WEIGHT) {
Expand Down
63 changes: 63 additions & 0 deletions src/runtime/model.cc
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,15 @@ ncclComm_t Op::init_nccl_comms_task(Task const *task,
// ncclComm, allRanks, myRank, ncclId);
return ncclComm;
}

void Op::finish_nccl_comms_task(Task const *task,
std::vector<PhysicalRegion> const &regions,
Context ctx,
Runtime *runtime) {
ncclComm_t comm = *((ncclComm_t *)task->local_args);
checkNCCL(ncclCommFinalize(comm));
checkNCCL(ncclCommDestroy(comm));
}
#endif

/**
Expand Down Expand Up @@ -1578,6 +1587,45 @@ FFModel::FFModel(FFConfig &_config, bool cpu_offload)
model_id = model_counter++;
}

FFModel::~FFModel() {
// Destroy nccl communication groups
#ifdef FF_USE_NCCL
if (config.computationMode == COMP_MODE_TRAINING) {
Context ctx = config.lg_ctx;
Runtime *runtime = config.lg_hlr;
for (auto const &comm : view_hash_to_nccl_comms) {
// Find the machine view that has the hash
MachineView view;
for (size_t l = 0; l < operators.size(); l++) {
view = operators[l]->outputs[0]->machine_view;
if (view.hash() == comm.first) {
break;
}
}
assert(view.hash() == comm.first && "Cannot find the machine view");
IndexSpace task_is = get_or_create_task_is(view);
Domain domain = runtime->get_index_space_domain(ctx, task_is);
ArgumentMap argmap;
int idx = 0;
for (Domain::DomainPointIterator it(domain); it; it++, idx++) {
argmap.set_point(*it,
TaskArgument(&comm.second[idx], sizeof(ncclComm_t)));
}
IndexLauncher index_launcher(NCCL_FINISH_COMMS_TASK_ID,
task_is,
TaskArgument(nullptr, 0),
argmap,
Predicate::TRUE_PRED,
false /*must*/,
0 /*mapper_id*/,
comm.first);
FutureMap fm = runtime->execute_index_space(ctx, index_launcher);
fm.wait_all_results();
}
}
#endif
}

void FFModel::clear_graph_search_cache() {
this->graph_search->clear_cache();
this->search->clear_cache();
Expand Down Expand Up @@ -6853,6 +6901,21 @@ void register_flexflow_internal_tasks(Runtime *runtime,
registrar);
}
}
{
TaskVariantRegistrar registrar(NCCL_FINISH_COMMS_TASK_ID,
"NCCL Finish Communicators");
registrar.add_constraint(ProcessorConstraint(Processor::TOC_PROC));
registrar.set_leaf();
if (pre_register) {
Runtime::preregister_task_variant<Op::finish_nccl_comms_task>(
registrar, "NCCL Finish Communicators Task");
} else {
if (enable_control_replication) {
registrar.global_registration = false;
}
runtime->register_task_variant<Op::finish_nccl_comms_task>(registrar);
}
}
#endif
// Search
{
Expand Down

0 comments on commit d21ed66

Please sign in to comment.