Skip to content

Commit

Permalink
#8264: Worker thread optimizations:
Browse files Browse the repository at this point in the history
  - Remove cross worker syncs, fastest worker now populates metadata
  - Use atomic flags to track if a tensor is populated instead of using a
    vector of booleans
  - Worker thread binding is now NUMA aware
  - Main thread + python/torch threads are also bound to cores not assigned
    to workers (more deterministic perf)
  - Improve multi-device synchronize to remove main thread overhead
  • Loading branch information
tt-asaigal committed May 30, 2024
1 parent 92a587a commit 92aafd5
Show file tree
Hide file tree
Showing 25 changed files with 972 additions and 743 deletions.
6 changes: 5 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ CHECK_COMPILERS()
find_package(Boost REQUIRED COMPONENTS thread filesystem system regex)
find_package(GTest REQUIRED)
find_package (Python3 COMPONENTS Interpreter Development)
find_library(NUMA_LIBRARY NAMES numa)
if (NOT NUMA_LIBRARY)
message(FATAL_ERROR "NUMA library not found")
endif()

############################################################################################################################
# Setting build type flags
Expand Down Expand Up @@ -84,7 +88,7 @@ set(CMAKE_INSTALL_DATAROOTDIR "${CMAKE_BINARY_DIR}/tmp/share")
############################################################################################################################
add_library(metal_common_libs INTERFACE)
target_link_libraries(metal_common_libs INTERFACE
dl z pthread atomic stdc++ # system libraries
dl z pthread atomic stdc++ numa # system libraries
Boost::thread Boost::filesystem Boost::system Boost::regex hwloc # hwloc has no cmake support, find_package won't find it
)

Expand Down
238 changes: 134 additions & 104 deletions tests/tt_eager/tensors/test_async_tensor_apis.cpp

Large diffs are not rendered by default.

129 changes: 46 additions & 83 deletions tt_eager/tensor/tensor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ Tensor::Tensor(const Storage storage, const ttnn::Shape shape, DataType dtype, L
[&](auto&& storage) {
using StorageType = std::decay_t<decltype(storage)>;
if constexpr (std::is_same_v<StorageType, OwnedStorage>) {
this->tensor_attributes->tensor_populated = {true};
this->tensor_attributes->num_shards_to_be_populated = 1;
} else if constexpr (std::is_same_v<StorageType, DeviceStorage>) {
TT_ASSERT(storage.buffer->device() != nullptr);
workers = {storage.buffer->device()};
Expand All @@ -48,9 +48,9 @@ Tensor::Tensor(const Storage storage, const ttnn::Shape shape, DataType dtype, L
if (not this->workers.at(0)->in_main_thread()) {
this->tensor_attributes->main_thread_tensor = false;
}
this->tensor_attributes->tensor_populated = {true};
this->tensor_attributes->num_shards_to_be_populated = 1;
} else if constexpr (std::is_same_v<StorageType, BorrowedStorage>) {
this->tensor_attributes->tensor_populated = {true};
this->tensor_attributes->num_shards_to_be_populated = 1;
} else if constexpr (std::is_same_v<StorageType, MultiDeviceStorage>) {
workers.reserve(storage.num_buffers());
for (int i = 0; i < storage.ordered_device_ids.size(); i++) {
Expand All @@ -68,14 +68,16 @@ Tensor::Tensor(const Storage storage, const ttnn::Shape shape, DataType dtype, L
if (not this->workers.at(0)->in_main_thread()) {
this->tensor_attributes->main_thread_tensor = false;
}
this->tensor_attributes->tensor_populated = std::vector<bool>(storage.num_buffers(), true);
this->tensor_attributes->num_shards_to_be_populated = storage.num_buffers();
} else if constexpr (std::is_same_v<StorageType, MultiDeviceHostStorage>) {
this->tensor_attributes->tensor_populated = std::vector<bool>(storage.num_buffers(), true);
this->tensor_attributes->num_shards_to_be_populated = storage.num_buffers();
} else {
raise_unsupported_storage<StorageType>();
}
},
storage);
this->tensor_attributes->num_workers_completed = this->tensor_attributes->num_shards_to_be_populated;
this->tensor_attributes->metadata_populated = true;
}

Tensor::Tensor(const Storage storage, const Shape shape, DataType dtype, Layout layout) :
Expand Down Expand Up @@ -239,45 +241,6 @@ void Tensor::perform_cleanup_for_async_mode() {
}
}

// Main Thread - Wait for all workers in this tensor to populate the entire tensor
void Tensor::wait_for_tensor_data_populated() const {
ZoneScoped;
// Stall until all the workers for this tensor
// have populated the full tensor
for (int i = 0; i < this->tensor_attributes->tensor_populated.size(); i++) {
while (true) {
std::scoped_lock<std::mutex> lock(this->tensor_attributes->populated_mutex);
if (this->tensor_attributes->tensor_populated.at(i))
break;
}
}
}

// Main Thread - Wait for the first worker in this tensor to populate the global metadata fields
void Tensor::wait_for_tensor_metadata_populated() const {
ZoneScoped;
// First worker is responsible for updating all metadata fields
// Stall until this worker is done
while (true) {
std::scoped_lock<std::mutex> lock(this->tensor_attributes->populated_mutex);
if (this->tensor_attributes->tensor_populated.at(0))
break;
};
}

// Worker Thread - Set populated flag to true, once worker has completed it's task for this tensor
void Tensor::set_populated(Device* worker) {
// If worker is not specified, set entry for all workers to true
std::scoped_lock<std::mutex> lock(this->tensor_attributes->populated_mutex);
if (not worker) {
for (int i = 0; i < this->tensor_attributes->tensor_populated.size(); i++) {
this->tensor_attributes->tensor_populated.at(i) = true;
}
} else {
this->tensor_attributes->tensor_populated.at(worker->id()) = true;
}
}

void Tensor::deepcopy(const Tensor& other) {
ZoneScoped;
// Wait until the tensor being copied is populated
Expand All @@ -288,7 +251,8 @@ void Tensor::deepcopy(const Tensor& other) {
this->set_dtype(other.get_dtype());
this->set_layout(other.get_layout());
// Set metadata populated flag for getters
this->set_populated();
this->tensor_attributes->metadata_populated = true;
this->tensor_attributes->num_workers_completed++;
}

void Tensor::populate_buffers_and_metadata(const Tensor& other) {
Expand All @@ -304,17 +268,17 @@ void Tensor::populate_buffers_and_metadata(const Tensor& other) {
using StorageType = std::decay_t<decltype(storage)>;
if constexpr (std::is_same_v<StorageType, OwnedStorage> or std::is_same_v<StorageType, DeviceStorage>) {
std::get<StorageType>(this->tensor_attributes->storage).insert_buffer(storage.get_buffer());
this->tensor_attributes->tensor_populated = {true};
} else if constexpr (
std::is_same_v<StorageType, MultiDeviceHostStorage> or
std::is_same_v<StorageType, MultiDeviceStorage>) {
std::get<StorageType>(this->tensor_attributes->storage).buffers = storage.buffers;
std::get<StorageType>(this->tensor_attributes->storage).shapes = storage.shapes;
this->tensor_attributes->tensor_populated = std::vector<bool>(storage.buffers.size(), true);
}
},
other.get_storage()); // Non blocking storage query, since this is done for tensors that get created inside the
// worker thread
this->tensor_attributes->metadata_populated = true;
this->tensor_attributes->num_workers_completed++;
}

std::vector<Device*> Tensor::get_workers(bool blocking) const {
Expand Down Expand Up @@ -484,21 +448,20 @@ Tensor Tensor::to(const std::vector<Device*>& workers, const MemoryConfig& mem_c
uint32_t num_workers = workers_to_use.size();
for (int worker_index = 0; worker_index < workers_to_use.size(); ++worker_index) {
auto& worker = workers_to_use[worker_index];
worker->push_work([worker, *this, device_tensor, mem_config, num_workers, worker_index]() mutable {
auto shard = get_shard_for_device(*this, worker, worker_index);
if (shard.storage_type() == StorageType::OWNED) {
shard = tensor_impl::to_device_wrapper(shard, worker, mem_config, std::nullopt);
}
insert_buffer_and_shape_for_device(worker, shard, device_tensor, worker_index);
if (not worker->id()) {
device_tensor.set_shape(this->get_shape());
device_tensor.set_dtype(this->get_dtype());
device_tensor.set_layout(this->get_layout());
}
if (num_workers > 1)
device_tensor.set_populated(worker);
else
device_tensor.set_populated();
worker->push_work(
[worker, *this, device_tensor, mem_config, num_workers, worker_index] () mutable {
auto shard = get_shard_for_device(*this, worker, worker_index);
if (shard.storage_type() == StorageType::OWNED) {
shard = tensor_impl::to_device_wrapper(shard, worker, mem_config, std::nullopt);
}
insert_buffer_and_shape_for_device(worker, shard, device_tensor, worker_index);
uint32_t num_workers_completed = (device_tensor.tensor_attributes->num_workers_completed)++;
if (not num_workers_completed) {
device_tensor.set_shape(this->get_shape());
device_tensor.set_dtype(this->get_dtype());
device_tensor.set_layout(this->get_layout());
device_tensor.tensor_attributes->metadata_populated = true;
}
});
}
device_tensor.tensor_attributes->update_main_thread_ref_count(workers.at(0), device_tensor_ref_count);
Expand Down Expand Up @@ -528,22 +491,18 @@ Tensor Tensor::cpu(bool blocking) const {
auto shard = get_shard_for_device(*this, target_device);
shard = tensor_impl::to_host_wrapper(shard, blocking);
insert_buffer_and_shape_for_device(target_device, shard, host_tensor, worker_index);
if (not target_device->id() or workers.size() == 1) {
uint32_t num_workers_completed = (host_tensor.tensor_attributes->num_workers_completed)++;
if (not num_workers_completed) {
host_tensor.set_shape(this->get_shape());
host_tensor.set_dtype(this->get_dtype());
host_tensor.set_layout(this->get_layout());
}
if (workers.size() == 1) {
host_tensor.set_populated();
} else {
host_tensor.set_populated(target_device);
host_tensor.tensor_attributes->metadata_populated = true;
}
});
}

if (blocking) {
for (auto target_device : workers) {
target_device->synchronize();
}
detail::SynchronizeWorkerThreads(workers);
}
// Update main_thread_ref_count for tensor after pushing to queue.
this->tensor_attributes->update_main_thread_ref_count(workers.at(0), original_tensor_ref_count);
Expand Down Expand Up @@ -611,12 +570,13 @@ Tensor Tensor::to(Layout target_layout, DeviceMesh* device_mesh) const {
auto shard = get_shard_for_device(*this, worker, worker_index);
shard = tensor_impl::to_layout_wrapper(shard, target_layout);
insert_buffer_and_shape_for_device(worker, shard, tensor_modified_layout, worker_index);
if (not(worker->id())) {
uint32_t num_workers_completed = (tensor_modified_layout.tensor_attributes->num_workers_completed)++;
if (not num_workers_completed) {
tensor_modified_layout.set_shape(this->get_shape());
tensor_modified_layout.set_dtype(this->get_dtype());
tensor_modified_layout.set_layout(target_layout);
}
tensor_modified_layout.set_populated(worker);
tensor_modified_layout.tensor_attributes->metadata_populated = true;
};
});
}
return tensor_modified_layout;
Expand Down Expand Up @@ -985,15 +945,18 @@ Tensor allocate_tensor_on_device(

for (int worker_index = 0; worker_index < num_workers; ++worker_index) {
auto& worker = workers[worker_index];
worker->push_work([shape, data_type, layout, worker, memory_config, device_tensor, worker_index]() mutable {
auto local_tensor = create_device_tensor(shape.value(), data_type, layout, worker, memory_config);
insert_buffer_and_shape_for_device(worker, local_tensor, device_tensor, worker_index);
if (not worker->id()) {
device_tensor.set_shape(ttnn::Shape(shape));
device_tensor.set_dtype(data_type);
device_tensor.set_layout(layout);
}
device_tensor.set_populated(worker);
worker->push_work(
[shape, data_type, layout, worker, memory_config, device_tensor, worker_index] () mutable {
auto local_tensor = create_device_tensor(shape.value(), data_type, layout, worker, memory_config);
insert_buffer_and_shape_for_device(worker, local_tensor, device_tensor, worker_index);

uint32_t num_workers_completed = (device_tensor.tensor_attributes->num_workers_completed)++;
if (not num_workers_completed) {
device_tensor.set_shape(ttnn::Shape(shape));
device_tensor.set_dtype(data_type);
device_tensor.set_layout(layout);
device_tensor.tensor_attributes->metadata_populated = true;
}
});
}
device_tensor.tensor_attributes->update_main_thread_ref_count(workers.at(0), device_tensor_ref_count);
Expand Down
55 changes: 41 additions & 14 deletions tt_eager/tensor/tensor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,12 @@ struct Tensor {
DataType dtype;
Layout layout;
std::mutex populated_mutex;
std::vector<bool> tensor_populated = {};
uint32_t num_shards_to_be_populated = 0;
uint32_t main_thread_ref_count = 0;
std::atomic<uint32_t> num_sibling_workers_sharing_tensor = 0;
std::atomic<bool> main_thread_tensor = true;
std::atomic<bool> metadata_populated = false;
std::atomic<int> num_workers_completed = 0;
bool deallocated = false; // Set to true if device side storage was deallocated
bool dynamic_storage = false; // Storage type can change, depending on op behaviour
bool track_ref_count = false;
Expand Down Expand Up @@ -155,7 +157,7 @@ struct Tensor {
std::get<MultiDeviceStorage>(this->tensor_attributes->storage).ordered_device_ids),
[](const Device *worker) { return worker->id(); });
}
this->tensor_attributes->tensor_populated = std::vector<bool>(workers.size(), false);
this->tensor_attributes->num_shards_to_be_populated = workers.size();
} else if (num_buffers) {
if (num_buffers == 1) {
this->tensor_attributes->storage = OwnedStorage();
Expand All @@ -167,7 +169,7 @@ struct Tensor {
std::get<MultiDeviceHostStorage>(this->tensor_attributes->storage).shapes =
std::vector<Shape>(num_buffers, this->tensor_attributes->shape.value());
}
this->tensor_attributes->tensor_populated = std::vector<bool>(num_buffers, false);
this->tensor_attributes->num_shards_to_be_populated = num_buffers;
}
}

Expand Down Expand Up @@ -286,19 +288,26 @@ struct Tensor {
const ttnn::Shape &get_shape() const;
const DataType &get_dtype() const;
const Layout &get_layout() const;

// ======================================================================================
// Non-Blocking Getters. Query attributes directly, without waiting for worker completion
// ======================================================================================
inline const Storage &storage() const { return this->tensor_attributes->storage; };
inline const Shape &legacy_shape() const { return this->tensor_attributes->shape.value(); };
inline const ttnn::Shape &shape() const { return this->tensor_attributes->shape; };
inline const DataType &dtype() const { return this->tensor_attributes->dtype; };
inline const Layout &layout() const { return this->tensor_attributes->layout; };

// ======================================================================================
// Setters
// ======================================================================================
void set_storage(const Storage &storage) { this->tensor_attributes->storage = storage; }
void set_shape(const ttnn::Shape &shape) { this->tensor_attributes->shape = shape; }
void set_dtype(const DataType &dtype) { this->tensor_attributes->dtype = dtype; }
void set_layout(const Layout &layout) { this->tensor_attributes->layout = layout; }
void set_populated(Device *worker = nullptr);
inline void set_storage(const Storage &storage) { this->tensor_attributes->storage = storage; }
inline void set_shape(const ttnn::Shape &shape) { this->tensor_attributes->shape = shape; }
inline void set_dtype(const DataType &dtype) { this->tensor_attributes->dtype = dtype; }
inline void set_layout(const Layout &layout) { this->tensor_attributes->layout = layout; }
// ======================================================================================
// Extra Helper Functions
// ======================================================================================
void wait_for_tensor_data_populated() const;
void wait_for_tensor_metadata_populated() const;
StorageType storage_type() const;
const Shape strides() const;
uint32_t volume() const;
Expand Down Expand Up @@ -355,13 +364,31 @@ struct Tensor {
static constexpr auto attribute_names = std::make_tuple("storage", "shape", "dtype", "layout");
const auto attribute_values() const {
return std::make_tuple(
std::cref(this->get_storage()),
std::cref(this->get_shape()),
std::cref(this->get_dtype()),
std::cref(this->get_layout()));
std::cref(this->tensor_attributes->storage),
std::cref(this->tensor_attributes->shape),
std::cref(this->tensor_attributes->dtype),
std::cref(this->tensor_attributes->layout));
}

std::vector<uint32_t> host_page_ordering();

// Main Thread - Wait for all workers in this tensor to populate the entire tensor
inline void wait_for_tensor_data_populated() const {
ZoneScoped;
// Stall until all the workers for this tensor
// have populated the full tensor
while (this->tensor_attributes->num_workers_completed < this->tensor_attributes->num_shards_to_be_populated) {
}
}

// Main Thread - Wait for the first worker in this tensor to populate the global metadata fields
inline void wait_for_tensor_metadata_populated() const {
ZoneScoped;
// First worker is responsible for updating all metadata fields
// Stall until this worker is done
while (not this->tensor_attributes->metadata_populated) {
}
}
};

Tensor create_device_tensor(
Expand Down
5 changes: 2 additions & 3 deletions tt_eager/tensor/tensor_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,6 @@ inline Tensor to_host(const Tensor& tensor, bool blocking = true) {
host_tensor.set_dtype(tensor.get_dtype());
host_tensor.set_layout(tensor.get_layout());
insert_buffer_and_shape_for_device(device, shard, host_tensor, device_index);
host_tensor.set_populated(device);
}
return host_tensor;
} else {
Expand Down Expand Up @@ -942,7 +941,7 @@ inline std::string to_string(const Tensor& tensor, std::optional<DataType> origi
}

if (is_tensor_on_device(tensor)) {
return to_string<T>(to_host<T>(tensor));
return to_string<T>(tensor.cpu());
}

return std::visit(
Expand Down Expand Up @@ -985,7 +984,7 @@ inline std::string to_string(const Tensor& tensor, std::optional<DataType> origi
TT_THROW("Cannot print a device tensor!");
} else if constexpr (std::is_same_v<StorageType, MultiDeviceStorage>) {
auto devices = get_devices(tensor);
auto host_tensor = to_host<T>(tensor);
auto host_tensor = tensor.cpu();
auto device_index = 0;
std::stringstream ss;
apply(host_tensor, [&](const Tensor& device_tensor) {
Expand Down
Loading

0 comments on commit 92aafd5

Please sign in to comment.