Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Copy Files From External Storage To Local Storage for Import #149

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 117 additions & 44 deletions src/Snapshot.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,18 @@
namespace Spatie\DbSnapshots;

use Carbon\Carbon;
use Illuminate\Filesystem\FilesystemAdapter as Disk;
use Illuminate\Contracts\Filesystem\Factory;
use Illuminate\Filesystem\FilesystemAdapter;
use Illuminate\Support\Facades\DB;
use Illuminate\Support\LazyCollection;
use Spatie\DbSnapshots\Events\DeletedSnapshot;
use Spatie\DbSnapshots\Events\DeletingSnapshot;
use Spatie\DbSnapshots\Events\LoadedSnapshot;
use Spatie\DbSnapshots\Events\LoadingSnapshot;
use Spatie\TemporaryDirectory\TemporaryDirectory;

class Snapshot
{
public Disk $disk;
public FilesystemAdapter $disk;

public string $fileName;

Expand All @@ -25,10 +26,11 @@ class Snapshot

public const STREAM_BUFFER_SIZE = 16384;

public function __construct(Disk $disk, string $fileName)
protected Factory $filesystemFactory;

public function __construct(FilesystemAdapter $disk, string $fileName)
{
$this->disk = $disk;

$this->fileName = $fileName;

$pathinfo = pathinfo($fileName);
Expand All @@ -39,12 +41,12 @@ public function __construct(Disk $disk, string $fileName)
}

$this->name = pathinfo($fileName, PATHINFO_FILENAME);
$this->filesystemFactory = app(Factory::class);
}

public function useStream(): self
{
$this->useStream = true;

return $this;
}

Expand Down Expand Up @@ -73,6 +75,10 @@ protected function loadAsync(?string $connectionName = null): void
$dbDumpContents = gzdecode($dbDumpContents);
}

if (empty(trim($dbDumpContents))) {
return;
}

DB::connection($connectionName)->unprepared($dbDumpContents);
}

Expand All @@ -84,59 +90,126 @@ protected function isASqlComment(string $line): bool
protected function shouldIgnoreLine(string $line): bool
{
$line = trim($line);

return empty($line) || $this->isASqlComment($line);
}

protected function loadStream(?string $connectionName = null): void
{
LazyCollection::make(function () {
$stream = $this->compressionExtension === 'gz'
? gzopen($this->disk->path($this->fileName), 'r')
: $this->disk->readStream($this->fileName);

$statement = '';
while (! feof($stream)) {
$chunk = $this->compressionExtension === 'gz'
? gzread($stream, self::STREAM_BUFFER_SIZE)
: fread($stream, self::STREAM_BUFFER_SIZE);

$lines = explode("\n", $chunk);
foreach ($lines as $idx => $line) {
if ($this->shouldIgnoreLine($line)) {
continue;
}

$statement .= $line;

// Carry-over the last line to the next chunk since it
// is possible that this chunk finished mid-line right on
// a semi-colon.
if (count($lines) == $idx + 1) {
break;
}

if (str_ends_with(trim($statement), ';')) {
yield $statement;
$statement = '';
}
$temporaryDirectory = (new TemporaryDirectory(config('db-snapshots.temporary_directory_path')))->create();

$this->configureFilesystemDisk($temporaryDirectory->path());

$localDisk = $this->filesystemFactory->disk(self::class);

try {
$this->processStream($localDisk, $connectionName);
} finally {
$temporaryDirectory->delete();
}
}

private function configureFilesystemDisk(string $path): void
{
config([
'filesystems.disks.' . self::class => [
'driver' => 'local',
'root' => $path,
'throw' => false,
],
]);
}

private function processStream(FilesystemAdapter $localDisk, ?string $connectionName): void
{
$this->copyStreamToLocalDisk($localDisk);

$stream = $this->openStream($localDisk);

try {
$this->processStatements($stream, $connectionName);
} finally {
$this->closeStream($stream);
}
}

private function copyStreamToLocalDisk(FilesystemAdapter $localDisk): void
{
$localDisk->writeStream($this->fileName, $this->disk->readStream($this->fileName));
}

private function openStream(FilesystemAdapter $localDisk): mixed
{
$stream = $this->compressionExtension === 'gz'
? gzopen($localDisk->path($this->fileName), 'r')
: $localDisk->readStream($this->fileName);

if (!is_resource($stream)) {
throw new \RuntimeException("Failed to open stream for file: {$this->fileName}");
}

return $stream;
}

private function closeStream(mixed $stream): void
{
if (!is_resource($stream)) {
throw new \RuntimeException("Invalid stream provided for closing.");
}

$this->compressionExtension === 'gz' ? gzclose($stream) : fclose($stream);
}

private function processStatements(mixed $stream, ?string $connectionName): void
{
$statement = '';
while (!feof($stream)) {
$chunk = $this->readChunk($stream);
$lines = explode("\n", $chunk);

foreach ($lines as $idx => $line) {
if ($this->shouldIgnoreLine($line)) {
continue;
}
}

if (str_ends_with(trim($statement), ';')) {
yield $statement;
$statement .= $line;

if ($this->isLastLineOfChunk($lines, $idx)) {
break;
}

if ($this->isCompleteStatement($statement)) {
DB::connection($connectionName)->unprepared($statement);
$statement = '';
}
}
})->each(function (string $statement) use ($connectionName) {
}

if ($this->isCompleteStatement($statement)) {
DB::connection($connectionName)->unprepared($statement);
});
}
}

private function readChunk(mixed $stream): string
{
return $this->compressionExtension === 'gz'
? gzread($stream, self::STREAM_BUFFER_SIZE)
: fread($stream, self::STREAM_BUFFER_SIZE);
}

private function isLastLineOfChunk(array $lines, int $idx): bool
{
return count($lines) === $idx + 1;
}

private function isCompleteStatement(string $statement): bool
{
return str_ends_with(trim($statement), ';');
}

public function delete(): void
{
event(new DeletingSnapshot($this));

$this->disk->delete($this->fileName);

event(new DeletedSnapshot($this->fileName, $this->disk));
}

Expand Down
107 changes: 107 additions & 0 deletions tests/Commands/LoadTest.php
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
<?php

use Carbon\Carbon;
use Illuminate\Filesystem\FilesystemAdapter;
use Illuminate\Support\Facades\Artisan;
use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Event;
use Mockery as m;

use Spatie\DbSnapshots\Events\DeletedSnapshot;
use Spatie\DbSnapshots\Snapshot;
use function Pest\Laravel\assertDatabaseCount;
use function PHPUnit\Framework\assertEquals;
use function PHPUnit\Framework\assertNotEquals;
Expand Down Expand Up @@ -143,3 +148,105 @@ function getNameOfLoadedSnapshot(): string

assertSnapshotLoaded('snapshot4');
});

it('throws an error when snapshot does not exist', function () {
$this->expectException(Exception::class);

$disk = m::mock(FilesystemAdapter::class);
$disk->shouldReceive('exists')
->with('nonexistent.sql')
->andReturn(false);

$snapshot = new Snapshot($disk, 'nonexistent.sql');
$snapshot->load();
});

it('throws an error for invalid SQL in snapshot', function () {
$disk = m::mock(FilesystemAdapter::class);
$disk->shouldReceive('get')
->andReturn("INVALID SQL;\n");

$snapshot = new Snapshot($disk, 'invalid.sql');

$this->expectException(Exception::class);
$snapshot->load();
});

it('deletes the snapshot and triggers event', function () {
Event::fake();

$disk = m::mock(FilesystemAdapter::class);
$disk->shouldReceive('delete')
->once()
->with('snapshot.sql')
->andReturn(true);

$snapshot = new Snapshot($disk, 'snapshot.sql');
$snapshot->delete();

Event::assertDispatched(DeletedSnapshot::class, function ($event) use ($snapshot) {
return $event->fileName === $snapshot->fileName && $event->disk === $snapshot->disk;
});
});

it('returns the correct size of the snapshot', function () {
$disk = m::mock(FilesystemAdapter::class);
$disk->shouldReceive('size')
->andReturn(2048);

$snapshot = new Snapshot($disk, 'snapshot.sql');

assertEquals(2048, $snapshot->size());
});

it('returns the correct creation date of the snapshot', function () {
$timestamp = Carbon::now()->timestamp;

$disk = m::mock(FilesystemAdapter::class);
$disk->shouldReceive('lastModified')
->andReturn($timestamp);

$snapshot = new Snapshot($disk, 'snapshot.sql');

assertEquals(Carbon::createFromTimestamp($timestamp), $snapshot->createdAt());
});

it('handles empty snapshots gracefully', function () {
$disk = m::mock(FilesystemAdapter::class);
$disk->shouldReceive('get')
->andReturn("");

$snapshot = new Snapshot($disk, 'empty.sql');

$snapshot->load();

// Expect no SQL to be executed
DB::shouldReceive('unprepared')
->never();
});

it('drops all current tables when requested', function () {
// Mock SchemaBuilder
$schemaBuilderMock = m::mock();
$schemaBuilderMock->shouldReceive('dropAllTables')->once();

// Mock DB facade
DB::shouldReceive('connection')
->andReturnSelf(); // Returns the DB connection
DB::shouldReceive('getSchemaBuilder')
->andReturn($schemaBuilderMock); // Returns the mocked schema builder
DB::shouldReceive('getDefaultConnection')
->andReturn('testing'); // Returns a mock default connection
DB::shouldReceive('reconnect')->once();

// Instance of Snapshot
$snapshot = new Snapshot(m::mock(FilesystemAdapter::class), 'snapshot.sql');

// Access protected method via Reflection
$reflection = new ReflectionMethod(Snapshot::class, 'dropAllCurrentTables');
$reflection->setAccessible(true);

// Invoke the protected method
$reflection->invoke($snapshot);
});

Loading