Viewing file: Consumer.php (4.28 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
<?php
/* * This file is part of the Predis package. * * (c) 2009-2020 Daniele Alessandri * (c) 2021-2025 Till Krüss * * For the full copyright and license information, please view the LICENSE * file that was distributed with this source code. */
namespace Predis\PubSub;
use Predis\ClientException; use Predis\ClientInterface; use Predis\Command\Command; use Predis\Connection\Cluster\ClusterInterface; use Predis\NotSupportedException;
/** * PUB/SUB consumer. */ class Consumer extends AbstractConsumer { protected $client; protected $options;
/** * @param ClientInterface $client Client instance used by the consumer. * @param array|null $options Options for the consumer initialization. */ public function __construct(ClientInterface $client, ?array $options = null) { $this->checkCapabilities($client);
$this->options = $options ?: []; $this->client = $client;
$this->genericSubscribeInit('subscribe'); $this->genericSubscribeInit('psubscribe'); }
/** * Returns the underlying client instance used by the pub/sub iterator. * * @return ClientInterface */ public function getClient() { return $this->client; }
/** * Checks if the client instance satisfies the required conditions needed to * initialize a PUB/SUB consumer. * * @param ClientInterface $client Client instance used by the consumer. * * @throws NotSupportedException */ protected function checkCapabilities(ClientInterface $client) { if ($client->getConnection() instanceof ClusterInterface) { throw new NotSupportedException( 'Cannot initialize a PUB/SUB consumer over cluster connections.' ); }
$commands = ['publish', 'subscribe', 'unsubscribe', 'psubscribe', 'punsubscribe'];
if (!$client->getCommandFactory()->supports(...$commands)) { throw new NotSupportedException( 'PUB/SUB commands are not supported by the current command factory.' ); } }
/** * This method shares the logic to handle both SUBSCRIBE and PSUBSCRIBE. * * @param string $subscribeAction Type of subscription. */ protected function genericSubscribeInit($subscribeAction) { if (isset($this->options[$subscribeAction])) { $this->$subscribeAction($this->options[$subscribeAction]); } }
/** * {@inheritdoc} */ protected function writeRequest($method, $arguments) { $this->client->getConnection()->writeRequest( $this->client->createCommand($method, Command::normalizeArguments($arguments) ) ); }
/** * {@inheritdoc} */ protected function disconnect() { $this->client->disconnect(); }
/** * {@inheritdoc} */ protected function getValue() { $response = $this->client->getConnection()->read();
switch ($response[0]) { case self::SUBSCRIBE: case self::UNSUBSCRIBE: case self::PSUBSCRIBE: case self::PUNSUBSCRIBE: if ($response[2] === 0) { $this->invalidate(); } // The missing break here is intentional as we must process // subscriptions and unsubscriptions as standard messages. // no break
case self::MESSAGE: return (object) [ 'kind' => $response[0], 'channel' => $response[1], 'payload' => $response[2], ];
case self::PMESSAGE: return (object) [ 'kind' => $response[0], 'pattern' => $response[1], 'channel' => $response[2], 'payload' => $response[3], ];
case self::PONG: return (object) [ 'kind' => $response[0], 'payload' => $response[1], ];
default: throw new ClientException( "Unknown message type '{$response[0]}' received in the PUB/SUB context." ); } } }
|