Viewing file: DatabaseQueue.php (11.13 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
<?php
namespace Illuminate\Queue;
use Illuminate\Contracts\Queue\ClearableQueue; use Illuminate\Contracts\Queue\Queue as QueueContract; use Illuminate\Database\Connection; use Illuminate\Queue\Jobs\DatabaseJob; use Illuminate\Queue\Jobs\DatabaseJobRecord; use Illuminate\Support\Carbon; use Illuminate\Support\Str; use PDO;
class DatabaseQueue extends Queue implements QueueContract, ClearableQueue { /** * The database connection instance. * * @var \Illuminate\Database\Connection */ protected $database;
/** * The database table that holds the jobs. * * @var string */ protected $table;
/** * The name of the default queue. * * @var string */ protected $default;
/** * The expiration time of a job. * * @var int|null */ protected $retryAfter = 60;
/** * Create a new database queue instance. * * @param \Illuminate\Database\Connection $database * @param string $table * @param string $default * @param int $retryAfter * @param bool $dispatchAfterCommit * @return void */ public function __construct(Connection $database, $table, $default = 'default', $retryAfter = 60, $dispatchAfterCommit = false) { $this->table = $table; $this->default = $default; $this->database = $database; $this->retryAfter = $retryAfter; $this->dispatchAfterCommit = $dispatchAfterCommit; }
/** * Get the size of the queue. * * @param string|null $queue * @return int */ public function size($queue = null) { return $this->database->table($this->table) ->where('queue', $this->getQueue($queue)) ->count(); }
/** * Push a new job onto the queue. * * @param string $job * @param mixed $data * @param string|null $queue * @return mixed */ public function push($job, $data = '', $queue = null) { return $this->enqueueUsing( $job, $this->createPayload($job, $this->getQueue($queue), $data), $queue, null, function ($payload, $queue) { return $this->pushToDatabase($queue, $payload); } ); }
/** * Push a raw payload onto the queue. * * @param string $payload * @param string|null $queue * @param array $options * @return mixed */ public function pushRaw($payload, $queue = null, array $options = []) { return $this->pushToDatabase($queue, $payload); }
/** * Push a new job onto the queue after a delay. * * @param \DateTimeInterface|\DateInterval|int $delay * @param string $job * @param mixed $data * @param string|null $queue * @return void */ public function later($delay, $job, $data = '', $queue = null) { return $this->enqueueUsing( $job, $this->createPayload($job, $this->getQueue($queue), $data), $queue, $delay, function ($payload, $queue, $delay) { return $this->pushToDatabase($queue, $payload, $delay); } ); }
/** * Push an array of jobs onto the queue. * * @param array $jobs * @param mixed $data * @param string|null $queue * @return mixed */ public function bulk($jobs, $data = '', $queue = null) { $queue = $this->getQueue($queue);
$availableAt = $this->availableAt();
return $this->database->table($this->table)->insert(collect((array) $jobs)->map( function ($job) use ($queue, $data, $availableAt) { return $this->buildDatabaseRecord($queue, $this->createPayload($job, $this->getQueue($queue), $data), $availableAt); } )->all()); }
/** * Release a reserved job back onto the queue. * * @param string $queue * @param \Illuminate\Queue\Jobs\DatabaseJobRecord $job * @param int $delay * @return mixed */ public function release($queue, $job, $delay) { return $this->pushToDatabase($queue, $job->payload, $delay, $job->attempts); }
/** * Push a raw payload to the database with a given delay. * * @param string|null $queue * @param string $payload * @param \DateTimeInterface|\DateInterval|int $delay * @param int $attempts * @return mixed */ protected function pushToDatabase($queue, $payload, $delay = 0, $attempts = 0) { return $this->database->table($this->table)->insertGetId($this->buildDatabaseRecord( $this->getQueue($queue), $payload, $this->availableAt($delay), $attempts )); }
/** * Create an array to insert for the given job. * * @param string|null $queue * @param string $payload * @param int $availableAt * @param int $attempts * @return array */ protected function buildDatabaseRecord($queue, $payload, $availableAt, $attempts = 0) { return [ 'queue' => $queue, 'attempts' => $attempts, 'reserved_at' => null, 'available_at' => $availableAt, 'created_at' => $this->currentTime(), 'payload' => $payload, ]; }
/** * Pop the next job off of the queue. * * @param string|null $queue * @return \Illuminate\Contracts\Queue\Job|null * * @throws \Throwable */ public function pop($queue = null) { $queue = $this->getQueue($queue);
return $this->database->transaction(function () use ($queue) { if ($job = $this->getNextAvailableJob($queue)) { return $this->marshalJob($queue, $job); } }); }
/** * Get the next available job for the queue. * * @param string|null $queue * @return \Illuminate\Queue\Jobs\DatabaseJobRecord|null */ protected function getNextAvailableJob($queue) { $job = $this->database->table($this->table) ->lock($this->getLockForPopping()) ->where('queue', $this->getQueue($queue)) ->where(function ($query) { $this->isAvailable($query); $this->isReservedButExpired($query); }) ->orderBy('id', 'asc') ->first();
return $job ? new DatabaseJobRecord((object) $job) : null; }
/** * Get the lock required for popping the next job. * * @return string|bool */ protected function getLockForPopping() { $databaseEngine = $this->database->getPdo()->getAttribute(PDO::ATTR_DRIVER_NAME); $databaseVersion = $this->database->getConfig('version') ?? $this->database->getPdo()->getAttribute(PDO::ATTR_SERVER_VERSION);
if (Str::of($databaseVersion)->contains('MariaDB')) { $databaseEngine = 'mariadb'; $databaseVersion = Str::before(Str::after($databaseVersion, '5.5.5-'), '-'); }
if (($databaseEngine === 'mysql' && version_compare($databaseVersion, '8.0.1', '>=')) || ($databaseEngine === 'mariadb' && version_compare($databaseVersion, '10.6.0', '>=')) || ($databaseEngine === 'pgsql' && version_compare($databaseVersion, '9.5', '>='))) { return 'FOR UPDATE SKIP LOCKED'; }
return true; }
/** * Modify the query to check for available jobs. * * @param \Illuminate\Database\Query\Builder $query * @return void */ protected function isAvailable($query) { $query->where(function ($query) { $query->whereNull('reserved_at') ->where('available_at', '<=', $this->currentTime()); }); }
/** * Modify the query to check for jobs that are reserved but have expired. * * @param \Illuminate\Database\Query\Builder $query * @return void */ protected function isReservedButExpired($query) { $expiration = Carbon::now()->subSeconds($this->retryAfter)->getTimestamp();
$query->orWhere(function ($query) use ($expiration) { $query->where('reserved_at', '<=', $expiration); }); }
/** * Marshal the reserved job into a DatabaseJob instance. * * @param string $queue * @param \Illuminate\Queue\Jobs\DatabaseJobRecord $job * @return \Illuminate\Queue\Jobs\DatabaseJob */ protected function marshalJob($queue, $job) { $job = $this->markJobAsReserved($job);
return new DatabaseJob( $this->container, $this, $job, $this->connectionName, $queue ); }
/** * Mark the given job ID as reserved. * * @param \Illuminate\Queue\Jobs\DatabaseJobRecord $job * @return \Illuminate\Queue\Jobs\DatabaseJobRecord */ protected function markJobAsReserved($job) { $this->database->table($this->table)->where('id', $job->id)->update([ 'reserved_at' => $job->touch(), 'attempts' => $job->increment(), ]);
return $job; }
/** * Delete a reserved job from the queue. * * @param string $queue * @param string $id * @return void * * @throws \Throwable */ public function deleteReserved($queue, $id) { $this->database->transaction(function () use ($id) { if ($this->database->table($this->table)->lockForUpdate()->find($id)) { $this->database->table($this->table)->where('id', $id)->delete(); } }); }
/** * Delete a reserved job from the reserved queue and release it. * * @param string $queue * @param \Illuminate\Queue\Jobs\DatabaseJob $job * @param int $delay * @return void */ public function deleteAndRelease($queue, $job, $delay) { $this->database->transaction(function () use ($queue, $job, $delay) { if ($this->database->table($this->table)->lockForUpdate()->find($job->getJobId())) { $this->database->table($this->table)->where('id', $job->getJobId())->delete(); }
$this->release($queue, $job->getJobRecord(), $delay); }); }
/** * Delete all of the jobs from the queue. * * @param string $queue * @return int */ public function clear($queue) { return $this->database->table($this->table) ->where('queue', $this->getQueue($queue)) ->delete(); }
/** * Get the queue or return the default. * * @param string|null $queue * @return string */ public function getQueue($queue) { return $queue ?: $this->default; }
/** * Get the underlying database instance. * * @return \Illuminate\Database\Connection */ public function getDatabase() { return $this->database; } }
|