Giter VIP home page Giter VIP logo

akka-persistence-cassandra's Introduction

Cassandra Plugins for Akka Persistence

Please note: work in this repository has been discontinued. The official location of the akka-persistence-cassandra project is now here.

Join the chat at https://gitter.im/krasserm/akka-persistence-cassandra

Replicated Akka Persistence journal and snapshot store backed by Apache Cassandra.

Build Status

Dependencies

Latest release

To include the latest release of the Cassandra plugins for Cassandra 2.1.x or 2.2.x into your sbt project, add the following lines to your build.sbt file:

resolvers += "krasserm at bintray" at "http://dl.bintray.com/krasserm/maven"

libraryDependencies += "com.github.krasserm" %% "akka-persistence-cassandra" % "0.7"

This version of akka-persistence-cassandra depends on Akka 2.4 and Scala 2.11.6. It is compatible with Cassandra 2.1.6 or higher (versions < 2.1.6 have a static column bug). Versions of the Cassandra plugins that are compatible with Cassandra 1.2.x are maintained on the cassandra-1.2 branch.

Latest release for Cassandra 3.x

To include the latest release of the Cassandra plugins for Cassandra 3.x into your sbt project, add the following lines to your build.sbt file:

resolvers += "krasserm at bintray" at "http://dl.bintray.com/krasserm/maven"

libraryDependencies += "com.github.krasserm" %% "akka-persistence-cassandra-3x" % "0.6"

This version of akka-persistence-cassandra depends on Akka 2.4 and Scala 2.11.6. It is compatible with Cassandra 3.0.0 or higher.

It implements the following Persistence Queries:

  • allPersistenceIds, currentPersistenceIds
  • eventsByPersistenceId, currentEventsByPersistenceId
  • eventsByTag, currentEventsByTag

Schema changes mean that you can't currently upgrade from a Cassandra 2.x version of the plugin to the Cassandra 3.x version and use existing data.

You should be able to export the data and load it to the new table definition.

Development snapshot

To include a current development snapshot of the Cassandra plugins into your sbt project, add the following lines to your build.sbt file:

resolvers += "OJO Snapshots" at "https://oss.jfrog.org/oss-snapshot-local" 

libraryDependencies += "com.github.krasserm" %% "akka-persistence-cassandra" % "0.7-SNAPSHOT"

This version of akka-persistence-cassandra depends on Akka 2.4 and Scala 2.11.6. It is compatible with Cassandra 2.1.6 or higher (versions < 2.1.6 have a static column bug).

Migrating from 0.3.x (Akka 2.3.x)

Schema and property changes mean that you can't currently upgrade from 0.3 to 0.4 SNAPSHOT and use existing data. This will be addressed in Issue 64.

Journal plugin

Features

  • All operations required by the Akka Persistence journal plugin API are fully supported.
  • The plugin uses Cassandra in a pure log-oriented way i.e. data are only ever inserted but never updated (deletions are made on user request only or by persistent channels, see also Caveats).
  • Writes of messages and confirmations are batched to optimize throughput. See batch writes for details how to configure batch sizes. The plugin was tested to work properly under high load.
  • Messages written by a single processor are partitioned across the cluster to achieve scalability with data volume by adding nodes.

Configuration

To activate the journal plugin, add the following line to your Akka application.conf:

akka.persistence.journal.plugin = "cassandra-journal"

This will run the journal with its default settings. The default settings can be changed with the following configuration keys:

  • cassandra-journal.contact-points. A comma-separated list of contact points in a Cassandra cluster. Default value is [127.0.0.1]. Host:Port pairs are also supported. In that case the port parameter will be ignored.
  • cassandra-journal.port. Port to use to connect to the Cassandra host. Default value is 9042. Will be ignored if the contact point list is defined by host:port pairs.
  • cassandra-journal.keyspace. Name of the keyspace to be used by the plugin. Default value is akka.
  • cassandra-journal.keyspace-autocreate. Boolean parameter indicating whether the keyspace should be automatically created if it doesn't exist. Default value is true.
  • cassandra-journal.keyspace-autocreate-retries. Int parameter which defines a number of retries before giving up on automatic schema creation. Default value is 1.
  • cassandra-journal.table. Name of the table to be used by the plugin. If the table doesn't exist it is automatically created. Default value is messages.
  • cassandra-journal.table-compaction-strategy. Configurations used to configure the CompactionStrategy for the table. Please refer to the tests for example configurations. Default value is SizeTieredCompactionStrategy. Refer to http://docs.datastax.com/en/cql/3.1/cql/cql_reference/compactSubprop.html for more information regarding the properties.
  • cassandra-journal.replication-strategy. Replication strategy to use. SimpleStrategy or NetworkTopologyStrategy
  • cassandra-journal.replication-factor. Replication factor to use when a keyspace is created by the plugin. Default value is 1.
  • cassandra-journal.data-center-replication-factors. Replication factor list for data centers, e.g. ["dc1:3", "dc2:2"]. Is only used when replication-strategy is NetworkTopologyStrategy.
  • cassandra-journal.max-message-batch-size. Maximum number of messages that will be batched when using persistAsync. Also used as the max batch size for deletes.
  • cassandra-journal.write-retries. The number of retries when a write request returns a TimeoutException or an UnavailableException. Default value is 3.
  • cassandra-journal.delete-retries. Deletes are achieved using a metadata entry and then the actual messages are deleted asynchronously. Number of retries before giving up. Default value is 3.
  • cassandra-journal.target-partition-size. Target number of messages per cassandra partition. Default value is 500000. Will only go above the target if you use persistAll and persistAllAsync Do not change this setting after table creation (not checked yet).
  • cassandra-journal.max-result-size. Maximum number of entries returned per query. Queries are executed recursively, if needed, to achieve recovery goals. Default value is 50001.
  • cassandra-journal.write-consistency. Write consistency level. Default value is QUORUM.
  • cassandra-journal.read-consistency. Read consistency level. Default value is QUORUM.

The default read and write consistency levels ensure that processors can read their own writes. During normal operation, processors only write to the journal, reads occur only during recovery.

To connect to the Cassandra hosts with credentials, add the following lines:

  • cassandra-journal.authentication.username. The username to use to login to Cassandra hosts. No authentication is set as default.
  • cassandra-journal.authentication.password. The password corresponding to username. No authentication is set as default.

To connect to the Cassandra host with SSL enabled, add the following configuration. For detailed instructions, please refer to the DataStax Cassandra chapter about SSL Encryption.

  • cassandra-journal.ssl.truststore.path. Path to the JKS Truststore file.
  • cassandra-journal.ssl.truststore.password. Password to unlock the JKS Truststore.
  • cassandra-journal.ssl.keystore.path. Path to the JKS Keystore file.
  • cassandra-journal.ssl.keystore.password. Password to unlock JKS Truststore and access the private key (both must use the same password).

To limit the Cassandra hosts this plugin connects with to a specific datacenter, use the following setting:

  • cassandra-journal.local-datacenter. The id for the local datacenter of the Cassandra hosts that this module should connect to. By default, this property is not set resulting in Datastax's standard round robin policy being used.

Caveats

  • Detailed tests under failure conditions are still missing.
  • Range deletion performance (i.e. deleteMessages up to a specified sequence number) depends on the extend of previous deletions
    • linearly increases with the number of tombstones generated by previous permanent deletions and drops to a minimum after compaction
    • linearly increases with the number of plugin-level deletion markers generated by previous logical deletions (recommended: always use permanent range deletions)

These issues are likely to be resolved in future versions of the plugin.

Snapshot store plugin

Features

  • Implements its own handler of the (internal) Akka Persistence snapshot protocol, making snapshot IO fully asynchronous (i.e. does not implement the Akka Persistence snapshot store plugin API directly).

Configuration

To activate the snapshot-store plugin, add the following line to your Akka application.conf:

akka.persistence.snapshot-store.plugin = "cassandra-snapshot-store"

This will run the snapshot store with its default settings. The default settings can be changed with the following configuration keys:

  • cassandra-snapshot-store.contact-points. A comma-separated list of contact points in a Cassandra cluster. Default value is [127.0.0.1]. Host:Port pairs are also supported. In that case the port parameter will be ignored.
  • cassandra-snapshot-store.port. Port to use to connect to the Cassandra host. Default value is 9042. Will be ignored if the contact point list is defined by host:port pairs.
  • cassandra-snapshot-store.keyspace. Name of the keyspace to be used by the plugin. Default value is akka_snapshot.
  • cassandra-snapshot-store.keyspace-autocreate. Boolean parameter indicating whether the keyspace should be automatically created if it doesn't exist. Default value is true.
  • cassandra-snapshot-store.keyspace-autocreate-retries. Int parameter which defines a number of retries before giving up on automatic schema creation. Default value is 1.
  • cassandra-snapshot-store.table. Name of the table to be used by the plugin. If the table doesn't exist it is automatically created. Default value is snapshots.
  • cassandra-snapshot-store.table-compaction-strategy. Configurations used to configure the CompactionStrategy for the table. Please refer to the tests for example configurations. Default value is SizeTieredCompactionStrategy. Refer to http://docs.datastax.com/en/cql/3.1/cql/cql_reference/compactSubprop.html for more information regarding the properties.
  • cassandra-snapshot-store.replication-strategy. Replication strategy to use. SimpleStrategy or NetworkTopologyStrategy
  • cassandra-snapshot-store.replication-factor. Replication factor to use when a keyspace is created by the plugin. Default value is 1.
  • cassandra-snapshot-store.data-center-replication-factors. Replication factor list for data centers, e.g. ["dc1:3", "dc2:2"]. Is only used when replication-strategy is NetworkTopologyStrategy.
  • cassandra-snapshot-store.max-metadata-result-size. Maximum number of snapshot metadata to load per recursion (when trying to find a snapshot that matches specified selection criteria). Default value is 10. Only increase this value when selection criteria frequently select snapshots that are much older than the most recent snapshot i.e. if there are much more than 10 snapshots between the most recent one and selected one. This setting is only for increasing load efficiency of snapshots.
  • cassandra-snapshot-store.write-consistency. Write consistency level. Default value is ONE.
  • cassandra-snapshot-store.read-consistency. Read consistency level. Default value is ONE.

To connect to the Cassandra hosts with credentials, add the following lines:

  • cassandra-snapshot-store.authentication.username. The username to use to login to Cassandra hosts. No authentication is set as default.
  • cassandra-snapshot-store.authentication.password. The password corresponding to username. No authentication is set as default.

To connect to the Cassandra host with SSL enabled, add the following configuration. For detailed instructions, please refer to the DataStax Cassandra chapter about SSL Encryption.

  • cassandra-snapshot-store.ssl.truststore.path. Path to the JKS Truststore file.
  • cassandra-snapshot-store.ssl.truststore.password. Password to unlock the JKS Truststore.
  • cassandra-snapshot-store.ssl.keystore.path. Path to the JKS Keystore file.
  • cassandra-snapshot-store.ssl.keystore.password. Password to unlock JKS Truststore and access the private key (both must use the same password).

To limit the Cassandra hosts this plugin connects with to a specific datacenter, use the following setting:

  • cassandra-snapshot-store.local-datacenter. The id for the local datacenter of the Cassandra hosts that this module should connect to. By default, this property is not set resulting in Datastax's standard round robin policy being used.

akka-persistence-cassandra's People

Contributors

analytically avatar bartlemstra avatar bjgbeelen avatar chbatey avatar gavares avatar gitter-badger avatar gmalouf avatar hannesstockner avatar josephkiranbabu avatar jparkie avatar krasserm avatar magnusart avatar magro avatar matlockx avatar michaeldiamant avatar patriknw avatar raboof avatar twisniewski avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

akka-persistence-cassandra's Issues

WIP 2.4 not working correctly with EventAdapters

STR:

  1. Register an eventadapter for a persisted event
  2. Override the stringmanifest in said adapter with a switch based on the event content (e.g. amount less or more than x)

Expected:

  • Select the correct event adapter step upon replay based on the set manifest

Actual:

  • Select the adapter step for the empty ("") manifest

Reduce default msgs per partition

Nothing wrong with 5 million for a very large use case but I am concerned smaller use cases like 100k transactions/day will spend the first 50 days in a single C* partition.

The benefit we get from larger partitions is sequential disk access on replay, but assuming the partition size % number of rows we read = 0 we'll get this.

What do people think?

Improve replay throughput

Considerations:

  • available fetching strategies

Alternatives:

  • parallel queries (inside and/or across partitions)
  • load concurrently up to n messages into memory then send them to processors

Browsable event journal

Browsing events is cumbersome since they are stored in blobs which are not easily human readable during development and maintenance.

Could there be a way to address this problem, perhaps by allowing for a JSON serialization into a text/varchar column instead of blob column or even custom schemas?

StringIndexOutOfBoundsException on recovery

I just started trying to use the Cassandra journal here after having done some initial dev work using the LevelDB journal. During a recovery (after restarting my service) I get the following exception. I noticed the code at the exception poing uses a substring index of 2, but in the Cassandra data the marker field seems to have a single character...

I'm using Scala-2.10.4 and Akka-2.3.3 (But I tried reverting to 2.3.2 as the README mentions and had the same issue).

Thanks,
-Jeff

alert-core 16:13:52,047 ERROR  OneForOneStrategy              - Recovery failure by journal (processor id = [/user/alert-management-cluster/victorops-channel-proxy]) - akka://platformcluster/user/alert-management-cluster/victorops-channel-proxy/$b 
akka.persistence.RecoveryException: Recovery failure by journal (processor id = [/user/alert-management-cluster/victorops-channel-proxy])
    at akka.persistence.Processor$class.akka$persistence$Processor$$onRecoveryFailure(Processor.scala:157) ~[akka-persistence-experimental_2.10-2.3.3.jar:na]
    at akka.persistence.Processor$$anon$1.aroundReceive(Processor.scala:70) ~[akka-persistence-experimental_2.10-2.3.3.jar:na]
    at akka.persistence.Recovery$class.aroundReceive(Recovery.scala:242) ~[akka-persistence-experimental_2.10-2.3.3.jar:na]
    at akka.persistence.RequestWriter.aroundReceive(PersistentChannel.scala:266) ~[akka-persistence-experimental_2.10-2.3.3.jar:na]
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) [akka-actor_2.10-2.3.3.jar:na]
    at akka.actor.ActorCell.invoke(ActorCell.scala:487) [akka-actor_2.10-2.3.3.jar:na]
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) [akka-actor_2.10-2.3.3.jar:na]
    at akka.dispatch.Mailbox.run(Mailbox.scala:220) [akka-actor_2.10-2.3.3.jar:na]
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) [akka-actor_2.10-2.3.3.jar:na]
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [scala-library.jar:na]
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [scala-library.jar:na]
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [scala-library.jar:na]
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [scala-library.jar:na]
Caused by: java.lang.StringIndexOutOfBoundsException: String index out of range: -1
    at java.lang.String.substring(String.java:1875) ~[na:1.7.0_60]
    at akka.persistence.cassandra.journal.CassandraRecovery$MessageIterator.fetch(CassandraRecovery.scala:71) ~[akka-persistence-cassandra_2.10-0.3.jar:0.3]
    at akka.persistence.cassandra.journal.CassandraRecovery$MessageIterator.<init>(CassandraRecovery.scala:41) ~[akka-persistence-cassandra_2.10-0.3.jar:0.3]
    at akka.persistence.cassandra.journal.CassandraRecovery$class.readHighestSequenceNr(CassandraRecovery.scala:21) ~[akka-persistence-cassandra_2.10-0.3.jar:0.3]
    at akka.persistence.cassandra.journal.CassandraJournal.readHighestSequenceNr(CassandraJournal.scala:18) ~[akka-persistence-cassandra_2.10-0.3.jar:0.3]
    at akka.persistence.cassandra.journal.CassandraRecovery$$anonfun$asyncReadHighestSequenceNr$1.apply$mcJ$sp(CassandraRecovery.scala:18) ~[akka-persistence-cassandra_2.10-0.3.jar:0.3]
    at akka.persistence.cassandra.journal.CassandraRecovery$$anonfun$asyncReadHighestSequenceNr$1.apply(CassandraRecovery.scala:18) ~[akka-persistence-cassandra_2.10-0.3.jar:0.3]
    at akka.persistence.cassandra.journal.CassandraRecovery$$anonfun$asyncReadHighestSequenceNr$1.apply(CassandraRecovery.scala:18) ~[akka-persistence-cassandra_2.10-0.3.jar:0.3]
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) ~[scala-library.jar:na]
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) ~[scala-library.jar:na]
    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) ~[akka-actor_2.10-2.3.3.jar:na]
    ... 5 common frames omitted

Make contactPointList more robust

Hi,

this is more a question as a direct issue but what I saw at work while using the cassandra driver is, that addContactPoint actually throws an exception if the hostname cannot be resolved which breaks the actor during initialisation.
I would propose something like

contactPoints.foreach(contactPoint =>
   Try(builder.addContactPoint(contactPoint)) 
}
if(builder.getContactPoints.size == 0) throw new IllegalArgumentException("....")

and only fail when there are no contact points at all in the builder, because the driver should work with only one accessible contact point. wdt? I can immediately create the PR.

Best

Martin

PersistenActor not receiving events.

I have tried to use this plugin, but I am experiencing a problem. When I started my application I could see that the akka.messages table was created in Cassandra. The problem is that the PersistentActor is not receiving any event at all. Even with a simple PersistentActor like this:

class SimplePersistentActor extends PersistentActor {
override def persistenceId = "simple-id-one"

var state: Int = 0

def updateState(inc: Int): Unit =
state += inc

val receiveRecover: Receive = {
case inc: Int =>
updateState(inc)
}

val receiveCommand: Receive = {
case ("add", inc: Int) =>
persist(inc)(updateState)
case ("dec", inc: Int) =>
persist(-inc)(updateState)
case "print" =>
println(s"Value: ${state}")
case "snap" =>
saveSnapshot(state)
}
}

If I disable the plugin (from the configuration file) then the actor behaves normally again.
A "normal" actor, on the other hand, doesn't suffer any problem in any case: it always receives every message.

Any hint?

Journal write consistency level being ignored

Problem Overview

Not sure what the problem is, but it feels like the cassandra-journal write-consistency configuration option specified in application.conf is being ignored and a default consistency level of ONE is being used instead. No matter what cassandra-journal write-consistency setting is specified, if just one Cassandra node is available then journal writes will succeed. However, journal reads and snapshot read/writes fail as expected with respect to the consistency level specified.

Testing which found the problem

Tested using:

  1. akka-persistence-cassandra v-0.3.2
  2. Cassandra V2.0.8 installed on CentOS 6.5 using RPM package

Setup:

  1. Created a Cassandra cluster with three nodes
  2. Copied the following into application.conf
    https://github.com/krasserm/akka-persistence-cassandra/blob/fb3ed6614c2db6e8fbe7b4fe793466b9f9d5871e/src/main/resources/reference.conf
  3. Then made the following changes for both cassandra-journal and cassandra-snapshot-store:
  4. replication-factor = 3
  5. write-consistency = "QUORUM"
  6. read-consistency = "QUORUM"

According to http://www.ecyrd.com/cassandracalculator/ the cluster's resilience should be:

  1. Your reads are consistent
  2. You can survive the loss of 1 node
  3. You are really reading from 2 nodes every time
  4. You are really writing to 2 nodes every time
  5. Each node holds 100% of your data

Goal of testing was to verify the above resilience:

  • Test 1 - with all three nodes active, test that journal read/writes and snapshot read/writes work correctly
  • Test 2 - turn off 1 Cassandra node and test that journal read/writes and snapshot read/writes work correctly
  • Test 3 - with two Cassandra nodes turned off, test that journal read/writes and snapshot read/writes fail correctly. It was during this testing the possible bug was noticed

Test 3 results:

  1. journal write succeeded - INCORRECT behaviour, should not have worked
  2. journal read failed - CORRECT behaviour
  3. snapshot write failed - CORRECT behaviour
  4. snapshot read failed - CORRECT behaviour

Further testing:
In an attempt to get journal writes to fail, only one Cassandra node was active and all consistency levels (journal read & snapshot read/write) were set to ONE except for journal write which was, in separate tests set to:

  1. TWO
  2. ALL

Again, the journal writes succeeded and the data really was written to the message table (verified using cqlsh 4.1.1 and DevCenter 1.1.1) . The only way to get journal writes to fail is to turn off all Cassandra nodes in which case the journal write fails as intended with the message:

PersistenceFailure(Add(ledger1,Some(1)),6,java.util.concurrent.ExecutionException: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (no host was tried))

Multi-Env support

Hi, Martin, great stuff here!

In my current project we've stumbled across a problem when it comes to support different configurations based on the environment. For example, we'd want to have "localhost" as the only contact point when we're working in dev/local but a list of contact points, according to some environment variable, in pre and production environments.

We've tried to employ the optional overriding technique [1] having something like this:

cassandra-journal {
  keyspace = "foo"
  table = "journal"
  contact-points = ["localhost"]
  contact-points = [${?CASSANDRA_HOST}]
}

But it doesn't work properly when that optional env variables are arrays. In dev, when CASSANDRA_HOST is not set, the contact-points are not evaluated as "localhost". How could we deal with this "limitation"?.

Thanks in advance,

Juanjo

[1] https://github.com/typesafehub/config#optional-system-or-env-variable-overrides

Providing support with lower Cassandra versions 1.2.x

Hi Martin,

We intend to use this plugin for our newly built akka-esper CEP component for our product which uses lower version of Cassandra 1.2.x. At the moment we cannot switch to higher version as the product is already live and deployed across multiple retailers. To use your plugin with lower version cassandra we downloaded the akka-persistence-cassandra code and did the below listed changes,

  1. Datastax cassandra driver downgraded from 2.0.3 to 2.0.1
  2. Certain CQL Statements in "CassandraStatements.scala" in both journal and snapshot packages changed
    a. Create keyspace has the IF Exists construct which is available only with CQL 3.0.1 and not CQL 3 -> removed ( need to think with CQL 3 how to do this)
    b. Create table has the IF Exists contruct which is available only with CQL 3.0.1 and not CQL 3 -> removed ( need to think with CQL 3 how to do this)
  3. Keyspaces were configured to be created with SimpleStrategy as Strategy class, changed it to NetworktopologyStrategy
  4. Both Journal and Snapshot code has the BatchStatement which is not available with Cassandra protocol version 1 -> replaced this with BoundStatement in CassandraJournal.scala and CassandraSnapshotStore.scala

With above changes on adding the updated jar in our component we could configure journal and snapshot, without any errors seen on startup and saw the tables + keyspaces ( used default ) being created on our environment. Please suggest if the above changes are on correct lines and if any further changes would be required. Also what should be our next steps to make this plugin usuable with lower version of cassandra. Thanks for your help

0.4-SNAPSHOT: Unknown identifier persistence_id

I tried 0.4-SNAPSHOT for Akka 2.4.0-RC2, and I had an old cassandra instance that was populated with things from some Akka 2.3.x version. Startup (recover) fails with the following:

[ERROR] [09/11/2015 12:36:41.026] [ClusterSystem-akka.actor.default-dispatcher-19] [akka://ClusterSystem/system/cassandra-snapshot-store] Unknown identifier persistence_id
akka.actor.ActorInitializationException: exception during creation
    at akka.actor.ActorInitializationException$.apply(Actor.scala:172)
    at akka.actor.ActorCell.create(ActorCell.scala:605)
    at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:460)
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:482)
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282)
    at akka.dispatch.Mailbox.run(Mailbox.scala:223)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: com.datastax.driver.core.exceptions.InvalidQueryException: Unknown identifier persistence_id
    at com.datastax.driver.core.exceptions.InvalidQueryException.copy(InvalidQueryException.java:35)
    at com.datastax.driver.core.DefaultResultSetFuture.extractCauseFromExecutionException(DefaultResultSetFuture.java:289)
    at com.datastax.driver.core.AbstractSession.prepare(AbstractSession.java:79)
    at akka.persistence.cassandra.snapshot.CassandraSnapshotStore.<init>(CassandraSnapshotStore.scala:37)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
    at java.lang.Class.newInstance(Class.java:442)
    at akka.util.Reflect$.instantiate(Reflect.scala:44)
    at akka.actor.NoArgsReflectConstructor.produce(Props.scala:357)
    at akka.actor.Props.newActor(Props.scala:259)
    at akka.actor.ActorCell.newActor(ActorCell.scala:561)
    at akka.actor.ActorCell.create(ActorCell.scala:587)
    ... 9 more
Caused by: com.datastax.driver.core.exceptions.InvalidQueryException: Unknown identifier persistence_id
    at com.datastax.driver.core.Responses$Error.asException(Responses.java:103)
    at com.datastax.driver.core.SessionManager$1.apply(SessionManager.java:185)
    at com.datastax.driver.core.SessionManager$1.apply(SessionManager.java:160)
    at com.google.common.util.concurrent.Futures$1.apply(Futures.java:713)
    at com.google.common.util.concurrent.Futures$ChainingListenableFuture.run(Futures.java:861)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

[INFO] [09/11/2015 12:36:41.026] [ClusterSystem-akka.actor.default-dispatcher-19] [akka://ClusterSystem/system/cassandra-snapshot-store] Message [akka.persistence.SnapshotProtocol$LoadSnapshot] from Actor[akka://ClusterSystem/system/sharding/PostCoordinator/singleton/coordinator#184144080] to Actor[akka://ClusterSystem/system/cassandra-snapshot-store#454194045] was not delivered. [3] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [09/11/2015 12:36:41.027] [ClusterSystem-akka.actor.default-dispatcher-19] [akka://ClusterSystem/system/cassandra-snapshot-store] Message [akka.persistence.SnapshotProtocol$LoadSnapshot] from Actor[akka://ClusterSystem/system/sharding/AuthorListingCoordinator/singleton/coordinator#454485575] to Actor[akka://ClusterSystem/system/cassandra-snapshot-store#454194045] was not delivered. [4] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[ERROR] [09/11/2015 12:36:41.033] [ClusterSystem-akka.actor.default-dispatcher-19] [akka://ClusterSystem/system/cassandra-journal] Unknown identifier persistence_id
akka.actor.ActorInitializationException: exception during creation
    at akka.actor.ActorInitializationException$.apply(Actor.scala:172)
    at akka.actor.ActorCell.create(ActorCell.scala:605)
    at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:460)
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:482)
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282)
    at akka.dispatch.Mailbox.run(Mailbox.scala:223)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: com.datastax.driver.core.exceptions.InvalidQueryException: Unknown identifier persistence_id
    at com.datastax.driver.core.exceptions.InvalidQueryException.copy(InvalidQueryException.java:35)
    at com.datastax.driver.core.DefaultResultSetFuture.extractCauseFromExecutionException(DefaultResultSetFuture.java:289)
    at com.datastax.driver.core.AbstractSession.prepare(AbstractSession.java:79)
    at akka.persistence.cassandra.journal.CassandraJournal.<init>(CassandraJournal.scala:52)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
    at java.lang.Class.newInstance(Class.java:442)
    at akka.util.Reflect$.instantiate(Reflect.scala:44)
    at akka.actor.NoArgsReflectConstructor.produce(Props.scala:357)
    at akka.actor.Props.newActor(Props.scala:259)
    at akka.actor.ActorCell.newActor(ActorCell.scala:561)
    at akka.actor.ActorCell.create(ActorCell.scala:587)
    ... 9 more
Caused by: com.datastax.driver.core.exceptions.InvalidQueryException: Unknown identifier persistence_id
    at com.datastax.driver.core.Responses$Error.asException(Responses.java:103)
    at com.datastax.driver.core.SessionManager$1.apply(SessionManager.java:185)
    at com.datastax.driver.core.SessionManager$1.apply(SessionManager.java:160)
    at com.google.common.util.concurrent.Futures$1.apply(Futures.java:713)
    at com.google.common.util.concurrent.Futures$ChainingListenableFuture.run(Futures.java:861)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Then I disabled the snapshot plugin, and then it fails with:

[ERROR] [09/11/2015 12:39:53.595] [ClusterSystem-akka.actor.default-dispatcher-15] [akka://ClusterSystem/system/cassandra-journal] Unknown identifier persistence_id
akka.actor.ActorInitializationException: exception during creation
    at akka.actor.ActorInitializationException$.apply(Actor.scala:172)
    at akka.actor.ActorCell.create(ActorCell.scala:605)
    at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:460)
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:482)
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282)
    at akka.dispatch.Mailbox.run(Mailbox.scala:223)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: com.datastax.driver.core.exceptions.InvalidQueryException: Unknown identifier persistence_id
    at com.datastax.driver.core.exceptions.InvalidQueryException.copy(InvalidQueryException.java:35)
    at com.datastax.driver.core.DefaultResultSetFuture.extractCauseFromExecutionException(DefaultResultSetFuture.java:289)
    at com.datastax.driver.core.AbstractSession.prepare(AbstractSession.java:79)
    at akka.persistence.cassandra.journal.CassandraJournal.<init>(CassandraJournal.scala:52)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
    at java.lang.Class.newInstance(Class.java:442)
    at akka.util.Reflect$.instantiate(Reflect.scala:44)
    at akka.actor.NoArgsReflectConstructor.produce(Props.scala:357)
    at akka.actor.Props.newActor(Props.scala:259)
    at akka.actor.ActorCell.newActor(ActorCell.scala:561)
    at akka.actor.ActorCell.create(ActorCell.scala:587)
    ... 9 more
Caused by: com.datastax.driver.core.exceptions.InvalidQueryException: Unknown identifier persistence_id
    at com.datastax.driver.core.Responses$Error.asException(Responses.java:103)
    at com.datastax.driver.core.SessionManager$1.apply(SessionManager.java:185)
    at com.datastax.driver.core.SessionManager$1.apply(SessionManager.java:160)
    at com.google.common.util.concurrent.Futures$1.apply(Futures.java:713)
    at com.google.common.util.concurrent.Futures$ChainingListenableFuture.run(Futures.java:861)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

[INFO] [09/11/2015 12:39:53.595] [ClusterSystem-akka.actor.default-dispatcher-15] [akka://ClusterSystem/system/cassandra-journal] Message [akka.persistence.JournalProtocol$ReplayMessages] from Actor[akka://ClusterSystem/system/sharding/PostCoordinator/singleton/coordinator#-2081023810] to Actor[akka://ClusterSystem/system/cassandra-journal#1713522111] was not delivered. [3] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [09/11/2015 12:39:53.595] [ClusterSystem-akka.actor.default-dispatcher-15] [akka://ClusterSystem/system/cassandra-journal] Message [akka.persistence.JournalProtocol$ReplayMessages] from Actor[akka://ClusterSystem/system/sharding/AuthorListingCoordinator/singleton/coordinator#-2079841851] to Actor[akka://ClusterSystem/system/cassandra-journal#1713522111] was not delivered. [4] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

After removing the old data everything is fine, but I wanted to let you know in case there is some regression/migration issue.

Future returned by asyncReplayMessages fails to complete

This issue stems from a post on the akka mailing list where it appears that the akka-persistence-cassandra journal fails to return a response to a View when reading messages from Cassandra. Unfortunately, I am unable (so far) to reproduce the issue. The pertinent details about my environment/setup are in the linked mailing list post.

'2.4-snapshot' incompatible with 'master' schema

I tried using the akka-2.4 branch and it appears that akka/persistence/cassandra/journal/CassandraStatements.scala uses "persistance_id" instead of "processor_id" which causes the library not to work if existing ('master' schema) tables are already present.
Had to 'drop' the corresponding keyspaces to make the 'akka-2.4' branch work

Persist immutable configuration

At the moment changing the maxPartition size after start up will break the plugins replay. We should persist these properties and validate they aren't changed. In future we could add a migration tool to move existing data to a new partition size. Perhaps spark based.

serializaion binding issue

the plugin always serialize the payload using the serializer binded to persistentRep.

so In case the application define a custom serializer to its objects , e.g
akka.actor {
serializers {
my-payload = "docs.persistence.MyPayloadSerializer"
my-snapshot = "docs.persistence.MySnapshotSerializer"
}
serialization-bindings {
"docs.persistence.MyPayload" = my-payload
"docs.persistence.MySnapshot" = my-snapshot
}
}

the plugin will still serialize the payload using the PersistentRepr serialize as in the command
def persistentToByteBuffer(p: PersistentRepr): ByteBuffer =
ByteBuffer.wrap(serialization.serialize(p).get)

we call the serialize on PersistentRepr.

Enhancement to support snapshot-store using cassandra

Hi Martin,

I'm currently using the akka-persistence-cassandra for one of my project and so far it works really great, but I was wondering if it would be possible to add a snapshot-store functionality for the cassandra driver so that we can rely just on one plugin.

Right now when using cassandra as the journal plugin, I still have to use some other plugins to have distributed snapshot store.

Do you think it would be possible to add such functionality?

Cheers,
/Benoit

Migration strategy?

Document possible migration strategies from plain file-based akka-persistence journals/snapshots.

Recovery optimization for persistent channels

Durable queues on top of Cassandra are known as anti-pattern. The issues related to reading a large number of tombstones can be addressed by introducing optimizations on persistent channel level and on journal level.

Possible optimizations on persistent channel level are (copied from a code comment in PersistentChannel.scala):

// TODO: avoid scanning over large number of tombstones during recovery
//
// Introduce an optimization to address issues mentioned in
// http://www.datastax.com/dev/blog/cassandra-anti-patterns-queues-and-queue-like-datasets
// (which is also relevant when using a local LevelDB).
//
// This requires that recovery should start from the first
// non-deleted message rather than from sequence number 1.
// This can be achieved by taking empty snapshots (savepoints) 
// to set a recovery starting point. During recovery
//
// - when the first replayed message is received, take its
//   sequence number n and write a savepoint at n - 1.
//   This ensures that the next recovery skips a possible
//   expensive scan over n - 1 messages with a tombstone.
// - when the savepoint has been successfully written, delete
//   all savepoints that are older than n - 1.
//
// Writing a savepoint at lastSequenceNr - 1 requires a direct
// interaction with the snapshot store actor.
//
// Alternative/addition:
//
// The RequestReader actor of a persistent channel could
// also process confirmation messages and compute starting
// points for recovery. At periodic intervals, these starting
// points are then persisted as savepoints.

One optimization on journal level is to delete entire rows if all columns have been marked as deleted (i.e. have a tombstone). A prerequisite is to have row splitting which is already implemented by #1.

Furthermore, reads form the journal with no lower bound (i.e. starting from sequence number 1) are only done during recovery. All other reads (done by View or PersistentChannel) have a lower bound which are fast. Assuming infrequent persistent channel recoveries, Cassandra can be configured in a way that tombstone garbage collection is likely to occur between recoveries.

Atomic deletes

Deleting a range of sequence numbers will normally require deletions in multiple partitions so the whole operation is not atomic. As far as I am aware this isn't an explicit requirement but it could lead to partially successfully deletions if some deletes fail and some succeed. Consider putting a per partition (static) field to record deletedUpTo that any message scans would use as their starting point and then execute the deletions asynchronously with retries.

This requires a schema change so we should do it before 2.4.

ConfigurationException: Column family ID mismatch

Hi,

when I start two services with akka persistance and both services try to create the schema at the same time I get a ConfigurationException: Column family ID mismatch
It seems that it's not really easy to solve:

http://mail-archives.apache.org/mod_mbox/cassandra-user/201411.mbox/%[email protected]%3E

https://issues.apache.org/jira/browse/CASSANDRA-8387

My proposal would be something like:

retry(3) {
     if (config.keyspaceAutoCreate) session.execute(createKeyspace)
     session.execute(createTable)
}

@annotation.tailrec
  private def retry[T](n: Int)(fn: => T): T = {
    util.Try { fn } match {
      case util.Success(x) => x
      case _ if n > 1 => retry(n - 1)(fn)
      case util.Failure(e) => throw e
    }
  }

in CassandraJournal and CassandraSnapshotStore. wdyt?
It's a workaround, I know, but it works so far well in our project. I create a PR if you want

Schema migration tool

With the new plugin API in akka-persistence 2.4, the implementation of CassandraJournal and the underlying schema can be strongly simplified (for example, we can remove headers and markers, ...).

Although it would be possible to stay backwards compatible with the existing schema, it would make working on #48 and the maintenance of the plugin unnecessarily complex. Hence, a migration tool seems to be a better solution than backwards compatibility.

  • depends on #48
  • supersedes #54.

Persistence Query for Cassandra

Akka persistence query complements Persistence by providing a universal asynchronous stream based query interface that various journal plugins can implement in order to expose their query capabilities. The API is documented http://doc.akka.io/docs/akka/snapshot/scala/persistence-query.html#persistence-query-scala. An example implementation using LevelDB is described here http://doc.akka.io/docs/akka/snapshot/scala/persistence-query-leveldb.html#persistence-query-leveldb-scala.

Akka-persistence-cassandra should support this new query side API. The API is available in Akka 2.4 so this work will need to be done against akka-persistence-cassandra supporting 2.4. It will also require introduction of dependency on Akka streams.

The work is tracked in the following tickets

  • Master Data Management Write Side #96
  • Stream Merging Component #97
  • Index Management #98
  • Stream Production #99
  • Replay #100
  • Migration #101

Port configuration only honored for first host in contact-points

Relevant configuration:

cassandra-journal {
  contact-points: [172.31.10.125, 172.31.17.250, ...... ]
  port = 10150
}

During application start, the first host is connected to successfully on the correct port. All connection attempts to the remaining hosts in the list fail as they are attempting to connect on the default port 9042:

[debug] c.d.d.c.Connection - Connection[/172.31.17.250:9042-14, inFlight=0, closed=false] connection error
java.net.ConnectException: Connection refused: /172.31.17.250:9042
        at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[na:1.8.0]
        at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:712) ~[na:1.8.0]
        at org.jboss.netty.channel.socket.nio.NioClientBoss.connect(NioClientBoss.java:152) [netty-3.10.3.Final.jar:na]
        at org.jboss.netty.channel.socket.nio.NioClientBoss.processSelectedKeys(NioClientBoss.java:105) [netty-3.10.3.Final.jar:na]
        at org.jboss.netty.channel.socket.nio.NioClientBoss.process(NioClientBoss.java:79) [netty-3.10.3.Final.jar:na]
        at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:337) [netty-3.10.3.Final.jar:na]

I've tried using host:port pairs instead and get the same result.
I also tried just registering a single host and still the same result as the remaining nodes are apparently discovered after connecting to the single host.

I can successfully telnet to any of the listed ip addrs on port 10150 so I'm fairly certain that the cassandra nodes are up and running properly on port 10150.

Is there something misconfigured on my end?

Deleting messages quickly creates cassandra thombstone issues

We ran into an issue with an actor that stores many events per minute and periodically cleans up the created events.

At a given point the queries to cassandra start falling over, because every delete creates a thombstone and once cassandra reads 100000 thombstones for a query it'll fail the query. One solution would be to define a gc_grace_seconds that's shorter than 10 days on the messages table, but the plugin doesn't expose the possibilities to do so.

It would be nice if this were possible.

Make retry policy configurable

Persistent actors are stopped if a persist fails which can be an expensive operation as when the Actor is restarted a full reply is required.

Could start with a fixed retry policy + offer the built in ones from the datastax driver.

Warning: Batch of prepared statements exceeding threshold

Hello Martin,

I see these warnings in cassandra logs when trying this out-
Batch of prepared statements for [akka.messages] is of size 5331, exceeding specified threshold of 5120 by 211.

This looks more of a concern from cassandra side than this plugin itself. But, wanted to check anyway.

Thanks in advance.

Implement cassandra-journal.max-result-size via Paging

Atm a LIMIT query is used. We could use cassandra paging fetch-size introduced in C* 2.0:

http://datastax.github.io/java-driver/2.1.7/features/paging/

This is used in the C* Spark Connector when bringing a large number of rows into Spark from C*.

It would remove one of the cases in the RowIterator, we're already doing synchronous queries so this would be almost transparent (call to next() would just block). You can also query the paging state to see how many rows can be taken without blocking / get a future for when more rows are available.

WDYT?

Possible to read data with partially applied or unapplied batches during replay?

First, great work on this project! I was looking at it for inspiration in adding similar Cassandra support to Akka.NET. Here's the scenario I'm imagining might be an issue:

  1. Actor is writing a batch of events (could be one or more events) to the Journal.
  2. The Journal looks like it's using a Logged Batch to send the writes to Cassandra.
  3. The Batch is accepted by Cassandra and the Journal receives confirmation so as far as it's concerned, the writes were successful.
  4. Actor crashes and so it goes into recovery. Sends read requests to the Journal (highest sequence number, replay of events) to rebuild state.
  5. Journal reads data from Cassandra and doesn't read some or all of the previously successful batch because A) It hasn't been applied yet or B) It's only been partially applied.
  6. Thus, Actor ends up in a bad state where it either has the wrong sequence number (and thus overwrites the previous batch on subsequent writes) or makes bad decisions because its in an incorrect/incomplete state.

The reason I think this might be a problem is because Logged Batches in Cassandra, while offering atomicity in the sense of "all writes will be applied" if the batch is accepted or "no writes will be applied" if the batch is rejected, they only guarantee that the writes will be applied eventually. They also don't have any isolation guarantees either, so it's possible to read a partially committed batch (i.e. 2 of the 3 statements in the batch may have been applied, but the third hasn't).

So, my question: does this scenario seem possible?

If so, it seems like maybe an edge failure case, but it could definitely cause some strange or unexpected behavior if it is. I could also be totally wrong about the possibility. ๐Ÿ˜„

Not possible to use 'NetworkTopologyStrategy' as replication strategy for created keyspaces

When the keyspaces for the akka journal and akka snapshots are created, they are created with the SimpleStrategy replication strategy as hard-coded setting. It should be great to also have the possibility to let the plug-in create the keyspaces with the NetworkTopologyStrategy and specify the number of replicas per data center. Example CQL (as taken for Cassandra documentation):

CREATE KEYSPACE "Excalibur"
WITH REPLICATION = { 'class' : 'NetworkTopologyStrategy', 'dc1' : 3, 'dc2' : 2 };

Update Cassandra driver to version 2.1

Hi,

is it possible to create a new 0.3.4 release with the updated cassandra driver dependency? With the current version I get lots of 'c.d.d.c.TableMetadata - Error parsing schema options for table' errors with cassandra 2.1.0 which is gone by using the driver 2.1.1 or as you have here 2.0.5. I can override the driver dependency in my project but having it here would be better also for other people using it the first time with newest cassandra version.

Thx

Consider changing impl of Highest sequence number more efficiently

While thinking about how we can avoid redundant deletes it got me thinking we can change read highest sequence nr by issuing a single query per partition rather than using the iterator e.g:

select inuse, sequence_nr from messages where persistence_id = ? and partition_nr = ? order by sequence_nr desc limit 1

As soon as inuse is null/false we can use the sequence_nr from the previous query.

C* keeps a partition index which means that going to the end is efficient (tho not as efficient as getting the first row in a partition).

Provide the 0.4-SNAPSHOT in maven

Hello,
You have already update README.md on master with new snapshot version:

resolvers += "krasserm at bintray" at "http://dl.bintray.com/krasserm/maven"

libraryDependencies += "com.github.krasserm" %% "akka-persistence-cassandra" % "0.4-SNAPSHOT"

I have tested and 0.3.9 does not work with Akka 2.4-RC2. Is it possible to build 0.4-SNAPSHOT and place it in maven?

CassandraJournal constructor fails on creating keyspace

We have keyspace created in cassandra by admins and our cassandra user does not have permission to create new keyspaces. Therefore session.execute(createKeyspace) line fails with UnauthorizedException.

Possible sollution is to wrap session.execute(createKeyspace) with check if keyspace already exists on cluster.

Should we provide PR?

Datastax Java Driver TLS Support

Hi

I poked around in the code and could not find any support for TLS (SSL). Is this planned or excluded on purpose? When running Cassandra in the Cloud it feels like a good idea to have traffic encrypted.

It looks as if the DataStax driver has support for this already so adding the configuration setting clusterBuilder.withSSL() in CassandraPluginConfig would work.

The enables the user to supply keystore+truststore with JSSE. Of course full configurability is even nicer.

How to set this up with the DataStax driver is described in a blogpost:
http://www.datastax.com/dev/blog/accessing-secure-dse-clusters-with-cql-native-protocol

I can give this a try and contribute on this if you wish to have some help.

Contact Points with ports

Currently there is only the possibility to use the same port for all contact points. For cluster testing (for example with docker) it would be great to use different ports for each contact point. What do you think? Should I provide a pr?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.