Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
goliaro committed Nov 14, 2024
1 parent 366e7db commit 58e0061
Show file tree
Hide file tree
Showing 6 changed files with 131 additions and 43 deletions.
25 changes: 21 additions & 4 deletions include/flexflow/utils/file_loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

using namespace std;
using namespace FlexFlow;
using namespace Legion;

class FileDataLoader {
public:
Expand All @@ -36,9 +37,22 @@ class FileDataLoader {
BatchConfig::TokenId *generate_requests(int num, int length);

template <typename DT>
void load_single_weight_tensor(FFModel *ff, Layer *l, int weight_idx);
void load_single_weight_tensor(FFModel *ff,
Layer *l,
int weight_idx,
size_t volume,
size_t num_replicas,
DT *weight,
Domain weight_domain);

void load_quantization_weight(FFModel *ff, Layer *l, int weight_idx);
void load_quantization_weight(FFModel *ff,
Layer *l,
int weight_idx,
size_t volume,
size_t num_replicas,
char *weight,
DataType data_type,
Domain weight_domain);

static void
load_weight_task(Legion::Task const *task,
Expand Down Expand Up @@ -68,12 +82,15 @@ struct WeightLoadTaskArgs {
FileDataLoader *loader;
Layer *layer;
int weight_idx;
size_t volume, num_replicas;
DataType data_type;
WeightLoadTaskArgs(FFModel *_ff,
FileDataLoader *_loader,
Layer *_l,
int _idx,
size_t _volume,
size_t _num_replicas,
DataType _data_type)
: ff(_ff), loader(_loader), layer(_l), weight_idx(_idx),
data_type(_data_type) {}
: ff(_ff), loader(_loader), layer(_l), weight_idx(_idx), volume(_volume),
num_replicas(_num_replicas), data_type(_data_type) {}
};
10 changes: 6 additions & 4 deletions inference/suffix_decoding/incr_dec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,8 @@ void FlexFlow::top_level_task(Task const *task,

bool is_warmup_request = total_num_requests < num_warmup_requests;
double request_delay =
1000.0 * (request_per_second > 0 ? (1.0 / (double)request_per_second) : 0);
1000.0 *
(request_per_second > 0 ? (1.0 / (double)request_per_second) : 0);
double emission_time_ms =
is_warmup_request
? 0.0
Expand Down Expand Up @@ -415,9 +416,10 @@ void FlexFlow::top_level_task(Task const *task,
// terminate the request manager by stopping the background thread
rm->terminate_background_server();

std::string header = "llm,partition,max_requests_per_batch,max_tokens_per_"
"batch,request_per_second,is_warmup_request,request_guid,"
"request_step_idx,timestamp,num_generated_tokens";
std::string header =
"llm,partition,max_requests_per_batch,max_tokens_per_"
"batch,request_per_second,is_warmup_request,request_guid,"
"request_step_idx,timestamp,num_generated_tokens";
// csv filepath
// create csv filepath and add header if it doesn't exist

Expand Down
6 changes: 4 additions & 2 deletions inference/suffix_decoding/specinfer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,8 @@ void FlexFlow::top_level_task(Task const *task,

bool is_warmup_request = total_num_requests < num_warmup_requests;
double request_delay =
1000.0 * (request_per_second > 0 ? (1.0 / (double)request_per_second) : 0);
1000.0 *
(request_per_second > 0 ? (1.0 / (double)request_per_second) : 0);
double emission_time_ms =
is_warmup_request
? 0.0
Expand Down Expand Up @@ -628,7 +629,8 @@ void FlexFlow::top_level_task(Task const *task,

std::string header =
"llm,ssm,partition,expansion_degree,max_tree_depth,max_tree_width,max_"
"requests_per_batch,max_tokens_per_batch,request_per_second,is_warmup_request,request_guid,"
"requests_per_batch,max_tokens_per_batch,request_per_second,is_warmup_"
"request,request_guid,"
"request_step_idx,"
"timestamp,speculation_start_timestamp,speculation_end_timestamp,num_"
"speculated_tokens,num_accepted_tokens,num_generated_tokens";
Expand Down
6 changes: 4 additions & 2 deletions inference/suffix_decoding/suffix_decoding.cc
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,8 @@ void FlexFlow::top_level_task(Task const *task,

bool is_warmup_request = total_num_requests < num_warmup_requests;
double request_delay =
1000.0 * (request_per_second > 0 ? (1.0 / (double)request_per_second) : 0);
1000.0 *
(request_per_second > 0 ? (1.0 / (double)request_per_second) : 0);
double emission_time_ms =
is_warmup_request
? 0.0
Expand Down Expand Up @@ -527,7 +528,8 @@ void FlexFlow::top_level_task(Task const *task,

std::string header =
"llm,partition,max_tree_depth,online_tree_update,matching_strategy,max_"
"requests_per_batch,max_tokens_per_batch,request_per_second,is_warmup_request,request_guid,"
"requests_per_batch,max_tokens_per_batch,request_per_second,is_warmup_"
"request,request_guid,"
"request_step_idx,"
"timestamp,num_speculated_tokens,num_accepted_tokens,prefix_length,"
"speculation_score,num_generated_tokens";
Expand Down
123 changes: 94 additions & 29 deletions src/runtime/file_loader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -621,14 +621,21 @@ void load_from_quantized_file(char *ptr,

void FileDataLoader::load_quantization_weight(FFModel *ff,
Layer *l,
int weight_idx) {
Tensor weight = l->weights[weight_idx];
size_t volume = 1;
int weight_idx,
size_t volume,
size_t num_replicas,
char *weight,
DataType data_type,
Domain weight_domain) {
// Tensor weight = l->weights[weight_idx];
size_t volume_ = 1;
std::vector<int> dims_vec;
for (int i = 0; i < weight->num_dims; i++) {
dims_vec.push_back(weight->dims[i]);
volume *= weight->dims[i];
for (int i = 0; i < weight_domain.get_dim(); i++) {
int dim_i = weight_domain.hi()[i] - weight_domain.lo()[i] + 1;
dims_vec.push_back(dim_i);
volume_ *= dim_i;
}
assert(volume_ == volume * num_replicas);
char *data = (char *)malloc(sizeof(char) * volume);

std::string weight_filename = removeGuidOperatorName(std::string(l->name));
Expand All @@ -643,7 +650,7 @@ void FileDataLoader::load_quantization_weight(FFModel *ff,
head_dim,
weight_filename,
weights_folder,
weight->data_type,
data_type,
use_full_precision);
}
// else {
Expand All @@ -665,31 +672,41 @@ void FileDataLoader::load_quantization_weight(FFModel *ff,
load_from_quantized_file(data,
volume,
join_path({weights_folder, weight_filename}),
weight->data_type,
data_type,
use_full_precision);
}

ParallelTensor weight_pt;
ff->get_parallel_tensor_from_tensor(weight, weight_pt);
weight_pt->set_tensor<char>(ff, dims_vec, data);
// ParallelTensor weight_pt;
// ff->get_parallel_tensor_from_tensor(weight, weight_pt);
// weight_pt->set_tensor<char>(ff, dims_vec, data);
char *ptr = weight;
for (size_t i = 0; i < num_replicas; i++) {
memcpy(ptr, data, volume * sizeof(char));
ptr += volume;
}

free(data);
}

template <typename DT>
void FileDataLoader::load_single_weight_tensor(FFModel *ff,
Layer *l,
int weight_idx) {
Tensor weight = l->weights[weight_idx];
int weight_idx,
size_t volume,
size_t num_replicas,
DT *weight,
Domain weight_domain) {

// Create a buffer to store weight data from the file
size_t volume = 1;
size_t volume_ = 1;
std::vector<int> dims_vec;
for (int i = 0; i < weight->num_dims; i++) {
dims_vec.push_back(weight->dims[i]);
volume *= weight->dims[i];
for (int i = 0; i < weight_domain.get_dim(); i++) {
int dim_i = weight_domain.hi()[i] - weight_domain.lo()[i] + 1;
dims_vec.push_back(dim_i);
volume_ *= dim_i;
}
assert(data_type_size(weight->data_type) == sizeof(DT));
assert(volume_ == volume * num_replicas);
// assert(data_type_size(weight->data_type) == sizeof(DT));
DT *data = (DT *)malloc(sizeof(DT) * volume);

std::string weight_filename = removeGuidOperatorName(std::string(l->name));
Expand Down Expand Up @@ -759,10 +776,12 @@ void FileDataLoader::load_single_weight_tensor(FFModel *ff,
}
}

// Copy the weight data from the buffer to the weight's ParallelTensor
ParallelTensor weight_pt;
ff->get_parallel_tensor_from_tensor(weight, weight_pt);
weight_pt->set_tensor<DT>(ff, dims_vec, data);
// Copy the weight data from the buffer to the weight
DT *ptr = weight;
for (size_t i = 0; i < num_replicas; i++) {
memcpy(ptr, data, volume * sizeof(DT));
ptr += volume;
}

// Free buffer memory
free(data);
Expand All @@ -775,21 +794,44 @@ void FileDataLoader::load_weight_task(
Legion::Runtime *runtime) {
WeightLoadTaskArgs const *args = (WeightLoadTaskArgs const *)task->args;

assert(task->regions.size() == regions.size());
assert(regions.size() == 1); // one weight only
GenericTensorAccessorW weight = helperGetGenericTensorAccessorWO(
args->data_type, regions[0], task->regions[0], FID_DATA, ctx, runtime);
Domain weight_domain = runtime->get_index_space_domain(
ctx, task->regions[0].region.get_index_space());

switch (args->data_type) {
case DT_HALF: {
args->loader->load_single_weight_tensor<half>(
args->ff, args->layer, args->weight_idx);
args->loader->load_single_weight_tensor<half>(args->ff,
args->layer,
args->weight_idx,
args->volume,
args->num_replicas,
weight.get_half_ptr(),
weight_domain);
break;
}
case DT_FLOAT: {
args->loader->load_single_weight_tensor<float>(
args->ff, args->layer, args->weight_idx);
args->loader->load_single_weight_tensor<float>(args->ff,
args->layer,
args->weight_idx,
args->volume,
args->num_replicas,
weight.get_float_ptr(),
weight_domain);
break;
}
case DT_INT4:
case DT_INT8: {
args->loader->load_quantization_weight(
args->ff, args->layer, args->weight_idx);
args->loader->load_quantization_weight(args->ff,
args->layer,
args->weight_idx,
args->volume,
args->num_replicas,
weight.get_byte_ptr(),
args->data_type,
weight_domain);
break;
}
default:
Expand Down Expand Up @@ -818,10 +860,33 @@ void FileDataLoader::load_weights_parallel(FFModel *ff,
assert(false && "Unsupported data type");
}

ParallelTensor weight_pt;
ff->get_parallel_tensor_from_tensor(weight, weight_pt);

// Create task arguments
WeightLoadTaskArgs args(ff, this, l, i, weight->data_type);
size_t volume = 1, num_replicas = 1;
if (weight_pt->sync_type == ParameterSyncType::NCCL) {
for (int i = 0; i < weight_pt->num_dims; i++) {
if (weight_pt->dims[i].is_replica_dim) {
num_replicas *= weight_pt->dims[i].size;
}
}
} else if (weight_pt->sync_type == ParameterSyncType::PS) {
num_replicas = 1;
} else {
num_replicas = 1;
}
for (int i = 0; i < weight->num_dims; i++) {
volume *= weight->dims[i];
}
WeightLoadTaskArgs args(
ff, this, l, i, volume, num_replicas, weight->data_type);
// launch task asynchronously
TaskLauncher launcher(LOAD_WEIGHT_TASK_ID,
TaskArgument(&args, sizeof(WeightLoadTaskArgs)));
launcher.add_region_requirement(RegionRequirement(
weight_pt->region, WRITE_ONLY, EXCLUSIVE, weight_pt->region));
launcher.add_field(0, FID_DATA);
futures.push_back(runtime->execute_task(ctx, launcher));
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/runtime/parallel_tensor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -685,7 +685,7 @@ template <typename T>
bool ParallelTensorBase::set_tensor(FFModel const *ff,
std::vector<int> const &dim_sizes,
T const *data) {
Context ctx = ff->config.lg_ctx;
Context ctx = Legion::Runtime::get_context();
Runtime *runtime = ff->config.lg_hlr;
// TODO: check data type matches
// TODO: Currently we use a task launch, change to index launch for NCCL
Expand Down Expand Up @@ -760,7 +760,7 @@ template <typename T>
bool ParallelTensorBase::get_tensor(FFModel const *ff,
T *data,
bool get_gradients) {
Context ctx = ff->config.lg_ctx;
Context ctx = Legion::Runtime::get_context();
Runtime *runtime = ff->config.lg_hlr;
LogicalRegion weight_lr = LogicalRegion::NO_REGION;
if (sync_type == ParameterSyncType::PS) {
Expand Down

0 comments on commit 58e0061

Please sign in to comment.