Skip to content

Commit

Permalink
HPCC-31324 New IFileIO method to flush to underlying storage
Browse files Browse the repository at this point in the history
Signed-off-by: Shamser Ahmed <[email protected]>
  • Loading branch information
shamser committed Feb 23, 2024
1 parent 5f9b805 commit 3c93403
Show file tree
Hide file tree
Showing 13 changed files with 164 additions and 100 deletions.
11 changes: 9 additions & 2 deletions common/remote/hooks/azure/azurefile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,10 @@ class AzureFileReadIO : implements CInterfaceOf<IFileIO>
{
}
unsigned __int64 getStatistic(StatisticKind kind) override;

virtual void flushToStorage() override
{
flush();
}
protected:
size_t extractDataFromResult(size_t offset, size_t length, void * target);

Expand Down Expand Up @@ -116,7 +119,7 @@ class AzureFileWriteIO : implements CInterfaceOf<IFileIO>
virtual void flush() override;

virtual unsigned __int64 getStatistic(StatisticKind kind) override;

virtual void flushToStorage() override;
protected:
Linked<AzureFile> file;
CriticalSection cs;
Expand Down Expand Up @@ -422,6 +425,10 @@ unsigned __int64 AzureFileWriteIO::getStatistic(StatisticKind kind)
return stats.getStatistic(kind);
}

void AzureFileWriteIO::flushToStorage()
{
flush();
}
//---------------------------------------------------------------------------------------------------------------------

AzureFileAppendBlobWriteIO::AzureFileAppendBlobWriteIO(AzureFile * _file) : AzureFileWriteIO(_file)
Expand Down
20 changes: 12 additions & 8 deletions common/remote/hooks/git/gitfile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ class GitRepositoryFileIO : implements CSimpleInterfaceOf<IFileIO>
if (isLFSfile())
readLfsContents(gitDirectory, gitUser);
}
virtual size32_t read(offset_t pos, size32_t len, void * data)
virtual size32_t read(offset_t pos, size32_t len, void * data) override
{
if (pos >= buf.length())
return 0;
Expand All @@ -197,36 +197,40 @@ class GitRepositoryFileIO : implements CSimpleInterfaceOf<IFileIO>
memcpy_iflen(data, buf.toByteArray()+pos, len);
return len;
}
virtual offset_t size()
virtual offset_t size() override
{
return buf.length();
}
virtual void close()
virtual void close() override
{
}

// Write methods not implemented - this is a read-only file
virtual size32_t write(offset_t pos, size32_t len, const void * data)
virtual size32_t write(offset_t pos, size32_t len, const void * data) override
{
throwUnexpected();
}
virtual offset_t appendFile(IFile *file,offset_t pos=0,offset_t len=(offset_t)-1)
virtual offset_t appendFile(IFile *file,offset_t pos=0,offset_t len=(offset_t)-1) override
{
throwUnexpected();
}
virtual void setSize(offset_t size)
virtual void setSize(offset_t size) override
{
throwUnexpected();
}
virtual void flush()
virtual void flush() override
{
throwUnexpected();
}
unsigned __int64 getStatistic(StatisticKind kind)
unsigned __int64 getStatistic(StatisticKind kind) override
{
//This could be implemented, but not likely to be useful so currently return nothing.
return 0;
}
virtual void flushToStorage() override
{
throwUnexpected();
}

protected:
void readLfsContents(const char *gitDirectory, const char * gitUser)
Expand Down
20 changes: 12 additions & 8 deletions common/remote/hooks/libarchive/archive.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ class ArchiveFileIO : implements IFileIO, public CInterface
#endif
}

virtual size32_t read(offset_t pos, size32_t len, void * _data)
virtual size32_t read(offset_t pos, size32_t len, void * _data) override
{
// NOTE - we don't support multithreaded access (the sequential-only restriction would make that tricky anyway)
if (pos < lastPos)
Expand Down Expand Up @@ -259,35 +259,39 @@ class ArchiveFileIO : implements IFileIO, public CInterface
lastPos = pos;
return lenRequested - len;
}
virtual offset_t size()
virtual offset_t size() override
{
return fileSize;
}
virtual void close()
virtual void close() override
{
}

// Write methods not implemented - this is a read-only file
virtual size32_t write(offset_t pos, size32_t len, const void * data)
virtual size32_t write(offset_t pos, size32_t len, const void * data) override
{
throwUnexpected();
}
virtual offset_t appendFile(IFile *file,offset_t pos=0,offset_t len=(offset_t)-1)
virtual offset_t appendFile(IFile *file,offset_t pos=0,offset_t len=(offset_t)-1) override
{
throwUnexpected();
}
virtual void setSize(offset_t size)
virtual void setSize(offset_t size) override
{
throwUnexpected();
}
virtual void flush()
virtual void flush() override
{
throwUnexpected();
}
unsigned __int64 getStatistic(StatisticKind kind)
unsigned __int64 getStatistic(StatisticKind kind) override
{
return 0;
}
virtual void flushToStorage() override
{
throwUnexpected();
}
protected:
struct archive *archive = nullptr;
offset_t fileSize = 0;
Expand Down
10 changes: 9 additions & 1 deletion common/remote/hooks/s3/s3file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,10 @@ class S3FileReadIO : implements CInterfaceOf<IFileIO>
//Could implement if we use the async version of the putObject call.
}
unsigned __int64 getStatistic(StatisticKind kind) override;
virtual void flushToStorage() override
{
flush();
}

protected:
size_t extractDataFromResult(size_t offset, size_t length, void * target);
Expand Down Expand Up @@ -169,7 +173,7 @@ class S3FileWriteIO : implements CInterfaceOf<IFileIO>
virtual size32_t write(offset_t pos, size32_t len, const void * data) override;
virtual void setSize(offset_t size) override;
virtual void flush() override;

virtual void flushToStorage() override;
virtual unsigned __int64 getStatistic(StatisticKind kind) override;

protected:
Expand Down Expand Up @@ -432,6 +436,10 @@ void S3FileWriteIO::flush()
{
}

void S3FileWriteIO::flushToStorage()
{
}

unsigned __int64 S3FileWriteIO::getStatistic(StatisticKind kind)
{
return stats.getStatistic(kind);
Expand Down
17 changes: 9 additions & 8 deletions dali/datest/datest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2900,21 +2900,22 @@ NULL
CSplitIFileIO() { }
void addIFileIO(IFileIO *iFileIO) { iFileIOs.append(*iFileIO); }
// IFileIO
virtual size32_t read(offset_t pos, size32_t len, void * data) { UNIMPLEMENTED; return 0; }
virtual offset_t size() { UNIMPLEMENTED; return 0; }
virtual size32_t write(offset_t pos, size32_t len, const void * data)
virtual size32_t read(offset_t pos, size32_t len, void * data) override { UNIMPLEMENTED; return 0; }
virtual offset_t size() override { UNIMPLEMENTED; return 0; }
virtual size32_t write(offset_t pos, size32_t len, const void * data) override
{
size32_t sz = iFileIOs.item(0).write(pos, len, data);
unsigned i=1;
for (i=1; i<iFileIOs.ordinality(); i++)
verifyex(sz == iFileIOs.item(i).write(pos, len, data));
return sz;
}
virtual unsigned __int64 getStatistic(StatisticKind kind) { return 0; }
virtual offset_t appendFile(IFile *file,offset_t pos=0,offset_t len=-1) { UNIMPLEMENTED; return 0; }
virtual void setSize(offset_t size) { UNIMPLEMENTED; }
virtual void flush() { }
virtual void close() { }
virtual unsigned __int64 getStatistic(StatisticKind kind) override { return 0; }
virtual offset_t appendFile(IFile *file,offset_t pos=0,offset_t len=-1) override { UNIMPLEMENTED; return 0; }
virtual void setSize(offset_t size) override { UNIMPLEMENTED; }
virtual void flush() override { }
virtual void flushToStorage() override { }
virtual void close() override { }
};

const char *newFileName = "xpathTests.out";
Expand Down
24 changes: 17 additions & 7 deletions fs/dafsclient/rmtfile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1296,7 +1296,7 @@ class CRemoteFileIO : public CInterfaceOf<IFileIO>
parent->disconnect();
}

void close()
void close() override
{
if (handle)
{
Expand Down Expand Up @@ -1379,7 +1379,7 @@ class CRemoteFileIO : public CInterfaceOf<IFileIO>
}


offset_t size()
offset_t size() override
{
MemoryBuffer sendBuffer;
initSendBuffer(sendBuffer);
Expand All @@ -1393,7 +1393,7 @@ class CRemoteFileIO : public CInterfaceOf<IFileIO>
return ret;
}

virtual unsigned __int64 getStatistic(StatisticKind kind)
virtual unsigned __int64 getStatistic(StatisticKind kind) override
{
switch (kind)
{
Expand All @@ -1419,7 +1419,7 @@ class CRemoteFileIO : public CInterfaceOf<IFileIO>
return 0;
}

size32_t read(offset_t pos, size32_t len, void * data)
size32_t read(offset_t pos, size32_t len, void * data) override
{
size32_t got;
MemoryBuffer replyBuffer;
Expand All @@ -1442,8 +1442,13 @@ class CRemoteFileIO : public CInterfaceOf<IFileIO>
return got;
}

virtual void flush()
virtual void flush() override
{
}

virtual void flushToStorage() override
{
flush();
}

const void *doRead(offset_t pos, size32_t len, MemoryBuffer &replyBuffer, size32_t &got, void *dstbuf)
Expand Down Expand Up @@ -1514,7 +1519,7 @@ class CRemoteFileIO : public CInterfaceOf<IFileIO>
}


size32_t write(offset_t pos, size32_t len, const void * data)
size32_t write(offset_t pos, size32_t len, const void * data) override
{
unsigned tries=0;
size32_t ret = 0;
Expand Down Expand Up @@ -1580,7 +1585,7 @@ class CRemoteFileIO : public CInterfaceOf<IFileIO>
}


void setSize(offset_t size)
void setSize(offset_t size) override
{
MemoryBuffer sendBuffer;
initSendBuffer(sendBuffer);
Expand Down Expand Up @@ -2336,6 +2341,11 @@ class CRemoteFilteredFileIOBase : public CRemoteBase, implements IRemoteFileIO
*/
return 0;
}
virtual void flushToStorage() override
{
flush();
}

// IRemoteFileIO
virtual void addVirtualFieldMapping(const char *fieldName, const char *fieldValue) override
{
Expand Down
36 changes: 20 additions & 16 deletions roxie/ccd/ccdfile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,15 @@ class CFailingFileIO : implements IFileIO, public CInterface

public:
IMPLEMENT_IINTERFACE;
virtual size32_t read(offset_t pos, size32_t len, void * data) { THROWNOTOPEN; }
virtual offset_t size() { THROWNOTOPEN; }
virtual void flush() { THROWNOTOPEN; }
virtual size32_t write(offset_t pos, size32_t len, const void * data) { THROWNOTOPEN; }
virtual void setSize(offset_t size) { UNIMPLEMENTED; }
virtual offset_t appendFile(IFile *file,offset_t pos,offset_t len) { UNIMPLEMENTED; return 0; }
virtual void close() { }
virtual unsigned __int64 getStatistic(StatisticKind kind) { return 0; }
virtual size32_t read(offset_t pos, size32_t len, void * data) override { THROWNOTOPEN; }
virtual offset_t size() override { THROWNOTOPEN; }
virtual void flush() override { THROWNOTOPEN; }
virtual size32_t write(offset_t pos, size32_t len, const void * data) override { THROWNOTOPEN; }
virtual void setSize(offset_t size) override { UNIMPLEMENTED; }
virtual offset_t appendFile(IFile *file,offset_t pos,offset_t len) override { UNIMPLEMENTED; return 0; }
virtual void close() override { }
virtual unsigned __int64 getStatistic(StatisticKind kind) override { return 0; }
virtual void flushToStorage() override { THROWNOTOPEN; }
} failure;

class CRoxieLazyFileIO : implements ILazyFileIO, implements IDelayedFile, public CInterface
Expand Down Expand Up @@ -198,7 +199,7 @@ class CRoxieLazyFileIO : implements ILazyFileIO, implements IDelayedFile, public
return lastAccess;
}

virtual void close()
virtual void close() override
{
CriticalBlock b(crit);
setFailure();
Expand Down Expand Up @@ -345,7 +346,7 @@ class CRoxieLazyFileIO : implements ILazyFileIO, implements IDelayedFile, public
}
}

virtual size32_t read(offset_t pos, size32_t len, void * data)
virtual size32_t read(offset_t pos, size32_t len, void * data) override
{
unsigned activeIdx;
Owned<IFileIO> active = getCheckOpen(activeIdx);
Expand Down Expand Up @@ -386,7 +387,7 @@ class CRoxieLazyFileIO : implements ILazyFileIO, implements IDelayedFile, public
}
}

virtual void flush()
virtual void flush() override
{
Linked<IFileIO> active;
{
Expand All @@ -397,23 +398,26 @@ class CRoxieLazyFileIO : implements ILazyFileIO, implements IDelayedFile, public
active->flush();
}

virtual offset_t size()
virtual offset_t size() override
{
unsigned activeIdx;
Owned<IFileIO> active = getCheckOpen(activeIdx);
lastAccess = msTick();
return active->size();
}

virtual unsigned __int64 getStatistic(StatisticKind kind)
virtual unsigned __int64 getStatistic(StatisticKind kind) override
{
unsigned __int64 v = fileStats.getStatisticValue(kind);
CriticalBlock b(crit); // don't bother with linking current and performing getStatistic outside of crit, because getStatistic is very quick
return v + current->getStatistic(kind);
}

virtual size32_t write(offset_t pos, size32_t len, const void * data) { throwUnexpected(); }
virtual void setSize(offset_t size) { throwUnexpected(); }
virtual void flushToStorage() override
{
flush();
}
virtual size32_t write(offset_t pos, size32_t len, const void * data) override { throwUnexpected(); }
virtual void setSize(offset_t size) override { throwUnexpected(); }
virtual offset_t appendFile(IFile *file,offset_t pos,offset_t len) { throwUnexpected(); return 0; }

virtual const char *queryFilename() const { return logical->queryFilename(); }
Expand Down
Loading

0 comments on commit 3c93403

Please sign in to comment.