Giter VIP home page Giter VIP logo

pdo-snapshot-store's Introduction

Prooph PDO Snapshot Store

Continuous Integration Coverage Status Gitter

Important

This library will receive support until December 31, 2019 and will then be deprecated.

For further information see the official announcement here: https://www.sasaprolic.com/2018/08/the-future-of-prooph-components.html

Overview

PDO implementation of snapshot store

Installation

You can install prooph/pdo-snapshot-store via composer by adding "prooph/pdo-snapshot-store": "^1.0" as requirement to your composer.json.

Upgrade

If you come from version 1.4.0 you are advised to manually update the table schema to fix an omitted primary key. You can issue the following statements or drop the snapshot table, recreate them from the provided scripts and restart projections.

MySql

ALTER TABLE `snapshots` DROP INDEX `ix_aggregate_id`, ADD PRIMARY KEY(`aggregate_id`);

Postgres

ALTER TABLE "snapshots" DROP CONSTRAINT "snapshots_aggregate_id_key", ADD PRIMARY KEY ("aggregate_id");

Support

Contribute

Please feel free to fork and extend existing or add new plugins and send a pull request with your changes! To establish a consistent code quality, please provide unit tests for all your changes and may adapt the documentation.

License

Released under the New BSD License.

pdo-snapshot-store's People

Contributors

basz avatar codeliner avatar dragosprotung avatar fritz-gerneth avatar ghettovoice avatar prolic avatar sandrokeil avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

pdo-snapshot-store's Issues

Can't stop snapshotter when it's running

I'm running snapshot with the following script

<?php

declare(strict_types=1);

namespace HF\Api\Tools\Console\Command\EventStore;

use Ds\Set;
use HF\Api\Infrastructure\Repository\ARStreamDomainName;
use HF\Api\Infrastructure\Repository\ARStreamName;
use HF\Api\Tools\Console\Command\AbstractCommand;
use Prooph\EventSourcing\EventStoreIntegration\AggregateTranslator;
use Prooph\EventStore\EventStore;
use Prooph\EventStore\Pdo\Projection\PdoEventStoreProjector;
use Prooph\EventStore\Projection\ProjectionManager;
use Prooph\SnapshotStore\SnapshotStore;
use Prooph\Snapshotter\SnapshotReadModel;
use Prooph\Snapshotter\StreamSnapshotProjection;
use Psr\Container\ContainerInterface;
use Symfony\Component\Console\Input\InputDefinition;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;

class RunSnapshotterCommand extends AbstractCommand
{
    /**
     * @var ContainerInterface
     */
    private $container;

    /**
     * @var SnapshotStore
     */
    private $snapshotStore;

    /**
     * @var EventStore
     */
    private $eventStore;

    /**
     * @var ProjectionManager
     */
    private $projectionManager;

    /**
     * @var array
     */
    private $arRepositoryConfig;

    private $stopping = false;

    public function __construct(
        ContainerInterface $container,
        SnapshotStore $snapshotStore,
        EventStore $eventStore,
        ProjectionManager $projectionManager,
        array $arRepositoryConfig
    ) {
        $this->container = $container;
        $this->snapshotStore = $snapshotStore;
        $this->eventStore = $eventStore;
        $this->projectionManager = $projectionManager;
        $this->arRepositoryConfig = $arRepositoryConfig;

        parent::__construct();
    }

    protected function configure()
    {
        $this
            ->setName('event-store:run-snapshotter')
            ->setDescription('Runs a long living process to create snapshots of aggregate roots')
            ->setDefinition(
                new InputDefinition([
                    new InputOption('domain', 'd', InputOption::VALUE_OPTIONAL, 'Stream from a particalar domain'),
                    new InputOption('streamName', 's', InputOption::VALUE_OPTIONAL, 'Name of stream'),
                    new InputOption('oneRunner', 'o', InputOption::VALUE_OPTIONAL, 'use a single runner', true),
                ])
            );
    }

    protected function execute(InputInterface $input, OutputInterface $output)
    {
        $options = $input->getArguments();

        $streamName = $input->getOption('streamName', false);
        $domain = $input->getOption('domain', false);
        $oneRunner = $input->getOption('oneRunner');

        $streamNames = new Set();

        if ($streamName) {
            try {
                $streamName = ARStreamName::byName(strtoupper($streamName));
                $streamNames->add($streamName);

                $streamNames = iterator_to_array(new \RecursiveIteratorIterator(new \RecursiveArrayIterator($domain->getValue())), false);
            } catch (\InvalidArgumentException $e) {
                $this->output->writeln(sprintf(
                    'Unknown stream name <error>`%s`</error>. Try one of <info>%s</info>',
                    $streamName,
                    implode(', ', array_map('strtolower', ARStreamName::getNames()))
                ));

                return self::EXIT_PROBLEM;
            }
        }

        if ($domain) {
            try {
                $domain = ARStreamDomainName::byName(strtoupper($domain));

                foreach (iterator_to_array(new \RecursiveIteratorIterator(new \RecursiveArrayIterator($domain->getValue())), false) as $domainName) {
                    $streamNames->add(ARStreamName::byName(strtoupper($domainName)));
                }
            } catch (\InvalidArgumentException $e) {
                $this->output->writeln(sprintf(
                    'Unknown domain <error>`%s`</error>. Try one of <info>%s</info>',
                    $domain,
                    implode(', ', array_map('strtolower', ARStreamDomainName::getNames()))
                ));

                return self::EXIT_PROBLEM;
            }
        }

        if (! $streamNames->count()) {
            $this->output->writeln('<error>No streams where selected!</error>');

            return self::EXIT_CANCELED;
        }

        // not for fs
        $streamNames->remove(ARStreamName::FILESYSTEM_FILE());

        $this->output->writeln(sprintf(
            'Starting snapshotter for <info>`%s`</info>',
            implode(', ', array_map('strtolower', $streamNames->toArray()))
        ));

        if (function_exists('pcntl_signal')) {
            pcntl_signal(SIGTERM, [$this, 'signalHandler']);
            pcntl_signal(SIGHUP, [$this, 'signalHandler']);
            pcntl_signal(SIGINT, [$this, 'signalHandler']);
        }

        $stopRunning = 0;
        while (! $stopRunning) {
            foreach ($streamNames as $streamName) {
                $stopRunning = $this->invokeProjection((string) $streamName);

                if ($stopRunning || $this->stopping) {
                    break;
                }
            }
            if (! $stopRunning && ! $this->stopping) {
                sleep(5);
            }

            if ($this->stopping) {
                break;
            }
        }

        $this->output->writeln(sprintf('Stopped...'));

        return self::EXIT_OK;
    }

    public function invokeProjection(string $streamName): int
    {
        $config = null;

        foreach ($this->arRepositoryConfig as $key => $config) {
            if (! isset($config['stream_name']) || $config['stream_name'] === $streamName) {
                break;
            }

            $config = null;
        }

        if (null === $config) {
            $this->output->writeln(sprintf('<error>Configuration problem for stream \'%s\'</error>', $streamName));

            return self::EXIT_PROBLEM;
        }

        $repositoryClass = class_implements($config['repository_class']);
        $repositoryClass = array_pop($repositoryClass);

        $snapshotReadModel = new SnapshotReadModel(
            $this->container->get($repositoryClass),
            new AggregateTranslator(),
            $this->snapshotStore,
            [$config['aggregate_type']]
        );

        $this->runningProjection = $readModelProjection = $this->projectionManager->createReadModelProjection(
            sprintf('snapshot:%s', $streamName),
            $snapshotReadModel,
            [
                PdoEventStoreProjector::OPTION_CACHE_SIZE => 1000, /* caches stream names from event store */
                PdoEventStoreProjector::OPTION_PERSIST_BLOCK_SIZE => 1000, /* size of handled events before persisting */
                PdoEventStoreProjector::OPTION_SLEEP => 0,
                PdoEventStoreProjector::OPTION_PCNTL_DISPATCH => function_exists('pcntl_signal'),
            ]
        );

        $projection = new StreamSnapshotProjection(
            $readModelProjection,
            $streamName
        );

        $projection(false);

        return self::EXIT_OK;
    }

    public function signalHandler(int $signal): void
    {
        $this->output->writeln(sprintf('Signal %s recieved, stopping...', $signal));

        $this->runningProjection->stop();

        $this->stopping = true;
    }
}

When the projection is running I cannot stop it with cntrl-c. Not even after the a persist block size is done. When it's idle (notice I'm running with $projection(false);) my stop handler kicks in.

Shouldn't PdoEventStoreProjector::OPTION_PCNTL_DISPATCH also be emitted...

snapshots are created over and over again Postgres

from docker log

postgres_1         | 2018-03-25 14:42:45.795 UTC [26] ERROR:  duplicate key value violates unique constraint "projections_name_key"
postgres_1         | 2018-03-25 14:42:45.795 UTC [26] DETAIL:  Key (name)=(snapshot:customer_customer) already exists.
postgres_1         | 2018-03-25 14:42:45.795 UTC [26] STATEMENT:  INSERT INTO "projections" (name, position, state, status, locked_until)
postgres_1         |    VALUES ($1, '{}', '{}', $2, NULL);
postgres_1         | 2018-03-25 14:42:45.899 UTC [26] ERROR:  duplicate key value violates unique constraint "projections_name_key"
postgres_1         | 2018-03-25 14:42:45.899 UTC [26] DETAIL:  Key (name)=(snapshot:dossier_dossier) already exists.
postgres_1         | 2018-03-25 14:42:45.899 UTC [26] STATEMENT:  INSERT INTO "projections" (name, position, state, status, locked_until)
postgres_1         |    VALUES ($1, '{}', '{}', $2, NULL);
postgres_1         | 2018-03-25 14:42:45.992 UTC [26] ERROR:  duplicate key value violates unique constraint "projections_name_key"
postgres_1         | 2018-03-25 14:42:45.992 UTC [26] DETAIL:  Key (name)=(snapshot:dossier_order) already exists.
postgres_1         | 2018-03-25 14:42:45.992 UTC [26] STATEMENT:  INSERT INTO "projections" (name, position, state, status, locked_until)
postgres_1         |    VALUES ($1, '{}', '{}', $2, NULL);
postgres_1         | 2018-03-25 14:42:46.083 UTC [26] ERROR:  duplicate key value violates unique constraint "projections_name_key"
postgres_1         | 2018-03-25 14:42:46.083 UTC [26] DETAIL:  Key (name)=(snapshot:filesystem_file) already exists.
postgres_1         | 2018-03-25 14:42:46.083 UTC [26] STATEMENT:  INSERT INTO "projections" (name, position, state, status, locked_until)
postgres_1         |    VALUES ($1, '{}', '{}', $2, NULL);
postgres_1         | 2018-03-25 14:42:46.179 UTC [26] ERROR:  duplicate key value violates unique constraint "projections_name_key"
postgres_1         | 2018-03-25 14:42:46.179 UTC [26] DETAIL:  Key (name)=(snapshot:identity_user) already exists.
postgres_1         | 2018-03-25 14:42:46.179 UTC [26] STATEMENT:  INSERT INTO "projections" (name, position, state, status, locked_until)
postgres_1         |    VALUES ($1, '{}', '{}', $2, NULL);
postgres_1         | 2018-03-25 14:42:46.280 UTC [26] ERROR:  duplicate key value violates unique constraint "projections_name_key"
postgres_1         | 2018-03-25 14:42:46.280 UTC [26] DETAIL:  Key (name)=(snapshot:sop_workflow) already exists.
postgres_1         | 2018-03-25 14:42:46.280 UTC [26] STATEMENT:  INSERT INTO "projections" (name, position, state, status, locked_until)
postgres_1         |    VALUES ($1, '{}', '{}', $2, NULL);

At first I thought is was because of the : in the name, however changing that to - didn't solve it.

Problem building on github actions

see for example build: https://github.com/prooph/pdo-snapshot-store/actions/runs/3176685431/jobs/5176279517.

Loading composer repositories with package information
Info from [https://repo.packagist.org:](https://repo.packagist.org/) #StandWithUkraine
Updating dependencies
Your requirements could not be resolved to an installable set of packages.

  Problem 1
    - phpspec/prophecy v1.10.3 requires php ^5.3|^7.0 -> your php version (8.2.0RC3) does not satisfy that requirement.
    - phpspec/prophecy[1.11.0, ..., 1.11.1] require php ^7.2 -> your php version (8.2.0RC3) does not satisfy that requirement.
    - phpspec/prophecy[1.12.0, ..., 1.13.0] require php ^7.2 || ~8.0, <8.1 -> your php version (8.2.0RC3) does not satisfy that requirement.
    - phpspec/prophecy[1.14.0, ..., 1.x-dev] require php ^7.2 || ~8.0, <8.2 -> your php version (8.2.0RC3) does not satisfy that requirement.
    - Root composer.json requires phpspec/prophecy ^1.10.3 -> satisfiable by phpspec/prophecy[v1.10.3, ..., 1.x-dev].

Error: Your requirements could not be resolved to an installable set of packages.

  Problem 1
    - phpspec/prophecy v1.10.3 requires php ^5.3|^7.0 -> your php version (8.2.0RC3) does not satisfy that requirement.
    - phpspec/prophecy[1.11.0, ..., 1.11.1] require php ^7.2 -> your php version (8.2.0RC3) does not satisfy that requirement.
    - phpspec/prophecy[1.12.0, ..., 1.13.0] require php ^7.2 || ~8.0, <8.1 -> your php version (8.2.0RC3) does not satisfy that requirement.
    - phpspec/prophecy[1.14.0, ..., 1.x-dev] require php ^7.2 || ~8.0, <8.2 -> your php version (8.2.0RC3) does not satisfy that requirement.
    - Root composer.json requires phpspec/prophecy ^1.10.3 -> satisfiable by phpspec/prophecy[v1.10.3, ..., 1.x-dev].

@basz wanna look into this?

PdoSnapshotStoreFactory is trying to modificate 'config' object

Hello,
I'm trying to integrate this great package with my existing project.
I've encountered a problem with the PdoSnapshotStoreFactory on line 66: $config = &$config[$dimension]; the factory tries to change an existing object provided by the Container.
Is it an expected behavior? In my environment, it throws an Indirect modification of overloaded element of Illuminate\\Config\\Repository has no effect because of this modification attempt. In some other environments, there can be other difficulties because of modification of the external object.

Is it possible to solve this issue and don't change the config object provided by the Container?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.