Skip to content

Commit

Permalink
Merge branch 'main' into streamable-files
Browse files Browse the repository at this point in the history
  • Loading branch information
Brian Caswell committed Sep 8, 2023
2 parents 99a7dc4 + aa60f41 commit 9a95661
Show file tree
Hide file tree
Showing 955 changed files with 1,953 additions and 1,947 deletions.
11 changes: 3 additions & 8 deletions sdk/core/src/http_client/reqwest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,9 @@ impl HttpClient for ::reqwest::Client {

let reqwest_request = match body {
Body::Bytes(bytes) => req.body(bytes).build(),
Body::SeekableStream(mut seekable_stream) => {
seekable_stream.reset().await.context(
ErrorKind::Other,
"failed to reset body stream when building request",
)?;
req.body(::reqwest::Body::wrap_stream(seekable_stream))
.build()
}
Body::SeekableStream(seekable_stream) => req
.body(::reqwest::Body::wrap_stream(seekable_stream))
.build(),
}
.context(ErrorKind::Other, "failed to build `reqwest` request")?;

Expand Down
11 changes: 6 additions & 5 deletions sdk/core/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@
/// ```
#[macro_export]
macro_rules! setters {
(@single $name:ident : $typ:ty => $transform:expr) => {
(@single $(#[$meta:meta])* $name:ident : $typ:ty => $transform:expr) => {
#[allow(clippy::redundant_field_names)]
#[allow(clippy::needless_update)]
#[allow(missing_docs)]
#[must_use]
$(#[$meta])*
pub fn $name<P: ::std::convert::Into<$typ>>(self, $name: P) -> Self {
let $name: $typ = $name.into();
Self {
Expand All @@ -41,12 +42,12 @@ macro_rules! setters {
// Terminal condition
(@recurse) => {};
// Recurse without transform
(@recurse $name:ident : $typ:ty, $($tokens:tt)*) => {
$crate::setters! { @recurse $name: $typ => $name, $($tokens)* }
(@recurse $(#[$meta:meta])* $name:ident : $typ:ty, $($tokens:tt)*) => {
$crate::setters! { @recurse $(#[$meta])* $name: $typ => $name, $($tokens)* }
};
// Recurse with transform
(@recurse $name:ident : $typ:ty => $transform:expr, $($tokens:tt)*) => {
$crate::setters! { @single $name : $typ => $transform }
(@recurse $(#[$meta:meta])* $name:ident : $typ:ty => $transform:expr, $($tokens:tt)*) => {
$crate::setters! { @single $(#[$meta])* $name : $typ => $transform }
$crate::setters! { @recurse $($tokens)* }
};
($($tokens:tt)*) => {
Expand Down
6 changes: 5 additions & 1 deletion sdk/core/src/options/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,11 @@ pub struct FixedRetryOptions {

impl FixedRetryOptions {
setters! {
#[doc = "Set the delay between retry attempts."]
delay: Duration => delay,
#[doc = "Set the maximum number of retry attempts before giving up."]
max_retries: u32 => max_retries,
#[doc = "Set the maximum permissible elapsed time since starting to retry."]
max_total_elapsed: Duration => max_total_elapsed,
}
}
Expand All @@ -268,12 +271,13 @@ impl Default for FixedRetryOptions {
/// Telemetry options.
#[derive(Clone, Debug, Default)]
pub struct TelemetryOptions {
/// Optional application ID to telemeter.
/// Optional application ID to telemetry.
pub(crate) application_id: Option<String>,
}

impl TelemetryOptions {
setters! {
#[doc = "Set the application ID to telemetry."]
application_id: String => Some(application_id),
}
}
Expand Down
2 changes: 1 addition & 1 deletion sdk/core/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ impl Pipeline {
&self.pipeline
}

pub async fn send(&self, ctx: &mut Context, request: &mut Request) -> crate::Result<Response> {
pub async fn send(&self, ctx: &Context, request: &mut Request) -> crate::Result<Response> {
self.pipeline[0]
.send(ctx, request, &self.pipeline[1..])
.await
Expand Down
8 changes: 7 additions & 1 deletion sdk/core/src/policies/retry_policies/retry_policy.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::error::{Error, ErrorKind, HttpError};
use crate::error::{Error, ErrorKind, HttpError, ResultExt};
use crate::policies::{Policy, PolicyResult, Request};
use crate::sleep::sleep;
use crate::{Context, StatusCode};
Expand Down Expand Up @@ -60,6 +60,12 @@ where
let mut start = None;

loop {
if retry_count > 0 {
request.body.reset().await.context(
ErrorKind::Other,
"failed to reset body stream before retrying request",
)?;
}
let result = next[0].send(ctx, request, &next[1..]).await;
// only start keeping track of time after the first request is made
let start = start.get_or_insert_with(OffsetDateTime::now_utc);
Expand Down
7 changes: 7 additions & 0 deletions sdk/core/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@ impl Body {
pub fn is_empty(&self) -> bool {
self.len() == 0
}

pub(crate) async fn reset(&mut self) -> crate::Result<()> {
match self {
Body::Bytes(_) => Ok(()),
Body::SeekableStream(stream) => stream.reset().await,
}
}
}

impl<B> From<B> for Body
Expand Down
6 changes: 6 additions & 0 deletions sdk/core/src/request_options/lease.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ impl std::fmt::Display for LeaseId {
}
}

impl From<Uuid> for LeaseId {
fn from(value: Uuid) -> Self {
Self(value)
}
}

impl std::str::FromStr for LeaseId {
type Err = <Uuid as FromStr>::Err;

Expand Down
24 changes: 13 additions & 11 deletions sdk/core/src/seekable_stream.rs
Original file line number Diff line number Diff line change
@@ -1,33 +1,35 @@
use bytes::Bytes;
use futures::io::AsyncRead;
use futures::stream::Stream;
use futures::task::Poll;
use dyn_clone::DynClone;
use futures::{io::AsyncRead, stream::Stream, task::Poll};
use std::{pin::Pin, task::Context};

/// Amount of the stream to buffer in memory during streaming uploads
pub(crate) const DEFAULT_BUFFER_SIZE: usize = 1024 * 64;

/// Enable a type implementing `AsyncRead` to be consumed as if it were
/// a `Stream` of `Bytes`.
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
pub trait SeekableStream:
AsyncRead + Unpin + std::fmt::Debug + Send + Sync + dyn_clone::DynClone
{
pub trait SeekableStream: AsyncRead + Unpin + std::fmt::Debug + Send + Sync + DynClone {
async fn reset(&mut self) -> crate::error::Result<()>;
fn len(&self) -> usize;

fn is_empty(&self) -> bool {
self.len() == 0
}

fn buffer_size(&self) -> usize {
DEFAULT_BUFFER_SIZE
}
}

dyn_clone::clone_trait_object!(SeekableStream);

impl Stream for dyn SeekableStream {
type Item = crate::error::Result<Bytes>;

fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let mut buffer = vec![0_u8; 1024 * 64];
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut buffer = vec![0_u8; self.buffer_size()];

match self.poll_read(cx, &mut buffer) {
Poll::Ready(Ok(0)) => Poll::Ready(None),
Expand Down
2 changes: 1 addition & 1 deletion sdk/iot_hub/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -838,7 +838,7 @@ impl ServiceClient {
/// send the request via the request pipeline
pub async fn send(
&self,
context: &mut Context,
context: &Context,
request: &mut Request,
) -> azure_core::Result<Response> {
self.pipeline.send(context, request).await
Expand Down
4 changes: 2 additions & 2 deletions sdk/iot_hub/src/service/operations/apply_on_edge_device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ operation! {

impl ApplyOnEdgeDeviceBuilder {
/// Performs the apply on edge device request
pub fn into_future(mut self) -> ApplyOnEdgeDevice {
pub fn into_future(self) -> ApplyOnEdgeDevice {
Box::pin(async move {
let uri = format!(
"https://{}.azure-devices.net/devices/{}/applyConfigurationContent?api-version={}",
Expand All @@ -32,7 +32,7 @@ impl ApplyOnEdgeDeviceBuilder {
let body = azure_core::to_json(&body)?;
request.set_body(body);

self.client.send(&mut self.context, &mut request).await?;
self.client.send(&self.context, &mut request).await?;

Ok(())
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ impl CreateOrUpdateConfigurationBuilder {
}

/// Performs the create or update request on the device identity
pub fn into_future(mut self) -> CreateOrUpdateConfiguration {
pub fn into_future(self) -> CreateOrUpdateConfiguration {
Box::pin(async move {
let uri = format!(
"https://{}.azure-devices.net/configurations/{}?api-version={}",
Expand Down Expand Up @@ -101,7 +101,7 @@ impl CreateOrUpdateConfigurationBuilder {
let body = azure_core::to_json(&body)?;
request.set_body(body);

let response = self.client.send(&mut self.context, &mut request).await?;
let response = self.client.send(&self.context, &mut request).await?;

CreateOrUpdateConfigurationResponse::try_from(response).await
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ impl CreateOrUpdateDeviceIdentityBuilder {
}

/// Performs the create or update request on the device identity
pub fn into_future(mut self) -> CreateOrUpdateDeviceIdentity {
pub fn into_future(self) -> CreateOrUpdateDeviceIdentity {
Box::pin(async move {
let uri = format!(
"https://{}.azure-devices.net/devices/{}?api-version={}",
Expand Down Expand Up @@ -66,7 +66,7 @@ impl CreateOrUpdateDeviceIdentityBuilder {
let body = azure_core::to_json(&body)?;
request.set_body(body);

let response = self.client.send(&mut self.context, &mut request).await?;
let response = self.client.send(&self.context, &mut request).await?;

CreateOrUpdateDeviceIdentityResponse::try_from(response).await
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ azure_core::operation! {

impl CreateOrUpdateModuleIdentityBuilder {
/// Performs the create or update request on the device identity
pub fn into_future(mut self) -> CreateOrUpdateModuleIdentity {
pub fn into_future(self) -> CreateOrUpdateModuleIdentity {
Box::pin(async move {
let uri = format!(
"https://{}.azure-devices.net/devices/{}/modules/{}?api-version={}",
Expand Down Expand Up @@ -50,7 +50,7 @@ impl CreateOrUpdateModuleIdentityBuilder {
let body = azure_core::to_json(&body)?;
request.set_body(body);

let response = self.client.send(&mut self.context, &mut request).await?;
let response = self.client.send(&self.context, &mut request).await?;

CreateOrUpdateModuleIdentityResponse::try_from(response).await
})
Expand Down
4 changes: 2 additions & 2 deletions sdk/iot_hub/src/service/operations/delete_configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ azure_core::operation! {

impl DeleteConfigurationBuilder {
/// Execute the request to delete the configuration.
pub fn into_future(mut self) -> DeleteConfiguration {
pub fn into_future(self) -> DeleteConfiguration {
Box::pin(async move {
let uri = format!(
"https://{}.azure-devices.net/configurations/{}?api-version={}",
Expand All @@ -25,7 +25,7 @@ impl DeleteConfigurationBuilder {

request.set_body(azure_core::EMPTY_BODY);

self.client.send(&mut self.context, &mut request).await?;
self.client.send(&self.context, &mut request).await?;

Ok(())
})
Expand Down
4 changes: 2 additions & 2 deletions sdk/iot_hub/src/service/operations/delete_identity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ azure_core::operation! {

impl DeleteIdentityBuilder {
/// Execute the request to delete the module or device identity.
pub fn into_future(mut self) -> DeleteIdentity {
pub fn into_future(self) -> DeleteIdentity {
Box::pin(async move {
let uri = match &self.module_id {
Some(module_id) => format!(
Expand All @@ -31,7 +31,7 @@ impl DeleteIdentityBuilder {

request.set_body(azure_core::EMPTY_BODY);

self.client.send(&mut self.context, &mut request).await?;
self.client.send(&self.context, &mut request).await?;
Ok(())
})
}
Expand Down
4 changes: 2 additions & 2 deletions sdk/iot_hub/src/service/operations/get_configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ azure_core::operation! {

impl GetConfigurationBuilder {
/// Execute the request to get the configuration of a given identifier.
pub fn into_future(mut self) -> GetConfiguration {
pub fn into_future(self) -> GetConfiguration {
Box::pin(async move {
let uri = match self.configuration_id {
Some(val) => format!(
Expand All @@ -28,7 +28,7 @@ impl GetConfigurationBuilder {
let mut request = self.client.finalize_request(&uri, Method::Get)?;
request.set_body(azure_core::EMPTY_BODY);

let response = self.client.send(&mut self.context, &mut request).await?;
let response = self.client.send(&self.context, &mut request).await?;

GetConfigurationResponse::try_from(response).await
})
Expand Down
4 changes: 2 additions & 2 deletions sdk/iot_hub/src/service/operations/get_identity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ azure_core::operation! {

impl GetIdentityBuilder {
/// Execute the request to get the identity of a device or module.
pub fn into_future(mut self) -> GetIdentity {
pub fn into_future(self) -> GetIdentity {
Box::pin(async move {
let uri = match self.module_id {
Some(module_id) => format!(
Expand All @@ -27,7 +27,7 @@ impl GetIdentityBuilder {
let mut request = self.client.finalize_request(&uri, Method::Get)?;
request.set_body(azure_core::EMPTY_BODY);

let response = self.client.send(&mut self.context, &mut request).await?;
let response = self.client.send(&self.context, &mut request).await?;

GetIdentityResponse::from_response(response).await
})
Expand Down
4 changes: 2 additions & 2 deletions sdk/iot_hub/src/service/operations/get_twin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ azure_core::operation! {

impl GetTwinBuilder {
/// Execute the request to get the twin of a module or device.
pub fn into_future(mut self) -> GetTwin {
pub fn into_future(self) -> GetTwin {
Box::pin(async move {
let uri = match self.module_id {
Some(val) => format!(
Expand All @@ -27,7 +27,7 @@ impl GetTwinBuilder {
let mut request = self.client.finalize_request(&uri, Method::Get)?;
request.set_body(azure_core::EMPTY_BODY);

let response = self.client.send(&mut self.context, &mut request).await?;
let response = self.client.send(&self.context, &mut request).await?;

GetTwinResponse::from_response(response).await
})
Expand Down
4 changes: 2 additions & 2 deletions sdk/iot_hub/src/service/operations/invoke_method.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ azure_core::operation! {

impl InvokeMethodBuilder {
/// Turn the builder into a `Future`
pub fn into_future(mut self) -> InvokeMethod {
pub fn into_future(self) -> InvokeMethod {
Box::pin(async move {
let uri = match &self.module_id {
Some(module_id_value) => format!(
Expand All @@ -43,7 +43,7 @@ impl InvokeMethodBuilder {

request.set_body(body);

let response = self.client.send(&mut self.context, &mut request).await?;
let response = self.client.send(&self.context, &mut request).await?;

InvokeMethodResponse::try_from(response).await
})
Expand Down
4 changes: 2 additions & 2 deletions sdk/iot_hub/src/service/operations/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ azure_core::operation! {

impl QueryBuilder {
/// Invoke a qiven query on the `IoT` Hub
pub fn into_future(mut self) -> Query {
pub fn into_future(self) -> Query {
Box::pin(async move {
let uri = format!(
"https://{}.azure-devices.net/devices/query?api-version={}",
Expand All @@ -37,7 +37,7 @@ impl QueryBuilder {
request.add_mandatory_header(&self.max_item_count.unwrap_or_default());
request.set_body(body);

let response = self.client.send(&mut self.context, &mut request).await?;
let response = self.client.send(&self.context, &mut request).await?;

QueryResponse::try_from(response).await
})
Expand Down
Loading

0 comments on commit 9a95661

Please sign in to comment.