Skip to content

Commit

Permalink
nydusd: achieve dynamic dedup for nydusd.
Browse files Browse the repository at this point in the history
The previous version of local cas was static dedup, which only modified
the chunk information in bootstrap. There is a serious problem: it may
reuse chunks that cannot be obtained by the backend of the current
image, resulting in the container being unable to load the corresponding
chunk data on demand during runtime.

To address this issue, dynamic dedup was introduced. When nydusd
initializes the blob cache, it reads the corresponding backend
configuration information of the blob from the CAS database, enabling
the blob cache to read chunk data from other backend.

Signed-off-by: xwb1136021767 <[email protected]>
  • Loading branch information
xwb1136021767 committed Aug 11, 2023
1 parent 10b2aa6 commit dff2451
Show file tree
Hide file tree
Showing 7 changed files with 199 additions and 24 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

65 changes: 65 additions & 0 deletions api/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2287,4 +2287,69 @@ mod tests {
let auth = registry.auth.unwrap();
assert_eq!(auth, test_auth);
}

#[test]
fn test_dedup_config() {
let content = r#"{
"enable": true,
"work_dir": "/tmp/nydus-cas"
}"#;
let config: DeduplicationConfig = serde_json::from_str(content).unwrap();
assert!(config.enable, "{}", true);
assert_eq!(config.work_dir, "/tmp/nydus-cas");
}

#[test]
fn test_snapshotter_config_with_dedup() {
let content = r#"
{
"device": {
"backend": {
"type": "registry",
"config": {
"readahead": false,
"host": "localhost",
"repo": "vke/golang",
"auth": "",
"scheme": "https",
"proxy": {
"fallback": false
},
"timeout": 5,
"connect_timeout": 5,
"retry_limit": 2
}
},
"cache": {
"type": "blobcache",
"compressed": true,
"config": {
"work_dir": "/var/lib/containerd-nydus/cache",
"disable_indexed_map": false
}
},
"dedup": {
"work_dir": "/home/t4/containerd/io.containerd.snapshotter.v1.nydus"
}
},
"mode": "direct",
"digest_validate": false,
"enable_xattr": true,
"fs_prefetch": {
"enable": true,
"prefetch_all": true,
"threads_count": 8,
"merging_size": 1048576,
"bandwidth_rate": 0
}
}
"#;
let config = ConfigV2::from_str(content).unwrap();
assert_eq!(&config.id, "");
assert!(!config.dedup.as_ref().unwrap().enable);
assert_eq!(
&config.dedup.unwrap().work_dir,
"/home/t4/containerd/io.containerd.snapshotter.v1.nydus"
);
}
}
26 changes: 7 additions & 19 deletions builder/src/core/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ impl NodeChunk {
}

/// Struct to host sharable fields of [Node].
#[derive(Clone, Default)]
#[derive(Clone, Default, Debug)]
pub struct NodeInfo {
/// Whether the explicit UID/GID feature is enabled or not.
pub explicit_uidgid: bool,
Expand Down Expand Up @@ -736,7 +736,7 @@ impl Node {
.with_context(|| format!("failed to get metadata of {}", self.path().display()))
}

pub fn get_chunk_ofs(&mut self, meta: &RafsSuperMeta) -> Result<(u64, u64)> {
fn get_chunk_ofs(&mut self, meta: &RafsSuperMeta) -> Result<(u64, u64)> {
if meta.version == RAFS_SUPER_VERSION_V6 {
self.get_chunk_ofs_v6(meta)
} else {
Expand Down Expand Up @@ -900,23 +900,6 @@ impl Node {
}
}

pub fn update_inode_digest_for_bootstrap(&self, writer: &mut dyn RafsIoWrite) -> Result<()> {
// Dump inode info
if let InodeWrapper::V5(raw_inode) = &self.inode {
let name = self.name();
let inode = RafsV5InodeWrapper {
name,
symlink: self.info.symlink.as_deref(),
inode: raw_inode,
};
inode
.store(writer)
.context("failed to dump inode to bootstrap")?;
}

Ok(())
}

fn dedup_bootstrap_v5(
&self,
build_ctx: &BuildContext,
Expand Down Expand Up @@ -1097,3 +1080,8 @@ impl Node {
self.info = Arc::new(info);
}
}

#[cfg(test)]
mod tests {
use super::*;
}
2 changes: 1 addition & 1 deletion rafs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ serde = { version = "1.0.110", features = ["serde_derive", "rc"] }
serde_json = "1.0.53"
vm-memory = "0.10"
fuse-backend-rs = "^0.10.3"

bitvec = { version="1", default-features = false, features = ["alloc", "atomic", "serde", "std",]}
nydus-api = { version = "0.3", path = "../api" }
nydus-storage = { version = "0.6", path = "../storage", features = ["backend-localfs"] }
nydus-utils = { version = "0.4", path = "../utils" }
Expand Down
23 changes: 23 additions & 0 deletions rafs/src/metadata/chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -449,3 +449,26 @@ pub fn convert_ref_to_rafs_v5_chunk_info(cki: &dyn BlobChunkInfo) -> RafsV5Chunk
let chunk = to_rafs_v5_chunk_info(as_blob_v5_chunk_info(cki.deref()));
chunk
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_set_deduped_for_chunk_v5() {
let mut chunk = ChunkWrapper::new(RafsVersion::V5);
assert!(!chunk.is_deduped());

chunk.set_deduped(true);
assert!(chunk.is_deduped());
}

#[test]
fn test_set_deduped_for_chunk_v6() {
let mut chunk = ChunkWrapper::new(RafsVersion::V6);
assert!(!chunk.is_deduped());

chunk.set_deduped(true);
assert!(chunk.is_deduped());
}
}
2 changes: 1 addition & 1 deletion storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ url = { version = "2.1.1", optional = true }
vm-memory = "0.10"
fuse-backend-rs = "^0.10.3"
gpt = { version = "3.0.0", optional = true }

bitvec = { version="1", default-features = false, features = ["alloc", "atomic", "serde", "std",]}
nydus-api = { version = "0.3", path = "../api" }
nydus-utils = { version = "0.4", path = "../utils", features = ["encryption", "zran"] }

Expand Down
103 changes: 100 additions & 3 deletions storage/src/device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,13 @@ use std::path::Path;
use std::sync::{Arc, Mutex};

use arc_swap::ArcSwap;
use bitvec::prelude::*;
use fuse_backend_rs::api::filesystem::ZeroCopyWriter;
use fuse_backend_rs::file_buf::FileVolatileSlice;
use fuse_backend_rs::file_traits::FileReadWriteVolatile;

use nydus_api::ConfigV2;
use nydus_api::{BackendConfigV2, ConfigV2};
use nydus_utils::cas::CasMgr;
use nydus_utils::compress;
use nydus_utils::crypt::{self, Cipher, CipherContext};
use nydus_utils::digest::{self, RafsDigest};
Expand Down Expand Up @@ -179,6 +181,9 @@ pub struct BlobInfo {
cipher_object: Arc<Cipher>,
/// Cipher context for encryption.
cipher_ctx: Option<CipherContext>,

/// Bitvector to indicate which chunks are deduplicated.
dedup_bitmap: Option<BitVec>,
}

fn serialize_cipher_object<S>(cipher_object: &Arc<Cipher>, serializer: S) -> Result<S::Ok, S::Error>
Expand Down Expand Up @@ -269,6 +274,7 @@ impl BlobInfo {
meta_path: Arc::new(Mutex::new(String::new())),
cipher_object: Default::default(),
cipher_ctx: None,
dedup_bitmap: None,
};

blob_info.compute_features();
Expand Down Expand Up @@ -621,6 +627,38 @@ impl BlobInfo {
self.cipher_ctx.clone(),
)
}

pub fn set_dedup_bitmap(&mut self, dedup_bitmap: Option<BitVec>) {
self.dedup_bitmap = dedup_bitmap;
}

pub fn set_dedup_by_chunk_idx(&mut self, index: usize, value: bool) {
if index >= self.chunk_count as usize || self.dedup_bitmap.is_none() {
return;
}

if let Some(bitmap) = &mut self.dedup_bitmap {
bitmap.set(index, value);
}
}

pub fn get_dedup_by_chunk_idx(&self, index: usize) -> bool {
if self.dedup_bitmap.is_none() {
return false;
}
// if chunk index > blob.chunk_count, means this chunk is from other blob.
if index >= self.chunk_count as usize {
return true;
}
if let Some(bitmap) = &self.dedup_bitmap {
match bitmap.get(index).as_deref() {
Some(v) => *v,
None => false,
}
} else {
false
}
}
}

bitflags! {
Expand Down Expand Up @@ -1143,9 +1181,22 @@ impl BlobDevice {
/// Create new blob device instance.
pub fn new(config: &Arc<ConfigV2>, blob_infos: &[Arc<BlobInfo>]) -> io::Result<BlobDevice> {
let mut blobs = Vec::with_capacity(blob_infos.len());
let dedup_config = config.get_dedup_config().ok();
let cas_mgr = dedup_config
.as_ref()
.filter(|config| config.get_enable() && config.get_work_dir().is_ok())
.and_then(|config| CasMgr::new(config.get_work_dir().unwrap()).ok());

for blob_info in blob_infos.iter() {
let blob = BLOB_FACTORY.new_blob_cache(config, blob_info)?;
blobs.push(blob);
let blob = if let Some(cas_mgr) = &cas_mgr {
Self::get_new_blob_config(cas_mgr, blob_info, config).and_then(|new_blob_config| {
BLOB_FACTORY.new_blob_cache(&new_blob_config, blob_info)
})
} else {
BLOB_FACTORY.new_blob_cache(config, blob_info)
};

blobs.push(blob.unwrap());
}

Ok(BlobDevice {
Expand All @@ -1154,6 +1205,31 @@ impl BlobDevice {
})
}

fn get_new_blob_config(
cas_mgr: &CasMgr,
blob_info: &Arc<BlobInfo>,
config: &Arc<ConfigV2>,
) -> io::Result<Arc<ConfigV2>> {
let blob_id = blob_info.blob_id();
let blob_backend = Self::get_blob_backend_config(cas_mgr, &blob_id)?;
let mut blob_config = config.deref().clone();
blob_config.backend = Some(blob_backend);
let blob_config = Arc::new(blob_config);

Ok(blob_config)
}

fn get_blob_backend_config(cas_mgr: &CasMgr, blob_id: &str) -> io::Result<BackendConfigV2> {
let backend_content = match cas_mgr.get_backend_by_blob_id(blob_id, true) {
Ok(backend_content) => Ok(backend_content),
Err(e) => Err(io::Error::new(io::ErrorKind::Other, e.to_string())),
};

let backend_content = backend_content?.unwrap();
let backend = serde_json::from_str::<BackendConfigV2>(&backend_content)?;
Ok(backend)
}

/// Update configuration and storage backends of the blob device.
///
/// The `update()` method switch a new storage backend object according to the configuration
Expand Down Expand Up @@ -1738,4 +1814,25 @@ mod tests {
);
}
}

#[test]
fn test_set_dedup_bitmap() {
let mut blob_info = BlobInfo::new(
1,
"test1".to_owned(),
0x200000,
0x100000,
0x100000,
512,
BlobFeatures::_V5_NO_EXT_BLOB_TABLE,
);

blob_info.dedup_bitmap = Some(bitvec!(0 ; blob_info.chunk_count as usize));
for idx in 0..blob_info.chunk_count {
let idx = idx as usize;
assert!(!blob_info.get_dedup_by_chunk_idx(idx));
blob_info.set_dedup_by_chunk_idx(idx, true);
assert!(blob_info.get_dedup_by_chunk_idx(idx));
}
}
}

0 comments on commit dff2451

Please sign in to comment.