Skip to content

Commit

Permalink
Track authentication mode per-auth-token
Browse files Browse the repository at this point in the history
Previously, we taught Janus to accept authentication tokens presented in
HTTP requests as either `DAP-Auth-Token: <token>` or `Authorization:
Bearer <token>`. However, we also want a Janus leader to be able to work
with a DAP helper that expects only one or the other authentication
mode.

The `task_aggregator_auth_tokens` and `task_collector_auth_tokens`
tables now have columns that track the type of auth token, which is
either `DAP_AUTH` or `BEARER`. `send_request_to_helper` will now consult
this value to decide how it should authenticate requests.

We didn't strictly need this column in `task_collector_auth_tokens` as
the leader's collection job handlers will still accept either kind of
token, but it's nice to have consistent handling of these types across
the two tables.

The distinction between DAP-Auth-Token and bearer tokens is plumbed up
from `janus_core` through `janus_aggregator` by combining
`janus_core::task::AuthenticationToken` with
`janus_collector::Authentication` into an enum that distingishes between
the two authentication modes and can produce suitable HTTP header names
and values.

Note that we track authentication mode per-token and not per-task. This
means that it's possible for a task to use two different kinds of
tokens.

A task defined in YAML and provisioned via `janus_cli` may use Bearer
tokens, but this isn't yet possible via `janus_aggregator_api`, which
still assumes it is receiving or generating `DAP-Auth-Token`-form
tokens. The aggregator API for creating tasks will have to change to
accomodate the distinction between token types, and that will arrive in
a later change so that it can be coordinated with `divviup-api`.

Part of #472
  • Loading branch information
tgeoghegan committed May 22, 2023
1 parent d8498ab commit a3f657e
Show file tree
Hide file tree
Showing 17 changed files with 443 additions and 359 deletions.
9 changes: 3 additions & 6 deletions aggregator/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use janus_core::test_util::dummy_vdaf;
use janus_core::{
hpke::{self, HpkeApplicationInfo, Label},
http::response_to_problem_details,
task::{AuthenticationToken, VdafInstance, DAP_AUTH_HEADER, PRIO3_VERIFY_KEY_LENGTH},
task::{AuthenticationToken, VdafInstance, PRIO3_VERIFY_KEY_LENGTH},
time::{Clock, DurationExt, IntervalExt, TimeExt},
};
use janus_messages::{
Expand Down Expand Up @@ -2438,16 +2438,13 @@ async fn send_request_to_helper<T: Encode>(
) -> Result<Bytes, Error> {
let domain = url.domain().unwrap_or_default().to_string();
let request_body = request.get_encoded();
let (auth_header, auth_value) = auth_token.request_authentication();

let start = Instant::now();
let response_result = http_client
.request(method, url)
.header(CONTENT_TYPE, content_type)
// TODO(#472): We want to be able to communicate with new Janus (prefers bearer token but
// supports `DAP-Auth-Token`) as well as older Janus and Daphne (which require
// `DAP-Auth-Token`) so for the moment, we send `DAP-Auth-Token`. But eventually we should
// determine the appropriate token header to send for a given task.
.header(DAP_AUTH_HEADER, auth_token.as_ref())
.header(auth_header, auth_value)
.body(request_body)
.send()
.await;
Expand Down
21 changes: 11 additions & 10 deletions aggregator/src/aggregator/aggregate_init_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use janus_aggregator_core::{
task::{test_util::TaskBuilder, QueryType, Task},
};
use janus_core::{
task::{VdafInstance, DAP_AUTH_HEADER},
task::{AuthenticationToken, VdafInstance, DAP_AUTH_HEADER},
test_util::{dummy_vdaf, install_test_trace_subscriber, run_vdaf, VdafTranscript},
time::{Clock, MockClock, TimeExt as _},
};
Expand Down Expand Up @@ -176,22 +176,23 @@ pub(crate) async fn put_aggregation_job(
}

#[tokio::test]
async fn aggregation_job_init_authorization_bearer_header() {
async fn aggregation_job_init_authorization_dap_auth_token() {
let test_case = setup_aggregate_init_test_without_sending_request().await;
// Find a DapAuthToken among the task's aggregator auth tokens
let (auth_header, auth_value) = test_case
.task
.aggregator_auth_tokens()
.iter()
.find(|auth| matches!(auth, AuthenticationToken::DapAuth(_)))
.unwrap()
.request_authentication();

let response = put(test_case
.task
.aggregation_job_uri(&test_case.aggregation_job_id)
.unwrap()
.path())
// Authenticate using an "Authorization: Bearer <token>" header instead of "DAP-Auth-Token"
.with_request_header(
KnownHeaderName::Authorization,
test_case
.task
.primary_aggregator_auth_token()
.bearer_token(),
)
.with_request_header(auth_header, auth_value)
.with_request_header(
KnownHeaderName::ContentType,
AggregationJobInitializeReq::<TimeInterval>::MEDIA_TYPE,
Expand Down
10 changes: 6 additions & 4 deletions aggregator/src/aggregator/collection_job_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,9 @@ impl CollectionJobTestCase {
.collection_job_uri(collection_job_id)
.unwrap()
.path());
if let Some(token) = auth_token {
test_conn = test_conn.with_request_header("DAP-Auth-Token", token.as_ref().to_owned())
if let Some(auth) = auth_token {
let (header, value) = auth.request_authentication();
test_conn = test_conn.with_request_header(header, value);
}

test_conn
Expand Down Expand Up @@ -94,8 +95,9 @@ impl CollectionJobTestCase {
.unwrap()
.path(),
);
if let Some(token) = auth_token {
test_conn = test_conn.with_request_header("DAP-Auth-Token", token.as_ref().to_owned())
if let Some(auth) = auth_token {
let (header, value) = auth.request_authentication();
test_conn = test_conn.with_request_header(header, value);
}
test_conn.run_async(&self.handler).await
}
Expand Down
36 changes: 12 additions & 24 deletions aggregator/src/aggregator/http_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use janus_aggregator_api::instrumented;
use janus_aggregator_core::datastore::Datastore;
use janus_core::{
http::extract_bearer_token,
task::{AuthenticationToken, DAP_AUTH_HEADER},
task::{AuthenticationToken, DapAuthToken, DAP_AUTH_HEADER},
time::Clock,
};
use janus_messages::{
Expand Down Expand Up @@ -526,32 +526,21 @@ fn parse_collection_job_id(captures: &Captures) -> Result<CollectionJobId, Error
.map_err(|_| Error::BadRequest("invalid CollectionJobId".to_owned()))
}

/// Get the authorization token header from the request.
/// Get an [`AuthenticationToken`] from the request.
fn parse_auth_token(task_id: &TaskId, conn: &Conn) -> Result<Option<AuthenticationToken>, Error> {
// Prefer a bearer token, then fall back to DAP-Auth-Token
let bearer_token =
extract_bearer_token(conn).map_err(|_| Error::UnauthorizedRequest(*task_id))?;
if bearer_token.is_some() {
return bearer_token
.map(AuthenticationToken::try_from)
.transpose()
.map_err(|_| {
Error::BadRequest(
"Authorization: Bearer value decodes to an authentication token containing \
unsafe bytes"
.to_string(),
)
});
if let Some(bearer_token) =
extract_bearer_token(conn).map_err(|_| Error::UnauthorizedRequest(*task_id))?
{
return Ok(Some(AuthenticationToken::Bearer(bearer_token)));
}

conn.request_headers()
.get(DAP_AUTH_HEADER)
.map(|value| {
value.as_ref().to_owned().try_into().map_err(|_| {
Error::BadRequest(
"DAP-Auth-Header value is not a valid HTTP header value".to_string(),
)
})
DapAuthToken::try_from(value.as_ref().to_vec())
.map(AuthenticationToken::DapAuth)
.map_err(|e| Error::BadRequest(format!("bad DAP-Auth-Token header: {e}")))
})
.transpose()
}
Expand Down Expand Up @@ -1211,6 +1200,8 @@ mod tests {

datastore.put_task(&task).await.unwrap();

let (wrong_auth_header, wrong_auth_value) =
random::<AuthenticationToken>().request_authentication();
let request = AggregationJobInitializeReq::new(
Vec::new(),
PartialBatchSelector::new_time_interval(),
Expand All @@ -1225,10 +1216,7 @@ mod tests {
.aggregation_job_uri(&aggregation_job_id)
.unwrap()
.path())
.with_request_header(
"DAP-Auth-Token",
random::<AuthenticationToken>().as_ref().to_owned(),
)
.with_request_header(wrong_auth_header, wrong_auth_value)
.with_request_header(
KnownHeaderName::ContentType,
AggregationJobInitializeReq::<TimeInterval>::MEDIA_TYPE,
Expand Down
7 changes: 2 additions & 5 deletions aggregator/src/aggregator/problem_details.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,7 @@ mod tests {
use assert_matches::assert_matches;
use futures::future::join_all;
use http::Method;
use janus_core::{
task::AuthenticationToken,
time::{Clock, RealClock},
};
use janus_core::time::{Clock, RealClock};
use janus_messages::{
problem_type::{DapProblemType, DapProblemTypeParseError},
Duration, HpkeConfigId, Interval, ReportIdChecksum,
Expand Down Expand Up @@ -242,7 +239,7 @@ mod tests {
"test",
"text/plain",
(),
&AuthenticationToken::try_from("auth".as_bytes().to_vec()).unwrap(),
&random(),
&request_histogram,
)
.await
Expand Down
64 changes: 39 additions & 25 deletions aggregator_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ use janus_aggregator_core::{
SecretBytes,
};
use janus_core::{
hpke::generate_hpke_config_and_private_key, http::extract_bearer_token,
task::AuthenticationToken, time::Clock,
hpke::generate_hpke_config_and_private_key,
http::extract_bearer_token,
task::{AuthenticationToken, DapAuthToken},
time::Clock,
};
use janus_messages::{Duration, HpkeAeadId, HpkeKdfId, HpkeKemId, Role, TaskId, Time};
use models::{GetTaskMetricsResp, TaskResp};
Expand Down Expand Up @@ -170,30 +172,42 @@ async fn post_task<C: Clock>(
Status::BadRequest,
)
})?;
Vec::from([AuthenticationToken::try_from(token_bytes).map_err(|_| {
Error::new(
"Invalid HTTP header value in aggregator_auth_token".to_string(),
Status::BadRequest,
)
})?])
Vec::from([
// TODO(#472): Each token in the PostTaskReq should indicate whether it is a bearer
// token or a DAP-Auth-Token. For now, assume the latter.
DapAuthToken::try_from(token_bytes)
.map(AuthenticationToken::DapAuth)
.map_err(|_| {
Error::new(
"Invalid HTTP header value in aggregator_auth_token".to_string(),
Status::BadRequest,
)
})?,
])
} else {
Vec::from([random()])
// TODO(#472): switch to generating bearer tokens by default
Vec::from([AuthenticationToken::DapAuth(random())])
};
let collector_auth_tokens = match (req.role, req.collector_auth_token) {
(Role::Leader, None) => Vec::from([random()]),
// TODO(#472): switch to generating bearer tokens by default
(Role::Leader, None) => Vec::from([AuthenticationToken::DapAuth(random())]),
(Role::Leader, Some(encoded)) => {
let token_bytes = URL_SAFE_NO_PAD.decode(encoded).map_err(|err| {
Error::new(
format!("Invalid base64 value for collector_auth_token: {err}"),
Status::BadRequest,
)
})?;
Vec::from([AuthenticationToken::try_from(token_bytes).map_err(|_| {
Error::new(
"Invalid HTTP header value in collector_auth_token".to_string(),
Status::BadRequest,
)
})?])
// TODO(#472): Each token in the PostTaskReq should indicate whether it is a bearer
// token or a DAP-Auth-Token. For now, assume the latter.
Vec::from([DapAuthToken::try_from(token_bytes)
.map(AuthenticationToken::DapAuth)
.map_err(|_| {
Error::new(
"Invalid HTTP header value in collector_auth_token".to_string(),
Status::BadRequest,
)
})?])
}
(Role::Helper, None) => Vec::new(),
(Role::Helper, Some(_)) => {
Expand Down Expand Up @@ -486,7 +500,7 @@ mod tests {
};
use janus_core::{
hpke::{generate_hpke_config_and_private_key, HpkeKeypair, HpkePrivateKey},
task::{AuthenticationToken, VdafInstance},
task::{AuthenticationToken, DapAuthToken, VdafInstance},
test_util::{
dummy_vdaf::{self, AggregationParam},
install_test_trace_subscriber,
Expand Down Expand Up @@ -712,8 +726,8 @@ mod tests {

let vdaf_verify_key =
SecretBytes::new(thread_rng().sample_iter(Standard).take(16).collect());
let aggregator_auth_token = random::<AuthenticationToken>();
let collector_auth_token = random::<AuthenticationToken>();
let aggregator_auth_token = AuthenticationToken::DapAuth(random());
let collector_auth_token = AuthenticationToken::DapAuth(random());

// Verify: posting a task creates a new task which matches the request.
let req = PostTaskReq {
Expand Down Expand Up @@ -1354,12 +1368,12 @@ mod tests {
HpkeAeadId::Aes128Gcm,
HpkePublicKey::from([0u8; 32].to_vec()),
),
Vec::from([
AuthenticationToken::try_from("aggregator-12345678".as_bytes().to_vec()).unwrap(),
]),
Vec::from([
AuthenticationToken::try_from("collector-abcdef00".as_bytes().to_vec()).unwrap(),
]),
Vec::from([AuthenticationToken::DapAuth(
DapAuthToken::try_from(b"aggregator-12345678".to_vec()).unwrap(),
)]),
Vec::from([AuthenticationToken::DapAuth(
DapAuthToken::try_from(b"collector-abcdef00".to_vec()).unwrap(),
)]),
[(HpkeKeypair::new(
HpkeConfig::new(
HpkeConfigId::from(13),
Expand Down
Loading

0 comments on commit a3f657e

Please sign in to comment.