Skip to content

Commit

Permalink
feature(turbo): Port Run Cache (#5707)
Browse files Browse the repository at this point in the history
### Description

Ports runcache along with the required dependencies like `PrefixWriter`,
`LogWriter`, etc.

### Testing Instructions

Unfortunately there aren't many tests to port for runcache. I'll work on
writing some tests for the dependencies

---------

Co-authored-by: nicholaslyang <Nicholas Yang>
  • Loading branch information
NicholasLYang authored Aug 17, 2023
1 parent 4389640 commit c8b9439
Show file tree
Hide file tree
Showing 26 changed files with 783 additions and 171 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/turborepo-api-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ impl APIClient {
&self,
hash: &str,
artifact_body: &[u8],
duration: u32,
duration: u64,
tag: Option<&str>,
token: &str,
) -> Result<()> {
Expand Down
1 change: 1 addition & 0 deletions crates/turborepo-cache/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,5 @@ tokio = { workspace = true, features = ["full"] }
tracing = { workspace = true }
turbopath = { workspace = true }
turborepo-api-client = { workspace = true }
turborepo-ui = { workspace = true }
zstd = "0.12.3"
4 changes: 2 additions & 2 deletions crates/turborepo-cache/src/async_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ impl AsyncCache {
anchor: AbsoluteSystemPathBuf,
key: String,
files: Vec<AnchoredSystemPathBuf>,
duration: u32,
duration: u64,
) {
if self.workers.len() >= self.max_workers {
let _ = self.workers.next().await.unwrap();
Expand All @@ -50,7 +50,7 @@ impl AsyncCache {
}

pub async fn fetch(
&mut self,
&self,
anchor: &AbsoluteSystemPath,
key: &str,
team_id: &str,
Expand Down
4 changes: 2 additions & 2 deletions crates/turborepo-cache/src/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub struct FSCache {
#[derive(Debug, Deserialize, Serialize)]
struct CacheMetadata {
hash: String,
duration: u32,
duration: u64,
}

impl CacheMetadata {
Expand Down Expand Up @@ -118,7 +118,7 @@ impl FSCache {
anchor: &AbsoluteSystemPath,
hash: &str,
files: &[AnchoredSystemPathBuf],
duration: u32,
duration: u64,
) -> Result<(), CacheError> {
let cache_path = self
.cache_directory
Expand Down
6 changes: 3 additions & 3 deletions crates/turborepo-cache/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl HTTPCache {
anchor: &AbsoluteSystemPath,
hash: &str,
files: &[AnchoredSystemPathBuf],
duration: u32,
duration: u64,
) -> Result<(), CacheError> {
let mut artifact_body = Vec::new();
self.write(&mut artifact_body, anchor, files).await?;
Expand Down Expand Up @@ -105,14 +105,14 @@ impl HTTPCache {
})
}

fn get_duration_from_response(response: &Response) -> Result<u32, CacheError> {
fn get_duration_from_response(response: &Response) -> Result<u64, CacheError> {
if let Some(duration_value) = response.headers().get("x-artifact-duration") {
let duration = duration_value
.to_str()
.map_err(|_| CacheError::InvalidDuration(Backtrace::capture()))?;

duration
.parse::<u32>()
.parse::<u64>()
.map_err(|_| CacheError::InvalidDuration(Backtrace::capture()))
} else {
Ok(0)
Expand Down
4 changes: 2 additions & 2 deletions crates/turborepo-cache/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ pub enum CacheSource {

#[derive(Debug, Clone, PartialEq)]
pub struct CacheResponse {
source: CacheSource,
time_saved: u32,
pub source: CacheSource,
pub time_saved: u64,
}

#[derive(Debug, Default)]
Expand Down
2 changes: 1 addition & 1 deletion crates/turborepo-cache/src/multiplexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ impl CacheMultiplexer {
anchor: &AbsoluteSystemPath,
key: &str,
files: &[AnchoredSystemPathBuf],
duration: u32,
duration: u64,
) -> Result<(), CacheError> {
self.fs
.as_ref()
Expand Down
2 changes: 1 addition & 1 deletion crates/turborepo-cache/src/test_cases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ impl TestFile {

pub(crate) struct TestCase {
pub files: Vec<TestFile>,
pub duration: u32,
pub duration: u64,
pub hash: &'static str,
}

Expand Down
4 changes: 2 additions & 2 deletions crates/turborepo-lib/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use anyhow::{anyhow, Result};
use camino::Utf8PathBuf;
use clap::{ArgAction, CommandFactory, Parser, Subcommand, ValueEnum};
use clap_complete::{generate, Shell};
use serde::Serialize;
use serde::{Deserialize, Serialize};
use tracing::{debug, error};
use turbopath::AbsoluteSystemPathBuf;
use turborepo_ui::UI;
Expand All @@ -23,7 +23,7 @@ use crate::{
// turbo can use it for package inference.
pub const INVOCATION_DIR_ENV_VAR: &str = "TURBO_INVOCATION_DIR";

#[derive(Copy, Clone, Debug, PartialEq, Serialize, ValueEnum)]
#[derive(Copy, Clone, Debug, PartialEq, Eq, Deserialize, Serialize, ValueEnum)]
pub enum OutputLogsMode {
#[serde(rename = "full")]
Full,
Expand Down
7 changes: 1 addition & 6 deletions crates/turborepo-lib/src/commands/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,7 @@ use turborepo_api_client::{APIClient, CachingStatus, Space, Team};
use turborepo_ui::CYAN;
use turborepo_ui::{BOLD, GREY, UNDERLINE};

use crate::{
cli::LinkTarget,
commands::CommandBase,
config::{RawTurboJSON, SpacesJson},
rewrite_json,
};
use crate::{cli::LinkTarget, commands::CommandBase, rewrite_json};

#[derive(Clone)]
pub(crate) enum SelectedTeam<'a> {
Expand Down
2 changes: 1 addition & 1 deletion crates/turborepo-lib/src/commands/unlink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::fs;
use anyhow::{Context, Result};
use turborepo_ui::GREY;

use crate::{cli::LinkTarget, commands::CommandBase, config::RawTurboJSON, rewrite_json};
use crate::{cli::LinkTarget, commands::CommandBase, rewrite_json};

enum UnlinkSpacesResult {
Unlinked,
Expand Down
26 changes: 13 additions & 13 deletions crates/turborepo-lib/src/config/turbo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,11 @@ use turbopath::{AbsoluteSystemPath, RelativeUnixPathBuf};
use turborepo_cache::RemoteCacheOpts;

use crate::{
cli::OutputLogsMode,
config::Error,
package_json::PackageJson,
run::task_id::{TaskId, TaskName, ROOT_PKG_NAME},
task_graph::{
BookkeepingTaskDefinition, Pipeline, TaskDefinitionHashable, TaskOutputMode, TaskOutputs,
},
task_graph::{BookkeepingTaskDefinition, Pipeline, TaskDefinitionHashable, TaskOutputs},
};

#[derive(Serialize, Deserialize, Debug, Default, PartialEq, Clone)]
Expand Down Expand Up @@ -92,7 +91,7 @@ struct RawTaskDefinition {
#[serde(skip_serializing_if = "Option::is_none")]
outputs: Option<Vec<String>>,
#[serde(skip_serializing_if = "Option::is_none")]
output_mode: Option<TaskOutputMode>,
output_mode: Option<OutputLogsMode>,
}

const CONFIG_FILE: &str = "turbo.json";
Expand Down Expand Up @@ -563,12 +562,13 @@ mod tests {

use super::RawTurboJSON;
use crate::{
cli::OutputLogsMode,
config::{turbo::RawTaskDefinition, TurboJson},
package_json::PackageJson,
run::task_id::TaskName,
task_graph::{
BookkeepingTaskDefinition, TaskDefinitionExperiments, TaskDefinitionHashable,
TaskOutputMode, TaskOutputs,
TaskOutputs,
},
};

Expand Down Expand Up @@ -729,7 +729,7 @@ mod tests {
outputs: Some(vec!["package/a/dist".to_string()]),
cache: Some(false),
inputs: Some(vec!["package/a/src/**".to_string()]),
output_mode: Some(TaskOutputMode::Full),
output_mode: Some(OutputLogsMode::Full),
persistent: Some(true),
},
BookkeepingTaskDefinition {
Expand Down Expand Up @@ -757,7 +757,7 @@ mod tests {
},
cache: false,
inputs: vec!["package/a/src/**".to_string()],
output_mode: TaskOutputMode::Full,
output_mode: OutputLogsMode::Full,
pass_through_env: vec!["AWS_SECRET_KEY".to_string()],
task_dependencies: vec!["cli#build".into()],
topological_dependencies: vec![],
Expand Down Expand Up @@ -823,13 +823,13 @@ mod tests {
assert_eq!(pruned_json, expected);
}

#[test_case("full", Some(TaskOutputMode::Full) ; "full")]
#[test_case("hash-only", Some(TaskOutputMode::HashOnly) ; "hash-only")]
#[test_case("new-only", Some(TaskOutputMode::NewOnly) ; "new-only")]
#[test_case("errors-only", Some(TaskOutputMode::ErrorsOnly) ; "errors-only")]
#[test_case("none", Some(TaskOutputMode::None) ; "none")]
#[test_case("full", Some(OutputLogsMode::Full) ; "full")]
#[test_case("hash-only", Some(OutputLogsMode::HashOnly) ; "hash-only")]
#[test_case("new-only", Some(OutputLogsMode::NewOnly) ; "new-only")]
#[test_case("errors-only", Some(OutputLogsMode::ErrorsOnly) ; "errors-only")]
#[test_case("none", Some(OutputLogsMode::None) ; "none")]
#[test_case("junk", None ; "invalid value")]
fn test_parsing_output_mode(output_mode: &str, expected: Option<TaskOutputMode>) {
fn test_parsing_output_mode(output_mode: &str, expected: Option<OutputLogsMode>) {
let json: Result<RawTurboJSON, _> = serde_json::from_value(serde_json::json!({
"pipeline": {
"build": {
Expand Down
7 changes: 3 additions & 4 deletions crates/turborepo-lib/src/daemon/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ pub mod proto {
tonic::include_proto!("turbodprotocol");
}

#[derive(Debug)]
pub struct DaemonClient<T> {
#[derive(Debug, Clone)]
pub struct DaemonClient<T: Clone> {
client: TurbodClient<tonic::transport::Channel>,
connect_settings: T,
}

impl<T> DaemonClient<T> {
impl<T: Clone> DaemonClient<T> {
/// Interrogate the server for its version.
pub(super) async fn handshake(&mut self) -> Result<(), DaemonError> {
let _ret = self
Expand Down Expand Up @@ -70,7 +70,6 @@ impl DaemonClient<DaemonConnector> {
self.stop().await?.connect().await.map_err(Into::into)
}

#[allow(dead_code)]
pub async fn get_changed_outputs(
&mut self,
hash: String,
Expand Down
2 changes: 1 addition & 1 deletion crates/turborepo-lib/src/daemon/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ pub enum ForkError {
Spawn(#[from] std::io::Error),
}

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct DaemonConnector {
/// Whether the connector is allowed to start a daemon if it is not already
/// running.
Expand Down
20 changes: 16 additions & 4 deletions crates/turborepo-lib/src/opts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ use turbopath::AnchoredSystemPathBuf;
use turborepo_cache::CacheOpts;

use crate::{
cli::{Command, DryRunMode, EnvMode, LogPrefix, RunArgs},
daemon::{DaemonClient, DaemonConnector},
cli::{Command, DryRunMode, EnvMode, LogPrefix, OutputLogsMode, RunArgs},
Args,
};

Expand All @@ -26,19 +25,32 @@ impl<'a> TryFrom<&'a Args> for Opts<'a> {
let run_opts = RunOpts::try_from(run_args.as_ref())?;
let cache_opts = CacheOpts::from(run_args.as_ref());
let scope_opts = ScopeOpts::try_from(run_args.as_ref())?;
let runcache_opts = RunCacheOpts::from(run_args.as_ref());

Ok(Self {
run_opts,
cache_opts,
scope_opts,
runcache_opts: RunCacheOpts::default(),
runcache_opts,
})
}
}

#[derive(Debug, Default)]
pub struct RunCacheOpts {
pub(crate) output_watcher: Option<DaemonClient<DaemonConnector>>,
pub(crate) skip_reads: bool,
pub(crate) skip_writes: bool,
pub(crate) task_output_mode_override: Option<OutputLogsMode>,
}

impl<'a> From<&'a RunArgs> for RunCacheOpts {
fn from(args: &'a RunArgs) -> Self {
RunCacheOpts {
skip_reads: args.force.flatten().is_some_and(|f| f),
skip_writes: args.no_cache,
task_output_mode_override: args.output_logs,
}
}
}

#[derive(Debug)]
Expand Down
37 changes: 37 additions & 0 deletions crates/turborepo-lib/src/package_task.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
use turbopath::AbsoluteSystemPathBuf;

use crate::{
cli::EnvMode,
package_json::PackageJson,
task_graph::{TaskDefinition, TaskOutputs},
};

pub struct PackageTask {
pub task_id: TaskId<'static>,
pkg: PackageJson,
env_mode: EnvMode,
pub(crate) task_definition: TaskDefinition,
pub dir: AbsoluteSystemPathBuf,
command: String,
outputs: Vec<String>,
excluded_outputs: Vec<String>,
pub(crate) log_file: String,
hash: String,
}

impl PackageTask {
pub fn hashable_outputs(&self) -> TaskOutputs {
let mut inclusion_outputs = vec![format!(".turbo/turbo-{}.log", self.task)];
inclusion_outputs.extend_from_slice(&self.task_definition.outputs.inclusions[..]);

let mut hashable = TaskOutputs {
inclusions: inclusion_outputs,
exclusions: self.task_definition.outputs.exclusions.clone(),
};

hashable.inclusions.sort();
hashable.exclusions.sort();

hashable
}
}
Loading

0 comments on commit c8b9439

Please sign in to comment.