<?php
declare(strict_types=1);
namespace HF\Api\Tools\Console\Command\EventStore;
use Ds\Set;
use HF\Api\Infrastructure\Repository\ARStreamDomainName;
use HF\Api\Infrastructure\Repository\ARStreamName;
use HF\Api\Tools\Console\Command\AbstractCommand;
use Prooph\EventSourcing\EventStoreIntegration\AggregateTranslator;
use Prooph\EventStore\EventStore;
use Prooph\EventStore\Pdo\Projection\PdoEventStoreProjector;
use Prooph\EventStore\Projection\ProjectionManager;
use Prooph\SnapshotStore\SnapshotStore;
use Prooph\Snapshotter\SnapshotReadModel;
use Prooph\Snapshotter\StreamSnapshotProjection;
use Psr\Container\ContainerInterface;
use Symfony\Component\Console\Input\InputDefinition;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
class RunSnapshotterCommand extends AbstractCommand
{
/**
* @var ContainerInterface
*/
private $container;
/**
* @var SnapshotStore
*/
private $snapshotStore;
/**
* @var EventStore
*/
private $eventStore;
/**
* @var ProjectionManager
*/
private $projectionManager;
/**
* @var array
*/
private $arRepositoryConfig;
private $stopping = false;
public function __construct(
ContainerInterface $container,
SnapshotStore $snapshotStore,
EventStore $eventStore,
ProjectionManager $projectionManager,
array $arRepositoryConfig
) {
$this->container = $container;
$this->snapshotStore = $snapshotStore;
$this->eventStore = $eventStore;
$this->projectionManager = $projectionManager;
$this->arRepositoryConfig = $arRepositoryConfig;
parent::__construct();
}
protected function configure()
{
$this
->setName('event-store:run-snapshotter')
->setDescription('Runs a long living process to create snapshots of aggregate roots')
->setDefinition(
new InputDefinition([
new InputOption('domain', 'd', InputOption::VALUE_OPTIONAL, 'Stream from a particalar domain'),
new InputOption('streamName', 's', InputOption::VALUE_OPTIONAL, 'Name of stream'),
new InputOption('oneRunner', 'o', InputOption::VALUE_OPTIONAL, 'use a single runner', true),
])
);
}
protected function execute(InputInterface $input, OutputInterface $output)
{
$options = $input->getArguments();
$streamName = $input->getOption('streamName', false);
$domain = $input->getOption('domain', false);
$oneRunner = $input->getOption('oneRunner');
$streamNames = new Set();
if ($streamName) {
try {
$streamName = ARStreamName::byName(strtoupper($streamName));
$streamNames->add($streamName);
$streamNames = iterator_to_array(new \RecursiveIteratorIterator(new \RecursiveArrayIterator($domain->getValue())), false);
} catch (\InvalidArgumentException $e) {
$this->output->writeln(sprintf(
'Unknown stream name <error>`%s`</error>. Try one of <info>%s</info>',
$streamName,
implode(', ', array_map('strtolower', ARStreamName::getNames()))
));
return self::EXIT_PROBLEM;
}
}
if ($domain) {
try {
$domain = ARStreamDomainName::byName(strtoupper($domain));
foreach (iterator_to_array(new \RecursiveIteratorIterator(new \RecursiveArrayIterator($domain->getValue())), false) as $domainName) {
$streamNames->add(ARStreamName::byName(strtoupper($domainName)));
}
} catch (\InvalidArgumentException $e) {
$this->output->writeln(sprintf(
'Unknown domain <error>`%s`</error>. Try one of <info>%s</info>',
$domain,
implode(', ', array_map('strtolower', ARStreamDomainName::getNames()))
));
return self::EXIT_PROBLEM;
}
}
if (! $streamNames->count()) {
$this->output->writeln('<error>No streams where selected!</error>');
return self::EXIT_CANCELED;
}
// not for fs
$streamNames->remove(ARStreamName::FILESYSTEM_FILE());
$this->output->writeln(sprintf(
'Starting snapshotter for <info>`%s`</info>',
implode(', ', array_map('strtolower', $streamNames->toArray()))
));
if (function_exists('pcntl_signal')) {
pcntl_signal(SIGTERM, [$this, 'signalHandler']);
pcntl_signal(SIGHUP, [$this, 'signalHandler']);
pcntl_signal(SIGINT, [$this, 'signalHandler']);
}
$stopRunning = 0;
while (! $stopRunning) {
foreach ($streamNames as $streamName) {
$stopRunning = $this->invokeProjection((string) $streamName);
if ($stopRunning || $this->stopping) {
break;
}
}
if (! $stopRunning && ! $this->stopping) {
sleep(5);
}
if ($this->stopping) {
break;
}
}
$this->output->writeln(sprintf('Stopped...'));
return self::EXIT_OK;
}
public function invokeProjection(string $streamName): int
{
$config = null;
foreach ($this->arRepositoryConfig as $key => $config) {
if (! isset($config['stream_name']) || $config['stream_name'] === $streamName) {
break;
}
$config = null;
}
if (null === $config) {
$this->output->writeln(sprintf('<error>Configuration problem for stream \'%s\'</error>', $streamName));
return self::EXIT_PROBLEM;
}
$repositoryClass = class_implements($config['repository_class']);
$repositoryClass = array_pop($repositoryClass);
$snapshotReadModel = new SnapshotReadModel(
$this->container->get($repositoryClass),
new AggregateTranslator(),
$this->snapshotStore,
[$config['aggregate_type']]
);
$this->runningProjection = $readModelProjection = $this->projectionManager->createReadModelProjection(
sprintf('snapshot:%s', $streamName),
$snapshotReadModel,
[
PdoEventStoreProjector::OPTION_CACHE_SIZE => 1000, /* caches stream names from event store */
PdoEventStoreProjector::OPTION_PERSIST_BLOCK_SIZE => 1000, /* size of handled events before persisting */
PdoEventStoreProjector::OPTION_SLEEP => 0,
PdoEventStoreProjector::OPTION_PCNTL_DISPATCH => function_exists('pcntl_signal'),
]
);
$projection = new StreamSnapshotProjection(
$readModelProjection,
$streamName
);
$projection(false);
return self::EXIT_OK;
}
public function signalHandler(int $signal): void
{
$this->output->writeln(sprintf('Signal %s recieved, stopping...', $signal));
$this->runningProjection->stop();
$this->stopping = true;
}
}
When the projection is running I cannot stop it with cntrl-c. Not even after the a persist block size is done. When it's idle (notice I'm running with $projection(false);
) my stop handler kicks in.