Skip to content

Commit

Permalink
Changed AsyncTransformer to work with collections instead of records.
Browse files Browse the repository at this point in the history
Added collection types for async collections.
  • Loading branch information
Bilge committed Apr 26, 2018
1 parent bb363bb commit a44083d
Show file tree
Hide file tree
Showing 13 changed files with 247 additions and 57 deletions.
23 changes: 23 additions & 0 deletions src/Collection/AsyncFilteredRecords.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
<?php
declare(strict_types=1);

namespace ScriptFUSION\Porter\Collection;

use Amp\Iterator;

class AsyncFilteredRecords extends AsyncRecordCollection
{
private $filter;

public function __construct(Iterator $records, AsyncRecordCollection $previousCollection, callable $filter)
{
parent::__construct($records, $previousCollection);

$this->filter = $filter;
}

public function getFilter(): callable
{
return $this->filter;
}
}
24 changes: 24 additions & 0 deletions src/Collection/AsyncPorterRecords.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
<?php
declare(strict_types=1);

namespace ScriptFUSION\Porter\Collection;

use Amp\Iterator;
use ScriptFUSION\Porter\Specification\AsyncImportSpecification;

class AsyncPorterRecords extends AsyncRecordCollection
{
private $specification;

public function __construct(Iterator $records, AsyncImportSpecification $specification)
{
parent::__construct($records, $records);

$this->specification = $specification;
}

public function getSpecification(): AsyncImportSpecification
{
return $this->specification;
}
}
24 changes: 24 additions & 0 deletions src/Collection/AsyncProviderRecords.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
<?php
declare(strict_types=1);

namespace ScriptFUSION\Porter\Collection;

use Amp\Iterator;
use ScriptFUSION\Porter\Provider\Resource\AsyncResource;

class AsyncProviderRecords extends AsyncRecordCollection
{
private $resource;

public function __construct(Iterator $records, AsyncResource $resource)
{
parent::__construct($records);

$this->resource = $resource;
}

public function getResource(): AsyncResource
{
return $this->resource;
}
}
44 changes: 44 additions & 0 deletions src/Collection/AsyncRecordCollection.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
<?php
declare(strict_types=1);

namespace ScriptFUSION\Porter\Collection;

use Amp\Iterator;
use Amp\Promise;

abstract class AsyncRecordCollection implements Iterator
{
private $records;

private $previousCollection;

public function __construct(Iterator $records, self $previousCollection = null)
{
$this->records = $records;
$this->previousCollection = $previousCollection;
}

public function advance(): Promise
{
return $this->records->advance();
}

public function getCurrent(): array
{
return $this->records->getCurrent();
}

public function getPreviousCollection(): ?AsyncRecordCollection
{
return $this->previousCollection;
}

public function findFirstCollection(): ?AsyncRecordCollection
{
do {
$previous = $nextPrevious ?? $this->getPreviousCollection();
} while ($previous && $nextPrevious = $previous->getPreviousCollection());

return $previous ?: $this;
}
}
18 changes: 18 additions & 0 deletions src/Collection/CountableAsyncPorterRecords.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<?php
declare(strict_types=1);

namespace ScriptFUSION\Porter\Collection;

use ScriptFUSION\Porter\Specification\AsyncImportSpecification;

class CountableAsyncPorterRecords extends AsyncPorterRecords implements \Countable
{
use CountableRecordsTrait;

public function __construct(AsyncRecordCollection $records, int $count, AsyncImportSpecification $specification)
{
parent::__construct($records, $specification);

$this->setCount($count);
}
}
19 changes: 19 additions & 0 deletions src/Collection/CountableAsyncProviderRecords.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
<?php
declare(strict_types=1);

namespace ScriptFUSION\Porter\Collection;

use Amp\Iterator;
use ScriptFUSION\Porter\Provider\Resource\AsyncResource;

class CountableAsyncProviderRecords extends AsyncProviderRecords implements \Countable
{
use CountableRecordsTrait;

public function __construct(Iterator $records, int $count, AsyncResource $resource)
{
parent::__construct($records, $resource);

$this->setCount($count);
}
}
5 changes: 0 additions & 5 deletions src/Collection/CountablePorterRecords.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,6 @@ class CountablePorterRecords extends PorterRecords implements \Countable
{
use CountableRecordsTrait;

/**
* @param RecordCollection $records
* @param int $count
* @param ImportSpecification $specification
*/
public function __construct(RecordCollection $records, int $count, ImportSpecification $specification)
{
parent::__construct($records, $specification);
Expand Down
1 change: 1 addition & 0 deletions src/Collection/RecordCollection.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ public function __construct(\Iterator $records, self $previousCollection = null)
$this->previousCollection = $previousCollection;
}

// TODO: Consider throwing our own exception type for clarity, instead of relying on PHP's TypeError.
public function current(): array
{
return $this->records->current();
Expand Down
75 changes: 41 additions & 34 deletions src/Porter.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@
namespace ScriptFUSION\Porter;

use Amp\Iterator;
use Amp\Producer;
use Amp\Promise;
use Psr\Container\ContainerInterface;
use ScriptFUSION\Porter\Collection\AsyncPorterRecords;
use ScriptFUSION\Porter\Collection\AsyncProviderRecords;
use ScriptFUSION\Porter\Collection\AsyncRecordCollection;
use ScriptFUSION\Porter\Collection\CountableAsyncPorterRecords;
use ScriptFUSION\Porter\Collection\CountablePorterRecords;
use ScriptFUSION\Porter\Collection\CountableProviderRecords;
use ScriptFUSION\Porter\Collection\PorterRecords;
Expand Down Expand Up @@ -125,13 +128,19 @@ private function fetch(ImportSpecification $specification): \Iterator
return $resource->fetch(ImportConnectorFactory::create($connector, $specification));
}

public function importAsync(AsyncImportSpecification $specification): Iterator
public function importAsync(AsyncImportSpecification $specification): AsyncRecordCollection
{
$specification = clone $specification;

$records = $this->fetchAsync($specification);

return $this->transformAsync($records, $specification->getTransformers(), $specification->getContext());
if (!$records instanceof AsyncProviderRecords) {
$records = new AsyncProviderRecords($records, $specification->getAsyncResource());
}

$records = $this->transformAsync($records, $specification->getTransformers(), $specification->getContext());

return $this->createAsyncPorterRecords($records, $specification);
}

public function importOneAsync(AsyncImportSpecification $specification): Promise
Expand Down Expand Up @@ -201,38 +210,25 @@ private function transformRecords(RecordCollection $records, array $transformers
return $records;
}

private function transformAsync(Iterator $records, array $transformers, $context): Producer
{
return new Producer(function (\Closure $emit) use ($records, $transformers, $context) {
while (yield $records->advance()) {
$record = $records->getCurrent();

foreach ($transformers as $transformer) {
if (!$transformer instanceof AsyncTransformer) {
// TODO: Proper exception or separate async stack.
throw new \RuntimeException('Cannot use sync transformer.');
}
if ($transformer instanceof PorterAware) {
$transformer->setPorter($this);
}

$record = yield $transformer->transformAsync($record, $context);

if ($record === null) {
// Do not process more transformers.
break;
}
}

if ($record !== null) {
if (!\is_array($record)) {
throw new \RuntimeException('Unexpected type: record must be array or null.');
}

yield $emit($record);
}
private function transformAsync(
AsyncRecordCollection $records,
array $transformers,
$context
): AsyncRecordCollection {
foreach ($transformers as $transformer) {
if (!$transformer instanceof AsyncTransformer) {
// TODO: Proper exception or separate async stack.
throw new \RuntimeException('Cannot use sync transformer.');
}
});

if ($transformer instanceof PorterAware) {
$transformer->setPorter($this);
}

$records = $transformer->transformAsync($records, $context);
}

return $records;
}

private function createProviderRecords(\Iterator $records, ProviderResource $resource): ProviderRecords
Expand All @@ -253,6 +249,17 @@ private function createPorterRecords(RecordCollection $records, ImportSpecificat
return new PorterRecords($records, $specification);
}

private function createAsyncPorterRecords(
AsyncRecordCollection $records,
AsyncImportSpecification $specification
): AsyncPorterRecords {
if ($records instanceof \Countable) {
return new CountableAsyncPorterRecords($records, \count($records), $specification);
}

return new AsyncPorterRecords($records, $specification);
}

/**
* Gets the provider matching the specified name.
*
Expand Down
4 changes: 2 additions & 2 deletions src/Transform/AsyncTransformer.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@

namespace ScriptFUSION\Porter\Transform;

use Amp\Promise;
use ScriptFUSION\Porter\Collection\AsyncRecordCollection;

interface AsyncTransformer
{
public function transformAsync(array $record, $context): Promise;
public function transformAsync(AsyncRecordCollection $records, $context): AsyncRecordCollection;
}
24 changes: 20 additions & 4 deletions src/Transform/FilterTransformer.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,17 @@

namespace ScriptFUSION\Porter\Transform;

use Amp\Promise;
use Amp\Success;
use Amp\Producer;
use ScriptFUSION\Porter\Collection\AsyncFilteredRecords;
use ScriptFUSION\Porter\Collection\AsyncRecordCollection;
use ScriptFUSION\Porter\Collection\FilteredRecords;
use ScriptFUSION\Porter\Collection\RecordCollection;

/**
* Filters a collection of records based on the specified predicate function.
*
* This simple transformer is bundled with Porter as an example reference implementation for other transformers.
*/
class FilterTransformer implements Transformer, AsyncTransformer
{
/**
Expand Down Expand Up @@ -36,8 +42,18 @@ public function transform(RecordCollection $records, $context): RecordCollection
return new FilteredRecords($filter($this->filter), $records, $filter);
}

public function transformAsync(array $record, $context): Promise
public function transformAsync(AsyncRecordCollection $records, $context): AsyncRecordCollection
{
return new Success(($this->filter)($record, $context) ? $record : null);
return new AsyncFilteredRecords(
new Producer(function (\Closure $emit) use ($records) {
while (yield $records->advance()) {
if (($this->filter)($record = $records->getCurrent())) {
yield $emit($record);
}
}
}),
$records,
$this->filter
);
}
}
Loading

0 comments on commit a44083d

Please sign in to comment.