From d73bba1212be19dd8b07e0e8f591b6db2fe4189d Mon Sep 17 00:00:00 2001 From: Gabriele Oliaro Date: Fri, 26 Jan 2024 11:41:51 -0500 Subject: [PATCH] Revert "Bug fixes and update Legion version" (#1286) --- .github/workflows/gpu-ci.yml | 12 +++--- CMakeLists.txt | 8 ++-- cmake/pip_install/CMakeLists.txt | 4 +- deps/legion | 2 +- include/flexflow/mapper.h | 9 +++-- include/flexflow/model.h | 2 - include/flexflow/operator.h | 5 --- include/flexflow/request_manager.h | 1 + src/mapper/mapper.cc | 47 +++++++++++++---------- src/ops/linear.cc | 8 +++- src/runtime/inference_manager.cc | 30 ++++++++++++++- src/runtime/model.cc | 61 ------------------------------ 12 files changed, 80 insertions(+), 109 deletions(-) diff --git a/.github/workflows/gpu-ci.yml b/.github/workflows/gpu-ci.yml index 48dcda157e..3901d6b5f7 100644 --- a/.github/workflows/gpu-ci.yml +++ b/.github/workflows/gpu-ci.yml @@ -222,7 +222,7 @@ jobs: CONDA: "3" needs: inference-tests container: - image: ghcr.io/flexflow/flexflow-environment-cuda-11.8:latest + image: ghcr.io/flexflow/flexflow-environment-cuda:latest options: --gpus all --shm-size=8192m steps: - name: Install updated git version @@ -243,7 +243,7 @@ jobs: - name: Build and Install FlexFlow run: | - export PATH=$CONDA_PREFIX/bin:$PATH + export PATH=/opt/conda/bin:$PATH export FF_HOME=$(pwd) export FF_BUILD_ALL_EXAMPLES=ON export FF_BUILD_ALL_INFERENCE_EXAMPLES=ON @@ -252,18 +252,18 @@ jobs: - name: Check FlexFlow Python interface (pip) run: | - export PATH=$CONDA_PREFIX/bin:$PATH + export PATH=/opt/conda/bin:$PATH export FF_HOME=$(pwd) - export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$CONDA_PREFIX/lib + export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/opt/conda/lib ./tests/python_interface_test.sh after-installation - name: Run multi-gpu tests run: | - export PATH=$CONDA_PREFIX/bin:$PATH + export PATH=/opt/conda/bin:$PATH export CUDNN_DIR=/usr/local/cuda export CUDA_DIR=/usr/local/cuda export FF_HOME=$(pwd) - export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$CONDA_PREFIX/lib + export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/opt/conda/lib # C++ tests ./tests/cpp_gpu_tests.sh 4 # Python tests diff --git a/CMakeLists.txt b/CMakeLists.txt index 43ce4f7044..acbe7e385f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -413,7 +413,6 @@ if(NOT BUILD_LEGION_ONLY) # python related if (FF_USE_PYTHON) - find_package(Python COMPONENTS Interpreter Development) # create flexflow_cffi_header.py add_custom_command(TARGET flexflow PRE_BUILD @@ -425,13 +424,13 @@ if(NOT BUILD_LEGION_ONLY) # generate the Legion Python bindings library. When building from pip, we need to do this post-install to prevent Legion from overwriting the path to the Legion shared library add_custom_command(TARGET flexflow POST_BUILD - COMMAND ${Python_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/deps/legion/bindings/python/setup.py build --cmake-build-dir ${Legion_BINARY_DIR}/runtime --prefix ${Legion_BINARY_DIR} --build-lib=${Legion_BINARY_DIR}/bindings/python ${Legion_PYTHON_EXTRA_INSTALL_ARGS} + COMMAND ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/deps/legion/bindings/python/setup.py build --cmake-build-dir ${Legion_BINARY_DIR}/runtime --prefix ${Legion_BINARY_DIR} --build-lib=${Legion_BINARY_DIR}/bindings/python ${Legion_PYTHON_EXTRA_INSTALL_ARGS} WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/deps/legion/bindings/python ) # create flexflow_python interpreter. When building from pip, we install the FF_HOME/python/flexflow_python script instead. add_custom_command(TARGET flexflow PRE_BUILD - COMMAND ${Python_EXECUTABLE} ${FLEXFLOW_ROOT}/python/flexflow_python_build.py --build-dir ${CMAKE_BINARY_DIR} + COMMAND ${PYTHON_EXECUTABLE} ${FLEXFLOW_ROOT}/python/flexflow_python_build.py --build-dir ${CMAKE_BINARY_DIR} WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} COMMENT "Creating flexflow_python interpreter..." ) @@ -568,8 +567,7 @@ if(NOT BUILD_LEGION_ONLY) install(TARGETS flexflow DESTINATION ${LIB_DEST}) # install python if (FF_USE_PYTHON) - find_package(Python COMPONENTS Interpreter Development) - execute_process(COMMAND ${Python_EXECUTABLE} -c "import site, os; print([pkg for func in (site.getsitepackages(), site.getusersitepackages()) for pkg in ([func] if isinstance(func, str) else func) if os.access(pkg, os.W_OK)][0])" OUTPUT_VARIABLE PY_DEST OUTPUT_STRIP_TRAILING_WHITESPACE) + execute_process(COMMAND ${PYTHON_EXECUTABLE} -c "import site, os; print([pkg for func in (site.getsitepackages(), site.getusersitepackages()) for pkg in ([func] if isinstance(func, str) else func) if os.access(pkg, os.W_OK)][0])" OUTPUT_VARIABLE PY_DEST OUTPUT_STRIP_TRAILING_WHITESPACE) if (NOT FF_BUILD_FROM_PYPI) install( DIRECTORY ${FLEXFLOW_ROOT}/python/flexflow/ diff --git a/cmake/pip_install/CMakeLists.txt b/cmake/pip_install/CMakeLists.txt index 105133a310..7ce38c4abc 100644 --- a/cmake/pip_install/CMakeLists.txt +++ b/cmake/pip_install/CMakeLists.txt @@ -1,10 +1,10 @@ # Use setup.py script to re-install the Python bindings library with the right library paths if (FF_USE_PYTHON) - execute_process(COMMAND ${Python_EXECUTABLE} -c "import site, os; print([pkg for func in (site.getsitepackages(), site.getusersitepackages()) for pkg in ([func] if isinstance(func, str) else func) if os.access(pkg, os.W_OK)][0])" OUTPUT_VARIABLE PY_DEST OUTPUT_STRIP_TRAILING_WHITESPACE) + execute_process(COMMAND ${PYTHON_EXECUTABLE} -c "import site, os; print([pkg for func in (site.getsitepackages(), site.getusersitepackages()) for pkg in ([func] if isinstance(func, str) else func) if os.access(pkg, os.W_OK)][0])" OUTPUT_VARIABLE PY_DEST OUTPUT_STRIP_TRAILING_WHITESPACE) if(FF_BUILD_FROM_PYPI) install(CODE "execute_process(COMMAND ${CMAKE_COMMAND} -E echo \"Editing path to Legion library using path: ${PY_DEST}/flexflow/lib \")") # CMAKE_CURRENT_SOURCE_DIR=/usr/FlexFlow/cmake/pip_install # Legion_BINARY_DIR=/usr/FlexFlow/build//deps/legion - install(CODE "execute_process(COMMAND ${Python_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/../../deps/legion/bindings/python/setup.py install --cmake-build-dir ${Legion_BINARY_DIR}/runtime --prefix ${PY_DEST}/flexflow ${Legion_PYTHON_EXTRA_INSTALL_ARGS} WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/../../deps/legion/bindings/python)") + install(CODE "execute_process(COMMAND ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/../../deps/legion/bindings/python/setup.py install --cmake-build-dir ${Legion_BINARY_DIR}/runtime --prefix ${PY_DEST}/flexflow ${Legion_PYTHON_EXTRA_INSTALL_ARGS} WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/../../deps/legion/bindings/python)") endif() endif() diff --git a/deps/legion b/deps/legion index 24e8c45234..626b55689c 160000 --- a/deps/legion +++ b/deps/legion @@ -1 +1 @@ -Subproject commit 24e8c452341dea41427e0ce61e154d61715e6835 +Subproject commit 626b55689c77848b246e1da19678c7ad58899f0c diff --git a/include/flexflow/mapper.h b/include/flexflow/mapper.h index e8337818ec..71be1892aa 100644 --- a/include/flexflow/mapper.h +++ b/include/flexflow/mapper.h @@ -83,10 +83,11 @@ class FFMapper : public NullMapper { Task const &task, MapTaskInput const &input, MapTaskOutput &output); - virtual void replicate_task(const MapperContext ctx, - Task const &task, - ReplicateTaskInput const &input, - ReplicateTaskOutput &output); + virtual void map_replicate_task(const MapperContext ctx, + Task const &task, + MapTaskInput const &input, + MapTaskOutput const &default_output, + MapReplicateTaskOutput &output); virtual void select_task_variant(const MapperContext ctx, Task const &task, SelectVariantInput const &input, diff --git a/include/flexflow/model.h b/include/flexflow/model.h index 95be9ab581..dd6dc76b4d 100644 --- a/include/flexflow/model.h +++ b/include/flexflow/model.h @@ -202,7 +202,6 @@ enum TaskIDs { // NCCL tasks NCCL_GETUNIQUEID_TASK_ID, NCCL_INIT_COMMS_TASK_ID, - NCCL_FINISH_COMMS_TASK_ID, // Search STRATEGY_SEARCH_TASK_ID, // Graph @@ -398,7 +397,6 @@ std::vector class FFModel { public: FFModel(FFConfig &config, bool cpu_offload = false); - ~FFModel(); static constexpr float PROPAGATION_CHANCE = 0.25; static constexpr float CONTINUE_PROPAGATION_CHANCE = 0.75; diff --git a/include/flexflow/operator.h b/include/flexflow/operator.h index 1b19bdb82f..73c2c3e092 100644 --- a/include/flexflow/operator.h +++ b/include/flexflow/operator.h @@ -406,11 +406,6 @@ class Op { std::vector const ®ions, Legion::Context ctx, Legion::Runtime *runtime); - static void - finish_nccl_comms_task(Legion::Task const *task, - std::vector const ®ions, - Legion::Context ctx, - Legion::Runtime *runtime); #endif protected: void set_argumentmap_for_init(FFModel const &ff, Legion::ArgumentMap &argmap); diff --git a/include/flexflow/request_manager.h b/include/flexflow/request_manager.h index 4763eb1ef3..50a51705cd 100644 --- a/include/flexflow/request_manager.h +++ b/include/flexflow/request_manager.h @@ -55,6 +55,7 @@ class InferenceManager { public: std::unordered_map> tensor_buffer; std::unordered_map model_weights_loaders; + int num_devices; }; struct Request { diff --git a/src/mapper/mapper.cc b/src/mapper/mapper.cc index d46bfc2877..bc26a79d3e 100644 --- a/src/mapper/mapper.cc +++ b/src/mapper/mapper.cc @@ -661,37 +661,44 @@ void FFMapper::map_task(const MapperContext ctx, } // for idx } -void FFMapper::replicate_task(const MapperContext ctx, - Task const &task, - ReplicateTaskInput const &input, - ReplicateTaskOutput &output) { +void FFMapper::map_replicate_task(const MapperContext ctx, + Task const &task, + MapTaskInput const &input, + MapTaskOutput const &default_output, + MapReplicateTaskOutput &output) { // Should only be replicated for the top-level task assert((task.get_depth() == 0) && (task.regions.size() == 0)); const Processor::Kind target_kind = task.target_proc.kind(); - VariantID vid; + VariantID chosen_variant; { std::vector variant_ids; - runtime->find_valid_variants(ctx, task.task_id, variant_ids, target_kind); + runtime->find_valid_variants( + ctx, task.task_id, variant_ids, task.target_proc.kind()); // Currently assume there is exactly one variant assert(variant_ids.size() == 1); - output.chosen_variant = variant_ids[0]; + chosen_variant = variant_ids[0]; } - output.target_processors.resize(total_nodes); - std::vector handled(total_nodes, false); - size_t count = 0; - Machine::ProcessorQuery procs(machine); - procs.only_kind(target_kind); - for (Machine::ProcessorQuery::iterator it = procs.begin(); it != procs.end(); + std::vector const &all_procs = all_procs_by_kind(target_kind); + // Place on replicate on each node by default + output.task_mappings.resize(total_nodes, default_output); + // Assume default_output does not include any target_procs + assert(default_output.target_procs.size() == 0); + for (std::vector::const_iterator it = all_procs.begin(); + it != all_procs.end(); it++) { - const AddressSpace space = it->address_space(); - if (handled[space]) { - continue; + AddressSpace space = it->address_space(); + assert(space < output.task_mappings.size()); + // Add *it as a target_proc if we haven't found one + if (output.task_mappings[space].target_procs.size() == 0) { + output.task_mappings[space].target_procs.push_back(*it); } - output.target_processors[space] = *it; - handled[space] = true; - count++; } - assert(count == total_nodes); + output.control_replication_map.resize(total_nodes); + for (int idx = 0; idx < total_nodes; idx++) { + output.task_mappings[idx].chosen_variant = chosen_variant; + output.control_replication_map[idx] = + output.task_mappings[idx].target_procs[0]; + } } void FFMapper::select_task_variant(const MapperContext ctx, diff --git a/src/ops/linear.cc b/src/ops/linear.cc index 0c7a0f78fe..03c9e48af8 100644 --- a/src/ops/linear.cc +++ b/src/ops/linear.cc @@ -467,8 +467,12 @@ OpMeta *Linear::init_task_with_dim(Task const *task, ctx, runtime, false /*readOutput*/); - TensorAccessorR acc_kernel( - regions[2], task->regions[2], FID_DATA, ctx, runtime); + // TensorAccessorW acc_kernel(regions[2], + // task->regions[2], + // FID_DATA, + // ctx, + // runtime, + // false /*readOutput*/); // TensorAccessorR acc_bias( // regions[3], task->regions[3], FID_DATA, ctx, runtime); diff --git a/src/runtime/inference_manager.cc b/src/runtime/inference_manager.cc index 2a94df8b4d..6588cbceeb 100644 --- a/src/runtime/inference_manager.cc +++ b/src/runtime/inference_manager.cc @@ -28,7 +28,33 @@ using namespace Legion; LegionRuntime::Logger::Category log_inf_mgr("InferenceManager"); LegionRuntime::Logger::Category log_offload("Offloading"); -InferenceManager::InferenceManager() {} +InferenceManager::InferenceManager() { +#ifdef DEADCODE + num_devices = ff_config.workersPerNode * ff_config.numNodes; + // Check parallelization degrees + assert(ff_config.data_parallelism_degree <= num_devices && + "Data parallelism degree exceeds number of available devices"); + assert(num_devices % ff_config.data_parallelism_degree == 0 && + "Number of available devices is not divisible by data parallelism " + "degree"); + assert(ff_config.tensor_parallelism_degree <= num_devices && + "Tensor parallelism degree exceeds number of available devices"); + assert(num_devices % ff_config.tensor_parallelism_degree == 0 && + "Number of available devices is not divisible by tensor parallelism " + "degree"); + assert(ff_config.pipeline_parallelism_degree <= num_devices && + "Pipeline parallelism degree exceeds number of available devices"); + assert(num_devices % ff_config.pipeline_parallelism_degree == 0 && + "Number of available devices is not divisible by pipeline parallelism " + "degree"); + assert(ff_config.data_parallelism_degree * + ff_config.tensor_parallelism_degree * + ff_config.pipeline_parallelism_degree == + num_devices && + "Product of data, tensor, and pipeline parallelism degrees does not " + "match the number of available devices"); +#endif +} InferenceManager *inference_manager_singleton = nullptr; @@ -270,6 +296,8 @@ void InferenceManager::compile_model_and_allocate_buffer(FFModel *model) { void InferenceManager::init_operators_inference(FFModel *model) { for (int batch_index = 0; batch_index < model->config.data_parallelism_degree; batch_index++) { + int expert_device_index = 0; + int device_index = batch_index % num_devices; for (size_t o = 0; o < model->operators.size(); o++) { Op *op = model->operators[o]; if (op->op_type == OP_WEIGHT) { diff --git a/src/runtime/model.cc b/src/runtime/model.cc index f9763627c8..c07c33efca 100644 --- a/src/runtime/model.cc +++ b/src/runtime/model.cc @@ -606,15 +606,6 @@ ncclComm_t Op::init_nccl_comms_task(Task const *task, // ncclComm, allRanks, myRank, ncclId); return ncclComm; } - -void Op::finish_nccl_comms_task(Task const *task, - std::vector const ®ions, - Context ctx, - Runtime *runtime) { - ncclComm_t comm = *((ncclComm_t *)task->local_args); - checkNCCL(ncclCommFinalize(comm)); - checkNCCL(ncclCommDestroy(comm)); -} #endif /** @@ -1587,43 +1578,6 @@ FFModel::FFModel(FFConfig &_config, bool cpu_offload) model_id = model_counter++; } -FFModel::~FFModel() { - // Destroy nccl communication groups -#ifdef FF_USE_NCCL - Context ctx = config.lg_ctx; - Runtime *runtime = config.lg_hlr; - for (auto const &comm : view_hash_to_nccl_comms) { - // Find the machine view that has the hash - MachineView view; - for (size_t l = 0; l < operators.size(); l++) { - view = operators[l]->outputs[0]->machine_view; - if (view.hash() == comm.first) { - break; - } - } - assert(view.hash() == comm.first && "Cannot find the machine view"); - IndexSpace task_is = get_or_create_task_is(view); - Domain domain = runtime->get_index_space_domain(ctx, task_is); - ArgumentMap argmap; - int idx = 0; - for (Domain::DomainPointIterator it(domain); it; it++, idx++) { - argmap.set_point(*it, - TaskArgument(&comm.second[idx], sizeof(ncclComm_t))); - } - IndexLauncher index_launcher(NCCL_FINISH_COMMS_TASK_ID, - task_is, - TaskArgument(nullptr, 0), - argmap, - Predicate::TRUE_PRED, - false /*must*/, - 0 /*mapper_id*/, - comm.first); - FutureMap fm = runtime->execute_index_space(ctx, index_launcher); - fm.wait_all_results(); - } -#endif -} - void FFModel::clear_graph_search_cache() { this->graph_search->clear_cache(); this->search->clear_cache(); @@ -6899,21 +6853,6 @@ void register_flexflow_internal_tasks(Runtime *runtime, registrar); } } - { - TaskVariantRegistrar registrar(NCCL_FINISH_COMMS_TASK_ID, - "NCCL Finish Communicators"); - registrar.add_constraint(ProcessorConstraint(Processor::TOC_PROC)); - registrar.set_leaf(); - if (pre_register) { - Runtime::preregister_task_variant( - registrar, "NCCL Finish Communicators Task"); - } else { - if (enable_control_replication) { - registrar.global_registration = false; - } - runtime->register_task_variant(registrar); - } - } #endif // Search {