Giter VIP home page Giter VIP logo

akka-persistence-mongo's People

Contributors

alari avatar an-tex avatar bmontuelle avatar cchantep avatar dpfeiffer avatar ffendt avatar filosganga avatar gbrd avatar gitter-badger avatar hannesschr avatar jbellenger avatar jeanfrancoisguena avatar krchniam avatar marcuslinke avatar marekzebrowski avatar matheuslima avatar nicmarti avatar njlbenn avatar rawyler avatar ri-boris avatar rodrigotn avatar scala-steward avatar scullxbones avatar thjaeckle avatar twillouer avatar wellingr avatar yannic92 avatar yufei-cai avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar

akka-persistence-mongo's Issues

journal index size

Having stored 200 million events in the journal the unique index is above 150GByte in size. Maybe this could be optimized somehow maybe by splitting the journal into multiple collections? WDYT?

Exception in MongoPersistenceExtension.createExtension

MongoPersistenceExtension invokes ConfigFactory.load()) which causes an ConfigException$UnresolvedSubstitution to be thrown if there are unresolved substitutions (e.g. ${com.my.key}).

When creating an actor system we resolving missing substitutions with a another configuration.

Config resolvedConfig = akkaConfig.resolveWith(springConfig);
ActorSystem.create(actorSystemName(), resolvedConfig);

This is a rare use case and for most applications not an issue, but the Extension should not assume that ConfigFactory.load() has all values resolved. Why not just take the backing config of the ExtendedActorSystem

Build for ReactiveMongo 0.11

With the new ReactiveMongo 0.11.0-M2 I'm getting the following errors:

java.lang.NoSuchMethodError: reactivemongo.api.MongoDriver$.apply(Lakka/actor/ActorSystem;)Lreactivemongo/api/MongoDriver;
    at akka.contrib.persistence.mongodb.RxMongoPersistenceDriver$class.driver(RxMongoPersistenceExtension.scala:50)

It has stable API now, so we should update driver for it.

Connection leaks?

In running testing of a prototype application using this journaling implementation, it seems like the MongoDB connections are being leaked. I scanned through the source and didn't see any obvious place where connections are being closed after opening.

Authentication

How to authenticate against a Mongo database? The only way I found not via extra configuration fields would be something like akka.contrib.persistence.mongodb.mongo.urls = ["user:password@vm:27017"] but if fails with MatchError because it does a split by colon

encounter recovery failure when working with akka cluster sharding

I am using akka 2.3.9, the useful log is pasted as below.
It seems that coordinator trying to recovery from mongo data which are not available yet.

[2015-07-09 23:23:38,347] ERROR[ClusterSystem-akka.actor.default-dispatcher-15] OneForOneStrategy - Processor killed after recovery failure (persistent id = [/user/sharding/ActionWorkerCoordinator/singleton/coordinator]). To avoid killing processors on recovery failure, a processor must handle RecoveryFailure messages. RecoveryFailure was caused by: akka.pattern.CircuitBreakerOpenException: Circuit Breaker is open; calls are failing fast

Akka 2.4

Hi,

thanks a lot for the great work, it work like a charm !

Did you plain to release a new version for akka 2.4 ?
Some api moves and the akka-persistence-mongo cannot be used anymore :-(

Research user-defined event streams

Similar to Martin's kafka journal, allow user to define a mapping of ~ Event --> capped collection.

Then user can set up a tailable cursor on that collection to synthesize streams together.

CircuitBreakerOpenException at high load persisting / restoring

Hi.

I am currently evaluating this nice akka persistence plugin for mongoDB and it suits really well.
But now I ran into some problems regarding the "CircuitBreaker" pattern included.

Under high load of journal writes or journal reads (when restoring) the CircuitBreakerOpenException is thrown.
I have no clue what goes wrong (is there an exception in Casbah driver?) or how to handle this correctly.

I don't say this is wrong - I just don't know what to do with the "CircuitBreakerOpenException" at this point.

Any hints?

Test case instability in Travis CI

Test cases sometimes pass when run on Travis CI. Especially TCK tests seem to be unstable and very timing dependent. Try to address on this side, but may need to contribute a patch upstream.

rxmongo: Replay journal with max replay events > Int.MaxValue

When setting override val autoUpdateReplayMax = Long.MaxValue in a persistent view actor only 101 events are replayed per Update. This is due to conversion the Long value to an Int via max.toInt here:

https://github.com/scullxbones/akka-persistence-mongo/blob/master/rxmongo/src/main/scala/akka/contrib/persistence/mongodb/RxMongoPersistenceJournaller.scala#L118

If max is greater then Int.MaxValue it results in a negative value so rxmongo will use its default of 101 which is a problem when processing mass of events.

Sporadic recovery failure with reactivemongo and authentification

Hi,
I am using akka 2.3.12 with reactivemongo 0.11.6. Sometimes actor recovery fails with exception reactivemongo.core.errors.DetailedDatabaseException: DatabaseException['not authorized for query on test.akkaPersistenceJournal' (code = 13)]. I suspect that this is caused by RxMongoPersistenceDriver#connection implementation, because connection is created with pending autentification. When RxMongoJournaller tries to use this connection without completing autentification handshake, query fails on MongoDB side (sporadically, if handshake is slow). I can`t see any code which waits while authenticating connection and there is no guarantee, that this unauthentificated connection will not be used byRxMongoJournalleror``RxMongoSnapshotter``.

My current workaround is custom implementation of MongoPersistenceExtension based on RxMongoPersistenceExtension which return connection after it is sucessufully authenticated. I can send pull request with my fix.

Thanks

default collection names

It would be nice if the default collection names were easier to work with in the mongo cli.

db.akka-persistence-journal.count()
Mon Feb 10 11:47:14.109 JavaScript execution failed: ReferenceError: persistence
 is not defined

Accessing the collection with the default name requires the db.getCollection('akka-persistence-journal') syntax.

Running into DuplicateKey issues with the Journal Index

Over multiple runs of my application (under stress testing) I seem to occasionally start running into akka PersistenceExceptions caused by a Mongo DuplicateKey thrown by the index this library sets up for the journal. Reading through some of the related code, it seems the index is based on the persistence id, deleted flag, and the sequence number.

I haven't ruled out user error on my part yet as I haven't found an exact way to reproduce the issue. However, since the index is primarily based on values I don't control in my application code I'm not clear on how I might be causing it. My assumption is the sequence number is duplicating one from a stored journal entry. It becomes problematic as the actor using the persistence-id now always fails unless I clear persistence data from Mongo.

If it is of any relevance, I delete old snapshots and journal entries upon snapshot save success:

case req: SaveSnapshotSuccess => {
      deleteSnapshots(SnapshotSelectionCriteria(maxSequenceNr = req.metadata.sequenceNr - 1)) 
      deleteMessages(lastSequenceNr)
    }

An anecdotal example of the error I can give is that it seems to typical occur during/after recovery. I see logging that my actor has recovered from a snapshot, then it receives its first message, which it tries to persist. The next logging I get is about the DuplicateKey exception. Here it actually seems like it no longer knows where the sequence numbers left off from before as it is trying to use the value 1.

The trace:

com.mongodb.MongoException$DuplicateKey: { "serverUsed" : "localhost:27017" , "ok" : 1 , "n" : 0 , "err" : "insertDocument :: caused by :: 11000 E11000 duplicate key error index: akka-persistence.akka_persistence_journal.$akka_persistence_journal_index  dup key: { : \"balanceStore-Felicia\", : 1, : false }" , "code" : 11000}
    at com.mongodb.CommandResult.getWriteException(CommandResult.java:88) ~[mongo-java-driver-2.12.3.jar:na]
    at com.mongodb.CommandResult.getException(CommandResult.java:79) ~[mongo-java-driver-2.12.3.jar:na]
    at com.mongodb.DBCollectionImpl.translateBulkWriteException(DBCollectionImpl.java:314) ~[mongo-java-driver-2.12.3.jar:na]
    at com.mongodb.DBCollectionImpl.insert(DBCollectionImpl.java:189) ~[mongo-java-driver-2.12.3.jar:na]
    at com.mongodb.DBCollectionImpl.insert(DBCollectionImpl.java:165) ~[mongo-java-driver-2.12.3.jar:na]
    at com.mongodb.DBCollection.insert(DBCollection.java:93) ~[mongo-java-driver-2.12.3.jar:na]
    at com.mongodb.casbah.MongoCollectionBase$class.insert(MongoCollection.scala:621) ~[casbah-core_2.11-2.7.3.jar:2.7.3]
    at com.mongodb.casbah.MongoCollection.insert(MongoCollection.scala:1109) ~[casbah-core_2.11-2.7.3.jar:2.7.3]
    at akka.contrib.persistence.mongodb.CasbahPersistenceJournaller$$anonfun$appendToJournal$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(CasbahPersistenceJournaller.scala:75) ~[akka-persistence-mongo-casbah_2.11-0.1.2.jar:0.1.2]
    at akka.contrib.persistence.mongodb.CasbahPersistenceJournaller$$anonfun$appendToJournal$1$$anonfun$apply$mcV$sp$1.apply(CasbahPersistenceJournaller.scala:74) ~[akka-persistence-mongo-casbah_2.11-0.1.2.jar:0.1.2]
    at akka.contrib.persistence.mongodb.CasbahPersistenceJournaller$$anonfun$appendToJournal$1$$anonfun$apply$mcV$sp$1.apply(CasbahPersistenceJournaller.scala:74) ~[akka-persistence-mongo-casbah_2.11-0.1.2.jar:0.1.2]
    at akka.pattern.CircuitBreaker$$anonfun$withSyncCircuitBreaker$1.apply(CircuitBreaker.scala:135) ~[akka-actor_2.11-2.3.6.jar:na]
    at akka.pattern.CircuitBreaker$$anonfun$withSyncCircuitBreaker$1.apply(CircuitBreaker.scala:135) ~[akka-actor_2.11-2.3.6.jar:na]
    at akka.pattern.CircuitBreaker$State$class.callThrough(CircuitBreaker.scala:296) ~[akka-actor_2.11-2.3.6.jar:na]
    at akka.pattern.CircuitBreaker$Closed$.callThrough(CircuitBreaker.scala:345) ~[akka-actor_2.11-2.3.6.jar:na]
    at akka.pattern.CircuitBreaker$Closed$.invoke(CircuitBreaker.scala:354) ~[akka-actor_2.11-2.3.6.jar:na]
    at akka.pattern.CircuitBreaker.withCircuitBreaker(CircuitBreaker.scala:113) ~[akka-actor_2.11-2.3.6.jar:na]
    at akka.pattern.CircuitBreaker.withSyncCircuitBreaker(CircuitBreaker.scala:135) ~[akka-actor_2.11-2.3.6.jar:na]
    at akka.contrib.persistence.mongodb.CasbahPersistenceJournaller$$anonfun$appendToJournal$1.apply$mcV$sp(CasbahPersistenceJournaller.scala:74) ~[akka-persistence-mongo-casbah_2.11-0.1.2.jar:0.1.2]
    at akka.contrib.persistence.mongodb.CasbahPersistenceJournaller$$anonfun$appendToJournal$1.apply(CasbahPersistenceJournaller.scala:74) ~[akka-persistence-mongo-casbah_2.11-0.1.2.jar:0.1.2]
    at akka.contrib.persistence.mongodb.CasbahPersistenceJournaller$$anonfun$appendToJournal$1.apply(CasbahPersistenceJournaller.scala:74) ~[akka-persistence-mongo-casbah_2.11-0.1.2.jar:0.1.2]
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) ~[scala-library.jar:na]
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) ~[scala-library.jar:na]
    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) ~[akka-actor_2.11-2.3.6.jar:na]
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) ~[na:1.6.0_31]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) ~[na:1.6.0_31]
    at java.lang.Thread.run(Thread.java:662) ~[na:1.6.0_31]

Read Preference of Event Consumer

It would be nice to have the choice of MongoDB read preference at the event consumer side to achieve some serious load balancing.

Using the casbah driver we are interested in Secondary and SecondaryPreferred:
http://mongodb.github.io/casbah/api/index.html#com.mongodb.casbah.ReadPreference$

Is there already a way to configure akka-persistence-mongo to achieve this with casbah and ReactiveMongo drivers?

Are there any serious arguments against using a read preference with reads across MongoDB replicas? If one replica already has some events, and the other does not, this could perhaps disrupt a particular consumer actor in some way.

NullpointerException when accessing currentEventsByPersistenceId

With an empty journal I try to access the new currentEventsByPersistenceId the following way:

val readJournal = PersistenceQuery(context.system).readJournalFor[ScalaDslMongoReadJournal](MongoReadJournal.Identifier)
val source: Source[EventEnvelope, Unit] = readJournal.currentEventsByPersistenceId(persistenceId, 0, Long.MaxValue)
implicit val mat = ActorMaterializer()
source.runForeach { event => 
    log.error(s"EVENT: sevent")
    if (event != null && event.event != null) {
      receive(event.event)
    }
  }

When starting the application I receive the following NullPointerException:

akka.actor.ActorInitializationException: exception during creation
    at akka.actor.ActorInitializationException$.apply(Actor.scala:172)
    at akka.actor.ActorCell.create(ActorCell.scala:605)
    at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:460)
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:482)
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282)
    at akka.dispatch.Mailbox.run(Mailbox.scala:223)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.NullPointerException
    at reactivemongo.bson.buffer.WritableBuffer$class.writeString(buffer.scala:71)
    at reactivemongo.core.netty.ChannelBufferWritableBuffer.writeString(netty.scala:59)
    at reactivemongo.bson.buffer.DefaultBufferHandler$BSONStringBufferHandler$.write(bufferhandlers.scala:83)
    at reactivemongo.bson.buffer.DefaultBufferHandler$BSONStringBufferHandler$.write(bufferhandlers.scala:82)
    at reactivemongo.bson.buffer.DefaultBufferHandler$.serialize(bufferhandlers.scala:226)
    at reactivemongo.bson.buffer.DefaultBufferHandler$BSONDocumentBufferHandler$$anonfun$write$1.apply(bufferhandlers.scala:93)
    at reactivemongo.bson.buffer.DefaultBufferHandler$BSONDocumentBufferHandler$$anonfun$write$1.apply(bufferhandlers.scala:90)
    at scala.collection.immutable.Stream.foreach(Stream.scala:594)
    at reactivemongo.bson.buffer.DefaultBufferHandler$BSONDocumentBufferHandler$.write(bufferhandlers.scala:90)
    at reactivemongo.bson.buffer.DefaultBufferHandler$BSONDocumentBufferHandler$.write(bufferhandlers.scala:86)
    at reactivemongo.bson.buffer.DefaultBufferHandler$.serialize(bufferhandlers.scala:226)
    at reactivemongo.bson.buffer.DefaultBufferHandler$.writeDocument(bufferhandlers.scala:241)
    at reactivemongo.api.BSONSerializationPack$.writeToBuffer(serializationpack.scala:64)
    at reactivemongo.api.BSONSerializationPack$.writeToBuffer(serializationpack.scala:41)
    at reactivemongo.api.collections.GenericQueryBuilder$class.reactivemongo$api$collections$GenericQueryBuilder$$write(genericquerybuilder.scala:61)
    at reactivemongo.api.collections.GenericQueryBuilder$class.defaultCursor(genericquerybuilder.scala:85)
    at reactivemongo.api.collections.GenericQueryBuilder$class.cursor(genericquerybuilder.scala:81)
    at reactivemongo.api.collections.bson.BSONQueryBuilder.cursor(bsoncollection.scala:78)
    at akka.contrib.persistence.mongodb.EventsByPersistenceId.initial(RxMongoReadJournaller.scala:138)
    at akka.contrib.persistence.mongodb.IterateeActorPublisher$class.preStart(RxMongoReadJournaller.scala:20)
    at akka.contrib.persistence.mongodb.EventsByPersistenceId.preStart(RxMongoReadJournaller.scala:113)
    at akka.actor.Actor$class.aroundPreStart(Actor.scala:485)
    at akka.contrib.persistence.mongodb.EventsByPersistenceId.akka$stream$actor$ActorPublisher$$super$aroundPreStart(RxMongoReadJournaller.scala:113)
    at akka.stream.actor.ActorPublisher$class.aroundPreStart(ActorPublisher.scala:322)
    at akka.contrib.persistence.mongodb.EventsByPersistenceId.aroundPreStart(RxMongoReadJournaller.scala:113)
    at akka.actor.ActorCell.create(ActorCell.scala:589)

What am I missing?

Sporadic write failure when using persistAsync

@scullxbones When using persistAsync to persist the events i get the following failure message sporadically and the actor stops after that. Supervised restart of the actor leads to a ClassCastException in the receiveCommand logic where persistenAsync is called ???. The strange thing about that is it occurs after processing millions of events of the same type. Any idea whats going on here?

WriteMessageFailure(PersistentImpl(HistStoryViewsEvent(Mon Dec 29 00:00:00 CET 2014,2854247,pm,2),32779138,de.story,false,List(),Actor[akka://rabbit-akka-stream/deadLetters]),akka.pattern.CircuitBreakerOpenException: Circuit Breaker is open; calls are failing fast,3)
WriteMessageFailure(PersistentImpl(HistStoryViewsEvent(Mon Dec 29 00:00:00 CET 2014,2856725,pm,1),32779941,de.story,false,List(),Actor[akka://rabbit-akka-stream/deadLetters]),akka.pattern.CircuitBreakerOpenException: Circuit Breaker is open; calls are failing fast,8)

2015-04-15 09:06:20,601 ERROR - restart Actor[akka://rabbit-akka-stream/user/$b/StoryDeCommandsProcessor#573242443]
java.lang.ClassCastException: de.na.stats.rabbit.flow.RabbitMessageImpl cannot be cast to de.na.stats.domain.Processor$Event
at de.na.stats.domain.story.StoryCommandsProcessor$$anonfun$1$$anonfun$applyOrElse$7.apply(StoryCommandsProcessor.scala:125) ~[stats.stats-0.0.1-SNAPSHOT.jar:0.0.1-SNAPSHOT]
at akka.persistence.Eventsourced$$anon$2.aroundReceive(Eventsourced.scala:72) ~[com.typesafe.akka.akka-persistence-experimental_2.10-2.3.9.jar:na]
at akka.persistence.Eventsourced$class.aroundReceive(Eventsourced.scala:369) ~[com.typesafe.akka.akka-persistence-experimental_2.10-2.3.9.jar:na]
at de.na.stats.domain.story.StoryCommandsProcessor.aroundReceive(StoryCommandsProcessor.scala:35) ~[stats.stats-0.0.1-SNAPSHOT.jar:0.0.1-SNAPSHOT]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) ~[com.typesafe.akka.akka-actor_2.10-2.3.9.jar:na]

Support URI style connections to MongoDB

As @ulrichwinter pointet out in #27 it would be great if the plugin would support URI stype connections to MongoDB, at least through casbah.

The best thing is: using a URI, configuration of the authentication method and other options is easy and does not require any additonal glue code.

In my project setting it would actually be very helpful to get this feature within the next two weeks or so, i.e. without waiting for reactivemongo to follow-up with a new release, since we are using MongoDB 3.0, your plugin, and our schedule is extremely tight towards upcoming production.

casbah supports URI style connectiosn since version 2.0, so you would not have to compile against a newer casbah version to introduce this feature.

Support JSON serialization of persistent payload

We are using your mongo persistence plugin in a CQRS setup and so far it working smoothly. Great job!

One "complaint" though is, that PersistentRepr and its payload (events) are stored in binary format, e.g.

{
  "_id": ObjectId("54b846f5e50895665d308cc0"),
  "pid": "MyEntity-0815",
  "sn": NumberLong(2),
  "cs": [

  ],
  "dl": false,
  "pr": BinData(0, "CkgIARJErO0ABXNyABpjb20uZ2ltYi5jb21tb24uZXMuQ29uZmlybQlACa6R6c/CAgABSgAKZGVsaXZlcnlJZHhwAAAAAAAAAAEQAhotQ2hlY2tvdXQtZjk3NGU5OGMtMWNhZi00MWRjLWJhZTEtZGNhMmVkZWZhNzU3IAAwAEAAWmBha2thLnRjcDovL0NoZWNrb3V0QWN0b3JTeXN0ZW1AMTI3LjAuMC4xOjI1NTIvdXNlci9zaW5nbGV0b24vY2hlY2tvdXRUb3BpY0FnZ3JlZ2F0b3IjLTE0Nzc3MDU1NjI=")
}

I think it is a fundamental feature to be able to query the event store (during development and production) and have a JSON-representation of all events when using MongoDB.

Event Store (https://github.com/EventStore/EventStore.JVM) does not seem to be a very mature plugin, but it does provide support for custom JSON serialization. Martin Krassers Kafka Plugin too.

Any way you can provide support for JSON serialization?

Finish implementation of rxmongo driver

Currently impeded by the akka version supported by rxmongo.

Fully implement rxmongo flavored driver for persistence engine, starting point is there - need test case coverage and TCK passing as well.

actor name "snapshot-store" is not unique

I tried to create a simple application, but failed :(

I get the following error

Caused by: akka.actor.InvalidActorNameException: 
actor name [snapshot-store] is not unique!

at akka.actor.dungeon.ChildrenContainer$NormalChildrenContainer.reserve(ChildrenContainer.scala:130)
    at akka.actor.dungeon.Children$class.reserveChild(Children.scala:77)
    at akka.actor.ActorCell.reserveChild(ActorCell.scala:369)
    at akka.actor.dungeon.Children$class.makeChild(Children.scala:202)
    at akka.actor.dungeon.Children$class.attachChild(Children.scala:42)
    at akka.actor.ActorCell.attachChild(ActorCell.scala:369)
    at akka.actor.ActorSystemImpl.systemActorOf(ActorSystem.scala:550)
    at akka.persistence.Persistence.createPlugin(Persistence.scala:148)
    at akka.persistence.Persistence.<init>(Persistence.scala:87)
    at akka.persistence.Persistence$.createExtension(Persistence.scala:73)
    at akka.persistence.Persistence$.createExtension(Persistence.scala:67)
    at akka.actor.ActorSystemImpl.registerExtension(ActorSystem.scala:698)
    at akka.actor.ExtensionId$class.apply(Extension.scala:79)
    at akka.persistence.Persistence$.apply(Persistence.scala:67)
    at akka.persistence.snapshot.SnapshotStore$class.$init$(SnapshotStore.scala:21)
    at akka.persistence.snapshot.local.LocalSnapshotStore.<init>(LocalSnapshotStore.scala:26)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
    at java.lang.Class.newInstance(Class.java:374)
    at akka.util.Reflect$.instantiate(Reflect.scala:45)
    at akka.actor.NoArgsReflectConstructor.produce(Props.scala:358)
    at akka.actor.Props.newActor(Props.scala:249)
    at akka.actor.ActorCell.newActor(ActorCell.scala:552)
    at akka.actor.ActorCell.create(ActorCell.scala:578)

Setup

I created my applaction with the Typesafe Persistence Activator Template and changed the library dependencies to

libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-persistence-experimental" % "2.3.0",
  "com.github.scullxbones" %% "akka-persistence-mongo-casbah" % "0.0.3"
)

My configuration looks like this

akka.persistence.journal.plugin = "akka-contrib-mongodb-persistence-journal"

Test Application

package sample.durable

import akka.actor._
import akka.persistence.{ Persistent, PersistenceFailure, Processor }
import ComputeActor._

object DurableActors extends App {

  val system = ActorSystem()
  val node = system.actorOf(Props[ComputeActor], "nodeA")

  for (i <- 1 until 20) {
    if (i % 3 == 0) node ! Persistent(ComputeActor.Task(s"#$i", i * 2))
    else node ! ComputeActor.Task(s"#$i", i * 2)
  }

  system shutdown ()
  system awaitTermination ()
}

class ComputeActor extends Processor with ActorLogging {

  def receive = {
    case Task(name, time) =>
      log info s"Starting task $name. Will need $time s"
      Thread sleep time * 1000
      log info s"Finished task $name"
    case Persistent(Task(name, time), snr) =>
      log info s"Starting persistent ($snr) task $name. Will need $time s"
      Thread sleep time * 1000
      log info s"Finished persistent ($snr) task $name"
  }
}

object ComputeActor {

  case class Task(name: String, time: Int)
}

ClassNotFoundException on journal replay

Hi!

I get

[ERROR] [09/09/2015 14:16:41.784] [application-akka.actor.default-dispatcher-2] [akka://application/user/local-orders/order-id] Persistence failure when replaying events for persistenceId [order-id]. Last known sequence number [0]
java.lang.ClassNotFoundException: actors.OrderIdActor$Incremented$
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:264)
        at akka.contrib.persistence.mongodb.RxMongoSerializers$JournalDeserializer$.deserializePayload(RxMongoSerializers.scala:105)
        at akka.contrib.persistence.mongodb.RxMongoSerializers$JournalDeserializer$.deserializeVersionOne(RxMongoSerializers.scala:97)
        at akka.contrib.persistence.mongodb.RxMongoSerializers$JournalDeserializer$.deserializeDocument(RxMongoSerializers.scala:87)
        at akka.contrib.persistence.mongodb.RxMongoSerializers$JournalDeserializer$.deserializeDocument(RxMongoSerializers.scala:84)
        at akka.contrib.persistence.mongodb.MongoPersistenceDriver.deserializeJournal(MongoPersistence.scala:107)
        at akka.contrib.persistence.mongodb.RxMongoJournaller$$anonfun$1.applyOrElse(RxMongoJournaller.scala:38)
        at akka.contrib.persistence.mongodb.RxMongoJournaller$$anonfun$1.applyOrElse(RxMongoJournaller.scala:37)
        at scala.PartialFunction$$anonfun$runWith$1.apply(PartialFunction.scala:141)
        at scala.PartialFunction$$anonfun$runWith$1.apply(PartialFunction.scala:140)
        at scala.collection.immutable.Stream.collect(Stream.scala:435)
        at akka.contrib.persistence.mongodb.RxMongoJournaller.akka$contrib$persistence$mongodb$RxMongoJournaller$$unwind(RxMongoJournaller.scala:37)
        at akka.contrib.persistence.mongodb.RxMongoJournaller$$anonfun$3.apply(RxMongoJournaller.scala:34)
        at akka.contrib.persistence.mongodb.RxMongoJournaller$$anonfun$3.apply(RxMongoJournaller.scala:34)
        at reactivemongo.api.DefaultCursor$$anon$2.reactivemongo$api$DefaultCursor$$anon$$go$1(cursor.scala:332)
        at reactivemongo.api.DefaultCursor$$anon$2$$anonfun$foldWhile$2.apply(cursor.scala:335)
        at reactivemongo.api.DefaultCursor$$anon$2$$anonfun$foldWhile$2.apply(cursor.scala:335)
        at reactivemongo.api.DefaultCursor$$anon$2$$anonfun$foldBulks$2.apply(cursor.scala:313)
        at reactivemongo.api.DefaultCursor$$anon$2$$anonfun$foldBulks$2.apply(cursor.scala:311)
        at reactivemongo.api.DefaultCursor$$anon$2$FoldResponses$$anonfun$procResponses$1.apply(cursor.scala:455)
        at reactivemongo.api.DefaultCursor$$anon$2$FoldResponses$$anonfun$procResponses$1.apply(cursor.scala:454)
        at scala.concurrent.Future$$anonfun$flatMap$1.apply(Future.scala:251)
        at scala.concurrent.Future$$anonfun$flatMap$1.apply(Future.scala:249)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
        at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
        at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
        at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
        at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
        at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
        at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

with "com.github.scullxbones" % "akka-persistence-mongo-rxmongo_2.11" % "1.0.3"

code for OrderIdActor object:

object OrderIdActor {
  sealed trait Command
  case object Increment extends Command

  sealed trait Event extends Serializable
  case class Incremented(value: Int = 1) extends Event

  def props = Props(new OrderIdActor)
  def name = "order-id"
}

Am I missing something in configuration?

Authentication against MongoDB via SCRAM-SHA-1

Please support the new database user authentication method supported by MongoDB called SCRAM-SHA-1.

The MongoDB drivers have now been updated to support this authentication method:
http://docs.mongodb.org/manual/release-notes/3.0-scram/#considerations-scram-sha-1-drivers
The relevant lowest casbah Scala Driver version is 2.8.0 according to the listing.

Here is the relevant API documentation of casbah that explains the how-to:
http://mongodb.github.io/casbah/guide/connecting.html#authentication

You just need to add some configuration property and use the following factory method for SCRAM-SHA-1:
createScramSha1Credential

As far as I can see, the small change would have to be made in:
https://github.com/scullxbones/akka-persistence-mongo/blob/ae092d0929fd13e1ab45a043d539b9a572ce782a/casbah/src/main/scala/akka/contrib/persistence/mongodb/CasbahPersistenceExtension.scala

mongo journal/snapshot enjoys higher priority than configuration

I found that if two cluster share one mongodb(same database name) even with different cluster name will eventually merge to one cluster.

It's really painful, because our cluster on dev are always trying to connect nodes on testbed. I know it must be someone in my team forget to modify mongo database name when starting cluster on testbed, but I don't know who it is since mongo journal/snapshot is not readable.

I am still using the 0.4.0 version of akka-persistence-mongo-rxmongo.

Consumer has to share class paths with producer

Hi scullxbones,

We seem to have a problem with your type hinting during persistence - or more accurately during reading of persisted events.

We have a microservice-architecture and thus, producer and consumer have their events in their own packages via copy&own. The problem arises when a consumer tries to read the journal and fails to load the hinted class as it is not in the classpath.

The resulting stacktrace is something like this:

java.lang.ClassNotFoundException: com.enterprise.producer.myconsumerservice.es.event.MyEntityCreatedEvent
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at org.springframework.boot.loader.LaunchedURLClassLoader.doLoadClass(LaunchedURLClassLoader.java:170)
    at org.springframework.boot.loader.LaunchedURLClassLoader.loadClass(LaunchedURLClassLoader.java:136)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:264)
    at akka.contrib.persistence.mongodb.Payload$.apply(MongoDataModel.scala:114)

(The consumer's package starts with com.enterprise.consumer)

Prior to this upgrade, the serialized payload was handed into our JSerializer as byte array and we got the (simple) class name from it. The serializer then had a mapping of simple class names to local classes in the consumer package. This also enabled for ignoring events that the consumer does not actually need. Furthermore, the events do not have to have the same name on both sides and multiple producer events can be mapped into a single generic event.

Does your framework support our scenario in the new version or are we missing something?

Our first idea to tackle this problem was to implement a generic class in a base package that all events are mapped to prior of persisting, something like

public class PersistedEvent implements Event {
   private byte[] payload;
   private String simpleClassName;
   ...
}

but I am not entirely sure wether this would be easily auto-upgradable (which we need as we already have real data which we must not throw away).

Any ideas on this?

Remove possibly buggy bulk inserts from rxm driver

from gitter:
@gbrd

tested : with casbah : create some event => replay ok - switch to rxmongo => replay ko
with rxmongo : create some events - replay KO - switch to casbah => replay ok

it only replays around 100 messages
I just noticed I have at startup an error log message : reactivemongo.core.actors.MongoDBSystem -
The entire node set is unreachable, is there a network problem?

Live versions of queries

  • Support NoRefresh hint for existing queries AllPersistenceIds and AllEvents
  • Missing NoRefresh should imply live, long-running queries

upgrade to casbah_2.10 version 2.8.1

We need to upgrade to mongo-java-driver version 2.13.x in order to use a MongoDB V3.0 and this is the minimum mongo-java-driver which is documented as compatible
http://docs.mongodb.org/ecosystem/drivers/driver-compatibility-reference/#reference-compatibility-mongodb-java

Currently akka-persistence-mongo depends on casbah_2.10 version 2.7.4.
casbah itself upgraded to mongo-java-driver 2.13.1 as of version 2.8.1
http://mongodb.github.io/casbah/changelog.html

So because we use akka-persistence-mongo I request to upgrade it to the current casbah-version
casbah_2.10 - version: 2.8.1

Use MongoDB batch inserts in the Casbah journal

Currently, it seems the Casbah journal implementation does one insert for each message:

https://github.com/scullxbones/akka-persistence-mongo/blob/master/casbah/src/main/scala/akka/contrib/persistence/mongodb/CasbahPersistenceJournaller.scala#L75

Performance could be improved by doing a MongoDB batch insert i.e. passing all of the documents to the insert method at one time (http://docs.mongodb.org/manual/core/bulk-inserts/).

As a side note: is the current driver ignoring the journal API requirement that batch inserts are atomic? I don't think that can be achieved using MongoDB unless compensation actions are manually implemented in the extension.

Documentation update (for akka 2.4.0)

Maybe you can add a hint to your documentation that the reactive-mongo/casbah dependency must be added to the project as well because in your build.sbt casbah/reative-mongo is marked as provided.

Simply adding your library those otherwise not work.

By the way great work!

Wolfgang

Change storage model to proposal by @krasserm

Per akka-dev, a viable approach for atomic batch updates in mongo would club together multiple PersistentImpls within an atomic unit (single document). This will require restructuring the physical store such that it is no longer document-per-PersistentImpl, but document-per-batch-of-PersistentImpl. There are implications for both deletes and confirms that need to be explored, since both are now tracked on a per-PersistentImpl basis. Replay will also require some forethought.

A simple first approach could be a master document per batch and sub-document per PersistentImpl - this would allow for a straightforward transition and continued use of in-line metadata (soft delete, confirmations, pid, sequence number)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.