Skip to content

Commit

Permalink
feat(migrate-from-gcs): concurrent processing
Browse files Browse the repository at this point in the history
  • Loading branch information
alextes committed Oct 15, 2023
1 parent 540ad74 commit 1298b3f
Show file tree
Hide file tree
Showing 3 changed files with 159 additions and 114 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 5 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "migrate-payload-archive"
version = "0.1.0"
version = "0.2.0"
edition = "2021"

[dependencies]
Expand All @@ -24,7 +24,10 @@ object_store = { version = "0.7.1", default-features = false, features = [
"aws",
"gcp",
] }
serde = { version = "1.0.189", default-features = false, features = ["derive", "std"] }
serde = { version = "1.0.189", default-features = false, features = [
"derive",
"std",
] }
serde_json = { version = "1.0.107", default-features = false, features = [
"alloc",
"std",
Expand Down
264 changes: 153 additions & 111 deletions src/bin/migrate-from-gcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,39 +3,49 @@ use std::{
fs::File,
io::{Read, Write},
path::Path,
time::{Duration, SystemTime, UNIX_EPOCH},
sync::{atomic::AtomicU64, Arc, Mutex},
time::{Duration, SystemTime},
};

use chrono::{DateTime, Datelike, Timelike, Utc};
use flate2::{read::GzDecoder, write::GzEncoder, Compression};
use futures::{FutureExt, TryStreamExt};
use futures::{channel::mpsc::channel, FutureExt, SinkExt, StreamExt, TryStreamExt};
use lazy_static::lazy_static;
use migrate_payload_archive::{env::ENV_CONFIG, log};
use object_store::{
aws::AmazonS3Builder, gcp::GoogleCloudStorageBuilder, ObjectMeta, ObjectStore, RetryConfig,
};
use serde::Serialize;
use tokio::time::interval;
use tokio_util::io::StreamReader;
use tracing::{debug, info};
use tokio::{task::spawn_blocking, time::interval};
use tokio_util::io::{StreamReader, SyncIoBridge};
use tracing::{debug, info, trace};

const PROGRESS_FILE_PATH: &str = "progress.json";

fn read_last_file() -> anyhow::Result<String> {
fn read_progress() -> anyhow::Result<Option<(String, String)>> {
let progress_file_path = Path::new(PROGRESS_FILE_PATH);
if !progress_file_path.exists() {
// No progress file found, returning empty string
return Ok(String::new());
info!("no progress file found");
return Ok(None);
}
let mut file = File::open(progress_file_path)?;
let mut last_file = String::new();
file.read_to_string(&mut last_file)?;
Ok(last_file)
let mut iter = last_file.split(':');
let progress = (
iter.next().unwrap().to_string(),
iter.next().unwrap().to_string(),
);
info!(last_file = %progress.0, progress_id = %progress.1, "found progress file");
Ok(Some(progress))
}

fn write_last_file(last_file: &ObjectMeta) -> anyhow::Result<()> {
fn write_progress(last_file: &ObjectMeta, payload_id: &str) -> anyhow::Result<()> {
info!(last_file = %last_file.location, payload_id, "writing progress");
let mut file = File::create(Path::new(PROGRESS_FILE_PATH))?;
file.write_all(last_file.location.to_string().as_bytes())?;
let progress = format!("{}:{}", last_file.location.to_string(), payload_id);
file.write_all(progress.as_bytes())?;
Ok(())
}

Expand Down Expand Up @@ -70,6 +80,7 @@ fn get_ovh_object_store() -> anyhow::Result<impl ObjectStore> {
let s3_store = AmazonS3Builder::from_env()
.with_bucket_name(s3_bucket)
.with_retry(RetryConfig::default())
.with_retry(RetryConfig::default())
.build()?;
Ok(s3_store)
}
Expand Down Expand Up @@ -140,99 +151,6 @@ impl fmt::Debug for ExecutionPayload {
}
}

async fn migrate_bundle(
gcs: &impl ObjectStore,
ovh: &impl ObjectStore,
object: &ObjectMeta,
) -> anyhow::Result<()> {
info!(object = %object.location, size_mib=object.size / 1_000_000, "migrating bundle");

// Introduce a counter and a timer
let mut payloads_migrated = 0u64;
let mut timer = interval(Duration::from_secs(10));
let start_time = SystemTime::now();

let payload_stream = gcs.get(&object.location).await?.into_stream();
let reader = StreamReader::new(payload_stream);

let (decoded_tx, decoded_rx) = std::sync::mpsc::sync_channel(16);

let handle = tokio::task::spawn_blocking(move || {
let reader_sync = tokio_util::io::SyncIoBridge::new(reader);
let decoder = GzDecoder::new(reader_sync);
let mut csv_reader = csv::Reader::from_reader(decoder);
let mut iter = csv_reader.byte_records();

while let Some(record) = iter.next().transpose().unwrap() {
let execution_payload = {
unsafe {
ExecutionPayload {
block_hash: String::from_utf8_unchecked(record[4].into()),
id: String::from_utf8_unchecked(record[0].into()),
inserted_at: String::from_utf8_unchecked(record[1].into()),
payload: serde_json::from_slice(&record[6]).unwrap(),
proposer_pubkey: String::from_utf8_unchecked(record[3].into()),
slot: std::str::from_utf8_unchecked(&record[2])
.parse()
.map(Slot)
.unwrap(),
version: String::from_utf8_unchecked(record[5].into()),
}
}
};

decoded_tx.send(execution_payload).unwrap();
}
});

while let Ok(payload) = decoded_rx.recv() {
let timestamp_micros = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("expect duration since UNIX_EPOCH to be positive regardless of clock shift")
.as_micros();

let block_hash = &payload.block_hash;

let slot = &payload.slot;
let slot_date_time = slot.date_time();
let year = slot_date_time.year();
let month = slot_date_time.month();
let day = slot_date_time.day();
let hour = slot_date_time.hour();
let minute = slot_date_time.minute();

let path_string =
format!("old_formats/gcs/{year}/{month:02}/{day:02}/{hour:02}/{minute:02}/{slot}/{timestamp_micros}-{block_hash}.json.gz");
let path = object_store::path::Path::from(path_string);

let bytes = serde_json::to_vec(&payload).unwrap();
let bytes_gz = Vec::new();
let mut encoder = GzEncoder::new(bytes_gz, Compression::default());
encoder.write_all(&bytes).unwrap();
let bytes_gz = encoder.finish().unwrap();

ovh.put(&path, bytes_gz.into()).await.unwrap();

debug!(object = %path, "migrated");

payloads_migrated += 1; // Increment the counter for each payload migrated

// Check if it's time to report the migration rate
if timer.tick().now_or_never().is_some() {
let elapsed = SystemTime::now()
.duration_since(start_time)
.unwrap()
.as_secs();

print_migration_rate(payloads_migrated, elapsed);
}
}

handle.await?;

Ok(())
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
log::init();
Expand All @@ -241,17 +159,22 @@ async fn main() -> anyhow::Result<()> {
let gcs = get_gcs_object_store()?;

// Initialize OVH object store
let ovh = get_ovh_object_store()?;
let ovh = &get_ovh_object_store()?;
debug!("initialized object stores");

debug!("listing day bundles");
let day_bundle_meta_stream = gcs.list(None).await?;
let mut day_bundle_metas = day_bundle_meta_stream.try_collect::<Vec<_>>().await?;
debug!("found {} day bundles", day_bundle_metas.len());

let last_file = read_last_file()?;
let progress = read_progress()?;

day_bundle_metas.retain(|file| file.location.to_string() > last_file);
if let Some((last_file, last_id)) = progress.as_ref() {
info!(last_file = %last_file, last_id, "resuming migration");
day_bundle_metas.retain(|file| file.location.to_string() >= *last_file);
} else {
info!("starting migration from scratch");
}

if day_bundle_metas.is_empty() {
info!("no new files to process");
Expand All @@ -261,15 +184,134 @@ async fn main() -> anyhow::Result<()> {
// Sort the files by name to make sure we process them in order
day_bundle_metas.sort_by(|a, b| a.location.to_string().cmp(&b.location.to_string()));

for file in &day_bundle_metas {
migrate_bundle(&gcs, &ovh, file).await?;
for object_meta in &day_bundle_metas {
info!(object = %object_meta.location, size_mib=object_meta.size / 1_000_000, "migrating bundle");

// Introduce a counter and a timer
let payloads_migrated_counter = &AtomicU64::new(0);
let interval_10_seconds = &Arc::new(Mutex::new(interval(Duration::from_secs(10))));
let start_time = SystemTime::now();

let payload_stream = gcs.get(&object_meta.location).await?.into_stream();
let reader = StreamReader::new(payload_stream);

const DECODED_BUFFER_SIZE: usize = 32;
let (mut decoded_tx, decoded_rx) = channel(DECODED_BUFFER_SIZE);

let handle = spawn_blocking(move || {
let reader_sync = SyncIoBridge::new(reader);
let decoder = GzDecoder::new(reader_sync);
let mut csv_reader = csv::Reader::from_reader(decoder);
let mut iter = csv_reader.byte_records();

while let Some(record) = iter.next().transpose().unwrap() {
let execution_payload = {
unsafe {
ExecutionPayload {
block_hash: String::from_utf8_unchecked(record[4].into()),
id: String::from_utf8_unchecked(record[0].into()),
inserted_at: String::from_utf8_unchecked(record[1].into()),
payload: serde_json::from_slice(&record[6]).unwrap(),
proposer_pubkey: String::from_utf8_unchecked(record[3].into()),
slot: std::str::from_utf8_unchecked(&record[2])
.parse()
.map(Slot)
.unwrap(),
version: String::from_utf8_unchecked(record[5].into()),
}
}
};

futures::executor::block_on(decoded_tx.send(execution_payload)).unwrap();
}
});

const CONCURRENT_PUT_LIMIT: usize = 1;

// Skip payloads that have already been processed.
decoded_rx
.skip_while(|payload| {
match progress.as_ref() {
// If there was previous progress
Some((_last_file, last_id)) => {
// And the current payload matches our last progress, process remaining payloads in
// the stream.
if payload.id == *last_id {
debug!(payload_id = %payload.id, "found last processed payload");
futures::future::ready(false)
} else {
// Otherwise, skip this one.
trace!(payload_id = %payload.id, "skipping payload");
futures::future::ready(true)
}
}
// If there was no previous progress (first run), process all payloads in the stream.
None => futures::future::ready(false),
}
})
.map(Ok)
.try_for_each_concurrent(CONCURRENT_PUT_LIMIT, |payload| async move {
let block_hash = payload.block_hash.clone();
let payload_id = payload.id.clone();

debug!(block_hash, payload_id, "storing payload");

let slot = &payload.slot;
let slot_date_time = slot.date_time();
let year = slot_date_time.year();
let month = slot_date_time.month();
let day = slot_date_time.day();
let hour = slot_date_time.hour();
let minute = slot_date_time.minute();

let path_string =
format!("old_formats/gcs/{year}/{month:02}/{day:02}/{hour:02}/{minute:02}/{slot}/{payload_id}-{block_hash}.json.gz");
let path = object_store::path::Path::from(path_string);

let payload_id = payload.id.clone();

let bytes_gz = spawn_blocking(move || {
let bytes = serde_json::to_vec(&payload).unwrap();
let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
encoder.write_all(&bytes).unwrap();
encoder.finish().unwrap()
})
.await
.unwrap();

ovh.put(&path, bytes_gz.into()).await.unwrap();

let payloads_migrated_count = payloads_migrated_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);

// Check if it's time to report the migration rate
if interval_10_seconds.lock().unwrap().tick().now_or_never().is_some() {
let elapsed = SystemTime::now()
.duration_since(start_time)
.unwrap()
.as_secs();

print_migration_rate(payloads_migrated_count, elapsed);

// As we process concurrently on a sudden shut down, we may lose payloads we
// processed before this one by skipping over them when we resume.
write_progress(&object_meta, &payload_id)?;
}


write_last_file(file)?;
debug!(block_hash, payload_id, "payload stored");

Ok::<_, anyhow::Error>(())
})
.await?;

handle.await?;
}

// Migration complete, clean up the progress file
if last_file == day_bundle_metas.last().unwrap().location.to_string() {
cleanup_last_file()?;
if let Some((last_file, _row)) = progress {
if last_file == day_bundle_metas.last().unwrap().location.to_string() {
cleanup_last_file()?;
}
}

Ok(())
Expand Down

0 comments on commit 1298b3f

Please sign in to comment.