Giter VIP home page Giter VIP logo

spark-jms's Introduction

JMS Compatible DataSource for Apache Spark(beta)

spark-jms is a spark dataframe reader and writer for JMS(Java Messaging Service) compatible sources .This connector works in both streaming and batch mode. It works directly or indirectly with mostly also the sources which has JMS providers implemented for them.

Usage

Currently spark-jms is not available in maven central or any other public repository.This is marked as a TODO for me.

spark-jms is used normally like other connectors just like kakfa.

For using it Clone this repo on your system.Ensure maven is installed on your system for building it.

git clone https://github.com/jksinghpro/spark-jms.git

Go to root directory of the project and run

mvn clean install

Once the project is build succesfully,either add it as a dependency to your spark application and add its jars to the spark classpath. You can also pass the jar to the spark submit using option

spark-submit --jars <jar_path>

Extensiblity

This connector directly supports ActiveMq, RabbitMq, Kafka (Will go on adding more direct support soon). However indirectly it supports all the messaging queues or sources which have JMS clients. Only requirement is that those JMS connection factory implementation for those sources must be in spark classpath along with implementation to the following trait.

jk.bigdata.tech.jms.ConnectionFactoryProvider

This trait has an abstract method

def createConnection(options:Map[String,String]):ConnectionFactory

which returns an dynamic instance of ConnectionFactory for the source under consideration. For examples on implementing this trait . Refer here

Sample Usage

Sample code for usage of spark-jms connector

Batch

//Active MQ
val dataframeAmq = spark.read
      .format("jk.bigdata.tech.jms")
      .option("connection","amq")
      .load
      
//Rabbit MQ      
val dataframeRmq = spark.read
      .format("jk.bigdata.tech.jms")
      .option("connection","rmq")
      .option("queue","<<queue_name>>")
      .option("username","<<username>>")
      .option("password","<<password>>")
      .option("virtualhost","<<virtual_host>>")
      .option("host","<<host>>")
      .option("port","<<port>>")
      .load
      

 //Kafka
 val dataframeKafka = spark.read
       .format("jk.bigdata.tech.jms")
       .option("connection","kafka")
       .option("bootstrap.servers","<<bootstrap-servers>>")
       .option("zookeeper.connect","<<zookeeper-host>>")
       .option("client.id","<<client id>>")
       .option("queue","<<queue_name>>")
       .load
       
 dataframe.write
       .mode(SaveMode.Append)
       .format("jk.bigdata.tech.jms")
       .option("connection","jk.bigdata.tech.jms.AMQConnectionFactoryProvider")
       .option("queue","<<queue name>>")
       .save
       
 dataframe.write
        .mode(SaveMode.Append)
        .format("jk.bigdata.tech.jms")
        .option("connection","rmq")
        .option("queue","<<queue name>>")
        .save
 
 dataframe.write
        .mode(SaveMode.Append)
        .format("jk.bigdata.tech.jms")
        .option("connection","kafka")
        .option("queue","<<queue name>>")
        .save

Streaming

//Active MQ
val dataframeAmq = spark.readStream
      .format("jk.bigdata.tech.jms")
      .option("connection","amq")
      .load
      
//Rabbit MQ      
val dataframeRmq = spark.readStream
      .format("jk.bigdata.tech.jms")
      .option("connection","rmq")
      .option("queue","<<queue_name>>")
      .option("username","<<username>>")
      .option("password","<<password>>")
      .option("virtualhost","<<virtual_host>>")
      .option("host","<<host>>")
      .option("port","<<port>>")
      .load
      

 //Kafka
 val dataframeKafka = spark.readStream
       .format("jk.bigdata.tech.jms")
       .option("connection","kafka")
       .option("bootstrap.servers","<<bootstrap-servers>>")
       .option("zookeeper.connect","<<zookeeper-host>>")
       .option("client.id","<<client id>>")
       .option("queue","<<queue_name>>")
       .load
       
       

Configuration Paramters(Common) :

Option Required Description DefaultValue
Queue true Name of topic or messaging queue None
acknowledge false To acknowledge the message or not false
connection true Fully Qualified Name of ConnectionFactoryProviderImplementation or Alias of directly supported queues None

Note Above configurations are common across jms clients .Other than these configs some configs are specific to JMS client under consideration

Issues

License

The project is licensed under the Apache 2 license.

spark-jms's People

Contributors

jksinghpro avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar

Forkers

yanjie666

spark-jms's Issues

invalid stream error

I am getting error while reading stream as
Exception in thread "main" com.rabbitmq.jms.util.RMQJMSException: invalid stream header: 48656C6C
at com.rabbitmq.jms.client.RMQMessage.fromMessage(RMQMessage.java:1128)
at com.rabbitmq.jms.client.RMQMessage.convertJmsMessage(RMQMessage.java:901)
at com.rabbitmq.jms.client.RMQMessage.convertMessage(RMQMessage.java:895)
at com.rabbitmq.jms.client.RMQMessageConsumer.receive(RMQMessageConsumer.java:354)
at com.rabbitmq.jms.client.RMQMessageConsumer.receive(RMQMessageConsumer.java:268)
at jk.bigdata.tech.jms.JmsDatasourceRelation.buildScan(JmsDataSourceRelation.scala:39)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy.apply(DataSourceStrategy.scala:308)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:63)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:63)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.