Skip to content
This repository has been archived by the owner on Oct 23, 2022. It is now read-only.

Make IpldDag and Ipns async_traits #285

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 29 additions & 35 deletions src/dag.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,22 @@
use crate::error::Error;
use crate::path::{IpfsPath, IpfsPathError, SubPath};
use crate::repo::RepoTypes;
use crate::Ipfs;
use crate::repo::{Repo, RepoTypes};
use async_trait::async_trait;
use bitswap::Block;
use cid::{Cid, Codec, Version};
use libipld::block::{decode_ipld, encode_ipld};
use libipld::ipld::Ipld;

#[derive(Clone, Debug)]
pub struct IpldDag<Types: RepoTypes> {
ipfs: Ipfs<Types>,
}
#[async_trait]
pub trait IpldDag {
async fn put_dag(&self, data: Ipld, codec: Codec) -> Result<Cid, Error>;

impl<Types: RepoTypes> IpldDag<Types> {
pub fn new(ipfs: Ipfs<Types>) -> Self {
IpldDag { ipfs }
}
async fn get_dag(&self, path: IpfsPath) -> Result<Ipld, Error>;
}

pub async fn put(&self, data: Ipld, codec: Codec) -> Result<Cid, Error> {
#[async_trait]
impl<T: RepoTypes> IpldDag for Repo<T> {
async fn put_dag(&self, data: Ipld, codec: Codec) -> Result<Cid, Error> {
let bytes = encode_ipld(&data, codec)?;
let hash = multihash::Sha2_256::digest(&bytes);
let version = if codec == Codec::DagProtobuf {
Expand All @@ -27,24 +26,24 @@ impl<Types: RepoTypes> IpldDag<Types> {
};
let cid = Cid::new(version, codec, hash)?;
let block = Block::new(bytes, cid);
let (cid, _) = self.ipfs.repo.put_block(block).await?;
let (cid, _) = self.put_block(block).await?;
Ok(cid)
}

pub async fn get(&self, path: IpfsPath) -> Result<Ipld, Error> {
async fn get_dag(&self, path: IpfsPath) -> Result<Ipld, Error> {
let cid = match path.root().cid() {
Some(cid) => cid,
None => return Err(anyhow::anyhow!("expected cid")),
};
let mut ipld = decode_ipld(&cid, self.ipfs.repo.get_block(&cid).await?.data())?;
let mut ipld = decode_ipld(&cid, self.get_block(&cid).await?.data())?;
for sub_path in path.iter() {
if !can_resolve(&ipld, sub_path) {
let path = sub_path.to_owned();
return Err(IpfsPathError::ResolveError { ipld, path }.into());
}
ipld = resolve(ipld, sub_path);
ipld = match ipld {
Ipld::Link(cid) => decode_ipld(&cid, self.ipfs.repo.get_block(&cid).await?.data())?,
Ipld::Link(cid) => decode_ipld(&cid, self.get_block(&cid).await?.data())?,
ipld => ipld,
};
}
Expand Down Expand Up @@ -100,21 +99,19 @@ mod tests {
#[async_std::test]
async fn test_resolve_root_cid() {
let Node { ipfs, bg_task: _bt } = Node::new("test_node").await;
let dag = IpldDag::new(ipfs);
let data = ipld!([1, 2, 3]);
let cid = dag.put(data.clone(), Codec::DagCBOR).await.unwrap();
let res = dag.get(IpfsPath::from(cid)).await.unwrap();
let cid = ipfs.put_dag(data.clone()).await.unwrap();
let res = ipfs.get_dag(IpfsPath::from(cid)).await.unwrap();
assert_eq!(res, data);
}

#[async_std::test]
async fn test_resolve_array_elem() {
let Node { ipfs, bg_task: _bt } = Node::new("test_node").await;
let dag = IpldDag::new(ipfs);
let data = ipld!([1, 2, 3]);
let cid = dag.put(data.clone(), Codec::DagCBOR).await.unwrap();
let res = dag
.get(IpfsPath::from(cid).sub_path("1").unwrap())
let cid = ipfs.put_dag(data.clone()).await.unwrap();
let res = ipfs
.get_dag(IpfsPath::from(cid).sub_path("1").unwrap())
.await
.unwrap();
assert_eq!(res, ipld!(2));
Expand All @@ -123,11 +120,10 @@ mod tests {
#[async_std::test]
async fn test_resolve_nested_array_elem() {
let Node { ipfs, bg_task: _bt } = Node::new("test_node").await;
let dag = IpldDag::new(ipfs);
let data = ipld!([1, [2], 3,]);
let cid = dag.put(data, Codec::DagCBOR).await.unwrap();
let res = dag
.get(IpfsPath::from(cid).sub_path("1/0").unwrap())
let cid = ipfs.put_dag(data).await.unwrap();
let res = ipfs
.get_dag(IpfsPath::from(cid).sub_path("1/0").unwrap())
.await
.unwrap();
assert_eq!(res, ipld!(2));
Expand All @@ -136,13 +132,12 @@ mod tests {
#[async_std::test]
async fn test_resolve_object_elem() {
let Node { ipfs, bg_task: _bt } = Node::new("test_node").await;
let dag = IpldDag::new(ipfs);
let data = ipld!({
"key": false,
});
let cid = dag.put(data, Codec::DagCBOR).await.unwrap();
let res = dag
.get(IpfsPath::from(cid).sub_path("key").unwrap())
let cid = ipfs.put_dag(data).await.unwrap();
let res = ipfs
.get_dag(IpfsPath::from(cid).sub_path("key").unwrap())
.await
.unwrap();
assert_eq!(res, ipld!(false));
Expand All @@ -151,13 +146,12 @@ mod tests {
#[async_std::test]
async fn test_resolve_cid_elem() {
let Node { ipfs, bg_task: _bt } = Node::new("test_node").await;
let dag = IpldDag::new(ipfs);
let data1 = ipld!([1]);
let cid1 = dag.put(data1, Codec::DagCBOR).await.unwrap();
let cid1 = ipfs.put_dag(data1).await.unwrap();
let data2 = ipld!([cid1]);
let cid2 = dag.put(data2, Codec::DagCBOR).await.unwrap();
let res = dag
.get(IpfsPath::from(cid2).sub_path("0/0").unwrap())
let cid2 = ipfs.put_dag(data2).await.unwrap();
let res = ipfs
.get_dag(IpfsPath::from(cid2).sub_path("0/0").unwrap())
.await
.unwrap();
assert_eq!(res, ipld!(1));
Expand Down
33 changes: 17 additions & 16 deletions src/ipns/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
#![allow(dead_code)]
use crate::error::Error;
use crate::path::{IpfsPath, PathRoot};
use crate::repo::RepoTypes;
use crate::Ipfs;
use crate::repo::{Repo, RepoTypes};
use async_trait::async_trait;
use libp2p::PeerId;

mod dns;
Expand All @@ -11,22 +11,23 @@ mod ipns_pb {
include!(concat!(env!("OUT_DIR"), "/ipns_pb.rs"));
}

#[derive(Clone, Debug)]
pub struct Ipns<Types: RepoTypes> {
ipfs: Ipfs<Types>,
}
#[async_trait]
pub trait Ipns {
async fn resolve_ipns(&self, path: &IpfsPath) -> Result<IpfsPath, Error>;

impl<Types: RepoTypes> Ipns<Types> {
pub fn new(ipfs: Ipfs<Types>) -> Self {
Ipns { ipfs }
}
async fn publish_ipns(&self, key: &PeerId, path: &IpfsPath) -> Result<IpfsPath, Error>;

async fn cancel_ipns(&self, key: &PeerId) -> Result<(), Error>;
}

#[async_trait]
impl<Types: RepoTypes> Ipns for Repo<Types> {
/// Resolves a ipns path to an ipld path.
pub async fn resolve(&self, path: &IpfsPath) -> Result<IpfsPath, Error> {
async fn resolve_ipns(&self, path: &IpfsPath) -> Result<IpfsPath, Error> {
let path = path.to_owned();
match path.root() {
PathRoot::Ipld(_) => Ok(path),
PathRoot::Ipns(peer_id) => match self.ipfs.repo.get_ipns(peer_id).await? {
PathRoot::Ipns(peer_id) => match self.get_ipns(peer_id).await? {
Some(path) => Ok(path),
None => Err(anyhow::anyhow!("unimplemented")),
},
Expand All @@ -35,8 +36,8 @@ impl<Types: RepoTypes> Ipns<Types> {
}

/// Publishes an ipld path.
pub async fn publish(&self, key: &PeerId, path: &IpfsPath) -> Result<IpfsPath, Error> {
let future = self.ipfs.repo.put_ipns(key, path);
async fn publish_ipns(&self, key: &PeerId, path: &IpfsPath) -> Result<IpfsPath, Error> {
let future = self.put_ipns(key, path);
let key = key.to_owned();
let mut path = path.to_owned();
future.await?;
Expand All @@ -45,8 +46,8 @@ impl<Types: RepoTypes> Ipns<Types> {
}

/// Cancel an ipns path.
pub async fn cancel(&self, key: &PeerId) -> Result<(), Error> {
self.ipfs.repo.remove_ipns(key).await?;
async fn cancel_ipns(&self, key: &PeerId) -> Result<(), Error> {
self.remove_ipns(key).await?;
Ok(())
}
}
32 changes: 13 additions & 19 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,14 +350,6 @@ impl<Types: IpfsTypes> std::ops::Deref for Ipfs<Types> {
}

impl<Types: IpfsTypes> Ipfs<Types> {
fn dag(&self) -> IpldDag<Types> {
IpldDag::new(self.clone())
}

fn ipns(&self) -> Ipns<Types> {
Ipns::new(self.clone())
}

/// Puts a block into the ipfs repo.
pub async fn put_block(&self, block: Block) -> Result<Cid, Error> {
self.repo
Expand Down Expand Up @@ -401,31 +393,30 @@ impl<Types: IpfsTypes> Ipfs<Types> {

/// Puts an ipld dag node into the ipfs repo.
pub async fn put_dag(&self, ipld: Ipld) -> Result<Cid, Error> {
self.dag()
.put(ipld, Codec::DagCBOR)
self.repo
.put_dag(ipld, Codec::DagCBOR)
.instrument(self.span.clone())
.await
}

/// Gets an ipld dag node from the ipfs repo.
pub async fn get_dag(&self, path: IpfsPath) -> Result<Ipld, Error> {
self.dag().get(path).instrument(self.span.clone()).await
self.repo.get_dag(path).instrument(self.span.clone()).await
}

/// Adds a file into the ipfs repo.
pub async fn add(&self, path: PathBuf) -> Result<Cid, Error> {
let dag = self.dag();
let file = File::new(path).await?;
let path = file
.put_unixfs_v1(&dag)
.put_unixfs_v1(&self.repo)
.instrument(self.span.clone())
.await?;
Ok(path)
}

/// Gets a file from the ipfs repo.
pub async fn get(&self, path: IpfsPath) -> Result<File, Error> {
File::get_unixfs_v1(&self.dag(), path)
File::get_unixfs_v1(&self.repo, path)
.instrument(self.span.clone())
.await
}
Expand All @@ -450,23 +441,26 @@ impl<Types: IpfsTypes> Ipfs<Types> {

/// Resolves a ipns path to an ipld path.
pub async fn resolve_ipns(&self, path: &IpfsPath) -> Result<IpfsPath, Error> {
self.ipns()
.resolve(path)
self.repo
.resolve_ipns(path)
.instrument(self.span.clone())
.await
}

/// Publishes an ipld path.
pub async fn publish_ipns(&self, key: &PeerId, path: &IpfsPath) -> Result<IpfsPath, Error> {
self.ipns()
.publish(key, path)
self.repo
.publish_ipns(key, path)
.instrument(self.span.clone())
.await
}

/// Cancel an ipns path.
pub async fn cancel_ipns(&self, key: &PeerId) -> Result<(), Error> {
self.ipns().cancel(key).instrument(self.span.clone()).await
self.repo
.cancel_ipns(key)
.instrument(self.span.clone())
.await
}

pub async fn connect<T: Into<ConnectionTarget>>(&self, target: T) -> Result<(), Error> {
Expand Down
13 changes: 6 additions & 7 deletions src/unixfs/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::dag::IpldDag;
use crate::error::Error;
use crate::path::IpfsPath;
use crate::repo::RepoTypes;
use crate::repo::{Repo, RepoTypes};
use async_std::fs;
use async_std::io::ReadExt;
use async_std::path::PathBuf;
Expand Down Expand Up @@ -31,20 +31,20 @@ impl File {
}

pub async fn get_unixfs_v1<T: RepoTypes>(
dag: &IpldDag<T>,
repo: &Repo<T>,
path: IpfsPath,
) -> Result<Self, Error> {
let ipld = dag.get(path).await?;
let ipld = repo.get_dag(path).await?;
let pb_node: PbNode = (&ipld).try_into()?;
Ok(File { data: pb_node.data })
}

pub async fn put_unixfs_v1<T: RepoTypes>(&self, dag: &IpldDag<T>) -> Result<Cid, Error> {
pub async fn put_unixfs_v1<T: RepoTypes>(&self, repo: &Repo<T>) -> Result<Cid, Error> {
let links: Vec<Ipld> = vec![];
let mut pb_node = BTreeMap::<String, Ipld>::new();
pb_node.insert("Data".to_string(), self.data.clone().into());
pb_node.insert("Links".to_string(), links.into());
dag.put(pb_node.into(), Codec::DagProtobuf).await
repo.put_dag(pb_node.into(), Codec::DagProtobuf).await
}
}

Expand Down Expand Up @@ -77,11 +77,10 @@ mod tests {
#[async_std::test]
async fn test_file_cid() {
let Node { ipfs, bg_task: _bt } = Node::new("test_node").await;
let dag = IpldDag::new(ipfs);
let file = File::from("\u{8}\u{2}\u{12}\u{12}Here is some data\n\u{18}\u{12}");
let cid = Cid::try_from("QmSy5pnHk1EnvE5dmJSyFKG5unXLGjPpBuJJCBQkBTvBaW").unwrap();

let cid2 = file.put_unixfs_v1(&dag).await.unwrap();
let cid2 = file.put_unixfs_v1(&ipfs.repo).await.unwrap();
assert_eq!(cid.to_string(), cid2.to_string());
}
}