From f49acf11cf70f85d110c7baae8dd885338e63030 Mon Sep 17 00:00:00 2001 From: Robert Kruszewski Date: Thu, 4 Jul 2024 17:32:33 +0100 Subject: [PATCH] fixed --- vortex-ipc/src/io/object_store.rs | 52 +++++++++++++++++++------------ 1 file changed, 32 insertions(+), 20 deletions(-) diff --git a/vortex-ipc/src/io/object_store.rs b/vortex-ipc/src/io/object_store.rs index 9e487b2030..d6ef0a353f 100644 --- a/vortex-ipc/src/io/object_store.rs +++ b/vortex-ipc/src/io/object_store.rs @@ -1,10 +1,11 @@ use std::future::Future; use std::io::Cursor; use std::ops::Range; +use std::{io, mem}; -use bytes::{Bytes, BytesMut}; +use bytes::BytesMut; use object_store::path::Path; -use object_store::{ObjectStore, PutPayload}; +use object_store::{ObjectStore, WriteMultipart}; use vortex_buffer::io_buf::IoBuf; use vortex_buffer::Buffer; use vortex_error::VortexResult; @@ -20,7 +21,10 @@ pub trait ObjectStoreExt { fn vortex_reader(&self, location: &Path) -> impl VortexReadAt; - fn vortex_writer(&self, location: &Path) -> impl VortexWrite; + fn vortex_writer( + &self, + location: &Path, + ) -> impl Future>; } impl ObjectStoreExt for O { @@ -37,8 +41,11 @@ impl ObjectStoreExt for O { ObjectStoreReadAt::new(self, location) } - fn vortex_writer(&self, location: &Path) -> impl VortexWrite { - ObjectStoreWriter::new(self, location) + async fn vortex_writer(&self, location: &Path) -> VortexResult { + Ok(ObjectStoreWriter::new(WriteMultipart::new_with_chunk_size( + self.put_multipart(location).await?, + 10 * 1024 * 1024, + ))) } } @@ -68,36 +75,41 @@ impl<'a, 'b, O: ObjectStore> VortexReadAt for ObjectStoreReadAt<'a, 'b, O> { } } -pub struct ObjectStoreWriter<'a, 'b, O: ObjectStore> { - object_store: &'a O, - location: &'b Path, +pub struct ObjectStoreWriter { + multipart: Option, } -impl<'a, 'b, O: ObjectStore> ObjectStoreWriter<'a, 'b, O> { - pub fn new(object_store: &'a O, location: &'b Path) -> Self { +impl ObjectStoreWriter { + pub fn new(multipart: WriteMultipart) -> Self { Self { - object_store, - location, + multipart: Some(multipart), } } } -impl<'a, 'b, O: ObjectStore> VortexWrite for ObjectStoreWriter<'a, 'b, O> { +impl VortexWrite for ObjectStoreWriter { async fn write_all(&mut self, buffer: B) -> std::io::Result { - self.object_store - .put( - self.location, - PutPayload::from_bytes(Bytes::copy_from_slice(buffer.as_slice())), - ) - .await?; + self.multipart + .as_mut() + .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "multipart already finished")) + .map(|mp| mp.write(buffer.as_slice()))?; Ok(buffer) } async fn flush(&mut self) -> std::io::Result<()> { - Ok(()) + Ok(self + .multipart + .as_mut() + .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "multipart already finished")) + .map(|mp| mp.wait_for_capacity(0))? + .await?) } async fn shutdown(&mut self) -> std::io::Result<()> { + let mp = mem::take(&mut self.multipart); + mp.ok_or_else(|| io::Error::new(io::ErrorKind::Other, "multipart already finished"))? + .finish() + .await?; Ok(()) } }