diff --git a/denokv/main.rs b/denokv/main.rs index d4ab7ef..00ddba1 100644 --- a/denokv/main.rs +++ b/denokv/main.rs @@ -1,5 +1,4 @@ use std::borrow::Cow; -use std::num::NonZeroU32; use std::path::Path; use std::sync::Arc; @@ -24,18 +23,14 @@ use chrono::Duration; use clap::Parser; use config::Config; use constant_time_eq::constant_time_eq; +use denokv_proto::convert::ConvertError; use denokv_proto::datapath as pb; -use denokv_proto::decode_value; -use denokv_proto::encode_value; +use denokv_proto::time::utc_now; use denokv_proto::AtomicWrite; -use denokv_proto::Check; use denokv_proto::Consistency; use denokv_proto::DatabaseMetadata; use denokv_proto::EndpointInfo; -use denokv_proto::Enqueue; use denokv_proto::MetadataExchangeRequest; -use denokv_proto::Mutation; -use denokv_proto::MutationKind; use denokv_proto::ReadRange; use denokv_proto::SnapshotReadOptions; use denokv_sqlite::Connection; @@ -45,13 +40,10 @@ use log::info; use prost::DecodeError; use rand::SeedableRng; use thiserror::Error; -use time::utc_now; use tokio::sync::Notify; use uuid::Uuid; mod config; -mod limits; -mod time; #[derive(Clone)] struct AppState { @@ -204,33 +196,8 @@ async fn snapshot_read_endpoint( State(state): State, Protobuf(snapshot_read): Protobuf, ) -> Result, ApiError> { - if snapshot_read.ranges.len() > limits::MAX_READ_RANGES { - return Err(ApiError::TooManyReadRanges); - } - let mut requests = Vec::with_capacity(snapshot_read.ranges.len()); - let mut total_limit: usize = 0; - for range in snapshot_read.ranges { - let limit: NonZeroU32 = u32::try_from(range.limit) - .map_err(|_| ApiError::InvalidReadRangeLimit)? - .try_into() - .map_err(|_| ApiError::InvalidReadRangeLimit)?; - if range.start.len() > limits::MAX_READ_KEY_SIZE_BYTES { - return Err(ApiError::KeyTooLong); - } - if range.end.len() > limits::MAX_READ_KEY_SIZE_BYTES { - return Err(ApiError::KeyTooLong); - } - total_limit += limit.get() as usize; - requests.push(ReadRange { - start: range.start, - end: range.end, - reverse: range.reverse, - limit, - }); - } - if total_limit > limits::MAX_READ_ENTRIES { - return Err(ApiError::ReadRangeTooLarge); - } + let requests: Vec = snapshot_read.try_into()?; + let options = SnapshotReadOptions { consistency: Consistency::Strong, }; @@ -244,29 +211,8 @@ async fn snapshot_read_endpoint( ApiError::InternalServerError })?; - let mut ranges = Vec::with_capacity(result_ranges.len()); - for range in result_ranges { - let values = range - .entries - .into_iter() - .map(|entry| { - let (value, encoding) = encode_value(&entry.value); - pb::KvEntry { - key: entry.key, - value: value.into_owned(), - encoding: encoding as i32, - versionstamp: entry.versionstamp.to_vec(), - } - }) - .collect(); - ranges.push(pb::ReadRangeOutput { values }); - } - - Ok(Protobuf(pb::SnapshotReadOutput { - ranges, - read_disabled: false, - read_is_strongly_consistent: true, - })) + let res = result_ranges.into(); + Ok(Protobuf(res)) } #[debug_handler] @@ -274,188 +220,12 @@ async fn atomic_write_endpoint( State(state): State, Protobuf(atomic_write): Protobuf, ) -> Result, ApiError> { - if atomic_write.checks.len() > limits::MAX_CHECKS { - return Err(ApiError::TooManyChecks); - } - if atomic_write.mutations.len() + atomic_write.enqueues.len() - > limits::MAX_MUTATIONS - { - return Err(ApiError::TooManyMutations); - } - - let mut total_payload_size = 0; - - let mut checks = Vec::with_capacity(atomic_write.checks.len()); - for check in atomic_write.checks { - if check.key.len() > limits::MAX_READ_KEY_SIZE_BYTES { - return Err(ApiError::KeyTooLong); - } - total_payload_size += check.key.len(); - checks.push(Check { - key: check.key, - versionstamp: match check.versionstamp.len() { - 0 => None, - 10 => { - let mut versionstamp = [0; 10]; - versionstamp.copy_from_slice(&check.versionstamp); - Some(versionstamp) - } - _ => return Err(ApiError::InvalidVersionstamp), - }, - }); - } - - let mut mutations = Vec::with_capacity(atomic_write.mutations.len()); - for mutation in atomic_write.mutations { - if mutation.key.len() > limits::MAX_WRITE_KEY_SIZE_BYTES { - return Err(ApiError::KeyTooLong); - } - total_payload_size += mutation.key.len(); - let value_size = mutation.value.as_ref().map(|v| v.data.len()).unwrap_or(0); - if value_size > limits::MAX_VALUE_SIZE_BYTES { - return Err(ApiError::ValueTooLong); - } - total_payload_size += value_size; - - let kind = match (mutation.mutation_type(), mutation.value) { - (pb::MutationType::MSet, Some(value)) => { - let value = decode_value(value.data, value.encoding as i64) - .ok_or_else(|| { - error!( - "Failed to decode value with invalid encoding {}", - value.encoding - ); - ApiError::InternalServerError - })?; - MutationKind::Set(value) - } - (pb::MutationType::MDelete, _) => MutationKind::Delete, - (pb::MutationType::MSum, Some(value)) => { - let value = decode_value(value.data, value.encoding as i64) - .ok_or_else(|| { - error!( - "Failed to decode value with invalid encoding {}", - value.encoding - ); - ApiError::InternalServerError - })?; - MutationKind::Sum(value) - } - (pb::MutationType::MMin, Some(value)) => { - let value = decode_value(value.data, value.encoding as i64) - .ok_or_else(|| { - error!( - "Failed to decode value with invalid encoding {}", - value.encoding - ); - ApiError::InternalServerError - })?; - MutationKind::Min(value) - } - (pb::MutationType::MMax, Some(value)) => { - let value = decode_value(value.data, value.encoding as i64) - .ok_or_else(|| { - error!( - "Failed to decode value with invalid encoding {}", - value.encoding - ); - ApiError::InternalServerError - })?; - MutationKind::Max(value) - } - _ => return Err(ApiError::InvalidMutationKind), - }; - let expire_at = match mutation.expire_at_ms { - -1 | 0 => None, - millis @ 1.. => Some( - chrono::DateTime::UNIX_EPOCH - + std::time::Duration::from_millis( - millis - .try_into() - .map_err(|_| ApiError::InvalidMutationExpireAt)?, - ), - ), - _ => return Err(ApiError::InvalidMutationExpireAt), - }; - mutations.push(Mutation { - key: mutation.key, - expire_at, - kind, - }) - } - - let mut enqueues = Vec::with_capacity(atomic_write.enqueues.len()); - for enqueue in atomic_write.enqueues { - if enqueue.payload.len() > limits::MAX_VALUE_SIZE_BYTES { - return Err(ApiError::ValueTooLong); - } - total_payload_size += enqueue.payload.len(); - if enqueue.kv_keys_if_undelivered.len() > limits::MAX_QUEUE_UNDELIVERED_KEYS - { - return Err(ApiError::TooManyQueueUndeliveredKeys); - } - for key in &enqueue.kv_keys_if_undelivered { - if key.len() > limits::MAX_WRITE_KEY_SIZE_BYTES { - return Err(ApiError::KeyTooLong); - } - total_payload_size += key.len(); - } - if enqueue.backoff_schedule.len() > limits::MAX_QUEUE_BACKOFF_INTERVALS { - return Err(ApiError::TooManyQueueBackoffIntervals); - } - for interval in &enqueue.backoff_schedule { - if *interval > limits::MAX_QUEUE_BACKOFF_MS { - return Err(ApiError::QueueBackoffIntervalTooLarge); - } - total_payload_size += 4; - } - let deadline = chrono::DateTime::UNIX_EPOCH - + std::time::Duration::from_millis( - enqueue - .deadline_ms - .try_into() - .map_err(|_| ApiError::InvalidMutationEnqueueDeadline)?, - ); - if utc_now().signed_duration_since(deadline).num_milliseconds() - > limits::MAX_QUEUE_DELAY_MS as i64 - { - return Err(ApiError::InvalidMutationEnqueueDeadline); - } - enqueues.push(Enqueue { - payload: enqueue.payload, - backoff_schedule: if enqueue.backoff_schedule.is_empty() { - None - } else { - Some(enqueue.backoff_schedule) - }, - deadline, - keys_if_undelivered: enqueue.kv_keys_if_undelivered, - }); - } - - if total_payload_size > limits::MAX_TOTAL_MUTATION_SIZE_BYTES { - return Err(ApiError::AtomicWriteTooLarge); - } - - let atomic_write = AtomicWrite { - checks, - mutations, - enqueues, - }; + let atomic_write: AtomicWrite = atomic_write.try_into()?; let res = state.sqlite.atomic_write(atomic_write).await; match res { - Ok(None) => Ok(Protobuf(pb::AtomicWriteOutput { - status: pb::AtomicWriteStatus::AwCheckFailure as i32, - failed_checks: vec![], // todo! - ..Default::default() - })), - Ok(Some(commit_result)) => Ok(Protobuf(pb::AtomicWriteOutput { - status: pb::AtomicWriteStatus::AwSuccess as i32, - versionstamp: commit_result.versionstamp.to_vec(), - ..Default::default() - })), + Ok(res) => Ok(Protobuf(res.into())), Err(err) => { error!("Failed to write to database: {}", err); Err(ApiError::InternalServerError) @@ -557,6 +327,39 @@ impl IntoResponse for ApiError { } } +impl From for ApiError { + fn from(err: ConvertError) -> ApiError { + match err { + ConvertError::KeyTooLong => ApiError::KeyTooLong, + ConvertError::ValueTooLong => ApiError::ValueTooLong, + ConvertError::ReadRangeTooLarge => ApiError::ReadRangeTooLarge, + ConvertError::AtomicWriteTooLarge => ApiError::AtomicWriteTooLarge, + ConvertError::TooManyReadRanges => ApiError::TooManyReadRanges, + ConvertError::TooManyChecks => ApiError::TooManyChecks, + ConvertError::TooManyMutations => ApiError::TooManyMutations, + ConvertError::TooManyQueueUndeliveredKeys => { + ApiError::TooManyQueueUndeliveredKeys + } + ConvertError::TooManyQueueBackoffIntervals => { + ApiError::TooManyQueueBackoffIntervals + } + ConvertError::QueueBackoffIntervalTooLarge => { + ApiError::QueueBackoffIntervalTooLarge + } + ConvertError::InvalidReadRangeLimit => ApiError::InvalidReadRangeLimit, + ConvertError::DecodeError => ApiError::InternalServerError, + ConvertError::InvalidVersionstamp => ApiError::InvalidVersionstamp, + ConvertError::InvalidMutationKind => ApiError::InvalidMutationKind, + ConvertError::InvalidMutationExpireAt => { + ApiError::InvalidMutationExpireAt + } + ConvertError::InvalidMutationEnqueueDeadline => { + ApiError::InvalidMutationEnqueueDeadline + } + } + } +} + struct Protobuf(T); impl IntoResponse for Protobuf { diff --git a/proto/convert.rs b/proto/convert.rs new file mode 100644 index 0000000..b0680e5 --- /dev/null +++ b/proto/convert.rs @@ -0,0 +1,274 @@ +// Copyright 2023 the Deno authors. All rights reserved. MIT license. + +use std::num::NonZeroU32; + +use crate::datapath as pb; +use crate::AtomicWrite; +use crate::Check; +use crate::CommitResult; +use crate::Enqueue; +use crate::Mutation; +use crate::MutationKind; +use crate::ReadRangeOutput; + +use crate::decode_value; +use crate::encode_value; +use crate::limits; +use crate::time::utc_now; +use crate::ReadRange; + +pub enum ConvertError { + KeyTooLong, + ValueTooLong, + ReadRangeTooLarge, + AtomicWriteTooLarge, + TooManyReadRanges, + TooManyChecks, + TooManyMutations, + TooManyQueueUndeliveredKeys, + TooManyQueueBackoffIntervals, + QueueBackoffIntervalTooLarge, + InvalidReadRangeLimit, + DecodeError, + InvalidVersionstamp, + InvalidMutationKind, + InvalidMutationExpireAt, + InvalidMutationEnqueueDeadline, +} + +impl TryFrom for Vec { + type Error = ConvertError; + + fn try_from( + snapshot_read: pb::SnapshotRead, + ) -> Result, ConvertError> { + if snapshot_read.ranges.len() > limits::MAX_READ_RANGES { + return Err(ConvertError::TooManyReadRanges); + } + let mut requests = Vec::with_capacity(snapshot_read.ranges.len()); + let mut total_limit: usize = 0; + for range in snapshot_read.ranges { + let limit: NonZeroU32 = u32::try_from(range.limit) + .map_err(|_| ConvertError::InvalidReadRangeLimit)? + .try_into() + .map_err(|_| ConvertError::InvalidReadRangeLimit)?; + if range.start.len() > limits::MAX_READ_KEY_SIZE_BYTES { + return Err(ConvertError::KeyTooLong); + } + if range.end.len() > limits::MAX_READ_KEY_SIZE_BYTES { + return Err(ConvertError::KeyTooLong); + } + total_limit += limit.get() as usize; + requests.push(ReadRange { + start: range.start, + end: range.end, + reverse: range.reverse, + limit, + }); + } + if total_limit > limits::MAX_READ_ENTRIES { + return Err(ConvertError::ReadRangeTooLarge); + } + Ok(requests) + } +} + +impl TryFrom for AtomicWrite { + type Error = ConvertError; + + fn try_from( + atomic_write: pb::AtomicWrite, + ) -> Result { + if atomic_write.checks.len() > limits::MAX_CHECKS { + return Err(ConvertError::TooManyChecks); + } + if atomic_write.mutations.len() + atomic_write.enqueues.len() + > limits::MAX_MUTATIONS + { + return Err(ConvertError::TooManyMutations); + } + + let mut total_payload_size = 0; + + let mut checks = Vec::with_capacity(atomic_write.checks.len()); + for check in atomic_write.checks { + if check.key.len() > limits::MAX_READ_KEY_SIZE_BYTES { + return Err(ConvertError::KeyTooLong); + } + total_payload_size += check.key.len(); + checks.push(Check { + key: check.key, + versionstamp: match check.versionstamp.len() { + 0 => None, + 10 => { + let mut versionstamp = [0; 10]; + versionstamp.copy_from_slice(&check.versionstamp); + Some(versionstamp) + } + _ => return Err(ConvertError::InvalidVersionstamp), + }, + }); + } + + let mut mutations = Vec::with_capacity(atomic_write.mutations.len()); + for mutation in atomic_write.mutations { + if mutation.key.len() > limits::MAX_WRITE_KEY_SIZE_BYTES { + return Err(ConvertError::KeyTooLong); + } + total_payload_size += mutation.key.len(); + let value_size = + mutation.value.as_ref().map(|v| v.data.len()).unwrap_or(0); + if value_size > limits::MAX_VALUE_SIZE_BYTES { + return Err(ConvertError::ValueTooLong); + } + total_payload_size += value_size; + + let kind = match (mutation.mutation_type(), mutation.value) { + (pb::MutationType::MSet, Some(value)) => { + let value = decode_value(value.data, value.encoding as i64) + .ok_or(ConvertError::DecodeError)?; + MutationKind::Set(value) + } + (pb::MutationType::MDelete, _) => MutationKind::Delete, + (pb::MutationType::MSum, Some(value)) => { + let value = decode_value(value.data, value.encoding as i64) + .ok_or(ConvertError::DecodeError)?; + MutationKind::Sum(value) + } + (pb::MutationType::MMin, Some(value)) => { + let value = decode_value(value.data, value.encoding as i64) + .ok_or(ConvertError::DecodeError)?; + MutationKind::Min(value) + } + (pb::MutationType::MMax, Some(value)) => { + let value = decode_value(value.data, value.encoding as i64) + .ok_or(ConvertError::DecodeError)?; + MutationKind::Max(value) + } + _ => return Err(ConvertError::InvalidMutationKind), + }; + let expire_at = match mutation.expire_at_ms { + -1 | 0 => None, + millis @ 1.. => Some( + chrono::DateTime::UNIX_EPOCH + + std::time::Duration::from_millis( + millis + .try_into() + .map_err(|_| ConvertError::InvalidMutationExpireAt)?, + ), + ), + _ => return Err(ConvertError::InvalidMutationExpireAt), + }; + mutations.push(Mutation { + key: mutation.key, + expire_at, + kind, + }) + } + + let mut enqueues = Vec::with_capacity(atomic_write.enqueues.len()); + for enqueue in atomic_write.enqueues { + if enqueue.payload.len() > limits::MAX_VALUE_SIZE_BYTES { + return Err(ConvertError::ValueTooLong); + } + total_payload_size += enqueue.payload.len(); + if enqueue.kv_keys_if_undelivered.len() + > limits::MAX_QUEUE_UNDELIVERED_KEYS + { + return Err(ConvertError::TooManyQueueUndeliveredKeys); + } + for key in &enqueue.kv_keys_if_undelivered { + if key.len() > limits::MAX_WRITE_KEY_SIZE_BYTES { + return Err(ConvertError::KeyTooLong); + } + total_payload_size += key.len(); + } + if enqueue.backoff_schedule.len() > limits::MAX_QUEUE_BACKOFF_INTERVALS { + return Err(ConvertError::TooManyQueueBackoffIntervals); + } + for interval in &enqueue.backoff_schedule { + if *interval > limits::MAX_QUEUE_BACKOFF_MS { + return Err(ConvertError::QueueBackoffIntervalTooLarge); + } + total_payload_size += 4; + } + let deadline = chrono::DateTime::UNIX_EPOCH + + std::time::Duration::from_millis( + enqueue + .deadline_ms + .try_into() + .map_err(|_| ConvertError::InvalidMutationEnqueueDeadline)?, + ); + if utc_now().signed_duration_since(deadline).num_milliseconds() + > limits::MAX_QUEUE_DELAY_MS as i64 + { + return Err(ConvertError::InvalidMutationEnqueueDeadline); + } + enqueues.push(Enqueue { + payload: enqueue.payload, + backoff_schedule: if enqueue.backoff_schedule.is_empty() { + None + } else { + Some(enqueue.backoff_schedule) + }, + deadline, + keys_if_undelivered: enqueue.kv_keys_if_undelivered, + }); + } + + if total_payload_size > limits::MAX_TOTAL_MUTATION_SIZE_BYTES { + return Err(ConvertError::AtomicWriteTooLarge); + } + + Ok(AtomicWrite { + checks, + mutations, + enqueues, + }) + } +} + +impl From> for pb::SnapshotReadOutput { + fn from(result_ranges: Vec) -> pb::SnapshotReadOutput { + let mut ranges = Vec::with_capacity(result_ranges.len()); + for range in result_ranges { + let values = range + .entries + .into_iter() + .map(|entry| { + let (value, encoding) = encode_value(&entry.value); + pb::KvEntry { + key: entry.key, + value: value.into_owned(), + encoding: encoding as i32, + versionstamp: entry.versionstamp.to_vec(), + } + }) + .collect(); + ranges.push(pb::ReadRangeOutput { values }); + } + + pb::SnapshotReadOutput { + ranges, + read_disabled: false, + read_is_strongly_consistent: true, + } + } +} + +impl From> for pb::AtomicWriteOutput { + fn from(commit_result: Option) -> pb::AtomicWriteOutput { + match commit_result { + None => pb::AtomicWriteOutput { + status: pb::AtomicWriteStatus::AwCheckFailure as i32, + failed_checks: vec![], // todo! + ..Default::default() + }, + Some(commit_result) => pb::AtomicWriteOutput { + status: pb::AtomicWriteStatus::AwSuccess as i32, + versionstamp: commit_result.versionstamp.to_vec(), + ..Default::default() + }, + } + } +} diff --git a/proto/lib.rs b/proto/lib.rs index 39d40ae..b7c2f3e 100644 --- a/proto/lib.rs +++ b/proto/lib.rs @@ -1,8 +1,11 @@ // Copyright 2023 the Deno authors. All rights reserved. MIT license. mod codec; +pub mod convert; mod interface; +mod limits; mod protobuf; +pub mod time; pub use crate::codec::decode_key; pub use crate::codec::encode_key; diff --git a/denokv/limits.rs b/proto/limits.rs similarity index 89% rename from denokv/limits.rs rename to proto/limits.rs index 953241d..e6be825 100644 --- a/denokv/limits.rs +++ b/proto/limits.rs @@ -1,3 +1,5 @@ +// Copyright 2023 the Deno authors. All rights reserved. MIT license. + pub const MAX_WRITE_KEY_SIZE_BYTES: usize = 2048; pub const MAX_READ_KEY_SIZE_BYTES: usize = MAX_WRITE_KEY_SIZE_BYTES + 1; pub const MAX_VALUE_SIZE_BYTES: usize = 65536; diff --git a/denokv/time.rs b/proto/time.rs similarity index 100% rename from denokv/time.rs rename to proto/time.rs