Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUGFIX #317] Make queueJob threadsafe #342

Open
wants to merge 1 commit into
base: 4
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 113 additions & 55 deletions src/Services/QueuedJobService.php
Original file line number Diff line number Diff line change
Expand Up @@ -235,64 +235,30 @@ public function queueJob(QueuedJob $job, $startAfter = null, $userId = null, $qu
{
$signature = $job->getSignature();

// see if we already have this job in a queue
$filter = [
'Signature' => $signature,
'JobStatus' => [
QueuedJob::STATUS_NEW,
QueuedJob::STATUS_INIT,
],
];

$existing = QueuedJobDescriptor::get()
->filter($filter)
->first();

if ($existing && $existing->ID) {
return $existing->ID;
}

$jobDescriptor = new QueuedJobDescriptor();
$jobDescriptor->JobTitle = $job->getTitle();
$jobDescriptor->JobType = $queueName ? $queueName : $job->getJobType();
$jobDescriptor->Signature = $signature;
$jobDescriptor->Implementation = get_class($job);
$jobDescriptor->StartAfter = $startAfter;

// no user provided - fallback to job user default
if ($userId === null && $job instanceof UserContextInterface) {
$userId = $job->getRunAsMemberID();
}
// Create the initial object
$jobDescriptor = $this->createJobDescriptor($job, $signature, $startAfter, $userId, $queueName);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is kind of weird creating the job and then passing it to findOrMakeJobDescriptorFromSignature

Seems like we should call createJobDescriptor() inside findOrMakeJobDescriptorFromSignature on if we were unable to find a jobDescriptor from the signature


// still no user - fallback to current user
if ($userId === null) {
if (Security::getCurrentUser() && Security::getCurrentUser()->exists()) {
// current user available
$runAsID = Security::getCurrentUser()->ID;
} else {
// current user unavailable
$runAsID = 0;
}
} else {
$runAsID = $userId;
try {
return $this->findOrMakeJobDescriptorFromSignature($signature, $job, $jobDescriptor, $startAfter);
} catch (\Throwable $e) {
// note that error here may not be an issue as failing to acquire a job lock is a valid state
// which happens when other process claimed the job lock first
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the comments should be updated to describe the job signature, not the job lock as that is a different feature. Please also update the log message so this feature is not confused with the job locking.

$this->getLogger()->debug(
sprintf(
'[%s] - Queued Jobs - Failed to acquire job lock %s %d %s',
DBDatetime::now()->Rfc2822(),
$e->getMessage(),
$signature,
PHP_EOL
),
[
'file' => __FILE__,
'line' => __LINE__,
]
);
}

$jobDescriptor->RunAsID = $runAsID;

// use this to populate custom data columns before job is queued
// note: you can pass arbitrary data to your job and then move it to job descriptor
// this is useful if you need some data that needs to be exposed as a separate
// DB column as opposed to serialised data
$this->extend('updateJobDescriptorBeforeQueued', $jobDescriptor, $job);

// copy data
$this->copyJobToDescriptor($job, $jobDescriptor);

$jobDescriptor->write();

$this->startJob($jobDescriptor, $startAfter);

return $jobDescriptor->ID;
return false;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return false;
return 0;

PHPDOC function signature is to return an int

}

/**
Expand Down Expand Up @@ -334,6 +300,98 @@ public function isAtMaxJobs()
return false;
}

/**
* Using a job signature, returns the JobDescriptor ID and whether the
* job descriptor is new or existing
*
* @param string $signature
* @param QueuedJob $job
* @param QueuedJobDescriptor $jobDescriptor
* @param null|string $startAfter
* @return int|null
*/
protected function findOrMakeJobDescriptorFromSignature($signature, $job, $jobDescriptor, $startAfter)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be made private to reduce the API surface. There's an extension hook available if it needs to be extended.

{
// Start a transaction which will hold until we have a lock on this signature.
return DB::get_conn()->withTransaction(function () use ($signature, $job, $jobDescriptor, $startAfter) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need the return? Database::withTransaction() doesn't have a return value?

$query = 'SELECT "ID" FROM "QueuedJobDescriptor" WHERE "Signature" = ? FOR UPDATE';
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bit of an edge case, though should read the tableName of QueuedJobDescriptor first incase it's been changed (table names are configuraable

$tableName = Config::inst()->get(QueuedJobDescriptor::class, 'table_name', Config::UNINHERITED);


// Retrieve first record
$result = DB::prepared_query($query, [$signature]);

if ($result === null) {
throw new Exception('Failed to execute query to retrieve job signature');
}

$ID = $result->value();

// If the record does not exist
if (!$ID) {
// use this to populate custom data columns before job is queued
// note: you can pass arbitrary data to your job and then move it to job descriptor
// this is useful if you need some data that needs to be exposed as a separate
// DB column as opposed to serialised data
$this->extend('updateJobDescriptorBeforeQueued', $jobDescriptor, $job);

// copy data
$this->copyJobToDescriptor($job, $jobDescriptor);

// Write the record
$jobDescriptorID = $jobDescriptor->write();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

$jobDescriptorID variable is unused?


$this->startJob($jobDescriptor, $startAfter);
} else {
$jobDescriptorID = $ID;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

$jobDescriptorID appears to be unused, so we can just omit this?

}
});
}

/**
* @param QueuedJob $job
* @param string $signature
* @param null $startAfter
* @param null $userId
* @param null $queueName
* @return QueuedJobDescriptor
*/
protected function createJobDescriptor(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should change this from protected to private

QueuedJob $job,
$signature,
$startAfter = null,
$userId = null,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
$userId = null,
$userID = null,

Convention is to capitilise the d in ID

Also update the references in this function to $userID

$queueName = null
)
{
$jobDescriptor = QueuedJobDescriptor::create();
$jobDescriptor->JobTitle = $job->getTitle();
$jobDescriptor->JobType = $queueName ? $queueName : $job->getJobType();
$jobDescriptor->Signature = $signature;
$jobDescriptor->Implementation = get_class($job);
$jobDescriptor->StartAfter = $startAfter;

// no user provided - fallback to job user default
if ($userId === null && $job instanceof UserContextInterface) {
$userId = $job->getRunAsMemberID();
}

// still no user - fallback to current user
if ($userId === null) {
if (Security::getCurrentUser() && Security::getCurrentUser()->exists()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should put this to a variable so that it doesn't need to be called 3 separate times times
$member = Security::getCurrentUser()

// current user available
$runAsID = Security::getCurrentUser()->ID;
} else {
// current user unavailable
$runAsID = 0;
}
} else {
$runAsID = $userId;
}

$jobDescriptor->RunAsID = $runAsID;

return $jobDescriptor;
}

/**
* Copies data from a job into a descriptor for persisting
*
Expand Down