Giter VIP home page Giter VIP logo

akka.persistence.sqlserver's Introduction

Akka.Persistence.SqlServer

Akka Persistence journal and snapshot store backed by SQL Server database.

Configuration

Both journal and snapshot store share the same configuration keys (however they resides in separate scopes, so they are definied distinctly for either journal or snapshot store):

Remember that connection string must be provided separately to Journal and Snapshot Store.

Please also note that unless circuit breaker settings are configured, the defaults from Akka.Persistence will be used. If these defaults are less than the Database connection timeout (default or provided in connection string) and provided command timeout, Warnings will be logged upon initialization of the Journal or Snapshot Store.

akka.persistence{
	journal {
	        plugin = "akka.persistence.journal.sql-server"
		sql-server {
			# qualified type name of the SQL Server persistence journal actor
			class = "Akka.Persistence.SqlServer.Journal.SqlServerJournal, Akka.Persistence.SqlServer"

			# dispatcher used to drive journal actor
			plugin-dispatcher = "akka.actor.default-dispatcher"

			# connection string used for database access
			connection-string = ""

			# default SQL commands timeout
			connection-timeout = 30s

			# SQL server schema name to table corresponding with persistent journal
			schema-name = dbo

			# SQL server table corresponding with persistent journal
			table-name = EventJournal

			# should corresponding journal table be initialized automatically
			auto-initialize = off

			# timestamp provider used for generation of journal entries timestamps
			timestamp-provider = "Akka.Persistence.Sql.Common.Journal.DefaultTimestampProvider, Akka.Persistence.Sql.Common"

			# metadata table
			metadata-table-name = Metadata
			
			# Recommended: change default circuit breaker settings
			# By uncommenting below and using Connection Timeout + Command Timeout
			# circuit-breaker.call-timeout=30s
		}
	}

	snapshot-store {
	        plugin = "akka.persistence.snapshot-store.sql-server"
		sql-server {

			# qualified type name of the SQL Server persistence journal actor
			class = "Akka.Persistence.SqlServer.Snapshot.SqlServerSnapshotStore, Akka.Persistence.SqlServer"

			# dispatcher used to drive journal actor
			plugin-dispatcher = ""akka.actor.default-dispatcher""

			# connection string used for database access
			connection-string = ""

			# default SQL commands timeout
			connection-timeout = 30s

			# SQL server schema name to table corresponding with persistent journal
			schema-name = dbo

			# SQL server table corresponding with persistent journal
			table-name = SnapshotStore

			# should corresponding journal table be initialized automatically
			auto-initialize = off
			
			# Recommended: change default circuit breaker settings
			# By uncommenting below and using Connection Timeout + Command Timeout
			# circuit-breaker.call-timeout=30s
		}
	}
}

Batching journal

Since version 1.1.3 an alternative, experimental type of the journal has been released, known as batching journal. It's optimized for concurrent writes made by multiple persistent actors, thanks to the ability of batching multiple SQL operations to be executed within the same database connection. In some of those situations we've noticed over an order of magnitude in event write speed.

To use batching journal, simply change akka.persistence.journal.sql-server.class to Akka.Persistence.SqlServer.Journal.BatchingSqlServerJournal, Akka.Persistence.SqlServer.

Additionally to the existing settings, batching journal introduces few more:

  • isolation-level to define isolation level for transactions used withing event reads/writes. Possible options: unspecified (default), chaos, read-committed, read-uncommitted, repeatable-read, serializable or snapshot.
  • max-concurrent-operations is used to limit the maximum number of database connections used by this journal. You can use them in situations when you want to partition the same ADO.NET pool between multiple components. Current default: 64.
  • max-batch-size defines the maximum number of SQL operations, that are allowed to be executed using the same connection. When there are more operations, they will chunked into subsequent connections. Current default: 100.
  • max-buffer-size defines maximum buffer capacity for the requests send to a journal. Once buffer gets overflown, a journal will call OnBufferOverflow method. By default it will reject all incoming requests until the buffer space gets freed. You can inherit from BatchingSqlServerJournal and override that method to provide a custom backpressure strategy. Current default: 500 000.

Table Schema

SQL Server persistence plugin defines a default table schema used for journal, snapshot store and metadata table.

CREATE TABLE {your_journal_table_name} (
  Ordering BIGINT IDENTITY(1,1) NOT NULL,
  PersistenceID NVARCHAR(255) NOT NULL,
  SequenceNr BIGINT NOT NULL,
  Timestamp BIGINT NOT NULL,
  IsDeleted BIT NOT NULL,
  Manifest NVARCHAR(500) NOT NULL,
  Payload VARBINARY(MAX) NOT NULL,
  Tags NVARCHAR(100) NULL,
  SerializerId INTEGER NULL
  CONSTRAINT PK_{your_journal_table_name} PRIMARY KEY (Ordering),
  CONSTRAINT QU_{your_journal_table_name} UNIQUE (PersistenceID, SequenceNr)
);

CREATE TABLE {your_snapshot_table_name} (
  PersistenceID NVARCHAR(255) NOT NULL,
  SequenceNr BIGINT NOT NULL,
  Timestamp DATETIME2 NOT NULL,
  Manifest NVARCHAR(500) NOT NULL,
  Snapshot VARBINARY(MAX) NOT NULL,
  SerializerId INTEGER NULL
  CONSTRAINT PK_{your_snapshot_table_name} PRIMARY KEY (PersistenceID, SequenceNr)
);

CREATE TABLE {your_metadata_table_name} (
  PersistenceID NVARCHAR(255) NOT NULL,
  SequenceNr BIGINT NOT NULL,
  CONSTRAINT PK_{your_metadata_table_name} PRIMARY KEY (PersistenceID, SequenceNr)
);

Underneath Akka.Persistence.SqlServer uses a raw ADO.NET commands. You may choose not to use a dedicated built in ones, but to create your own being better fit for your use case. To do so, you have to create your own versions of IJournalQueryBuilder and IJournalQueryMapper (for custom journals) or ISnapshotQueryBuilder and ISnapshotQueryMapper (for custom snapshot store) and then attach inside journal, just like in the example below:

class MyCustomSqlServerJournal: Akka.Persistence.SqlServer.Journal.SqlServerJournal
{
    public MyCustomSqlServerJournal() : base()
    {
        QueryBuilder = new MyCustomJournalQueryBuilder();
        QueryMapper = new MyCustomJournalQueryMapper();
    }
}

Migration

From 1.1.2 to 1.3.1

ALTER TABLE {your_journal_table_name} ADD COLUMN SerializerId INTEGER NULL
ALTER TABLE {your_snapshot_table_name} ADD COLUMN SerializerId INTEGER NULL

From 1.1.0 to 1.1.2

ALTER TABLE {your_journal_table_name} DROP CONSTRAINT PK_{your_journal_table_name};
ALTER TABLE {your_journal_table_name} ADD Ordering BIGINT IDENTITY(1,1) PRIMARY KEY NOT NULL;
ALTER TABLE {your_journal_table_name} ADD Ordering BIGINT IDENTITY(1,1) NOT NULL;
ALTER TABLE {your_journal_table_name} ADD CONSTRAINT PK_EventJournal PRIMARY KEY (Ordering);
ALTER TABLE {your_journal_table_name} ADD CONSTRAINT QU_{your_journal_table_name} UNIQUE (PersistenceID, SequenceNr);

From 1.0.8 to 1.1.0

-- helper function to convert between DATETIME2 and BIGINT as .NET ticks
-- taken from: http://stackoverflow.com/questions/7386634/convert-sql-server-datetime-object-to-bigint-net-ticks
CREATE FUNCTION [dbo].[Ticks] (@dt DATETIME)
RETURNS BIGINT
WITH SCHEMABINDING
AS
BEGIN
DECLARE @year INT = DATEPART(yyyy, @dt)
DECLARE @month INT = DATEPART(mm, @dt)
DECLARE @day INT = DATEPART(dd, @dt)
DECLARE @hour INT = DATEPART(hh, @dt)
DECLARE @min INT = DATEPART(mi, @dt)
DECLARE @sec INT = DATEPART(ss, @dt)

DECLARE @days INT =
    CASE @month - 1
        WHEN 0 THEN 0
        WHEN 1 THEN 31
        WHEN 2 THEN 59
        WHEN 3 THEN 90
        WHEN 4 THEN 120
        WHEN 5 THEN 151
        WHEN 6 THEN 181
        WHEN 7 THEN 212
        WHEN 8 THEN 243
        WHEN 9 THEN 273
        WHEN 10 THEN 304
        WHEN 11 THEN 334
        WHEN 12 THEN 365
    END
    IF  @year % 4 = 0 AND (@year % 100  != 0 OR (@year % 100 = 0 AND @year % 400 = 0)) AND @month > 2 BEGIN
        SET @days = @days + 1
    END
RETURN CONVERT(bigint,
    ((((((((@year - 1) * 365) + ((@year - 1) / 4)) - ((@year - 1) / 100)) + ((@year - 1) / 400)) + @days) + @day) - 1) * 864000000000) +
    ((((@hour * 3600) + CONVERT(bigint, @min) * 60) + CONVERT(bigint, @sec)) * 10000000) + (CONVERT(bigint, DATEPART(ms, @dt)) * CONVERT(bigint,10000));

END;
ALTER TABLE {your_journal_table_name} ADD Timestamp_tmp BIGINT NULL;
UPDATE {your_journal_table_name} SET Timestamp_tmp = dbo.Ticks(Timestamp);
DROP INDEX [IX_EventJournal_Timestamp] ON {your_journal_table_name};
ALTER TABLE {your_journal_table_name} DROP COLUMN Timestamp;
ALTER TABLE {your_journal_table_name} ALTER COLUMN Timestamp_tmp BIGINT NOT NULL;
EXEC sp_RENAME '{your_journal_table_name}.Timestamp_tmp' , 'Timestamp', 'COLUMN';
CREATE NONCLUSTERED INDEX [IX_EventJournal_Timestamp] ON {your_journal_table_name}([Timestamp] ASC);
ALTER TABLE {your_journal_table_name} ADD Tags NVARCHAR(100) NULL;

From 1.0.6 to 1.0.8

CREATE TABLE {your_metadata_table_name} (
  PersistenceID NVARCHAR(255) NOT NULL,
  SequenceNr BIGINT NOT NULL,
  CONSTRAINT PK_Metadata PRIMARY KEY (PersistenceID, SequenceNr)
);

INSERT INTO {your_metadata_table_name} (PersistenceID, SequenceNr)
SELECT PersistenceID, MAX(SequenceNr) as SequenceNr FROM {your_journal_table_name} GROUP BY PersistenceID;

ALTER TABLE {your_journal_table_name} ALTER COLUMN PersistenceID NVARCHAR(255) [NOT NULL];

From 1.0.4 to 1.0.5

ALTER TABLE dbo.EventJournal ADD Timestamp DATETIME2 NOT NULL DEFAULT GETDATE();
ALTER TABLE dbo.EventJournal DROP CONSTRAINT PK_EventJournal;
ALTER TABLE dbo.EventJournal DROP COLUMN CS_PID;
ALTER TABLE dbo.EventJournal ADD CONSTRAINT PK_EventJournal PRIMARY KEY (PersistenceID, SequenceNr);
sp_RENAME 'EventJournal.PayloadType', 'Manifest', 'COLUMN';
sp_RENAME 'SnapshotStore.PayloadType', 'Manifest', 'COLUMN';

Tests

The SqlServer tests are packaged and run as part of the default "All" build task.

In order to run the tests, you must do the following things:

  1. Download and install SQL Server Express 2014 from: http://www.microsoft.com/en-us/server-cloud/Products/sql-server-editions/sql-server-express.aspx
  2. Install SQL Server Express with the default settings.
  3. Create a new user called akkadotnet with the password akkadotnet and give them rights to create new databases on the server.
  4. The default connection string uses the following credentials: Data Source=localhost\SQLEXPRESS;Database=akka_persistence_tests;User Id=akkadotnet; Password=akkadotnet;
  5. A custom app.config file can be used and needs to be placed in the same folder as the dll

akka.persistence.sqlserver's People

Contributors

aaronontheweb avatar alexcuse avatar alexvaluyskiy avatar arkatufus avatar danthar avatar davesearlegsa avatar dependabot-preview[bot] avatar dependabot[bot] avatar eaba avatar graemebradbury avatar havret avatar heynickc avatar horusiath avatar idesai1210 avatar igorfedchenko avatar ingted avatar iosifa avatar irvindominin avatar ismaelhamed avatar izavala avatar lesscodetxm avatar mrrd avatar newmancodes avatar object avatar pmbanka avatar rogeralsing avatar to11mtm avatar vaclavk avatar zetanova avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

akka.persistence.sqlserver's Issues

Ensuring that important PRs are merged before Akka Persistence is released

Akka.NET Persistence is still in beta but AFAIK is planned for RTM during the summer. There are some PRs that are important to merge before the release, in particular #59 and #60.

#59 doesn't affect anything at runtime, it's about consistend and deterministic names.
#60 is quite important, we had to fork Akka.Persistence.SqlServer to apply it, otherwise streaming AllEventsByTag don't work properly.

Both PRs are shown as if they break the build but IMHO it's a false negative, they are good. Is there anything that can be done to ensure the pending PRs are applied before the release? Anything I could do?

Persistent actors are still stuck after network failure

This might be the same as #104 which is supposed to be fixed in 1.3.13. However we upgraded to 1.3.13 and it looks like this error occurs more often than before. Here are the symptoms:

  1. A temporary network failure occurs and lasts for a short time, we see in our logs some errors (for example an error from RabbitMQ).
  2. Around this time we see a number of "Failed to execute chunk for 1 requests" error message from akka://Oddjob/system/akka.persistence.journal.sql-server.
  3. In a few minutes after that (when the network is OK) we see a few hundreds of "Circuit Breaker is open; calls are failing fast" errors from our persistent actors that are trying to recover its state.
  4. Then there is a continuous stream of "Recovery timed out, didn't get event within 60s, highest sequence number seen 0." messages that lasts until we restart the node.
  5. After restarting the node things go back to normal.

akka.cluster and persistence delivering issue

In our cluster we have four nodes composite of:

  • 2 seed nodes (backend)
  • 1 worker
  • 1 webapi on IIS

The cluster is joined, up and running; when i send a POST to the webapi,:

  • IIS join the cluster
  • the API recieve the a post and send the message with a Tell
  • the message is processed two or three times!
  • is happens only for the message sent when IIS join, the following messages are working fine

Here is my IIS config:

<akka>
  <hocon>
    <![CDATA[
            akka.loglevel = INFO
            akka.log-config-on-start = off
            akka.stdout-loglevel = INFO
            akka.actor {
                provider = "Akka.Cluster.ClusterActorRefProvider, Akka.Cluster"
                deployment {
                  /TheProcess {
                    router = round-robin-group
                    routees.paths = ["/user/TheProcess"] # path of routee on each node
                    # nr-of-instances = 3 # max number of total routees
                    cluster {
                        enabled = on
                        allow-local-routees = off
                        use-role = TheProcess
                    }
                  }                            
                }
                debug {
                  receive = on
                  autoreceive = on
                  lifecycle = on
                  event-stream = on
                  unhandled = on
                }
            }
            akka.remote {
                helios.tcp {
                    # transport-class = "Akka.Remote.Transport.Helios.HeliosTcpTransport, Akka.Remote"
                    # applied-adapters = []
                    # transport-protocol = tcp
					          # public-hostname = "localhost"
                    # 0 or 46001-46010
                    port = 0
                    hostname = "localhost"
                }
                log-remote-lifecyclo-events = DEBUG
            }
            akka.cluster {
              seed-nodes = [
                "akka.tcp://ActorSystem@localhost:2551",
                "akka.tcp://ActorSystem@localhost:2552"
              ]
              roles = [TheSend]
              # auto-down-unreachable-after = 10s
              # how often should the node send out gossip information?
              # gossip-interval = 1s
              # discard incoming gossip messages if not handled within this duration
              # gossip-time-to-live = 2s              
            }
            # http://getakka.net/docs/persistence/at-least-once-delivery
            akka.persistence.at-least-once-delivery.redeliver-interval = 300s
            # akka.persistence.at-least-once-delivery.redelivery-burst-limit =
            # akka.persistence.at-least-once-delivery.warn-after-number-of-unconfirmed-attempts =
            akka.persistence.at-least-once-delivery.max-unconfirmed-messages = 1000000
            akka.persistence.journal.plugin = "akka.persistence.journal.sql-server"
            akka.persistence.journal.publish-plugin-commands = on
            akka.persistence.journal.sql-server {
                class = "Akka.Persistence.SqlServer.Journal.SqlServerJournal, Akka.Persistence.SqlServer"
                plugin-dispatcher = "akka.actor.default-dispatcher"
                table-name = EventJournal
                schema-name = dbo
                auto-initialize = on
                connection-string-name = "AkkaPersistence"
                refresh-interval = 1s
                connection-timeout = 30s
                timestamp-provider = "Akka.Persistence.Sql.Common.Journal.DefaultTimestampProvider, Akka.Persistence.Sql.Common"
                metadata-table-name = Metadata
            }
            akka.persistence.snapshot-store.plugin = ""akka.persistence.snapshot-store.sql-server""
            akka.persistence.snapshot-store.sql-server {
              class = "Akka.Persistence.SqlServer.Snapshot.SqlServerSnapshotStore, Akka.Persistence.SqlServer"
              plugin-dispatcher = ""akka.actor.default-dispatcher""
              connection-string-name = "AkkaPersistence"
              schema-name = dbo
              table-name = SnapshotStore
              auto-initialize = on
            }
      ]]>
  </hocon>

And here is my backend config:

  <hocon>
    <![CDATA[
        akka.loglevel = INFO
        akka.log-config-on-start = on
        akka.stdout-loglevel = INFO
        akka.actor {
            provider = "Akka.Cluster.ClusterActorRefProvider, Akka.Cluster"
            debug {
              receive = on
              autoreceive = on
              lifecycle = on
              event-stream = on
              unhandled = on
            }
        }
        akka.remote {
          helios.tcp {
                # transport-class = "Akka.Remote.Transport.Helios.HeliosTcpTransport, Akka.Remote"
                # applied-adapters = []
                # transport-protocol = tcp
				        # public-hostname = "localhost"
                # 
                # seed-node ports 2551 and 2552
                # non-seed-node port 0 or 46001-46010
                port = 2551
                hostname = "localhost"
            }
            log-remote-lifecyclo-events = INFO
        }
        akka.cluster {
          seed-nodes = [
            "akka.tcp://ActorSystem@localhost:2551",
            "akka.tcp://ActorSystem@localhost:2552"
          ]
          roles = [TheProcess]
          # auto-down-unreachable-after = 10s
        }
      ]]>
  </hocon>

I think the issue is akka persistence related, what can the issue be?

Example config appears to be incorrect

Hi there. The example HOCON file appears to contain an error in the 'akka.persistence.snapshot-store.sql-server.plugin-dispatcher' field:

            # dispatcher used to drive journal actor
            plugin-dispatcher = ""akka.actor.default-dispatcher""

Remove CS_PID field

CS_PID field doesn't seems to work the way, it was designed to. Therefore it should be dropped and PersistenceID field should be used instead.

Ensure backwards compability with pre-v1.3.1 schema

Adding the column SerializerId per changes in v1.3.1 may not be sufficient as we are now possibly inserting NULL into the Manifest column per this line: https://github.com/akkadotnet/akka.net/blob/dev/src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/QueryExecutor.cs#L654

If the serializer is set to not include manifest info, then we insert NULL, which is not allowed per the existing schema: https://github.com/akkadotnet/akka.net/blob/dev/src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/QueryExecutor.cs#L654

CreateEventsJournalSql = $@"
            IF NOT EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = '{configuration.SchemaName}' AND TABLE_NAME = '{configuration.JournalEventsTableName}')
            BEGIN
                CREATE TABLE {configuration.FullJournalTableName} (
                    {configuration.OrderingColumnName} BIGINT IDENTITY(1,1) PRIMARY KEY NOT NULL,
	                {configuration.PersistenceIdColumnName} NVARCHAR(255) NOT NULL,
	                {configuration.SequenceNrColumnName} BIGINT NOT NULL,
                    {configuration.TimestampColumnName} BIGINT NOT NULL,
                    {configuration.IsDeletedColumnName} BIT NOT NULL,
                    {configuration.ManifestColumnName} NVARCHAR(500) NOT NULL,
	                {configuration.PayloadColumnName} VARBINARY(MAX) NOT NULL,
                    {configuration.TagsColumnName} NVARCHAR(100) NULL,
                    CONSTRAINT UQ_{configuration.JournalEventsTableName} UNIQUE ({configuration.PersistenceIdColumnName}, {configuration.SequenceNrColumnName})
                );
                CREATE INDEX IX_{configuration.JournalEventsTableName}_{configuration.SequenceNrColumnName} ON {configuration.FullJournalTableName}({configuration.SequenceNrColumnName});
                CREATE INDEX IX_{configuration.JournalEventsTableName}_{configuration.TimestampColumnName} ON {configuration.FullJournalTableName}({configuration.TimestampColumnName});
            END
            ";

May need an additional ALTER COLUMN statement in README to make Manifest nullable for backwards compatibility with 1.1.1.7-beta

SqlServerPersistence.Init is missing -- Documentation out of date?

Hey guys,

It looks like in 1.0.5 and on "SqlServerPersistence.Init" is missing -- and there's no documented way to initialize this extension.

The README for this repo still says that this is the correct way to initialize the extension. -- Please update the readme, because whatever the new method is, it's not clear, and it means we can't use a newer version than 1.0.4. :(

Current nuget package fails to work with akka 1.3.7

Due to the changes to Akka.Persistence.Sql.Common QueryConfiguration and the addition of the extra parameter of useSequentialAccess this package does not operate with Akka 1.3.7

I have updated the code to support this parameter in #96

Error during snapshot store initialization

Trying to make an Akka.Persistence example with sql server and autoinit of tables. However, the snapshot table fails to create.

I'm using Akka.NET 1.3.13.

Here is my HOCON:

akka {
          persistence {
            journal {
              plugin = "akka.persistence.journal.sql-server"
              sql-server {
                class = "Akka.Persistence.SqlServer.Journal.SqlServerJournal, Akka.Persistence.SqlServer"
                schema-name = dbo
                auto-initialize = on
                connection-string = "Data Source=(LocalDB)\\MSSQLLocalDB;AttachDbFilename=\\\\Mac\\Dropbox\\Nastava\\2018-2019\\RS_18-19\\Vj07\\PersistentActorExample\\PersistentActorExample\\App_Data\\Persistence.mdf;Integrated Security=True"
              }
            } 
            snapshot-store {
              plugin = "akka.persistence.snapshot-store.sql-server"
              sql-server {
                class = "Akka.Persistence.SqlServer.Snapshot.SqlServerSnapshotStore, Akka.Persistence.SqlServer"
                schema-name = dbo
                auto-initialize = on
                connection-string = "Data Source=(LocalDB)\\MSSQLLocalDB;AttachDbFilename=\\\\Mac\\Dropbox\\Nastava\\2018-2019\\RS_18-19\\Vj07\\PersistentActorExample\\PersistentActorExample\\App_Data\\Persistence.mdf;Integrated Security=True"
              }
            }
          }   
        }

I'm working on a OSX with parallels running windows 10.

Here is the error that occurs:

[ERROR][5/5/2019 9:56:15 PM][Thread 0009][[akka://persistence-example/system/akka.persistence.snapshot-store.sql-server#670725692]] Error during snapshot store initialization
Cause: System.Data.SqlClient.SqlException (0x80131904): Incorrect syntax near '('.
Incorrect syntax near '('.
Incorrect syntax near '('.
   at System.Data.SqlClient.SqlConnection.OnError(SqlException exception, Boolean breakConnection, Action`1 wrapCloseInAction)
   at System.Data.SqlClient.SqlInternalConnection.OnError(SqlException exception, Boolean breakConnection, Action`1 wrapCloseInAction)
   at System.Data.SqlClient.TdsParser.ThrowExceptionAndWarning(TdsParserStateObject stateObj, Boolean callerHasConnectionLock, Boolean asyncClose)
   at System.Data.SqlClient.TdsParser.TryRun(RunBehavior runBehavior, SqlCommand cmdHandler, SqlDataReader dataStream, BulkCopySimpleResultSet bulkCopyHandler, TdsParserStateObject stateObj, Boolean& dataReady)
   at System.Data.SqlClient.SqlCommand.InternalEndExecuteNonQuery(IAsyncResult asyncResult, String endMethod, Boolean isInternal)
   at System.Data.SqlClient.SqlCommand.EndExecuteNonQueryInternal(IAsyncResult asyncResult)
   at System.Data.SqlClient.SqlCommand.EndExecuteNonQueryAsync(IAsyncResult asyncResult)
   at System.Threading.Tasks.TaskFactory`1.FromAsyncCoreLogic(IAsyncResult iar, Func`2 endFunction, Action`1 endAction, Task`1 promise, Boolean requiresSynchronization)
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at Akka.Persistence.Sql.Common.Snapshot.AbstractQueryExecutor.<CreateTableAsync>d__28.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at Akka.Persistence.Sql.Common.Snapshot.SqlSnapshotStore.<Initialize>d__17.MoveNext()
ClientConnectionId:69ba6e78-21b1-4cda-999b-94b7b0f703a5
Error Number:102,State:1,Class:15

The tables EventJournal and Metadata are created.

Current Akka.Persistence.SqlServer version incompatible with 1.1.2.x

In follow up on the ordering issue on SQL server: akkadotnet/akka.net#2272

Please release a new nightly of package Akka.Persistence.SqlServer so it handles the new ordering column configuration. We're experiencing a missing method exception with the current 1.1.1.158-beta

System.Reflection.TargetInvocationException: Exception has been thrown by the target of an invocation. ---> System.MissingMethodException: Method not found: 'Void Akka.Persistence.Sql.Common.Journal.QueryConfiguration..ctor(System.String, System.String, System.String, System.String, System.String, System.String, System.String, System.String, System.String, System.String, System.TimeSpan)'.
at Akka.Persistence.SqlServer.Journal.SqlServerJournal..ctor(Config journalConfig)
--- End of inner exception stack trace ---
at System.RuntimeMethodHandle.InvokeMethod(Object target, Object[] arguments, Signature sig, Boolean constructor)
at System.Reflection.RuntimeConstructorInfo.Invoke(BindingFlags invokeAttr, Binder binder, Object[] parameters, CultureInfo culture)
at System.RuntimeType.CreateInstanceImpl(BindingFlags bindingAttr, Binder binder, Object[] args, CultureInfo culture, Object[] activationAttributes, StackCrawlMark& stackMark)
at System.Activator.CreateInstance(Type type, BindingFlags bindingAttr, Binder binder, Object[] args, CultureInfo culture, Object[] activationAttributes)
at System.Activator.CreateInstance(Type type, Object[] args)
at Akka.Actor.Props.ActivatorProducer.Produce()
at Akka.Actor.Props.NewActor()
--- End of inner exception stack trace ---
at Akka.Actor.Props.NewActor()
at Akka.Actor.ActorCell.CreateNewActorInstance()
at Akka.Actor.ActorCell.<>c__DisplayClass118_0.b__0()
at Akka.Actor.ActorCell.UseThreadContext(Action action)
at Akka.Actor.ActorCell.NewActor()
at Akka.Actor.ActorCell.Create(Exception failure)
--- End of inner exception stack trace ---
at Akka.Actor.ActorCell.Create(Exception failure)
at Akka.Actor.ActorCell.SysMsgInvokeAll(EarliestFirstSystemMessageList messages, Int32 currentState)

Thanks :)

EventJournal slow and redeliver issue

In our cluster we have five nodes composite of:

  • 2 seed nodes (backend)
  • 1 worker
  • 2 webapi on IIS

The cluster is joined, up and running; but when the EventJournal is a bit full (~ 1 million rows) the insert in the EventJournal became slow and often (or always on rows growing) if a message must be redeliverd because of the redeliver interval is reached the message is not sent again to the backend.

How do I have to manage this issue?

Here is my IIS config:

<hocon>
  <![CDATA[
        akka.loglevel = INFO
        akka.log-config-on-start = off
        akka.stdout-loglevel = INFO
        akka.actor {
            provider = "Akka.Cluster.ClusterActorRefProvider, Akka.Cluster"
            deployment {
              /Process {
                router = round-robin-group
                routees.paths = ["/user/Process"] # path of routee on each node
                # nr-of-instances = 3 # max number of total routees
                cluster {
                    enabled = on
                    allow-local-routees = off
                    use-role = Process
                }
              }
            }
            debug {
              receive = on
              autoreceive = on
              lifecycle = on
              event-stream = on
              unhandled = on
            }
        }
        akka.remote {
            helios.tcp {
                # transport-class = "Akka.Remote.Transport.Helios.HeliosTcpTransport, Akka.Remote"
                # applied-adapters = []
                # transport-protocol = tcp
                port = 0
                hostname = 172.16.1.8
            }
            log-remote-lifecyclo-events = DEBUG
        }
        akka.cluster {
          seed-nodes = [
            "akka.tcp://[email protected]:2551",
            "akka.tcp://[email protected]:2552"
          ]
          roles = [Send]
          auto-down-unreachable-after = 10s
          # how often should the node send out gossip information?
          gossip-interval = 1s
          # discard incoming gossip messages if not handled within this duration
          gossip-time-to-live = 2s
        }
        # http://getakka.net/docs/persistence/at-least-once-delivery
        akka.persistence.at-least-once-delivery.redeliver-interval = 300s
        # akka.persistence.at-least-once-delivery.redelivery-burst-limit =
        # akka.persistence.at-least-once-delivery.warn-after-number-of-unconfirmed-attempts =
        akka.persistence.at-least-once-delivery.max-unconfirmed-messages = 1000000
        akka.persistence.journal.plugin = "akka.persistence.journal.sql-server"
        akka.persistence.journal.publish-plugin-commands = on
        akka.persistence.journal.sql-server {
            class = "Akka.Persistence.SqlServer.Journal.SqlServerJournal, Akka.Persistence.SqlServer"
            plugin-dispatcher = "akka.actor.default-dispatcher"
            table-name = EventJournal
            schema-name = dbo
            auto-initialize = on
            connection-string-name = "HubAkkaPersistence"
            refresh-interval = 1s
            connection-timeout = 30s
            timestamp-provider = "Akka.Persistence.Sql.Common.Journal.DefaultTimestampProvider, Akka.Persistence.Sql.Common"
            metadata-table-name = Metadata
        }
        akka.persistence.snapshot-store.plugin = ""akka.persistence.snapshot-store.sql-server""
        akka.persistence.snapshot-store.sql-server {
          class = "Akka.Persistence.SqlServer.Snapshot.SqlServerSnapshotStore, Akka.Persistence.SqlServer"
          plugin-dispatcher = ""akka.actor.default-dispatcher""
          connection-string-name = "HubAkkaPersistence"
          schema-name = dbo
          table-name = SnapshotStore
          auto-initialize = on
        }
  ]]>
</hocon>

and here is our backend config:

<hocon><![CDATA[
    akka.loglevel = INFO
    akka.log-config-on-start = on
    akka.stdout-loglevel = INFO
    akka.actor {
        provider = "Akka.Cluster.ClusterActorRefProvider, Akka.Cluster"
        debug {
          receive = on
          autoreceive = on
          lifecycle = on
          event-stream = on
          unhandled = on
        }
    }
    akka.remote {
      helios.tcp {
            # transport-class = "Akka.Remote.Transport.Helios.HeliosTcpTransport, Akka.Remote"
            # applied-adapters = []
            # transport-protocol = tcp
            # 
            # seed-node ports 2551 and 2552
            # non-seed-node port 0
            port = 2551
            hostname = 172.16.1.8
        }
        log-remote-lifecyclo-events = INFO
    }
    akka.cluster {
      seed-nodes = [
        "akka.tcp://[email protected]:2551",
        "akka.tcp://[email protected]:2552"
      ]
      roles = [Process]
      auto-down-unreachable-after = 10s
    }
  ]]></hocon>

The issue in present using Akka 1.3.2, but I have faced using with previous versions too.

Need to separate Powershell for Docker startup into a separate build step

Wanted to have a Mono build in CI to validate that the plugin builds on Mono, but in order to trigger the Docker-related Powershells from within build.fsx, had to import System.Management.Automation which throws here: http://petabridge-ci.cloudapp.net/viewLog.html?buildId=24898&tab=buildResultsDiv&buildTypeId=AkkaNet_AkkaPersistenceImplementations_AkkaPersistenceSqlServer_AkkaP

The other plugins spin up containers in separate build steps - with the building and running tests done inside of build.fsx. Think this is a better approach, to decouple the Docker infrastructure from the build scripts.

SQL BeginExecuteReader error message occur when reply event

When I am trying to replay events, the error message below shows up.

BeginExecuteReader requires the command to have a transaction when the connection assigned to the command is in a pending local transaction. The Transaction property of the command has not been initialized.

Below is the summary of the program flow.

ActorA > FowardCommand > ActorB > PublishEvent > ActorC > Tell > ActorD > ForwardCommand > ActorE

Actors below are actor base:
ActorA, ActorD

Actors below are persistence actor:
ActorB, ActorE

Actors below are receive actor:
ActorC

When ActorE trying to replay events, error occur.
Below are the stackflow I grab from RecoveryFailure object.

   at System.Threading.Tasks.Task.ThrowIfExceptional(Boolean includeTaskCanceledExceptions)
   at System.Threading.Tasks.Task`1.GetResultCore(Boolean waitCompletionNotification)
   at System.Threading.Tasks.Task`1.get_Result()
   at Akka.Persistence.Sql.Common.Journal.JournalDbEngine.<>c__DisplayClass1.<ReplayMessagesAsync>b__0(Task`1 task)
   at System.Threading.Tasks.ContinuationTaskFromResultTask`1.InnerInvoke()
   at System.Threading.Tasks.Task.Execute()

Error happened at line below:
https://github.com/rambo406/Akka.NET-SQL-Server-issue-example/blob/master/Actors/Aggregates/Accountant.cs#L28

Get rid of dependency on Powershell in build.fsx

Other plugins, as we moved them to using container dbs, we're running any Docker-related scripts outside of the actual build process. This is better, especially for Mono because it's currently breaking the build on Linux looking for System.Management.Automation.

Racy spec: EventsByPersistenceIdSpec.Sql_query_EventsByPersistenceId_should_return_empty_stream_for_cleaned_journal_from_0_to_MaxLong

This error

Xunit.Sdk.TrueException
Failed: Expected a message of type Akka.Streams.TestKit.TestSubscriber+OnComplete, but received {TestSubscriber.OnNext(g1-1)} (type Akka.Streams.TestKit.TestSubscriber+OnNext`1[System.Object]) instead  from [akka://test/user/StreamSupervisor-11/Flow-0-0-select#838415787]
Expected: True
Actual:   False
   at Akka.TestKit.Xunit2.XunitAssertions.Fail(String format, Object[] args)
   at Akka.TestKit.TestKitBase.InternalExpectMsgEnvelope[T](Nullable`1 timeout, Action`2 assert, String hint, Boolean shouldLog)
   at Akka.TestKit.TestKitBase.InternalExpectMsgEnvelope[T](Nullable`1 timeout, Action`1 msgAssert, Action`1 senderAssert, String hint)
   at Akka.TestKit.TestKitBase.InternalExpectMsg[T](Nullable`1 timeout, Action`1 msgAssert, String hint)
   at Akka.TestKit.TestKitBase.ExpectMsg[T](Nullable`1 duration, String hint)
   at Akka.Persistence.Sql.TestKit.EventsByPersistenceIdSpec.Sql_query_EventsByPersistenceId_should_return_empty_stream_for_cleaned_journal_from_0_to_MaxLong()

[INFO][01.08.2016 14:39:40][Thread 0041][akka://test/user/testActor143] Message OnError from akka://test/user/StreamSupervisor-11/Flow-0-0-select to akka://test/user/testActor143 was not delivered. 1 dead letters encountered.
[INFO][01.08.2016 14:39:40][Thread 0027][akka://test/user/StreamSupervisor-11/Flow-0-0-select] Message Terminate from akka://test/user/StreamSupervisor-11/Flow-0-0-select to akka://test/user/StreamSupervisor-11/Flow-0-0-select was not delivered. 2 dead letters encountered.

Journal reader missing events

In our application we are using Akka.net, with event sourcing. The persistent actors save their events in an SQL Server database.
We also have view actors, which subscribe to these events, using a journal reader/persistence query, to create materialised views. We have a table in the database, that has a row for every view actor. This row contains the name of the view actor and the offset of the last event prccessed.
At first sight, this is working smoothly. Sometimes however, when we run a test that results in thousands of events, the journal reader is missing some events.

A view actor is a ReceiveActor. When started, it retrieves the last handled event offset from the database (called from the actor's constructor). The offset is piped to self in an OffsetMessage.
On receiving the OffsetMessage the view actor initialises the journal reader. On receiving events (in EventEnvelope messages), the views are updated.

The action that is run from the journal reader, first writes a line to the log. That line contains the event offset.
The EventEnvelope receive handler also writes a line to the log. That line also contains the event offset.

We have a test that results in 9635 event inserted into the journal. Sometimes the journal reader and the EventEnvelope receive handler are logging less than 9635 events.
They both log the same numbers, so it seems the events are missed by the journal reader. The missed events from the log are corresponding to the missing items in the views.
We run the test on an empty database. Logging is at the debug level, and does not show exceptions. The missing events (we have seen numbers of 1 to 4) can be among the first, middle or last events. Everytime this is different.

So far we have no idea what is causing this problem, or how it can be solved.

Following are fragments of our code. The view actors all inherit from a base class: ViewActorBase.

internal abstract class ViewActorBase : ReceiveActor, ILogReceive
{
    public ViewActorBase()
    {
        // Some initialisation code
        ....

        this.Receive<OffsetMessage>(this.HandleOffsetMessage);
        this.ReceiveAsync<EventEnvelope>(this.UpdateState);

        var sender = this.Sender;
        var self = this.Self;
        this.GetViewActorOffset(self, sender);
    }

    private void HandleOffsetMessage(OffsetMessage offsetMessage)
    {
        this.InitialiseJournalReader(offsetMessage.Offset);
    }

    private void InitialiseJournalReader(long offset)
    {
        // obtain read journal by plugin id
        var readJournal = PersistenceQuery.Get(Context.System).ReadJournalFor<SqlReadJournal>($"akka.persistence.query");

        // materialize stream, consuming events
        var materializer = ActorMaterializer.Create(Context.System);

        // issue query to journal
        Source<EventEnvelope, NotUsed> source = readJournal.EventsByTag(this.QueryEventTag, new Sequence(offset));

        var self = this.Self;
        source.RunForeach(envelope => { this.Logger.Debug("{Date:HH:mm:ss.fffff} JournalReader.Tell {Offset}", DateTime.Now, (envelope.Offset as Sequence).Value); self.Tell(envelope); }, materializer);
    }

    private void GetViewActorOffset(IActorRef self, IActorRef sender)
    {
        // Initialise repository
        ....

        repository.GetViewActorOffset(this.GetViewName()).PipeTo(self, sender, offset => new OffsetMessage(offset));
    }
}

internal class MyViewActor : ViewActorBase
{
    protected override async Task UpdateState(EventEnvelope envelope)
    {
        var offset = (envelope.Offset as Sequence).Value;

        this.Logger.Debug("{Date:HH:mm:ss.fffff} {MethodName} {Offset}", DateTime.Now, $"{this.GetType().Name}.UpdateState", offset);

        // Update views
        ....
    }
}

Is there something wrong in our code or architecture? Are there better solutions?

Additional information
We have run some tests with SQL Server profiler monitoring the queries to the database.

A query was executed on the event journal, asking for 100 events, starting at offset 204743. The result contained 61 rows.

<Event id="10" name="RPC:Completed">
  <Column id="1" name="TextData">exec sp_executesql N'
        SELECT TOP (@Take)
        e.PersistenceId as PersistenceId, 
        e.SequenceNr as SequenceNr, 
        e.Timestamp as Timestamp, 
        e.IsDeleted as IsDeleted, 
        e.Manifest as Manifest, 
        e.Payload as Payload,
        e.SerializerId as SerializerId,
        e.Ordering as Ordering
        FROM dbo.EventJournal e
        WHERE e.Ordering &gt; @Ordering AND e.Tags LIKE @Tag
        ORDER BY Ordering ASC
        ',N'@Tag nvarchar(10),@Ordering bigint,@Take bigint',@Tag=N'%;Module;%',@Ordering=204743,@Take=100</Column>
  <Column id="9" name="ClientProcessID">1169425116</Column>
  <Column id="10" name="ApplicationName">Core .Net SqlClient Data Provider</Column>
  <Column id="12" name="SPID">82</Column>
  <Column id="13" name="Duration">353890</Column>
  <Column id="14" name="StartTime">2018-08-30T16:32:32.927+02:00</Column>
  <Column id="15" name="EndTime">2018-08-30T16:32:33.28+02:00</Column>
  <Column id="16" name="Reads">326</Column>
  <Column id="17" name="Writes">0</Column>
  <Column id="18" name="CPU">0</Column>
  <Column id="48" name="RowCounts">61</Column>
</Event>

We expexted the next query to start at 204804 (204743 + 61). However, it started at 204810. Why is it skipping (or missing) 6 events?

Getting error after update NuGet package Akka.Persistence.SqlServer

After updating the NuGet package to version 1.0.6.3 I'm getting an error:

[Thread 0014][akka://TaskManagementSystem/system/akka.persistence.journal.sql-server] Object reference not set to an instance of an object.
Cause: [akka://TaskManagementSystem/system/akka.persistence.journal.sql-server]: Akka.Actor.ActorInitializationException: Exception during creation ---> System.NullReferenceException: Object reference not set to an instance of an object.

Without changing anything to the project I have made downgrade to the package 1.0.6.0 and my system starter working propertly.

Strange error when SaveSnapshotFailure

I'm getting strange error when my persistent actor failed to save snaphot:

Error in saving snapshot SnapshotMetadata<pid: TaskManager, seqNr: 218, timestamp: 0001-01-01>
Cause: System.NotSupportedException: There is no active ActorContext, this is most likely due to use of async operations from within this actor.

How can I avoid this kind of SaveSnapshotFailure?

BatchingSqlServerJournal - stuck persistent actor

During the maintenance of our production SqlServer database hosting event journal (active/passive setup), we have encountered a problem, that whole application got unresponsive. Only restarting of actor systems on all nodes in cluster via pbm helped. We are using Akka.Persistence.SqlServer 1.3.7 and BatchingSqlServerJournal.

I managed to simulate problem locally by running load test and taking local journal DB offline.
Even after putting DB online, the actors are still stuck.

Problem seems to be between expectations of ReceivePersistentActor (Eventsourced parent class) and implementation of BatchingSqlServerJournal. When Persist is called in a persistent actor, a WriteMessages message is sent to journal actor and persistent actor changes its behaviour (waiting for a sequence of WriteMessageSuccess/WriteMessageRejected/WriteMessageFailurea and finally WriteMessagesSuccessful message, while stashing all other messages). The problem happens when journal actor will not reply, there is no timeout mechanism in persistent actor treating persist as failed and it blocks infinitely. This is the case of BatchingSqlServerJournal, which has some code paths not replying to journal requests (connection open error, circuit breaker opened and maybe more).

The consequences are quite severe, persistent actor may be unblocked only by external actor stop or whole actor system restart.

how to delete Metadata after deleting snapshot?

I have an actor inherited from ReceivePersistentActor.
It save snapshots and then deletes the old snapshots after saving snapshot is successfully done using the following method:

DeleteSnapshots(new SnapshotSelectionCriteria(@event.Metadata.SequenceNr - 1));

Looks like this method doesn't delete relevant metadata records from MetaData table.

Is there any way to delete those records from MetaData table?

akka packages:

  Akka                        1.2.0
  Akka.DI.AutoFac             1.0.8
  Akka.DI.Core                1.0.8
  Akka.Persistence            1.1.2.30-beta
  Akka.Persistence.Sql.Common 1.1.2.30-beta
  Akka.Persistence.SqlServer  1.1.1.7-beta
  Akka.Remote                 1.2.0

Snapshots not getting played back

I have created a sample application, which creates a snapshot every 25 events and removes all event messages until that time. But when it reload the actor, only the events are recovered and not the snapshot.

i am using the Akka.Persistence.SqlServer version 1.1.1.7-beta and Akka.Persistence.Sql.Common version 1.2.0.36-beta.

I have also observed that the metadata table never get any records... not sure whether its part of design.

Snapshots are not replaying while running on mono with a Sql Server Docker container

I created a simple example of an akka.persistence program (basically a carbon copy of the example on Pluralsight) running it on a Mac and connecting to a Sql Server database hosted in a Docker container (here is the link of how I set that up). And it will save Journal Events and play them back just fine, but if I try to do the same with a snapshot, it will save the snapshot to the database, but it does not play it back. I tried the exact same code on a Windows virtual machine (pointed to a sql server database that was running locally on Windows, NOT in a Docker container) and it worked perfectly. So, I am guessing this is an issue with Mono? Just wanted to let you guys know.

Error when trying to persist actor

New transaction is not allowed because there are other threads running in the session.

   at System.Data.SqlClient.SqlConnection.OnError(SqlException exception, Boolean breakConnection, Action`1 wrapCloseInAction)
   at System.Data.SqlClient.TdsParser.ThrowExceptionAndWarning(TdsParserStateObject stateObj, Boolean callerHasConnectionLock, Boolean asyncClose)
   at System.Data.SqlClient.TdsParser.TryRun(RunBehavior runBehavior, SqlCommand cmdHandler, SqlDataReader dataStream, BulkCopySimpleResultSet bulkCopyHandler, TdsParserStateObject stateObj, Boolean& dataReady)
   at System.Data.SqlClient.TdsParser.Run(RunBehavior runBehavior, SqlCommand cmdHandler, SqlDataReader dataStream, BulkCopySimpleResultSet bulkCopyHandler, TdsParserStateObject stateObj)
   at System.Data.SqlClient.TdsParser.TdsExecuteTransactionManagerRequest(Byte[] buffer, TransactionManagerRequestType request, String transactionName, TransactionManagerIsolationLevel isoLevel, Int32 timeout, SqlInternalTransaction transaction, TdsParserStateObject stateObj, Boolean isDelegateControlRequest)
   at System.Data.SqlClient.SqlInternalConnectionTds.ExecuteTransactionYukon(TransactionRequest transactionRequest, String transactionName, IsolationLevel iso, SqlInternalTransaction internalTransaction, Boolean isDelegateControlRequest)
   at System.Data.SqlClient.SqlInternalConnection.BeginSqlTransaction(IsolationLevel iso, String transactionName, Boolean shouldReconnect)
   at System.Data.SqlClient.SqlConnection.BeginTransaction(IsolationLevel iso, String transactionName)
   at System.Data.SqlClient.SqlConnection.BeginDbTransaction(IsolationLevel isolationLevel)
   at Akka.Persistence.Sql.Common.Journal.JournalDbEngine.<InsertInTransactionAsync>d__10.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at Akka.Persistence.Sql.Common.Journal.JournalDbEngine.<WriteMessagesAsync>d__6.MoveNext()

Error when trying to persist actor state, version of Akka.Persistence.SqlServer 1.0.6

After I run my application I get the error:

[WARNING][21-Apr-16 1:00:56 PM][Thread 0021][[akka://TaskManagementSystem/user/tenantRouter/TaskManagementDeliveryActor-45bcbf03-756f-4821-9c27-15035fe10621]] Rejected to persist event type [TaskManagement.Messages.CreateTaskMessage] with sequence number [1] for persistenceId [TaskManager] due to [Cannot insert the value NULL into column 'IsDeleted', table 'TaskManagment.dbo.EventJournal'; column does not allow nulls. INSERT fails.
The statement has been terminated.].

My table schema is the same like in the documentation.

PersistenceID vs PersistenceId on case sensitive DB

I have one DB with case sensitive collation (for db not collumn), and Persistence on this DB throws errors:

{ "Severity": "ERROR", "message": "Persistence failure when replaying events for persistenceId [ADER-636032988151227420-04]. Last known sequence number [0]", "timestamp": "2016-07-05 07:10:16.9931", "logger": "Offers.OfferActor", "stack": "System.Data.SqlClient.SqlException (0x80131904): Nieprawid\u0142owa nazwa kolumny PersistenceId.\r\n at System.Data.SqlClient.SqlCommand.<>c.b__167_0(Task1 result)\r\n at System.Threading.Tasks.ContinuationResultTaskFromResultTask2.InnerInvoke()\r\n at System.Threading.Tasks.Task.Execute()\r\n--- End of stack trace from previous location where exception was thrown ---\r\n at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)\r\n at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)\r\n at Akka.Persistence.Sql.Common.Journal.JournalDbEngine.d__23.MoveNext()\r\nClientConnectionId:b2fc7f28-29d8-4882-b365-d358a25b1d91\r\nError Number:207,State:1,Class:16", "appdomain": "0001:Offers.Node.Service.exe", "machinename": "O360NODE01 ", "Hostname": "Offers.Node.Service" }

I've noticed that sometimes in the Akka.Persistence.SqlServer code there is PersistenceId instead of PersistenceID as collumn name. Small typo and such a trouble ๐Ÿ˜„

Events not snapshotted/deleted

In my system I'm using Akka.Persistence.SQL each IIS have its actor incarnation in order to avoid events mixin.

I'm using Akka 1.3.2.

To reduce the number of rows in the eventjournal table I'm subscribing to SaveSnapshotSuccess using the following code:

    public class TheSend : AtLeastOnceDeliveryActor
    {
        // Logger
        private readonly ILoggingAdapter _log = Logging.GetLogger(Context);
        // Snapshot
        private readonly int snapshotinterval = Int32.Parse(System.Configuration.ConfigurationManager.AppSettings["AkkaSnapshotInterval"]);
        private int counter = 0;
        private readonly string persistenceActorId = System.Configuration.ConfigurationManager.AppSettings["AkkaPersistenceActorId"];

        // Actor system
        protected readonly IActorRef _backEndActor;
        // Cluster system
        protected Akka.Cluster.Cluster Cluster = Akka.Cluster.Cluster.Get(Context.System);

        public TheSend(IActorRef orderProcesserActor)
        {
            // router for backend actorref.
            this._backEndActor = orderProcesserActor;
        }

        /// <summary>
        /// Need to subscribe to cluster changes
        /// </summary>
        protected override void PreStart()
        {
            Cluster.Subscribe(Self, new[] { typeof(ClusterEvent.MemberUp) });
        }

        /// <summary>
        /// Re-subscribe on restart
        /// </summary>
        protected override void PostStop()
        {
            Cluster.Unsubscribe(Self);
        }

        public override string PersistenceId
        {
            get { return String.Format("{0}-{1}-{2}", Context.Parent.Path.Name, Context.Self.Path.Name, persistenceActorId); }
        }        

        /**
         * RECOVER
         */
        protected override bool ReceiveRecover(object message)
        {
            if (message is TheSendMessage)
            {
                var messageData = (TheSendMessage)message;

                _log.Info("recovered {0}", messageData);
                
                Deliver(_backEndActor.Path,
                        id =>
                        {
                            return new ConfirmableTheSend(id);
                        });   
            }
            else if (message is TheSendConfirmation)
            {
                ConfirmDelivery(((TheSendConfirmation)message).DeliveryId);
            }	
            else if (message is CheckMessage)
            {
                var messageData = ((CheckMessage)message);

                _log.Info("recovered {0}", messageData.TrackingNumber);

                Deliver(_backEndActor.Path,
                    id =>
                    {
                        return new ConfirmableCheck(id, messageData.TrackingNumber);
                    });
            }
            else if (message is ConfirmationCheck)
            {
                ConfirmDelivery(((ConfirmationCheck)message).DeliveryId);
            }			
            else if (message is SnapshotOffer)
            {
                var m = (SnapshotOffer)message;
                var s = (AtLeastOnceDeliverySnapshot)m.Snapshot;
                SetDeliverySnapshot(s);
            }
            else
                return false;

            return true;
        }

        /**
         * RECEIVE
         */
        protected override bool ReceiveCommand(object message)
        { 
            if (message is TheSendMessage)
            {
                    Persist(message as InvoiceMessage, m =>
                    {
                        // is time for a new snapshot?
                        counter = (counter + 1) % snapshotinterval;
                        if (counter == 0)
                        {
                            var snapshot = GetDeliverySnapshot();
                            SaveSnapshot(snapshot);
                        }

                        // send a confirmation.
                        Deliver(_backEndActor.Path,
                            id =>
                            {
                                return new ConfirmableTheSend(id);
                            });
                    });                
            }
            else if (message is TheSendConfirmation)
            {
                _log.Info($"Confirmation: {message}");

                Persist(message as TheSendConfirmation, m => ConfirmDelivery(m.DeliveryId));

                var mes = (TheSendConfirmation)message;


            }
            else if (message is CheckMessage)
            {
                Persist(message as CheckMessage, m =>
                {
                    // is time for a new snapshot?
                    counter = (counter + 1) % snapshotinterval;
                    if (counter == 0)
                    {
                        var snapshot = GetDeliverySnapshot();
                        SaveSnapshot(snapshot);
                    }

                    Deliver(_backEndActor.Path,
                        id =>
                        {
                        });
                });

            }
            else if (message is ConfirmationCheck)
            {
                _log.Info($"Confirmation Check: {message}");

                var messageData = (ConfirmationCheck)message;

                Persist(message as ConfirmationCheck, m => ConfirmDelivery(m.DeliveryId));

            }
            else if (message is UnconfirmedWarning)
            {
                _log.Info(message.ToString());
            }
            else if (message is SaveSnapshotSuccess)
            {
                var snapshotSeqNr = ((SaveSnapshotSuccess)message).Metadata.SequenceNr;

                /**
                 * Enable if you want to decrease the size of Journal ed Snapshot tables.
                 */ 
                 DeleteMessages(snapshotSeqNr);
                 DeleteSnapshots(new SnapshotSelectionCriteria(snapshotSeqNr - 1));
                 

                _log.Info("SaveSnapshotSuccess");
            }
            else if (message is SaveSnapshotFailure)
            {
                _log.Info("SaveSnapshotFailure");
            }
            else if (message is ClusterEvent.MemberUp)
            {
                _log.Info("Frontend [{0}]: Cluster is ready. Able to begin jobs.");
                // pronto per ricevere messaggi dal backend.
                Become(ReceiveCommand);
                Stash.UnstashAll();
            }
            else if (message is ClusterEvent.CurrentClusterState)
            {
                var clusterState = ((ClusterEvent.CurrentClusterState)message);

                _log.Info($"Store Cluster State: {clusterState}");

                // TODO Put this on Elastic (clusterState) ed un timestamp

            }
            else
            {
                Stash.Stash();
                return false;
            }

            return true;
        }
    }

the limit is set to 25000 events...the issue is that in production is never fired so I have to clean it manually...but why? The system in production is under an heavy usage, can be a rece condition issue? Can you please confim the that only confirmed messages are snapshotted and deleted? Can it be a SaveSnapshotFailure?

Incorrect SQL syntax when saving snapshot

Here's the callstack:

Unhandled message from akka://Oddjob/system/akka.persistence.snapshot-store.sql-server : SaveSnapshotFailure<meta: SnapshotMetadata<pid: akamai-volume-usage:22, seqNr: 1053, timestamp: 0001.01.01>, cause: System.Data.SqlClient.SqlException (0x80131904): Incorrect syntax near the keyword 'WHERE'.
Incorrect syntax near the keyword 'SET'.
Incorrect syntax near '('.
at System.Data.SqlClient.SqlConnection.OnError(SqlException exception, Boolean breakConnection, Action1 wrapCloseInAction) at System.Data.SqlClient.SqlInternalConnection.OnError(SqlException exception, Boolean breakConnection, Action1 wrapCloseInAction)
at System.Data.SqlClient.TdsParser.ThrowExceptionAndWarning(TdsParserStateObject stateObj, Boolean callerHasConnectionLock, Boolean asyncClose)
at System.Data.SqlClient.TdsParser.TryRun(RunBehavior runBehavior, SqlCommand cmdHandler, SqlDataReader dataStream, BulkCopySimpleResultSet bulkCopyHandler, TdsParserStateObject stateObj, Boolean& dataReady)
at System.Data.SqlClient.SqlCommand.FinishExecuteReader(SqlDataReader ds, RunBehavior runBehavior, String resetOptionsString)
at System.Data.SqlClient.SqlCommand.CompleteAsyncExecuteReader()
at System.Data.SqlClient.SqlCommand.EndExecuteNonQueryInternal(IAsyncResult asyncResult)
at System.Data.SqlClient.SqlCommand.EndExecuteNonQueryAsync(IAsyncResult asyncResult)
at System.Threading.Tasks.TaskFactory1.FromAsyncCoreLogic(IAsyncResult iar, Func2 endFunction, Action1 endAction, Task1 promise, Boolean requiresSynchronization)
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at Akka.Persistence.Sql.Common.Snapshot.AbstractQueryExecutor.d__26.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at Akka.Persistence.Sql.Common.Snapshot.SqlSnapshotStore.d__21.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at Akka.Util.Internal.AtomicState.d__8.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at Akka.Util.Internal.AtomicState.d__8.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at Akka.Pattern.HalfOpen.d__4.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at Akka.Pattern.CircuitBreaker.d__33.MoveNext()
ClientConnectionId:5a1a9a0c-5099-4e42-942b-cd935519dbfa
Error Number:156,State:1,Class:15>

journaling and sqlexceptions - PK violations

using 1.1.5 pre-release version of the plugin but the issue was seen with earlier versions too

an example -
Rejected to persist event type ["BrokerDealer.Contracts.Messages.MessageHandled"] with sequence number [59] for persistenceId ["/user/CashFundingManagementView/$Gb"] due to ["Violation of PRIMARY KEY constraint 'PK_AKKAJournal'. Cannot insert duplicate key in object 'dbo.AKKAJournal'. The duplicate key value is (/user/CashFundingManagementView/$Gb, 59).

I see higher incidence of this with higher actor instance count

what we do is very simple - every incoming message is persisted into the journal and when the message is handled the corresponding MessageHandled is appended to the journal - this allows us to only replay messages that have not yet been processed

I see sufficient time span between original message under certain persistence id and sequence number and the exception e.g.

this is the exception from the log (note the TS)

2016-09-21 10:34:42.843 +01:00 [Warning] Rejected to persist event type ["BrokerDealer.Contracts.Messages.Feed.CashManager.GrossFlow"] with sequence number [8] for persistenceId ["/user/CashFundingManagementView/$h"] due to ["Violation of PRIMARY KEY constraint 'PK_AKKAJournal'. Cannot insert duplicate key in object 'dbo.AKKAJournal'. The duplicate key value is (/user/CashFundingManagementView/$h, 8).

this is the original journal entry
SELECT [PersistenceID]
,[SequenceNr]
,[Timestamp]
,[IsDeleted]
,[Manifest]
,[Payload]
,[Tags]
FROM [BrokerDealer].[dbo].[AKKAJournal] where PersistenceID='/user/CashFundingManagementView/$h' and SequenceNr=8

I have removed the payload for brewity

PersistenceID   SequenceNr  Timestamp   IsDeleted   Manifest

1 /user/CashFundingManagementView/$H 8 636100472666516918 0 BrokerDealer.Contracts.Messages.Feed.CashManager.GrossFlow, BrokerDealer.Contracts

unix timestamp converts to 2016-09-21โ€‹T09:34:26.651Z (vs the exception time of 2016-09-21 10:34:42.843 +01:00

We are aware of the fact that this is pre-release so I would also like to ask for suggestions re alternative approach/other akka persistance plugin implementations that would behave stable

performance of journal write process

using version 1.1.5 pre-release but was seen with other versions a well

we run our standartised test with 5000 incoming messages and test actor system performance under various settings (journal and snapshot enabled)

in all tests we see lot of synchronisation/high SQL time spent around journal write
mosts test would spend 120 seconds in total just journalling these messages

to be precise it is journalling 10000 message as we also journal MessageHandled for each incoming message when it was actually processed

based on the data, there seems to be synchronization/ async wait bottleneck in journal write routine where system is under no real particular stress (low CPU etc.)

see the snippet from the ANTS profiles

sql bug 41

also note the level of wait callstack (over 30K invocations deep) consisting System.Threading.ThreadPoolWorkQueue.Dispatch()
and Method
System.Threading._ThreadPoolWaitCallback.PerformWaitCallback() nested over and over
see the comparision over wait time over actual execution vs the inserts themselves and their timing

sql bug 41_2

the dabatase server is under no stress and within our actor we do our own upserts of business data derived from incoming messages with much higher throughput

with increased number of instances of the actor, the insert into journal gets considerably slower and journal would start throwing lot of PK violation exceptions (#50)

the HOCON is here (passwords removed)

sql-server { # qualified type name of the SQL Server persistence journal actor class = "Akka.Persistence.SqlServer.Journal.SqlServerJournal, Akka.Persistence.SqlServer" # dispatcher used to drive journal actor plugin-dispatcher = "akka.actor.default-dispatcher" # connection string used for database access connection-string = REMOVED" # default SQL commands timeout connection-timeout = 30s # SQL server schema name to table corresponding with persistent journal schema-name = dbo # SQL server table corresponding with persistent journal table-name = AKKAJournal # should corresponding journal table be initialized automatically auto-initialize = off # timestamp provider used for generation of journal entries timestamps timestamp-provider = "Akka.Persistence.Sql.Common.Journal.DefaultTimestampProvider, Akka.Persistence.Sql.Common" # metadata table metadata-table-name = AKKAMetadata } } snapshot-store { plugin = "akka.persistence.snapshot-store.sql-server" sql-server { # qualified type name of the SQL Server persistence journal actor class = "Akka.Persistence.SqlServer.Snapshot.SqlServerSnapshotStore, Akka.Persistence.SqlServer" # dispatcher used to drive journal actor plugin-dispatcher = "akka.actor.default-dispatcher" # connection string used for database access connection-string = "REMOVED*" # default SQL commands timeout connection-timeout = 30s # SQL server schema name to table corresponding with persistent journal schema-name = dbo # SQL server table corresponding with persistent journal table-name = AKKASnapshot # should corresponding journal table be initialized automatically auto-initialize = off } } } # this config section will be referenced as akka.actor actor { serializers { wire = "Akka.Serialization.WireSerializer, Akka.Serialization.Wire" } serialization-bindings { "System.Object" = wire } provider = "Akka.Remote.RemoteActorRefProvider, Akka.Remote" debug { receive = on autoreceive = on lifecycle = on event-stream = on unhandled = on } deployment { /CashFundingManagementView { router = round-robin-pool nr-of-instances = 100 } } } # here we're configuring the Akka.Remote module remote { helios.tcp { transport-class = "Akka.Remote.Transport.Helios.HeliosTcpTransport, Akka.Remote" #applied-adapters = [] transport-protocol = tcp port = 7095 hostname = "localhost" } log-remote-lifecycle-events = INFO } ]]> </hocon>

Migration script syntax

Migration script to v 1.3.2 syntax should be:
ALTER TABLE dbo.EventJournal ADD SerializerId INTEGER NULL
ALTER TABLE dbo.SnapshotStore ADD SerializerId INTEGER NULL
instead of ADD COLUMN SerializerId

Null Reference Exception at Akka.Persistence.Journal.AsyncWriteJournal Constructor

Akka v1.3.11

Everything was working fine for months. I have a scheduled restart of the windows service hosting this application.
Today, I got this error.

2019-04-15 04:09:18,798 [1] INFO  Archive.Api.Startup - ActorSystem Started !
2019-04-15 04:09:18,798 [1] INFO  Archive.Api.Startup - ASP.NET application started !
2019-04-15 04:09:18,829 [11] INFO  Archive.Shared.Export.VideoMaster - [PreStart] VideoMaster
2019-04-15 04:09:18,829 [27] INFO  Archive.Shared.Export.ApiMaster - [PreStart] ApiMaster
2019-04-15 04:09:18,954 [30] ERROR Akka.Actor.OneForOneStrategy - Error while creating actor instance of type Akka.Persistence.SqlServer.Journal.SqlServerJournal with 1 args: (  class : "Akka.Persistence.SqlServer.Journal.SqlServerJournal, Akka.Persistence.SqlServer"
  plugin-dispatcher : akka.actor.default-dispatcher
  connection-string : "Server=.;Database=AkkaPersistenceArchiveDb;User ID=sa;Password=xxxxx;Connection Timeout=30;MultipleActiveResultSets=True; Pooling=True; Min Pool Size=2; Connect Timeout=250;"
  connection-timeout : 30s
  schema-name : dbo
  table-name : EventJournal
  auto-initialize : on
  timestamp-provider : "Akka.Persistence.Sql.Common.Journal.DefaultTimestampProvider, Akka.Persistence.Sql.Common"
  metadata-table-name : Metadata
)
[akka://ArchiveSystem/system/akka.persistence.journal.sql-server#15922092]: Akka.Actor.ActorInitializationException: Exception during creation ---> System.TypeLoadException: Error while creating actor instance of type Akka.Persistence.SqlServer.Journal.SqlServerJournal with 1 args: (  class : "Akka.Persistence.SqlServer.Journal.SqlServerJournal, Akka.Persistence.SqlServer"
  plugin-dispatcher : akka.actor.default-dispatcher
  connection-string : "Server=.;Database=AkkaPersistenceArchiveDb;User ID=sa;Password=xxxxx;Connection Timeout=30;MultipleActiveResultSets=True; Pooling=True; Min Pool Size=2; Connect Timeout=250;"
  connection-timeout : 30s
  schema-name : dbo
  table-name : EventJournal
  auto-initialize : on
  timestamp-provider : "Akka.Persistence.Sql.Common.Journal.DefaultTimestampProvider, Akka.Persistence.Sql.Common"
  metadata-table-name : Metadata
) ---> System.Reflection.TargetInvocationException: Exception has been thrown by the target of an invocation. ---> System.NullReferenceException: Object reference not set to an instance of an object.
   at Akka.Persistence.Journal.AsyncWriteJournal..ctor()
   at Akka.Persistence.Sql.Common.Journal.SqlJournal..ctor(Config journalConfig)
   at Akka.Persistence.SqlServer.Journal.SqlServerJournal..ctor(Config journalConfig)
   --- End of inner exception stack trace ---
   at System.RuntimeMethodHandle.InvokeMethod(Object target, Object[] arguments, Signature sig, Boolean constructor)
   at System.Reflection.RuntimeConstructorInfo.Invoke(BindingFlags invokeAttr, Binder binder, Object[] parameters, CultureInfo culture)
   at System.RuntimeType.CreateInstanceImpl(BindingFlags bindingAttr, Binder binder, Object[] args, CultureInfo culture, Object[] activationAttributes, StackCrawlMark& stackMark)
   at System.Activator.CreateInstance(Type type, BindingFlags bindingAttr, Binder binder, Object[] args, CultureInfo culture, Object[] activationAttributes)
   at System.Activator.CreateInstance(Type type, Object[] args)
   at Akka.Actor.Props.ActivatorProducer.Produce()
   at Akka.Actor.Props.NewActor()
   --- End of inner exception stack trace ---
   at Akka.Actor.Props.NewActor()
   at Akka.Actor.ActorCell.CreateNewActorInstance()
   at Akka.Actor.ActorCell.<>c__DisplayClass109_0.<NewActor>b__0()
   at Akka.Actor.ActorCell.UseThreadContext(Action action)
   at Akka.Actor.ActorCell.NewActor()
   at Akka.Actor.ActorCell.Create(Exception failure)
   --- End of inner exception stack trace ---
   at Akka.Actor.ActorCell.Create(Exception failure)
   at Akka.Actor.ActorCell.SysMsgInvokeAll(EarliestFirstSystemMessageList messages, Int32 currentState)
2019-04-15 04:09:19,063 [30] WARN  Archive.Shared.Cluster.ClusterStatus - ClusterLeader is null
2019-04-15 04:09:19,720 [11] INFO  Akka.Event.DummyClassForStringSources - Cluster Node [akka.tcp://[email protected]:16666] - Welcome from [akka.tcp://[email protected]:4053]
...

Serialise to string

In an answer to De-Serialize akka.net persistence message using C# Horusiath said:

In the future there will be a possibility to change this format from binary into JSON data type if your database of choice will support that format.

Has the future arrived yet? We would like to store the event payload as JSON string in a SQL Server database. Converting the JSON string to a byte array is a waste of CPU cycles. Depending on the size of the payload the overhead is between 50% and 100% for serialising, and between 100% and 200% for deserialising.

Error while creating actor instance of type Akka.Persistence.SqlServer.Journal.SqlServerJournal

I updated the references to 1.3 and the app throws the following exception:

System.MissingMethodException: Method not found: 'Void Akka.Persistence.Sql.Common.Journal.QueryConfiguration..ctor(System.String, System.String, System.String, System.String, System.String, System.String, System.String, System.String, System.String, System.String, System.String, System.TimeSpan)

akka packages:

  Akka                        1.3.0
  Akka.DI.AutoFac             1.0.8
  Akka.DI.Core                1.0.8
  Akka.Logger.NLog            1.2.0
  Akka.Persistence            1.3.0
  Akka.Persistence.Sql.Common 1.3.0
  Akka.Persistence.SqlServer  1.1.1.7-beta
  Akka.Remote                 1.3.0

full log:

2017-08-25 10:52:55.9321 [20] Info local-machine daemon Akka.Actor.Internal.ActorSystemImpl -   akka : {
    persistence : {
      journal : {
        plugin : akka.persistence.journal.sql-server
        sql-server : {
          class : "Akka.Persistence.SqlServer.Journal.SqlServerJournal, Akka.Persistence.SqlServer"
          plugin-dispatcher : akka.actor.default-dispatcher
          connection-string : "server=localhost; database=RapidBilling; user=sa; password=123456; Application Name=AdminBillingApi;"
          connection-timeout : 30s
          schema-name : dbo
          table-name : AkkaEventJournal
          auto-initialize : on
          timestamp-provider : "Akka.Persistence.Sql.Common.Journal.DefaultTimestampProvider, Akka.Persistence.Sql.Common"
          metadata-table-name : AkkaMetadata
        }
      }
      snapshot-store : {
        plugin : akka.persistence.snapshot-store.sql-server
        sql-server : {
          class : "Akka.Persistence.SqlServer.Snapshot.SqlServerSnapshotStore, Akka.Persistence.SqlServer"
          plugin-dispatcher : akka.actor.default-dispatcher
          connection-string : "server=localhost; database=RapidBilling; user=sa; password=123456; Application Name=AdminBillingApi;"
          connection-timeout : 30s
          schema-name : dbo
          table-name : AkkaSnapshotStore
          auto-initialize : on
        }
      }
    }
    loggers : ["Akka.Logger.NLog.NLogLogger, Akka.Logger.NLog"]
    actor : {
      debug : {
        unhandled : on
        receive : on
        autoreceive : on
        lifecycle : on
        event-stream : on
      }
    }
    stdout-loglevel : Error
    loglevel : DEBUG
    log-config-on-start : on
  }
2017-08-25 10:52:55.9211 [7] Debug local-machine daemon Akka.Event.EventStream - subscribing [akka://BillingActorSystem/system/log1-NLogLogger#699980222] to channel Akka.Event.Error
2017-08-25 10:52:55.9211 [19] Warn local-machine daemon Akka.Actor.Internal.ActorSystemImpl - NewtonSoftJsonSerializer has been detected as a default serializer. It will be obsoleted in Akka.NET starting from version 1.5 in the favor of Hyperion (for more info visit: http://getakka.net/docs/Serialization#how-to-setup-hyperion-as-default-serializer ). If you want to suppress this message set HOCON `akka.suppress-json-serializer-warning` config flag to on.
2017-08-25 10:52:59.6202 [24] Error local-machine daemon Akka.Actor.OneForOneStrategy - Error while creating actor instance of type Akka.Persistence.SqlServer.Journal.SqlServerJournal with 1 args: (  class : "Akka.Persistence.SqlServer.Journal.SqlServerJournal, Akka.Persistence.SqlServer"
  plugin-dispatcher : akka.actor.default-dispatcher
  connection-string : "server=localhost; database=RapidBilling; user=sa; password=123456; Application Name=AdminBillingApi;"
  connection-timeout : 30s
  schema-name : dbo
  table-name : AkkaEventJournal
  auto-initialize : on
  timestamp-provider : "Akka.Persistence.Sql.Common.Journal.DefaultTimestampProvider, Akka.Persistence.Sql.Common"
  metadata-table-name : AkkaMetadata
)[akka://BillingActorSystem/system/akka.persistence.journal.sql-server#796296213]: Akka.Actor.ActorInitializationException: Exception during creation ---> System.TypeLoadException: Error while creating actor instance of type Akka.Persistence.SqlServer.Journal.SqlServerJournal with 1 args: (  class : "Akka.Persistence.SqlServer.Journal.SqlServerJournal, Akka.Persistence.SqlServer"
  plugin-dispatcher : akka.actor.default-dispatcher
  connection-string : "server=localhost; database=RapidBilling; user=sa; password=123456; Application Name=AdminBillingApi;"
  connection-timeout : 30s
  schema-name : dbo
  table-name : AkkaEventJournal
  auto-initialize : on
  timestamp-provider : "Akka.Persistence.Sql.Common.Journal.DefaultTimestampProvider, Akka.Persistence.Sql.Common"
  metadata-table-name : AkkaMetadata
) ---> System.Reflection.TargetInvocationException: Exception has been thrown by the target of an invocation. ---> System.MissingMethodException: Method not found: 'Void Akka.Persistence.Sql.Common.Journal.QueryConfiguration..ctor(System.String, System.String, System.String, System.String, System.String, System.String, System.String, System.String, System.String, System.String, System.String, System.TimeSpan)'.
   at Akka.Persistence.SqlServer.Journal.SqlServerJournal..ctor(Config journalConfig)
   --- End of inner exception stack trace ---
   at System.RuntimeMethodHandle.InvokeMethod(Object target, Object[] arguments, Signature sig, Boolean constructor)
   at System.Reflection.RuntimeConstructorInfo.Invoke(BindingFlags invokeAttr, Binder binder, Object[] parameters, CultureInfo culture)
   at System.RuntimeType.CreateInstanceImpl(BindingFlags bindingAttr, Binder binder, Object[] args, CultureInfo culture, Object[] activationAttributes, StackCrawlMark& stackMark)
   at System.Activator.CreateInstance(Type type, BindingFlags bindingAttr, Binder binder, Object[] args, CultureInfo culture, Object[] activationAttributes)
   at System.Activator.CreateInstance(Type type, Object[] args)
   at Akka.Actor.Props.ActivatorProducer.Produce()
   at Akka.Actor.Props.NewActor()
   --- End of inner exception stack trace ---
   at Akka.Actor.Props.NewActor()
   at Akka.Actor.ActorCell.CreateNewActorInstance()
   at Akka.Actor.ActorCell.<>c__DisplayClass109_0.<NewActor>b__0()
   at Akka.Actor.ActorCell.UseThreadContext(Action action)
   at Akka.Actor.ActorCell.NewActor()
   at Akka.Actor.ActorCell.Create(Exception failure)
   --- End of inner exception stack trace ---
   at Akka.Actor.ActorCell.Create(Exception failure)
   at Akka.Actor.ActorCell.SysMsgInvokeAll(EarliestFirstSystemMessageList messages, Int32 currentState)[akka://BillingActorSystem/system/akka.persistence.journal.sql-server#796296213]: Akka.Actor.ActorInitializationException: Exception during creation ---> System.TypeLoadException: Error while creating actor instance of type Akka.Persistence.SqlServer.Journal.SqlServerJournal with 1 args: (  class : "Akka.Persistence.SqlServer.Journal.SqlServerJournal, Akka.Persistence.SqlServer"
  plugin-dispatcher : akka.actor.default-dispatcher
  connection-string : "server=localhost; database=RapidBilling; user=sa; password=123456; Application Name=AdminBillingApi;"
  connection-timeout : 30s
  schema-name : dbo
  table-name : AkkaEventJournal
  auto-initialize : on
  timestamp-provider : "Akka.Persistence.Sql.Common.Journal.DefaultTimestampProvider, Akka.Persistence.Sql.Common"
  metadata-table-name : AkkaMetadata
) ---> System.Reflection.TargetInvocationException: Exception has been thrown by the target of an invocation. ---> System.MissingMethodException: Method not found: 'Void Akka.Persistence.Sql.Common.Journal.QueryConfiguration..ctor(System.String, System.String, System.String, System.String, System.String, System.String, System.String, System.String, System.String, System.String, System.String, System.TimeSpan)'.
   at Akka.Persistence.SqlServer.Journal.SqlServerJournal..ctor(Config journalConfig)
   --- End of inner exception stack trace ---
   at System.RuntimeMethodHandle.InvokeMethod(Object target, Object[] arguments, Signature sig, Boolean constructor)
   at System.Reflection.RuntimeConstructorInfo.Invoke(BindingFlags invokeAttr, Binder binder, Object[] parameters, CultureInfo culture)
   at System.RuntimeType.CreateInstanceImpl(BindingFlags bindingAttr, Binder binder, Object[] args, CultureInfo culture, Object[] activationAttributes, StackCrawlMark& stackMark)
   at System.Activator.CreateInstance(Type type, BindingFlags bindingAttr, Binder binder, Object[] args, CultureInfo culture, Object[] activationAttributes)
   at System.Activator.CreateInstance(Type type, Object[] args)
   at Akka.Actor.Props.ActivatorProducer.Produce()
   at Akka.Actor.Props.NewActor()
   --- End of inner exception stack trace ---
   at Akka.Actor.Props.NewActor()
   at Akka.Actor.ActorCell.CreateNewActorInstance()
   at Akka.Actor.ActorCell.<>c__DisplayClass109_0.<NewActor>b__0()
   at Akka.Actor.ActorCell.UseThreadContext(Action action)
   at Akka.Actor.ActorCell.NewActor()
   at Akka.Actor.ActorCell.Create(Exception failure)
   --- End of inner exception stack trace ---
   at Akka.Actor.ActorCell.Create(Exception failure)
   at Akka.Actor.ActorCell.SysMsgInvokeAll(EarliestFirstSystemMessageList messages, Int32 currentState)

The same source code works perfectly with the following packages:

  Akka                        1.2.0
  Akka.DI.AutoFac             1.0.8
  Akka.DI.Core                1.0.8
  Akka.Persistence            1.1.2.30-beta
  Akka.Persistence.Sql.Common 1.1.2.30-beta
  Akka.Persistence.SqlServer  1.1.1.7-beta
  Akka.Remote                 1.2.0

MSSQLLocalDB supported?

With the following hocon

<hocon>
      <![CDATA[          
          akka.persistence {

              journal {
                plugin = "akka.persistence.journal.sql-server"                
                sql-server {
                      class = "Akka.Persistence.SqlServer.Journal.SqlServerJournal, Akka.Persistence.SqlServer"
                      plugin-dispatcher = "akka.actor.default-dispatcher"

                      # connection string used for database access                      
                      connection-string = "Data Source=(local)\\MSSQLLocalDB;Initial Catalog=PSAkka;Integrated Security=True"

                      # can alternativly specify: connection-string-name

                      # default SQL timeout
                      connection-timeout = 30s

                      # SQL server schema name
                      schema-name = dbo

                      # persistent journal table name
                      table-name = EventJournal

                      # initialize journal table automatically
                      auto-initialize = on

                      timestamp-provider = "Akka.Persistence.Sql.Common.Journal.DefaultTimestampProvider, Akka.Persistence.Sql.Common"
                      metadata-table-name = Metadata
                }
              }

            }
      ]]>
    </hocon>

I get the following error:

image

Is this supported?

Thanks

Racy spec: SqlServerEventsByPersistenceIdSpec.Sql_query_EventsByPersistenceId_should_return_remaining_values_after_partial_journal_cleanup

Got a message of the expected type <Akka.Streams.TestKit.TestSubscriber+OnNext`1[[System.Object, mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089]]>. Also expected the predicate to return true but the message {TestSubscriber.OnNext(h-1)} of type <Akka.Streams.TestKit.TestSubscriber+OnNext`1[[System.Object, mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089]]> did not match
Expected: True
Actual:   False
   at Akka.TestKit.Xunit2.XunitAssertions.AssertTrue(Boolean condition, String format, Object[] args)
   at Akka.TestKit.TestKitBase.AssertPredicateIsTrueForMessage[T](Predicate`1 isMessage, T m, String hint)
   at Akka.TestKit.TestKitBase.<>c__DisplayClass91_0`1.<ExpectMsg>b__0(T m, IActorRef sender)
   at Akka.TestKit.TestKitBase.InternalExpectMsgEnvelope[T](Nullable`1 timeout, Action`2 assert, String hint, Boolean shouldLog)
   at Akka.TestKit.TestKitBase.InternalExpectMsg[T](Nullable`1 timeout, Action`2 assert, String hint)
   at Akka.TestKit.TestKitBase.ExpectMsg[T](Predicate`1 isMessage, Nullable`1 timeout, String hint)
   at Akka.Streams.TestKit.TestSubscriber.ManualProbe`1.ExpectNext(T element)
   at Akka.Persistence.Sql.TestKit.EventsByPersistenceIdSpec.Sql_query_EventsByPersistenceId_should_return_remaining_values_after_partial_journal_cleanup()

AllPersistenceIds becomes unresponsive on attempt to fetch more than 16 rows

I created a small project reproducing this issue. It's completely weird since it everything works fine if I limit the result size to a very small number (up to 16 rows). After that it doesn't respond. Here's the project:
https://github.com/object/Akka.PersistenceQuery.Performance

And here's the code (in F#) to reproduce the error. I included a copy of my SQL db in the repo, but I don't think actual data should matter since it's only IDs that are requested.

let getAllPersistenceIds system =
let queries = PersistenceQuery.Get(system).ReadJournalFor("akka.persistence.query.journal.sql")
let mat = ActorMaterializer.Create(system)
let src : Source<string, NotUsed> = queries.AllPersistenceIds()
src.Take(int64 MaxResultCount).RunSum(System.Func<string,string,string> reduce, mat)
|> Async.AwaitTask
|> Async.RunSynchronously

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.