Skip to content

Commit

Permalink
initial implementation of local_chunks (slice like chunks) iterator
Browse files Browse the repository at this point in the history
  • Loading branch information
rdfriese committed Feb 26, 2024
1 parent 43e4201 commit 2293879
Show file tree
Hide file tree
Showing 5 changed files with 236 additions and 1 deletion.
7 changes: 7 additions & 0 deletions Cargo.toml
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ pin-project = "1.0.12"
serde_with = "3.0.0"
pin-weak = "1.1.0"
async-lock = "2.8.0"
itertools = "0.12.1"


[dev-dependencies]
Expand Down Expand Up @@ -299,6 +300,8 @@ path="examples/active_message_examples/am_local.rs"
name="am_local_memregions"
path="examples/active_message_examples/am_local_memregions.rs"



##---------------Array Examples -------------------##
[[example]]
name="dist_array_reduce"
Expand Down Expand Up @@ -360,6 +363,10 @@ path="examples/array_examples/global_lock_array.rs"
#name="2d_array"
#path="examples/array_examples/2d_array.rs"

#[[example]]
#name="histo"
#path="examples/array_examples/histo.rs"

##------------ RDMA Examples -----------------##
[[example]]
name="rdma_put"
Expand Down
1 change: 1 addition & 0 deletions run_examples.sh
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ for toolchain in stable; do #nightly; do
cur_tasks=`squeue -u frie869 | grep " R " | wc -l`
sleep 5
done

done
cd ..
wait
Expand Down
1 change: 1 addition & 0 deletions src/array/local_lock_atomic.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
mod iteration;
mod local_chunks;
pub(crate) mod operations;
mod rdma;
use crate::array::private::LamellarArrayPrivate;
Expand Down
226 changes: 226 additions & 0 deletions src/array/local_lock_atomic/local_chunks.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
use crate::array::iterator::local_iterator::{
IndexedLocalIterator, LocalIterator, LocalIteratorLauncher,
};
use crate::array::iterator::{LamellarArrayIterators, LamellarArrayMutIterators, Schedule};
use crate::array::local_lock_atomic::*;
use crate::array::{operations::ArrayOps, AtomicArray, Distribution, LamellarArray, TeamFrom};
use crate::memregion::Dist;
use crate::LamellarTeamRT;

use crate::active_messaging::SyncSend;

use enum_dispatch::enum_dispatch;
use futures::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::sync::Arc;

#[derive(Clone)]
pub struct LocalLockLocalChunks<T: Dist> {
// data: &'a [T],
chunk_size: usize,
index: usize, //global index within the array local data
end_index: usize, //global index within the array local data
array: LocalLockArray<T>,
lock: LocalRwDarc<()>,
lock_guard: Arc<RwLockReadGuardArc<Box<()>>>,
}

#[derive(Clone)]
pub struct LocalLockLocalChunksMut<T: Dist> {
// data: &'a mut [T],
chunk_size: usize,
index: usize, //global index within the array local data
end_index: usize, //global index within the array local data
array: LocalLockArray<T>,
lock: LocalRwDarc<()>,
lock_guard: Arc<RwLockWriteGuardArc<Box<()>>>,
}

#[derive(Debug)]
pub struct LocalLockMutChunkLocalData<'a, T: Dist> {
data: &'a mut [T],
_index: usize,
_lock_guard: Arc<RwLockWriteGuardArc<Box<()>>>,
}

impl<T: Dist> Deref for LocalLockMutChunkLocalData<'_, T> {
type Target = [T];
fn deref(&self) -> &Self::Target {
self.data
}
}
impl<T: Dist> DerefMut for LocalLockMutChunkLocalData<'_, T> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.data
}
}

impl<T: Dist + 'static> LocalIterator for LocalLockLocalChunks<T> {
type Item = LocalLockLocalData<'static, T>;
type Array = LocalLockArray<T>;
fn init(&self, start_i: usize, cnt: usize) -> Self {
//these are with respect to the single elements, not chunk indexing and cnt
let end_i = std::cmp::min(
(start_i + cnt) * self.chunk_size,
self.array.num_elems_local(),
);
let new_start_i = start_i * self.chunk_size;
// println!(
// "start_i {} new_start_i {} end_i {} cnt: {}",
// start_i, new_start_i, end_i, cnt
// );
LocalLockLocalChunks {
chunk_size: self.chunk_size,
index: new_start_i,
end_index: end_i,
array: self.array.clone(),
lock: self.lock.clone(),
lock_guard: self.lock_guard.clone(),
}
}
fn array(&self) -> Self::Array {
self.array.clone()
}
fn next(&mut self) -> Option<Self::Item> {
// println!("next index {} end_index: {}", self.index, self.end_index);
if self.index < self.end_index {
let start_i = self.index;
self.index += self.chunk_size;
let end_i = std::cmp::min(self.index, self.end_index);
// println!(
// "start_i {} end_i {} self.index {} self.end_index {}",
// start_i, end_i, self.index, self.end_index
// );
Some(LocalLockLocalData {
array: self.array.clone(),
data: unsafe {
std::slice::from_raw_parts_mut(
self.array.array.local_as_mut_ptr().offset(start_i as isize),
end_i - start_i,
)
},
index: 0,
lock: self.lock.clone(),
lock_guard: self.lock_guard.clone(),
})
} else {
None
}
}
fn elems(&self, in_elems: usize) -> usize {
in_elems / self.chunk_size + (in_elems % self.chunk_size != 0) as usize
}

fn advance_index(&mut self, count: usize) {
self.index = std::cmp::min(self.index + count * self.chunk_size, self.end_index);
}
}

impl<T: Dist + 'static> IndexedLocalIterator for LocalLockLocalChunks<T> {
fn iterator_index(&self, index: usize) -> Option<usize> {
if index * self.chunk_size < self.array.len() {
Some(index) //everyone at this point as calculated the actual index (cause we are local only) so just return it
} else {
None
}
}
}

impl<T: Dist + 'static> LocalIterator for LocalLockLocalChunksMut<T> {
type Item = LocalLockMutChunkLocalData<'static, T>;
type Array = LocalLockArray<T>;
fn init(&self, start_i: usize, cnt: usize) -> Self {
let end_i = std::cmp::min(
(start_i + cnt) * self.chunk_size,
self.array.num_elems_local(),
);
let new_start_i = start_i * self.chunk_size;
// println!(
// "mut start_i {} new_start_i {} end_i {} cnt: {}",
// start_i, new_start_i, end_i, cnt
// );
LocalLockLocalChunksMut {
chunk_size: self.chunk_size,
index: new_start_i,
end_index: end_i,
array: self.array.clone(),
lock: self.lock.clone(),
lock_guard: self.lock_guard.clone(),
}
}
fn array(&self) -> Self::Array {
self.array.clone()
}
fn next(&mut self) -> Option<Self::Item> {
// println!(
// "mut next index {} end_index: {}",
// self.index, self.end_index
// );
if self.index < self.end_index {
let start_i = self.index;
self.index += self.chunk_size;
let end_i = std::cmp::min(self.index, self.end_index);
// println!(
// "mut start_i {} end_i {} self.index {} self.end_index {}",
// start_i, end_i, self.index, self.end_index
// );
Some(LocalLockMutChunkLocalData {
data: unsafe {
std::slice::from_raw_parts_mut(
self.array.array.local_as_mut_ptr().offset(start_i as isize),
end_i - start_i,
)
},
_index: 0,
_lock_guard: self.lock_guard.clone(),
})
} else {
None
}
}
fn elems(&self, in_elems: usize) -> usize {
in_elems / self.chunk_size + (in_elems % self.chunk_size != 0) as usize
}

fn advance_index(&mut self, count: usize) {
self.index = std::cmp::min(self.index + count * self.chunk_size, self.end_index);
}
}

impl<T: Dist + 'static> IndexedLocalIterator for LocalLockLocalChunksMut<T> {
fn iterator_index(&self, index: usize) -> Option<usize> {
if index * self.chunk_size < self.array.len() {
//hmm should this be local num elems?
Some(index) //everyone at this point as calculated the actual index (cause we are local only) so just return it
} else {
None
}
}
}

impl<T: Dist> LocalLockArray<T> {
pub fn read_local_chunks(&self, chunk_size: usize) -> LocalLockLocalChunks<T> {
let lock = Arc::new(self.array.block_on(self.lock.read()));
LocalLockLocalChunks {
chunk_size,
index: 0,
end_index: 0,
array: self.clone(),
lock: self.lock.clone(),
lock_guard: lock,
}
}

pub fn write_local_chunks(&self, chunk_size: usize) -> LocalLockLocalChunksMut<T> {
let lock = Arc::new(self.array.block_on(self.lock.write()));
LocalLockLocalChunksMut {
chunk_size,
index: 0,
end_index: 0,
array: self.clone(),
lock: self.lock.clone(),
lock_guard: lock,
}
}
}
2 changes: 1 addition & 1 deletion src/array/operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ impl<'a, T: Dist> OpInput<'a, T> for &'a [T] {
}
}

impl<'a, T: Dist> OpInput<'a, T> for &'a mut dyn Iterator<Item = T> {
impl<'a, T: Dist> OpInput<'a, T> for &'a mut (dyn Iterator<Item = T> + 'a) {
fn as_op_input(self) -> (Vec<OpInputEnum<'a, T>>, usize) {
self.collect::<Vec<_>>().as_op_input()
}
Expand Down

0 comments on commit 2293879

Please sign in to comment.