Giter VIP home page Giter VIP logo

photon's Introduction

Photon Build Status

An event sourcing framework built on top of Apache Cassandra allowing horizontally scalable throughput and native cross data-center replication.

Benefits of Photon

  1. Exactly one delivery.
  2. Horizontal Scalability.
  3. Backed by Apache Cassandra.
  4. Native cross data center replication.

Getting Started

Once you have setup a Cassandra cluster then deploy the necessary schema located here link.

Make sure you include the Photon dependency below in your code

<dependencies>
    <dependency>
        <groupId>com.homeaway.datatools.photon</groupId>
        <artifactId>photon</artifactId>
        <version>0.1.1</version>
    </dependency>
</dependencies>

In order to create a producer or consumer, you will first need to define the properties that will be used to build the producer or consumer. Below an example of properties using the default Cassandra driver and the default Json serializer.

Producer

Properties properties = new Properties();
properties.put(PHOTON_DRIVER_CLASS, "com.homeaway.datatools.photon.driver.CassandraPhotonDriver");
properties.put(PHOTON_SERIALIZER_CLASS, "com.homeaway.datatools.photon.serialization.JsonPhotonSerializer");
properties.put(SESSION_CONTACT_POINTS, "XX.XXX.X.XX,XX.XXX.XX.XXX,XX.XXX.XX.XXX");
properties.put(SESSION_USER_NAME,"{username}");
properties.put(SESSION_PASSWORD,"{password}");
properties.put(SESSION_KEYSPACE,"{keyspace}");

BeamProducer producer = Producers.newProducer(properties);

Consumer

Properties properties = new Properties();
properties.put(CONSUMER_TYPE, "SINGLE_REGION");
properties.put(PHOTON_DRIVER_CLASS, "com.homeaway.datatools.photon.driver.CassandraPhotonDriver");
properties.put(PHOTON_DESERIALIZER_CLASS, "com.homeaway.datatools.photon.serialization.JsonPhotonDeserializer");
properties.put(SESSION_CONTACT_POINTS, "XX.XXX.X.XX,XX.XXX.XX.XXX,XX.XXX.XX.XXX");
properties.put(SESSION_USER_NAME,"{username}");
properties.put(SESSION_PASSWORD,"{password}");
properties.put(SESSION_KEYSPACE,"{keyspace}");

ConsumerFactory factory = Consumers.newConsumerFactory(properties);

Features

Producer

Synchronous and Asynchronous Writes using NIO channels (no local threads)
Synchronous
PayloadObject obj = new PayloadObject();
producer.writeMessageToBeam("stream.name.here", "message.key.here", obj);
Asynchronous
PayloadObject obj = new PayloadObject();
BeamFuture future = producer.writeMessageToBeamAsync("stream.name.here", "message.key.here", obj);
if (future.get())
{
	//Successful write
}
Configurable Time To Live per Event

These examples will write events that will be automatically deleted in 30 days.

Synchronous
Duration ttl = Duration.ofDays(30);
PayloadObject obj = new PayloadObject();
producer.writeMessageToBeam("stream.name.here", "message.key.here", obj, ttl);
Asynchronous
Duration ttl = Duration.ofDays(30);
PayloadObject obj = new PayloadObject();
BeamFuture future = producer.writeMessageToBeamAsync("stream.name.here", "message.key.here", obj, ttl);
if (future.get())
{
	//Successful write
}
Future event creation.

These examples will write an event that will be received 5 days from now.

Synchronous
Instant writeTime = Instant.now().plus(5, ChronoUnit.DAYS);
PayloadObject obj = new PayloadObject();
producer.writeMessageToBeam("stream.name.here", "message.key.here", obj, writeTime);
Asynchronous
Instant writeTime = Instant.now().plus(5, ChronoUnit.DAYS);
PayloadObject obj = new PayloadObject();
BeamFuture future = producer.writeMessageToBeamAsync("stream.name.here", "message.key.here", obj, writeTime);
if (future.get())
{
	//Successful write
}

Consumer

Standard consumer handles events as they are received with user defined event handler.

Below is an example of how to setup and standard consumer that will beginning reading from the last point where this client read to.

PhotonConsumer consumer = consumerFactory.getPhotonConsumer();

consumer.putBeamForProcessing("ClientNameHere", "stream.name.here",
new PhotonMessageHandler() {

  @Override
  public void handleMessage(PhotonMessage message) {
  	//Perform some action base on the message received
  }

  @Override
  public void handleException(BeamException beamException) {
  	//Handle exception thrown by the message
  }

  @Override
  public void handleStaleMessage(PhotonMessage message) {
  	//If a message arrives here then it is because the it arrived out of order
  }
}, PhotonBeamReaderOffsetType.FROM_CURRENT);

try {
	consumer.start()
} catch (Exception e)
{

}
Events are processed in the order they are written and provides mechanism for detecting events that arrive late.
Configurable polling interval to suit different workloads and SLA demands

This example sets the polling interval to 500 milliseconds.

PhotonConsumer consumer = consumerFactory.getPhotonConsumer();
consumer.setPollingInterval(500L);

try {
	consumer.start()
} catch (Exception e)
{

}
Configurable start offset so that you can begin consuming from an exact point in time

There are 3 offset options:

The example below would read from the beginning of a stream:

PhotonConsumer consumer = consumerFactory.getPhotonConsumer();

consumer.putBeamForProcessing("ClientNameHere", "stream.name.here",
new PhotonMessageHandler() ..., PhotonBeamReaderOffsetType.FROM_BEGINNING);

try {
	consumer.start()
} catch (Exception e)
{

}

The example below will read from the last point where this client left off

PhotonConsumer consumer = consumerFactory.getPhotonConsumer();

consumer.putBeamForProcessing("ClientNameHere", "stream.name.here",
new PhotonMessageHandler() ..., PhotonBeamReaderOffsetType.FROM_CURRENT);

try {
	consumer.start()
} catch (Exception e)
{

}

The example below will read all messages starting from 3 hours ago

PhotonConsumer consumer = consumerFactory.getPhotonConsumer();

consumer.putBeamForProcessing("ClientNameHere", "stream.name.here",
new PhotonMessageHandler() ..., PhotonBeamReaderOffsetType.FROM_OFFSET,
Instant.now().minus(3, ChronoUnit.HOURS));

try {
	consumer.start()
} catch (Exception e)
{

}

The example below will read all messages starting from the Instant returned by the provided BiFunction

PhotonConsumer consumer = consumerFactory.getPhotonConsumer();

consumer.putBeamForProcessing("ClientNameHere", "stream.name.here",
new PhotonMessageHandler() ..., PhotonBeamReaderOffsetType.FROM_OFFSET,
(clientName, beamName) -> {
	//Some logic here to determine determine where to beginning reading from and returns    	//and Instant
});

try {
	consumer.start()
} catch (Exception e)
{

}
Async consumer for processing events on multiple threads while still ensuring order with guaranteed delivery

The following example will create and Async processor that will map PhotonMessage to Widget and then execute the provided Consumer<Widget> on separate threads. The events will be processed in order by stream name and message key and there will be no limit to the number of events that can be container in memory at any given time (This can be dangerous in high throughput situations where there are memory constraints).

AsyncPhotonConsumer<Widget> asyncConsumer = consumerFactory.getAsyncPhotonConsumer();

asyncConsumer.putBeamForProcessing("ClientNameHere", "stream.name.here",
	(photonMessage) -> {
    	Widget widget = new Widget();
        //Some logic to mape the PhotonMessage to a Widget
        return widget
    },
    (widget) -> {
    	//Some logic that is performed on the provided widget
    },
    PhotonBeamReaderOffsetType.FROM_CURRENT);
try {
	asyncConsumer.start()
} catch (Exception e)
{

}
Configurable memory utilization for Async consumer (limit the number of events that are kept in memory at any give time).

The following example is the same as about but it will limit the number of events that can be held in memory to 500.

AsyncPhotonConsumer<Widget> asyncConsumer = consumerFactory.getAsyncPhotonConsumer(500);

asyncConsumer.putBeamForProcessing("ClientNameHere", "stream.name.here",
	(photonMessage) -> {
    	Widget widget = new Widget();
        //Some logic to mape the PhotonMessage to a Widget
        return widget
    },
    (widget) -> {
    	//Some logic that is performed on the provided widget
    },
    PhotonBeamReaderOffsetType.FROM_CURRENT);
try {
	asyncConsumer.start()
} catch (Exception e)
{

}

photon's People

Contributors

eleduardo avatar

Watchers

 avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.