Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/concurrency #25

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 15 additions & 6 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
sudo: required

language: php
php:
- 7.2
- 7.3
- 7.4

jobs:
include:
# Combination PHP <7.4 and maglnet/composer-require-checker doesn't work with Composer 2
- php: 7.2
env: COMPOSER_VERSION=1.10.16
- php: 7.3
env: COMPOSER_VERSION=1.10.16
- php: 7.4
env: COMPOSER_VERSION=--stable

env:
- ESB_CONSOLE_PORT=8080 ESB_HTTP_SERVER_PORT=34981 ESB_BEANSTALKD_URL=tcp://127.0.0.1:11300 ES_BASE_URI=http://127.0.0.1:9200
global:
- ESB_CONSOLE_PORT=8080 ESB_HTTP_SERVER_PORT=34981 ESB_BEANSTALKD_URL=tcp://127.0.0.1:11300 ES_BASE_URI=http://127.0.0.1:9200

cache:
directories:
Expand All @@ -24,14 +31,16 @@ before_install:
- echo -e '-Ddiscovery.type=single-node\n-XX:+DisableExplicitGC\n-Djdk.io.permissionsUseCanonicalPath=true\n-Dlog4j.skipJansi=true\n-server\n' | sudo tee -a /etc/elasticsearch/jvm.options
- sudo chown -R elasticsearch:elasticsearch /etc/default/elasticsearch
- sudo systemctl start elasticsearch
- composer --verbose self-update $COMPOSER_VERSION
- composer --version

install:
- sudo apt-get update
- sudo apt-get install beanstalkd

before_script:
- composer install
- composer global require maglnet/composer-require-checker && $HOME/.composer/vendor/bin/composer-require-checker --config-file=composer-require-checker.json;
- composer global require maglnet/composer-require-checker && $(composer config home)/vendor/bin/composer-require-checker --config-file=composer-require-checker.json;
- vendor/bin/ecs check
- vendor/bin/phpstan analyse --no-progress -l max -c phpstan.neon src/

Expand Down
4 changes: 2 additions & 2 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@
"pda/pheanstalk": "^3.1",
"mikey179/vfsstream": "^1.6",
"amphp/artax": "^3.0",
"phpstan/phpstan": "^0.12",
"symplify/easy-coding-standard-prefixed": "^8.1"
"phpstan/phpstan": "^0.12 <=0.12.66",
"symplify/easy-coding-standard-prefixed": "^8.1 <8.3"
},
"scripts": {
"phpcs": "phpcs",
Expand Down
2 changes: 1 addition & 1 deletion src/Exception/ElasticSearch/JobNotFoundException.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

class JobNotFoundException extends \RuntimeException
{
public function __construct(string $jobUuid, $code = 0, Throwable $previous = null)
public function __construct(string $jobUuid, int $code = 0, Throwable $previous = null)
{
parent::__construct(sprintf('Job with UUID "%s" has not been found.', $jobUuid), $code, $previous);
}
Expand Down
13 changes: 9 additions & 4 deletions src/Service/QueueManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -223,18 +223,23 @@ private function jobExists(string $jobUuid): Promise
private function processBatch(): \Generator
{
$this->logger->debug('Processing batch');
yield $this->elasticSearch->bulkIndexJobs($this->batch, $this->flowConfig->getTube());

foreach ($this->batch as $singleJob) {
// Store batch in a local variable and clear the shared property. Any other Producer instance may update and
// process $this->batch during a yield call within this method (emulating concurrency). This shouldn't cause
// duplicate inserts of the same job.
$batch = $this->batch;
$this->batch = [];

yield $this->elasticSearch->bulkIndexJobs($batch, $this->flowConfig->getTube());

foreach ($batch as $singleJob) {
yield $this->beanstalkClient->put(
$singleJob->getUuid(),
$singleJob->getTimeout(),
$singleJob->getDelay(),
$singleJob->getPriority()
);
}

$this->batch = [];
}

/**
Expand Down