Skip to content

Commit

Permalink
batched operations use variable sized indicies based on len of array
Browse files Browse the repository at this point in the history
  • Loading branch information
rdfriese committed Aug 26, 2023
1 parent 11eaaf8 commit 23c4276
Show file tree
Hide file tree
Showing 42 changed files with 2,541 additions and 4,769 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ rand = "0.8.5"
parking_lot = {version = "0.12.1", features = ["arc_lock", "send_guard", "serde"] }
indexmap = "1.9.1" #lamellar_alloc
core_affinity = "0.5.10"
log = "0.4.17"
log = "0.4.19"
simple_logger = "4.0.0"
async-task = "4.3.0"
async-trait = "0.1.58"
Expand Down
1,040 changes: 264 additions & 776 deletions impl/src/array_ops.rs

Large diffs are not rendered by default.

51 changes: 29 additions & 22 deletions src/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,11 @@ use crate::{active_messaging::*, LamellarTeamRT};
use async_trait::async_trait;
use enum_dispatch::enum_dispatch;
use futures_lite::Future;
use parking_lot::{Mutex, RwLock};
use parking_lot::Mutex;
use std::collections::HashMap;
use std::marker::PhantomData;
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

// use serde::de::DeserializeOwned;
Expand Down Expand Up @@ -111,10 +111,17 @@ pub mod prelude;

pub(crate) mod r#unsafe;
pub use r#unsafe::{
operations::{UnsafeArrayOpBuf,MultiValMultiIdxOps,MultiValSingleIdxOps,SingleValMultiIdxOps,BatchReturnType}, UnsafeArray, UnsafeByteArray, UnsafeByteArrayWeak,
operations::{
BatchReturnType, MultiValMultiIdxOps, MultiValSingleIdxOps, SingleValMultiIdxOps,
},
UnsafeArray, UnsafeByteArray, UnsafeByteArrayWeak,
};
pub(crate) mod read_only;
pub use read_only::{ReadOnlyArray, ReadOnlyArrayOpBuf, /*ReadOnlyArrayMultiMultiOps, ReadOnlyArrayMultiSingleOps,*/ ReadOnlyByteArray, ReadOnlyByteArrayWeak};
pub use read_only::{
ReadOnlyArray, ReadOnlyArrayOpBuf,
/*ReadOnlyArrayMultiMultiOps, ReadOnlyArrayMultiSingleOps,*/ ReadOnlyByteArray,
ReadOnlyByteArrayWeak,
};

// pub(crate) mod local_only;
// pub use local_only::LocalOnlyArray;
Expand All @@ -130,26 +137,24 @@ pub use atomic::{

pub(crate) mod generic_atomic;
pub use generic_atomic::{
operations::{GenericAtomicArrayOpBuf, /*GenericAtomicArrayMultiMultiOps, GenericAtomicArrayMultiSingleOps*/}, GenericAtomicArray, GenericAtomicByteArray,
GenericAtomicByteArrayWeak, GenericAtomicLocalData,
GenericAtomicArray, GenericAtomicByteArray, GenericAtomicByteArrayWeak, GenericAtomicLocalData,
};

pub(crate) mod native_atomic;
pub use native_atomic::{
operations::{NativeAtomicArrayOpBuf,/*NativeAtomicArrayMultiMultiOps, NativeAtomicArrayMultiSingleOps*/}, NativeAtomicArray, NativeAtomicByteArray,
NativeAtomicByteArrayWeak, NativeAtomicLocalData,
NativeAtomicArray, NativeAtomicByteArray, NativeAtomicByteArrayWeak, NativeAtomicLocalData,
};

pub(crate) mod local_lock_atomic;
pub use local_lock_atomic::{
operations::{LocalLockArrayOpBuf,/*LocalLockArrayMultiMultiOps, LocalLockArrayMultiSingleOps*/}, LocalLockArray, LocalLockByteArray, LocalLockByteArrayWeak,
LocalLockLocalData, LocalLockMutLocalData,
LocalLockArray, LocalLockByteArray, LocalLockByteArrayWeak, LocalLockLocalData,
LocalLockMutLocalData,
};

pub(crate) mod global_lock_atomic;
pub use global_lock_atomic::{
operations::{GlobalLockArrayOpBuf,/*GlobalLockArrayMultiMultiOps, GlobalLockArrayMultiSingleOps*/}, GlobalLockArray, GlobalLockByteArray,
GlobalLockByteArrayWeak, GlobalLockLocalData, GlobalLockMutLocalData,
GlobalLockArray, GlobalLockByteArray, GlobalLockByteArrayWeak, GlobalLockLocalData,
GlobalLockMutLocalData,
};

pub mod iterator;
Expand Down Expand Up @@ -190,8 +195,6 @@ crate::inventory::collect!(ReduceKey);
// lamellar_impl::generate_ops_for_type_rt!(true, true, u8,usize);
// impl Dist for bool {}



lamellar_impl::generate_reductions_for_type_rt!(true, u8, u16, u32, u64, usize);
lamellar_impl::generate_reductions_for_type_rt!(false, u128);
lamellar_impl::generate_ops_for_type_rt!(true, true, true, u8, u16, u32, u64, usize);
Expand Down Expand Up @@ -402,8 +405,8 @@ impl<T: Dist> TeamFrom<&LamellarArrayRdmaOutput<T>> for LamellarArrayRdmaOutput<
}
}

impl <T: Clone> TeamFrom<(&Vec<T>,Distribution)> for Vec<T> {
fn team_from(vals: (&Vec<T>,Distribution), _team: &Pin<Arc<LamellarTeamRT>>) -> Self {
impl<T: Clone> TeamFrom<(&Vec<T>, Distribution)> for Vec<T> {
fn team_from(vals: (&Vec<T>, Distribution), _team: &Pin<Arc<LamellarTeamRT>>) -> Self {
vals.0.to_vec()
}
}
Expand Down Expand Up @@ -434,14 +437,18 @@ pub enum LamellarByteArray {
GlobalLockArray(GlobalLockByteArray),
}

impl LamellarByteArray{
impl LamellarByteArray {
pub fn type_id(&self) -> std::any::TypeId {
match self{
match self {
LamellarByteArray::UnsafeArray(_) => std::any::TypeId::of::<UnsafeByteArray>(),
LamellarByteArray::ReadOnlyArray(_) => std::any::TypeId::of::<ReadOnlyByteArray>(),
LamellarByteArray::AtomicArray(_) => std::any::TypeId::of::<AtomicByteArray>(),
LamellarByteArray::NativeAtomicArray(_) => std::any::TypeId::of::<NativeAtomicByteArray>(),
LamellarByteArray::GenericAtomicArray(_) => std::any::TypeId::of::<GenericAtomicByteArray>(),
LamellarByteArray::NativeAtomicArray(_) => {
std::any::TypeId::of::<NativeAtomicByteArray>()
}
LamellarByteArray::GenericAtomicArray(_) => {
std::any::TypeId::of::<GenericAtomicByteArray>()
}
LamellarByteArray::LocalLockArray(_) => std::any::TypeId::of::<LocalLockByteArray>(),
LamellarByteArray::GlobalLockArray(_) => std::any::TypeId::of::<GlobalLockByteArray>(),
}
Expand Down Expand Up @@ -615,8 +622,8 @@ impl<T: Dist + AmDist + ElementComparePartialEqOps + 'static> LamellarArrayCompa
pub(crate) mod private {
use crate::active_messaging::*;
use crate::array::{
AtomicArray, GlobalLockArray,
/*NativeAtomicArray, GenericAtomicArray,*/ LamellarReadArray, LamellarWriteArray, LamellarByteArray,
AtomicArray, GlobalLockArray, LamellarByteArray,
/*NativeAtomicArray, GenericAtomicArray,*/ LamellarReadArray, LamellarWriteArray,
LocalLockArray, ReadOnlyArray, UnsafeArray,
};
use crate::lamellar_request::{LamellarMultiRequest, LamellarRequest};
Expand Down
40 changes: 0 additions & 40 deletions src/array/atomic/operations.rs
Original file line number Diff line number Diff line change
@@ -1,46 +1,6 @@
use crate::array::atomic::*;
use crate::array::*;

use std::any::TypeId;
use std::collections::HashMap;

type BufFn = fn(AtomicByteArrayWeak) -> Arc<dyn BufferOp>;
// type OpFn = fn(UnsafeByteArray,ArrayOpCmd2,Vec<u8>) -> LamellarArcAm;


lazy_static! {
pub(crate) static ref BUFOPS: HashMap<TypeId, BufFn> = {
let mut map = HashMap::new();
for op in crate::inventory::iter::<AtomicArrayOpBuf> {
map.insert(op.id.clone(), op.op);
}
map
};

// pub(crate) static ref NEWBUFOPS: HashMap<TypeId, OpFn> = {
// let mut map = HashMap::new();
// for op in crate::inventory::iter::<AtomicArrayOpBufNew> {
// map.insert(op.id.clone(), op.op);
// }
// map
// };
}

#[doc(hidden)]
pub struct AtomicArrayOpBuf {
pub id: TypeId,
pub op: BufFn,
}
// #[doc(hidden)]
// pub struct AtomicArrayOpBufNew {
// pub id: TypeId,
// pub op: OpFn,
// }

crate::inventory::collect!(AtomicArrayOpBuf);

// crate::inventory::collect!(AtomicArrayOpBufNew);

impl<T: ElementOps + 'static> ReadOnlyOps<T> for AtomicArray<T> {}

impl<T: ElementOps + 'static> AccessOps<T> for AtomicArray<T> {}
Expand Down
Loading

0 comments on commit 23c4276

Please sign in to comment.