Viewing file: Batch.php (12.44 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
<?php
namespace Illuminate\Bus;
use Carbon\CarbonImmutable; use Closure; use Illuminate\Contracts\Queue\Factory as QueueFactory; use Illuminate\Contracts\Support\Arrayable; use Illuminate\Queue\CallQueuedClosure; use Illuminate\Support\Arr; use Illuminate\Support\Collection; use JsonSerializable; use Throwable;
class Batch implements Arrayable, JsonSerializable { /** * The queue factory implementation. * * @var \Illuminate\Contracts\Queue\Factory */ protected $queue;
/** * The repository implementation. * * @var \Illuminate\Bus\BatchRepository */ protected $repository;
/** * The batch ID. * * @var string */ public $id;
/** * The batch name. * * @var string */ public $name;
/** * The total number of jobs that belong to the batch. * * @var int */ public $totalJobs;
/** * The total number of jobs that are still pending. * * @var int */ public $pendingJobs;
/** * The total number of jobs that have failed. * * @var int */ public $failedJobs;
/** * The IDs of the jobs that have failed. * * @var array */ public $failedJobIds;
/** * The batch options. * * @var array */ public $options;
/** * The date indicating when the batch was created. * * @var \Carbon\CarbonImmutable */ public $createdAt;
/** * The date indicating when the batch was cancelled. * * @var \Carbon\CarbonImmutable|null */ public $cancelledAt;
/** * The date indicating when the batch was finished. * * @var \Carbon\CarbonImmutable|null */ public $finishedAt;
/** * Create a new batch instance. * * @param \Illuminate\Contracts\Queue\Factory $queue * @param \Illuminate\Bus\BatchRepository $repository * @param string $id * @param string $name * @param int $totalJobs * @param int $pendingJobs * @param int $failedJobs * @param array $failedJobIds * @param array $options * @param \Carbon\CarbonImmutable $createdAt * @param \Carbon\CarbonImmutable|null $cancelledAt * @param \Carbon\CarbonImmutable|null $finishedAt */ public function __construct( QueueFactory $queue, BatchRepository $repository, string $id, string $name, int $totalJobs, int $pendingJobs, int $failedJobs, array $failedJobIds, array $options, CarbonImmutable $createdAt, ?CarbonImmutable $cancelledAt = null, ?CarbonImmutable $finishedAt = null, ) { $this->queue = $queue; $this->repository = $repository; $this->id = $id; $this->name = $name; $this->totalJobs = $totalJobs; $this->pendingJobs = $pendingJobs; $this->failedJobs = $failedJobs; $this->failedJobIds = $failedJobIds; $this->options = $options; $this->createdAt = $createdAt; $this->cancelledAt = $cancelledAt; $this->finishedAt = $finishedAt; }
/** * Get a fresh instance of the batch represented by this ID. * * @return self */ public function fresh() { return $this->repository->find($this->id); }
/** * Add additional jobs to the batch. * * @param \Illuminate\Support\Enumerable|object|array $jobs * @return self */ public function add($jobs) { $count = 0;
$jobs = Collection::wrap($jobs)->map(function ($job) use (&$count) { $job = $job instanceof Closure ? CallQueuedClosure::create($job) : $job;
if (is_array($job)) { $count += count($job);
return with($this->prepareBatchedChain($job), function ($chain) { return $chain->first() ->allOnQueue($this->options['queue'] ?? null) ->allOnConnection($this->options['connection'] ?? null) ->chain($chain->slice(1)->values()->all()); }); } else { $job->withBatchId($this->id);
$count++; }
return $job; });
$this->repository->transaction(function () use ($jobs, $count) { $this->repository->incrementTotalJobs($this->id, $count);
$this->queue->connection($this->options['connection'] ?? null)->bulk( $jobs->all(), $data = '', $this->options['queue'] ?? null ); });
return $this->fresh(); }
/** * Prepare a chain that exists within the jobs being added. * * @param array $chain * @return \Illuminate\Support\Collection */ protected function prepareBatchedChain(array $chain) { return (new Collection($chain))->map(function ($job) { $job = $job instanceof Closure ? CallQueuedClosure::create($job) : $job;
return $job->withBatchId($this->id); }); }
/** * Get the total number of jobs that have been processed by the batch thus far. * * @return int */ public function processedJobs() { return $this->totalJobs - $this->pendingJobs; }
/** * Get the percentage of jobs that have been processed (between 0-100). * * @return int */ public function progress() { return $this->totalJobs > 0 ? round(($this->processedJobs() / $this->totalJobs) * 100) : 0; }
/** * Record that a job within the batch finished successfully, executing any callbacks if necessary. * * @param string $jobId * @return void */ public function recordSuccessfulJob(string $jobId) { $counts = $this->decrementPendingJobs($jobId);
if ($this->hasProgressCallbacks()) { $batch = $this->fresh();
(new Collection($this->options['progress']))->each(function ($handler) use ($batch) { $this->invokeHandlerCallback($handler, $batch); }); }
if ($counts->pendingJobs === 0) { $this->repository->markAsFinished($this->id); }
if ($counts->pendingJobs === 0 && $this->hasThenCallbacks()) { $batch = $this->fresh();
(new Collection($this->options['then']))->each(function ($handler) use ($batch) { $this->invokeHandlerCallback($handler, $batch); }); }
if ($counts->allJobsHaveRanExactlyOnce() && $this->hasFinallyCallbacks()) { $batch = $this->fresh();
(new Collection($this->options['finally']))->each(function ($handler) use ($batch) { $this->invokeHandlerCallback($handler, $batch); }); } }
/** * Decrement the pending jobs for the batch. * * @param string $jobId * @return \Illuminate\Bus\UpdatedBatchJobCounts */ public function decrementPendingJobs(string $jobId) { return $this->repository->decrementPendingJobs($this->id, $jobId); }
/** * Determine if the batch has finished executing. * * @return bool */ public function finished() { return ! is_null($this->finishedAt); }
/** * Determine if the batch has "progress" callbacks. * * @return bool */ public function hasProgressCallbacks() { return isset($this->options['progress']) && ! empty($this->options['progress']); }
/** * Determine if the batch has "success" callbacks. * * @return bool */ public function hasThenCallbacks() { return isset($this->options['then']) && ! empty($this->options['then']); }
/** * Determine if the batch allows jobs to fail without cancelling the batch. * * @return bool */ public function allowsFailures() { return Arr::get($this->options, 'allowFailures', false) === true; }
/** * Determine if the batch has job failures. * * @return bool */ public function hasFailures() { return $this->failedJobs > 0; }
/** * Record that a job within the batch failed to finish successfully, executing any callbacks if necessary. * * @param string $jobId * @param \Throwable $e * @return void */ public function recordFailedJob(string $jobId, $e) { $counts = $this->incrementFailedJobs($jobId);
if ($counts->failedJobs === 1 && ! $this->allowsFailures()) { $this->cancel(); }
if ($this->hasProgressCallbacks() && $this->allowsFailures()) { $batch = $this->fresh();
(new Collection($this->options['progress']))->each(function ($handler) use ($batch, $e) { $this->invokeHandlerCallback($handler, $batch, $e); }); }
if ($counts->failedJobs === 1 && $this->hasCatchCallbacks()) { $batch = $this->fresh();
(new Collection($this->options['catch']))->each(function ($handler) use ($batch, $e) { $this->invokeHandlerCallback($handler, $batch, $e); }); }
if ($counts->allJobsHaveRanExactlyOnce() && $this->hasFinallyCallbacks()) { $batch = $this->fresh();
(new Collection($this->options['finally']))->each(function ($handler) use ($batch, $e) { $this->invokeHandlerCallback($handler, $batch, $e); }); } }
/** * Increment the failed jobs for the batch. * * @param string $jobId * @return \Illuminate\Bus\UpdatedBatchJobCounts */ public function incrementFailedJobs(string $jobId) { return $this->repository->incrementFailedJobs($this->id, $jobId); }
/** * Determine if the batch has "catch" callbacks. * * @return bool */ public function hasCatchCallbacks() { return isset($this->options['catch']) && ! empty($this->options['catch']); }
/** * Determine if the batch has "finally" callbacks. * * @return bool */ public function hasFinallyCallbacks() { return isset($this->options['finally']) && ! empty($this->options['finally']); }
/** * Cancel the batch. * * @return void */ public function cancel() { $this->repository->cancel($this->id); }
/** * Determine if the batch has been cancelled. * * @return bool */ public function canceled() { return $this->cancelled(); }
/** * Determine if the batch has been cancelled. * * @return bool */ public function cancelled() { return ! is_null($this->cancelledAt); }
/** * Delete the batch from storage. * * @return void */ public function delete() { $this->repository->delete($this->id); }
/** * Invoke a batch callback handler. * * @param callable $handler * @param \Illuminate\Bus\Batch $batch * @param \Throwable|null $e * @return void */ protected function invokeHandlerCallback($handler, Batch $batch, ?Throwable $e = null) { try { $handler($batch, $e); } catch (Throwable $e) { if (function_exists('report')) { report($e); } } }
/** * Convert the batch to an array. * * @return array */ public function toArray() { return [ 'id' => $this->id, 'name' => $this->name, 'totalJobs' => $this->totalJobs, 'pendingJobs' => $this->pendingJobs, 'processedJobs' => $this->processedJobs(), 'progress' => $this->progress(), 'failedJobs' => $this->failedJobs, 'options' => $this->options, 'createdAt' => $this->createdAt, 'cancelledAt' => $this->cancelledAt, 'finishedAt' => $this->finishedAt, ]; }
/** * Get the JSON serializable representation of the object. * * @return array */ public function jsonSerialize(): array { return $this->toArray(); }
/** * Dynamically access the batch's "options" via properties. * * @param string $key * @return mixed */ public function __get($key) { return $this->options[$key] ?? null; } }
|