Skip to content

Commit

Permalink
refactor: extract datapath/interface converters
Browse files Browse the repository at this point in the history
  • Loading branch information
Igor Zinkovsky committed Nov 7, 2023
1 parent 24d4f3b commit 38a34e1
Show file tree
Hide file tree
Showing 5 changed files with 320 additions and 238 deletions.
279 changes: 41 additions & 238 deletions denokv/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::borrow::Cow;
use std::num::NonZeroU32;
use std::path::Path;
use std::sync::Arc;

Expand All @@ -24,18 +23,14 @@ use chrono::Duration;
use clap::Parser;
use config::Config;
use constant_time_eq::constant_time_eq;
use denokv_proto::convert::ConvertError;
use denokv_proto::datapath as pb;
use denokv_proto::decode_value;
use denokv_proto::encode_value;
use denokv_proto::time::utc_now;
use denokv_proto::AtomicWrite;
use denokv_proto::Check;
use denokv_proto::Consistency;
use denokv_proto::DatabaseMetadata;
use denokv_proto::EndpointInfo;
use denokv_proto::Enqueue;
use denokv_proto::MetadataExchangeRequest;
use denokv_proto::Mutation;
use denokv_proto::MutationKind;
use denokv_proto::ReadRange;
use denokv_proto::SnapshotReadOptions;
use denokv_sqlite::Connection;
Expand All @@ -45,13 +40,10 @@ use log::info;
use prost::DecodeError;
use rand::SeedableRng;
use thiserror::Error;
use time::utc_now;
use tokio::sync::Notify;
use uuid::Uuid;

mod config;
mod limits;
mod time;

#[derive(Clone)]
struct AppState {
Expand Down Expand Up @@ -204,33 +196,8 @@ async fn snapshot_read_endpoint(
State(state): State<AppState>,
Protobuf(snapshot_read): Protobuf<pb::SnapshotRead>,
) -> Result<Protobuf<pb::SnapshotReadOutput>, ApiError> {
if snapshot_read.ranges.len() > limits::MAX_READ_RANGES {
return Err(ApiError::TooManyReadRanges);
}
let mut requests = Vec::with_capacity(snapshot_read.ranges.len());
let mut total_limit: usize = 0;
for range in snapshot_read.ranges {
let limit: NonZeroU32 = u32::try_from(range.limit)
.map_err(|_| ApiError::InvalidReadRangeLimit)?
.try_into()
.map_err(|_| ApiError::InvalidReadRangeLimit)?;
if range.start.len() > limits::MAX_READ_KEY_SIZE_BYTES {
return Err(ApiError::KeyTooLong);
}
if range.end.len() > limits::MAX_READ_KEY_SIZE_BYTES {
return Err(ApiError::KeyTooLong);
}
total_limit += limit.get() as usize;
requests.push(ReadRange {
start: range.start,
end: range.end,
reverse: range.reverse,
limit,
});
}
if total_limit > limits::MAX_READ_ENTRIES {
return Err(ApiError::ReadRangeTooLarge);
}
let requests: Vec<ReadRange> = snapshot_read.try_into()?;

let options = SnapshotReadOptions {
consistency: Consistency::Strong,
};
Expand All @@ -244,218 +211,21 @@ async fn snapshot_read_endpoint(
ApiError::InternalServerError
})?;

let mut ranges = Vec::with_capacity(result_ranges.len());
for range in result_ranges {
let values = range
.entries
.into_iter()
.map(|entry| {
let (value, encoding) = encode_value(&entry.value);
pb::KvEntry {
key: entry.key,
value: value.into_owned(),
encoding: encoding as i32,
versionstamp: entry.versionstamp.to_vec(),
}
})
.collect();
ranges.push(pb::ReadRangeOutput { values });
}

Ok(Protobuf(pb::SnapshotReadOutput {
ranges,
read_disabled: false,
read_is_strongly_consistent: true,
}))
let res = result_ranges.into();
Ok(Protobuf(res))
}

#[debug_handler]
async fn atomic_write_endpoint(
State(state): State<AppState>,
Protobuf(atomic_write): Protobuf<pb::AtomicWrite>,
) -> Result<Protobuf<pb::AtomicWriteOutput>, ApiError> {
if atomic_write.checks.len() > limits::MAX_CHECKS {
return Err(ApiError::TooManyChecks);
}
if atomic_write.mutations.len() + atomic_write.enqueues.len()
> limits::MAX_MUTATIONS
{
return Err(ApiError::TooManyMutations);
}

let mut total_payload_size = 0;

let mut checks = Vec::with_capacity(atomic_write.checks.len());
for check in atomic_write.checks {
if check.key.len() > limits::MAX_READ_KEY_SIZE_BYTES {
return Err(ApiError::KeyTooLong);
}
total_payload_size += check.key.len();
checks.push(Check {
key: check.key,
versionstamp: match check.versionstamp.len() {
0 => None,
10 => {
let mut versionstamp = [0; 10];
versionstamp.copy_from_slice(&check.versionstamp);
Some(versionstamp)
}
_ => return Err(ApiError::InvalidVersionstamp),
},
});
}

let mut mutations = Vec::with_capacity(atomic_write.mutations.len());
for mutation in atomic_write.mutations {
if mutation.key.len() > limits::MAX_WRITE_KEY_SIZE_BYTES {
return Err(ApiError::KeyTooLong);
}
total_payload_size += mutation.key.len();
let value_size = mutation.value.as_ref().map(|v| v.data.len()).unwrap_or(0);
if value_size > limits::MAX_VALUE_SIZE_BYTES {
return Err(ApiError::ValueTooLong);
}
total_payload_size += value_size;

let kind = match (mutation.mutation_type(), mutation.value) {
(pb::MutationType::MSet, Some(value)) => {
let value = decode_value(value.data, value.encoding as i64)
.ok_or_else(|| {
error!(
"Failed to decode value with invalid encoding {}",
value.encoding
);
ApiError::InternalServerError
})?;
MutationKind::Set(value)
}
(pb::MutationType::MDelete, _) => MutationKind::Delete,
(pb::MutationType::MSum, Some(value)) => {
let value = decode_value(value.data, value.encoding as i64)
.ok_or_else(|| {
error!(
"Failed to decode value with invalid encoding {}",
value.encoding
);
ApiError::InternalServerError
})?;
MutationKind::Sum(value)
}
(pb::MutationType::MMin, Some(value)) => {
let value = decode_value(value.data, value.encoding as i64)
.ok_or_else(|| {
error!(
"Failed to decode value with invalid encoding {}",
value.encoding
);
ApiError::InternalServerError
})?;
MutationKind::Min(value)
}
(pb::MutationType::MMax, Some(value)) => {
let value = decode_value(value.data, value.encoding as i64)
.ok_or_else(|| {
error!(
"Failed to decode value with invalid encoding {}",
value.encoding
);
ApiError::InternalServerError
})?;
MutationKind::Max(value)
}
_ => return Err(ApiError::InvalidMutationKind),
};
let expire_at = match mutation.expire_at_ms {
-1 | 0 => None,
millis @ 1.. => Some(
chrono::DateTime::UNIX_EPOCH
+ std::time::Duration::from_millis(
millis
.try_into()
.map_err(|_| ApiError::InvalidMutationExpireAt)?,
),
),
_ => return Err(ApiError::InvalidMutationExpireAt),
};
mutations.push(Mutation {
key: mutation.key,
expire_at,
kind,
})
}

let mut enqueues = Vec::with_capacity(atomic_write.enqueues.len());
for enqueue in atomic_write.enqueues {
if enqueue.payload.len() > limits::MAX_VALUE_SIZE_BYTES {
return Err(ApiError::ValueTooLong);
}
total_payload_size += enqueue.payload.len();
if enqueue.kv_keys_if_undelivered.len() > limits::MAX_QUEUE_UNDELIVERED_KEYS
{
return Err(ApiError::TooManyQueueUndeliveredKeys);
}
for key in &enqueue.kv_keys_if_undelivered {
if key.len() > limits::MAX_WRITE_KEY_SIZE_BYTES {
return Err(ApiError::KeyTooLong);
}
total_payload_size += key.len();
}
if enqueue.backoff_schedule.len() > limits::MAX_QUEUE_BACKOFF_INTERVALS {
return Err(ApiError::TooManyQueueBackoffIntervals);
}
for interval in &enqueue.backoff_schedule {
if *interval > limits::MAX_QUEUE_BACKOFF_MS {
return Err(ApiError::QueueBackoffIntervalTooLarge);
}
total_payload_size += 4;
}
let deadline = chrono::DateTime::UNIX_EPOCH
+ std::time::Duration::from_millis(
enqueue
.deadline_ms
.try_into()
.map_err(|_| ApiError::InvalidMutationEnqueueDeadline)?,
);
if utc_now().signed_duration_since(deadline).num_milliseconds()
> limits::MAX_QUEUE_DELAY_MS as i64
{
return Err(ApiError::InvalidMutationEnqueueDeadline);
}
enqueues.push(Enqueue {
payload: enqueue.payload,
backoff_schedule: if enqueue.backoff_schedule.is_empty() {
None
} else {
Some(enqueue.backoff_schedule)
},
deadline,
keys_if_undelivered: enqueue.kv_keys_if_undelivered,
});
}

if total_payload_size > limits::MAX_TOTAL_MUTATION_SIZE_BYTES {
return Err(ApiError::AtomicWriteTooLarge);
}

let atomic_write = AtomicWrite {
checks,
mutations,
enqueues,
};
let atomic_write: AtomicWrite = atomic_write.try_into()?;

let res = state.sqlite.atomic_write(atomic_write).await;

match res {
Ok(None) => Ok(Protobuf(pb::AtomicWriteOutput {
status: pb::AtomicWriteStatus::AwCheckFailure as i32,
failed_checks: vec![], // todo!
..Default::default()
})),
Ok(Some(commit_result)) => Ok(Protobuf(pb::AtomicWriteOutput {
status: pb::AtomicWriteStatus::AwSuccess as i32,
versionstamp: commit_result.versionstamp.to_vec(),
..Default::default()
})),
Ok(res) => Ok(Protobuf(res.into())),
Err(err) => {
error!("Failed to write to database: {}", err);
Err(ApiError::InternalServerError)
Expand Down Expand Up @@ -557,6 +327,39 @@ impl IntoResponse for ApiError {
}
}

impl From<ConvertError> for ApiError {
fn from(err: ConvertError) -> ApiError {
match err {
ConvertError::KeyTooLong => ApiError::KeyTooLong,
ConvertError::ValueTooLong => ApiError::ValueTooLong,
ConvertError::ReadRangeTooLarge => ApiError::ReadRangeTooLarge,
ConvertError::AtomicWriteTooLarge => ApiError::AtomicWriteTooLarge,
ConvertError::TooManyReadRanges => ApiError::TooManyReadRanges,
ConvertError::TooManyChecks => ApiError::TooManyChecks,
ConvertError::TooManyMutations => ApiError::TooManyMutations,
ConvertError::TooManyQueueUndeliveredKeys => {
ApiError::TooManyQueueUndeliveredKeys
}
ConvertError::TooManyQueueBackoffIntervals => {
ApiError::TooManyQueueBackoffIntervals
}
ConvertError::QueueBackoffIntervalTooLarge => {
ApiError::QueueBackoffIntervalTooLarge
}
ConvertError::InvalidReadRangeLimit => ApiError::InvalidReadRangeLimit,
ConvertError::DecodeError => ApiError::InternalServerError,
ConvertError::InvalidVersionstamp => ApiError::InvalidVersionstamp,
ConvertError::InvalidMutationKind => ApiError::InvalidMutationKind,
ConvertError::InvalidMutationExpireAt => {
ApiError::InvalidMutationExpireAt
}
ConvertError::InvalidMutationEnqueueDeadline => {
ApiError::InvalidMutationEnqueueDeadline
}
}
}
}

struct Protobuf<T: prost::Message>(T);

impl<T: prost::Message> IntoResponse for Protobuf<T> {
Expand Down
Loading

0 comments on commit 38a34e1

Please sign in to comment.