Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Removed builders from eventhubs; removed impl Into<String> from eventhubs and AMQP; Fixed time conversion issue with 1-1-0001 #1892

Merged
merged 11 commits into from
Nov 8, 2024
23 changes: 12 additions & 11 deletions sdk/core/azure_core_amqp/src/cbs.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
// Copyright (c) Microsoft Corporation. All Rights reserved
// Licensed under the MIT license.
// cspell: words amqp sasl
// cspell: words amqp sasl sastoken

use azure_core::error::Result;
use std::fmt::Debug;

use super::session::AmqpSession;

Expand Down Expand Up @@ -37,9 +36,10 @@ pub trait AmqpClaimsBasedSecurityApis {
///
/// # Parameters
///
/// - `path`: A `String` reference representing the AMQP path to be authorized.
/// - `secret`: An implementor of `Into<String>` representing the secret used for authorization. This is typically a JSON Web token.
/// - `expires_on`: A `time::OffsetDateTime` representing the expiration time of the authorization.
/// - `path`: A string representing the AMQP path to be authorized.
/// - `token_type`: An optional string representing the type of token used for authorization. This is either "servicebus.windows.net:sastoken" or "jwt". If it is not supplied, "jwt" is assumed.
/// - `secret`: A string representing the secret used for authorization. This is typically a JSON Web token.
/// - `expires_on`: The expiration time of the authorization.
///
/// # Returns
///
Expand All @@ -49,13 +49,13 @@ pub trait AmqpClaimsBasedSecurityApis {
///
fn authorize_path(
&self,
path: impl Into<String> + Debug,
secret: impl Into<String>,
path: String,
token_type: Option<String>,
secret: String,
expires_on: time::OffsetDateTime,
) -> impl std::future::Future<Output = Result<()>>;
}

#[derive(Debug)]
pub struct AmqpClaimsBasedSecurity<'a> {
implementation: CbsImplementation<'a>,
}
Expand All @@ -71,12 +71,13 @@ impl<'a> AmqpClaimsBasedSecurity<'a> {
impl<'a> AmqpClaimsBasedSecurityApis for AmqpClaimsBasedSecurity<'a> {
async fn authorize_path(
&self,
path: impl Into<String> + Debug,
secret: impl Into<String>,
path: String,
token_type: Option<String>,
secret: String,
expires_on: time::OffsetDateTime,
) -> Result<()> {
self.implementation
.authorize_path(path, secret, expires_on)
.authorize_path(path, token_type, secret, expires_on)
.await
}
async fn attach(&self) -> Result<()> {
Expand Down
50 changes: 25 additions & 25 deletions sdk/core/azure_core_amqp/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,60 +30,60 @@ impl AmqpConnectionOptions {
pub fn builder() -> builders::AmqpConnectionOptionsBuilder {
builders::AmqpConnectionOptionsBuilder::new()
}
pub fn max_frame_size(&self) -> Option<u32> {
self.max_frame_size
pub fn max_frame_size(&self) -> Option<&u32> {
self.max_frame_size.as_ref()
LarryOsterman marked this conversation as resolved.
Show resolved Hide resolved
}
pub fn channel_max(&self) -> Option<u16> {
self.channel_max
pub fn channel_max(&self) -> Option<&u16> {
self.channel_max.as_ref()
}
pub fn idle_timeout(&self) -> Option<Duration> {
self.idle_timeout
pub fn idle_timeout(&self) -> Option<&Duration> {
self.idle_timeout.as_ref()
}
pub fn outgoing_locales(&self) -> Option<Vec<String>> {
self.outgoing_locales.clone()
pub fn outgoing_locales(&self) -> Option<&Vec<String>> {
self.outgoing_locales.as_ref()
}
pub fn incoming_locales(&self) -> Option<Vec<String>> {
self.incoming_locales.clone()
pub fn incoming_locales(&self) -> Option<&Vec<String>> {
self.incoming_locales.as_ref()
}
pub fn offered_capabilities(&self) -> Option<Vec<AmqpSymbol>> {
self.offered_capabilities.clone()
pub fn offered_capabilities(&self) -> Option<&Vec<AmqpSymbol>> {
self.offered_capabilities.as_ref()
}
pub fn desired_capabilities(&self) -> Option<Vec<AmqpSymbol>> {
self.desired_capabilities.clone()
pub fn desired_capabilities(&self) -> Option<&Vec<AmqpSymbol>> {
self.desired_capabilities.as_ref()
}
pub fn properties(&self) -> Option<AmqpOrderedMap<AmqpSymbol, AmqpValue>> {
self.properties.clone()
pub fn properties(&self) -> Option<&AmqpOrderedMap<AmqpSymbol, AmqpValue>> {
self.properties.as_ref()
}
pub fn buffer_size(&self) -> Option<usize> {
self.buffer_size
pub fn buffer_size(&self) -> Option<&usize> {
self.buffer_size.as_ref()
}
}

pub trait AmqpConnectionApis {
fn open(
&self,
name: impl Into<String>,
name: String,
url: Url,
options: Option<AmqpConnectionOptions>,
) -> impl std::future::Future<Output = Result<()>>;
fn close(&self) -> impl std::future::Future<Output = Result<()>>;
fn close_with_error(
&self,
condition: impl Into<AmqpSymbol>,
condition: AmqpSymbol,
description: Option<String>,
info: Option<AmqpOrderedMap<AmqpSymbol, AmqpValue>>,
) -> impl std::future::Future<Output = Result<()>>;
}

#[derive(Debug, Default)]
#[derive(Default)]
pub struct AmqpConnection {
pub(crate) implementation: ConnectionImplementation,
}

impl AmqpConnectionApis for AmqpConnection {
fn open(
&self,
name: impl Into<String>,
name: String,
url: Url,
options: Option<AmqpConnectionOptions>,
) -> impl std::future::Future<Output = Result<()>> {
Expand All @@ -94,7 +94,7 @@ impl AmqpConnectionApis for AmqpConnection {
}
fn close_with_error(
&self,
condition: impl Into<AmqpSymbol>,
condition: AmqpSymbol,
description: Option<String>,
info: Option<AmqpOrderedMap<AmqpSymbol, AmqpValue>>,
) -> impl std::future::Future<Output = Result<()>> {
Expand Down Expand Up @@ -156,8 +156,8 @@ pub mod builders {
}
pub fn with_properties<K, V>(mut self, properties: impl Into<AmqpOrderedMap<K, V>>) -> Self
where
K: Into<AmqpSymbol> + Debug + Default + PartialEq,
V: Into<AmqpValue> + Debug + Default,
K: Into<AmqpSymbol> + Debug + Clone + PartialEq,
V: Into<AmqpValue> + Debug + Clone,
{
let properties_map: AmqpOrderedMap<K, V> = properties.into();
let properties_map = properties_map
Expand Down
14 changes: 7 additions & 7 deletions sdk/core/azure_core_amqp/src/fe2o3/cbs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,9 @@ use azure_core::error::Result;
use fe2o3_amqp_cbs::token::CbsToken;
use fe2o3_amqp_types::primitives::Timestamp;
use std::borrow::BorrowMut;
use std::{fmt::Debug, sync::OnceLock};
use std::sync::OnceLock;
use tracing::{debug, trace};

#[derive(Debug)]
pub(crate) struct Fe2o3ClaimsBasedSecurity<'a> {
cbs: OnceLock<Mutex<fe2o3_amqp_cbs::client::CbsClient>>,
session: &'a AmqpSession,
Expand Down Expand Up @@ -67,8 +66,9 @@ impl AmqpClaimsBasedSecurityApis for Fe2o3ClaimsBasedSecurity<'_> {

async fn authorize_path(
&self,
path: impl Into<String> + Debug,
secret: impl Into<String>,
path: String,
token_type: Option<String>,
secret: String,
expires_at: time::OffsetDateTime,
) -> Result<()> {
trace!(
Expand All @@ -77,8 +77,8 @@ impl AmqpClaimsBasedSecurityApis for Fe2o3ClaimsBasedSecurity<'_> {
expires_at
);
let cbs_token = CbsToken::new(
secret.into(),
"jwt",
secret,
token_type.unwrap_or("jwt".to_string()),
Some(Timestamp::from(
expires_at
.to_offset(time::UtcOffset::UTC)
Expand All @@ -103,7 +103,7 @@ impl AmqpClaimsBasedSecurityApis for Fe2o3ClaimsBasedSecurity<'_> {
.lock()
.await
.borrow_mut()
.put_token(path.into(), cbs_token)
.put_token(path, cbs_token)
.await
.map_err(AmqpManagement::from)?;
Ok(())
Expand Down
6 changes: 3 additions & 3 deletions sdk/core/azure_core_amqp/src/fe2o3/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ impl Drop for Fe2o3AmqpConnection {
impl AmqpConnectionApis for Fe2o3AmqpConnection {
async fn open(
&self,
id: impl Into<String>,
id: String,
url: Url,
options: Option<AmqpConnectionOptions>,
) -> Result<()> {
Expand Down Expand Up @@ -133,7 +133,7 @@ impl AmqpConnectionApis for Fe2o3AmqpConnection {
}
async fn close_with_error(
&self,
condition: impl Into<AmqpSymbol>,
condition: AmqpSymbol,
description: Option<String>,
info: Option<AmqpOrderedMap<AmqpSymbol, AmqpValue>>,
) -> Result<()> {
Expand All @@ -152,7 +152,7 @@ impl AmqpConnectionApis for Fe2o3AmqpConnection {
.borrow_mut()
.close_with_error(fe2o3_amqp::types::definitions::Error::new(
fe2o3_amqp::types::definitions::ErrorCondition::Custom(
fe2o3_amqp_types::primitives::Symbol::from(condition.into()),
fe2o3_amqp_types::primitives::Symbol::from(condition),
),
description,
info.map(|i| i.into()),
Expand Down
11 changes: 5 additions & 6 deletions sdk/core/azure_core_amqp/src/fe2o3/management.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use crate::{
session::AmqpSession,
value::{AmqpOrderedMap, AmqpValue},
};

use async_std::sync::Mutex;
use azure_core::{
credentials::AccessToken,
Expand Down Expand Up @@ -40,15 +39,15 @@ impl Drop for Fe2o3AmqpManagement {
impl Fe2o3AmqpManagement {
pub fn new(
session: AmqpSession,
client_node_name: impl Into<String>,
client_node_name: String,
access_token: AccessToken,
) -> Result<Self> {
// Session::get() returns a clone of the underlying session handle.
let session = session.implementation.get()?;

Ok(Self {
access_token,
client_node_name: client_node_name.into(),
client_node_name,
session,
management: OnceLock::new(),
})
Expand Down Expand Up @@ -85,7 +84,7 @@ impl AmqpManagementApis for Fe2o3AmqpManagement {

async fn call(
&self,
operation_type: impl Into<String>,
operation_type: String,
application_properties: AmqpOrderedMap<String, AmqpValue>,
) -> Result<AmqpOrderedMap<String, AmqpValue>> {
let mut management = self
Expand Down Expand Up @@ -122,12 +121,12 @@ struct WithApplicationPropertiesRequest<'a> {

impl<'a> WithApplicationPropertiesRequest<'a> {
pub fn new(
entity_type: impl Into<String>,
entity_type: String,
access_token: &'a AccessToken,
application_properties: AmqpOrderedMap<String, AmqpValue>,
) -> Self {
Self {
entity_type: entity_type.into(),
entity_type,
access_token,
application_properties,
}
Expand Down
22 changes: 11 additions & 11 deletions sdk/core/azure_core_amqp/src/fe2o3/messaging/message_fields.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ impl From<fe2o3_amqp_types::messaging::ApplicationProperties>
fn from(application_properties: fe2o3_amqp_types::messaging::ApplicationProperties) -> Self {
let mut properties = AmqpOrderedMap::<String, AmqpValue>::new();
for (key, value) in application_properties.0 {
properties.insert(key, value);
properties.insert(key, value.into());
}
AmqpApplicationProperties(properties)
}
Expand Down Expand Up @@ -278,7 +278,7 @@ impl From<fe2o3_amqp_types::messaging::Annotations> for AmqpAnnotations {
fn from(annotations: fe2o3_amqp_types::messaging::Annotations) -> Self {
let mut amqp_annotations = AmqpOrderedMap::<AmqpAnnotationKey, AmqpValue>::new();
for (key, value) in annotations {
amqp_annotations.insert(key, value);
amqp_annotations.insert(key.into(), value.into());
}
AmqpAnnotations(amqp_annotations)
}
Expand Down Expand Up @@ -363,11 +363,11 @@ impl From<fe2o3_amqp_types::messaging::Properties> for AmqpMessageProperties {
}
if let Some(content_type) = properties.content_type {
amqp_message_properties_builder =
amqp_message_properties_builder.with_content_type(content_type);
amqp_message_properties_builder.with_content_type(content_type.into());
}
if let Some(content_encoding) = properties.content_encoding {
amqp_message_properties_builder =
amqp_message_properties_builder.with_content_encoding(content_encoding);
amqp_message_properties_builder.with_content_encoding(content_encoding.into());
}
if let Some(absolute_expiry_time) = properties.absolute_expiry_time {
amqp_message_properties_builder =
Expand Down Expand Up @@ -480,17 +480,17 @@ fn test_properties_conversion() {

let properties = AmqpMessageProperties::builder()
.with_absolute_expiry_time(time_now)
.with_content_encoding("content_encoding")
.with_content_type("content_type")
.with_content_encoding(crate::value::AmqpSymbol("content_encoding".to_string()))
.with_content_type(crate::value::AmqpSymbol("content_type".to_string()))
.with_correlation_id("correlation_id")
.with_creation_time(time_now)
.with_group_id("group_id")
.with_group_id("group_id".to_string())
.with_group_sequence(3)
.with_message_id("test")
.with_reply_to("reply_to")
.with_reply_to_group_id("reply_to_group_id")
.with_subject("subject")
.with_to("to")
.with_reply_to("reply_to".to_string())
.with_reply_to_group_id("reply_to_group_id".to_string())
.with_subject("subject".to_string())
.with_to("to".to_string())
.with_user_id(vec![1, 2, 3])
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ fn message_source_conversion_fe2o3_amqp() {
#[test]
fn message_source_conversion_amqp_fe2o3() {
let amqp_source = AmqpSource::builder()
.with_address("test")
.with_address("test".to_string())
.with_durable(TerminusDurability::UnsettledState)
.with_expiry_policy(TerminusExpiryPolicy::SessionEnd)
.with_timeout(95)
Expand Down
14 changes: 7 additions & 7 deletions sdk/core/azure_core_amqp/src/fe2o3/messaging/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -521,8 +521,8 @@ mod tests {
std::time::UNIX_EPOCH + std::time::Duration::from_millis(timestamp as u64);

let amqp_message = AmqpMessage::builder()
.add_application_property("abc", "23 skiddoo")
.add_application_property("What?", 29.5)
.add_application_property("abc".to_string(), "23 skiddoo")
.add_application_property("What?".to_string(), 29.5)
.with_body(AmqpValue::from("hello"))
.with_properties(
AmqpMessageProperties::builder()
Expand All @@ -531,13 +531,13 @@ mod tests {
.with_content_type(AmqpSymbol::from("text/plain"))
.with_correlation_id("abc")
.with_creation_time(timestamp)
.with_group_id(AmqpSymbol::from("group"))
.with_group_id("group".to_string())
.with_group_sequence(5)
.with_message_id("message")
.with_reply_to(AmqpSymbol::from("reply"))
.with_reply_to_group_id(AmqpSymbol::from("reply_group"))
.with_subject(AmqpSymbol::from("subject"))
.with_to(AmqpSymbol::from("to"))
.with_reply_to("reply".to_string())
.with_reply_to_group_id("reply_group".to_string())
.with_subject("subject".to_string())
.with_to("to".to_string())
.with_user_id(vec![39, 20, 54])
.build(),
)
Expand Down
3 changes: 1 addition & 2 deletions sdk/core/azure_core_amqp/src/fe2o3/receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use std::borrow::BorrowMut;
use std::sync::{Arc, OnceLock};
use tracing::trace;

#[derive(Debug, Default)]
#[derive(Default)]
pub(crate) struct Fe2o3AmqpReceiver {
receiver: OnceLock<Arc<Mutex<fe2o3_amqp::Receiver>>>,
}
Expand Down Expand Up @@ -84,7 +84,6 @@ impl AmqpReceiverApis for Fe2o3AmqpReceiver {
.max_message_size())
}

#[tracing::instrument]
async fn receive(&self) -> Result<AmqpMessage> {
let mut receiver = self
.receiver
Expand Down
Loading