Giter VIP home page Giter VIP logo

Comments (16)

cpitman avatar cpitman commented on July 17, 2024

Additional detail would be great. Is the only difference between your environment and the vagrant environment included here the version of spark? Are there any other changes to the source code?

from spark-drools-example.

mautematico avatar mautematico commented on July 17, 2024

Theese are my changes, until now:
mautematico@5a3b201

I was trying on my local machine, with my own spark 1.6.1 installation and so on. Right now I'm waiting mvn package to finish on vagrant env. I'll report back ASAP. :D

from spark-drools-example.

mautematico avatar mautematico commented on July 17, 2024

Okay, so it is working just fine on vagrant env.
There must be something odd on my spark configuration or my debian installation that is messing with your code.

@cpitman, I'm wondering how do your loadRules() and applyRules work (I'm unfamilar with other things than KieSession.fireAllRules(), and I can't figure where is the logic to load your .drl files.

Is it okay to you if I open a new Issue (or reach you in some other way) to ask you somethings about building a KieContainer from given HashMap<Integer,String>, where each String is a .drl rule? :)

from spark-drools-example.

cpitman avatar cpitman commented on July 17, 2024

This is a pretty simplistic example. The line that loads the drl is KieContainer kieContainer = kieServices.getKieClasspathContainer();. Because we request the ClasspathContainer, any drools files on the classpath (ie in the resource folder) are loaded.

fireAllRules is used with a stateful session, but in the case of spark we really want to avoid state since we're actually running these rules across many processes, each with independent sessions. When you use a stateless session, you create a Command object that will insert every fact needed for a single run then call session.execute(command). This will create a new session, insert the requested facts, run the rules to completion, and then destroy the session. Every time we run it is against a clean state. You could do this manually, but stateless sessions make it easy.

Some samples of the different ways to load rules can be found in the drools/drools-example-api repository. The closest example is this one: https://github.com/droolsjbpm/drools/blob/master/drools-examples-api/kie-module-from-multiple-files/src/main/java/org/drools/example/api/kiemodulefrommultiplefiles/KieModuleFromMultipleFilesExample.java

A lot of that code is probably more complicated than you need. Basically, create a File object that points at your drl and build a module based on them.

from spark-drools-example.

terminatur avatar terminatur commented on July 17, 2024

I ran into this issue as well. It turned out the default java serializer could handle the KieBase object, but the Kryo serializer cannot.

from spark-drools-example.

freedev avatar freedev commented on July 17, 2024

I'm running in this issue too.
Have you found a way to serialise KieBase with Kryo?

from spark-drools-example.

terminatur avatar terminatur commented on July 17, 2024

@freedev I did not find a way to serialize the KieBase using the Kryo serializer. My work around is to instantiate the KieBase on each executor. I then have each executor download the rules from a Nexus repository. Probably not ideal, but works for now until I get time to force use of the Java serializer for just the KieBase

from spark-drools-example.

freedev avatar freedev commented on July 17, 2024

@terminatur. I've configured Kryo except for the KieBase object, which is still using the JavaSerializer. This is my current configuration:

class MyKryoRegistrator() extends KryoRegistrator {
  @Override
  def registerClasses(kryo: Kryo) = {
    // -- here there were other classes registered
    kryo.register(classOf[KnowledgeBaseImpl], new JavaSerializer())
    kryo.register(classOf[KieBase], new JavaSerializer())
  }
}

And I've added the following options to my SparkConf instance

  .set("spark.kryo.registrator", "it.my.package.MyKryoRegistrator")
  .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  .set("spark.kryoserializer.buffer.mb", "24")

from spark-drools-example.

terminatur avatar terminatur commented on July 17, 2024

@freedev I have done exactly what you have shown, but run into:

java.io.IOException: com.esotericsoftware.kryo.KryoException: Error during Java deserialization.
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1283)
    at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:174)
    at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:65)
    at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:65)
    at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:89)
    at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
    at rules.SparkRulesEngine$$anonfun$1.apply(SparkRulesEngine.scala:125)
    at rules.SparkRulesEngine$$anonfun$1.apply(SparkRulesEngine.scala:123)
    at org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$6.apply(objects.scala:178)
    at org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$6.apply(objects.scala:175)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:332)
    at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:330)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:935)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:926)
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:926)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:670)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
    at org.apache.spark.scheduler.Task.run(Task.scala:86)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)
Caused by: com.esotericsoftware.kryo.KryoException: Error during Java deserialization.
    at com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:65)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:790)
    at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:229)
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$10.apply(TorrentBroadcast.scala:254)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1310)
    at org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:255)
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:189)
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1276)
    ... 44 more
Caused by: java.lang.ClassNotFoundException: org.drools.core.impl.KnowledgeBaseImpl
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)
    at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:677)
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1826)
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2000)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
    at com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:63)

Did you run into this at all?
I have posted this at: https://stackoverflow.com/questions/44960492/user-kryoserializer-alongside-javaserializer-in-spark-2

from spark-drools-example.

darshanpandya-kogentix avatar darshanpandya-kogentix commented on July 17, 2024

Hello,
I am trying to run this code in spark cluster v1.6.0 in yarn mode with JDK 1.8 and I am facing the This code runs fine in spark local mode but fails horribly with the above exception when you submit the job in the yarn mode.

spark-submit --class com.awesome.App --master yarn ./SparkDroolsExample-1.0-SNAPSHOT.jar

17/08/17 19:07:44 INFO scheduler.DAGScheduler: ResultStage 0 (count at App.java:38) failed in 12.653 s due to Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 6, ip-172-31-11-240.ec2.internal, executor 4): java.lang.UnsupportedClassVersionError: com/awesome/Applicant : Unsupported major.minor version 52.0 at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:800) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:449) at java.net.URLClassLoader.access$100(URLClassLoader.java:71) at java.net.URLClassLoader$1.run(URLClassLoader.java:361) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:270) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:68) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:500) at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply$mcV$sp(ParallelCollectionRDD.scala:74) at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply(ParallelCollectionRDD.scala:70) at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply(ParallelCollectionRDD.scala:70) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1207) at org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:70) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:222) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745)

Anyone facing similar issue ?

from spark-drools-example.

melletdhillon avatar melletdhillon commented on July 17, 2024

Hello,
I am facing similar issue. I am executing code in spark cluster v2.0.0 in yarn mode with JSE 1.8.

Is anyone able to resolve this issue then please reply with solution.

Error log: -

Exception in thread "main" com.esotericsoftware.kryo.KryoException: java.util.ConcurrentModificationException
Serialization trace:
classes (sun.misc.Launcher$AppClassLoader)
classloader (java.security.ProtectionDomain)
context (java.security.AccessControlContext)
acc (org.apache.spark.util.MutableURLClassLoader)
cacheByClassLoader (org.drools.core.base.ClassFieldAccessorCache)
classFieldAccessorCache (org.drools.core.impl.KnowledgeBaseImpl)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:101)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:518)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:552)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:80)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:518)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:366)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:307)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:552)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:80)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:518)
at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:606)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:87)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:518)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628)
at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:106)
at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:39)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:552)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:80)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:518)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:552)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:80)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:518)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628)
at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:195)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$blockifyObject$2.apply(TorrentBroadcast.scala:236)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$blockifyObject$2.apply(TorrentBroadcast.scala:236)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1310)
at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:237)
at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:107)
at org.apache.spark.broadcast.TorrentBroadcast.(TorrentBroadcast.scala:86)
at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:56)
at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1393)
at org.apache.spark.api.java.JavaSparkContext.broadcast(JavaSparkContext.scala:646)
at org.sssihl.license.SparkDroolLicenseExample.main(SparkDroolLicenseExample.java:46)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:736)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.util.ConcurrentModificationException

from spark-drools-example.

rmangesh1 avatar rmangesh1 commented on July 17, 2024

any solutions with Kyro serializer? facing the same issue

from spark-drools-example.

annisun avatar annisun commented on July 17, 2024

i have solved this issue,i use spark-submit run my maven jar as well.
this is my config:
.config("spark.serializer", "org.apache.spark.serializer.JavaSerializer")

from spark-drools-example.

rahulchanda123 avatar rahulchanda123 commented on July 17, 2024

I am yet facing the issue. I tried with
.config("spark.serializer", "org.apache.spark.serializer.JavaSerializer")

Exception - Exception in thread "main" com.esotericsoftware.kryo.KryoException: java.util.ConcurrentModificationException
Serialization trace:
classes (sun.misc.Launcher$AppClassLoader)
classloader (java.security.ProtectionDomain)
context (java.security.AccessControlContext)
acc (org.apache.spark.util.MutableURLClassLoader)
cacheByClassLoader (org.drools.core.base.ClassFieldAccessorCache)
classFieldAccessorCache (org.drools.core.impl.KnowledgeBaseImpl)

from spark-drools-example.

ganesh-shinde123 avatar ganesh-shinde123 commented on July 17, 2024

Hi @rahulchanda123 ,
Please set config before/while creating the spark session. It will not work work if you are setting the config after session creation.

from spark-drools-example.

ganesh-shinde123 avatar ganesh-shinde123 commented on July 17, 2024

Please check below stack overflow link for solution :'

https://stackoverflow.com/questions/44635932/spark-drools-how-to-serialize-kiebase-with-kryo/68256676#68256676

from spark-drools-example.

Related Issues (3)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.