Comments (16)
Additional detail would be great. Is the only difference between your environment and the vagrant environment included here the version of spark? Are there any other changes to the source code?
from spark-drools-example.
Theese are my changes, until now:
mautematico@5a3b201
I was trying on my local machine, with my own spark 1.6.1 installation and so on. Right now I'm waiting mvn package
to finish on vagrant env. I'll report back ASAP. :D
from spark-drools-example.
Okay, so it is working just fine on vagrant env.
There must be something odd on my spark configuration or my debian installation that is messing with your code.
@cpitman, I'm wondering how do your loadRules()
and applyRules
work (I'm unfamilar with other things than KieSession.fireAllRules()
, and I can't figure where is the logic to load your .drl files.
Is it okay to you if I open a new Issue (or reach you in some other way) to ask you somethings about building a KieContainer from given HashMap<Integer,String>, where each String is a .drl rule? :)
from spark-drools-example.
This is a pretty simplistic example. The line that loads the drl is KieContainer kieContainer = kieServices.getKieClasspathContainer();
. Because we request the ClasspathContainer, any drools files on the classpath (ie in the resource folder) are loaded.
fireAllRules
is used with a stateful session, but in the case of spark we really want to avoid state since we're actually running these rules across many processes, each with independent sessions. When you use a stateless session, you create a Command object that will insert every fact needed for a single run then call session.execute(command)
. This will create a new session, insert the requested facts, run the rules to completion, and then destroy the session. Every time we run it is against a clean state. You could do this manually, but stateless sessions make it easy.
Some samples of the different ways to load rules can be found in the drools/drools-example-api repository. The closest example is this one: https://github.com/droolsjbpm/drools/blob/master/drools-examples-api/kie-module-from-multiple-files/src/main/java/org/drools/example/api/kiemodulefrommultiplefiles/KieModuleFromMultipleFilesExample.java
A lot of that code is probably more complicated than you need. Basically, create a File object that points at your drl and build a module based on them.
from spark-drools-example.
I ran into this issue as well. It turned out the default java serializer could handle the KieBase object, but the Kryo serializer cannot.
from spark-drools-example.
I'm running in this issue too.
Have you found a way to serialise KieBase with Kryo?
from spark-drools-example.
@freedev I did not find a way to serialize the KieBase using the Kryo serializer. My work around is to instantiate the KieBase on each executor. I then have each executor download the rules from a Nexus repository. Probably not ideal, but works for now until I get time to force use of the Java serializer for just the KieBase
from spark-drools-example.
@terminatur. I've configured Kryo except for the KieBase
object, which is still using the JavaSerializer
. This is my current configuration:
class MyKryoRegistrator() extends KryoRegistrator {
@Override
def registerClasses(kryo: Kryo) = {
// -- here there were other classes registered
kryo.register(classOf[KnowledgeBaseImpl], new JavaSerializer())
kryo.register(classOf[KieBase], new JavaSerializer())
}
}
And I've added the following options to my SparkConf
instance
.set("spark.kryo.registrator", "it.my.package.MyKryoRegistrator")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryoserializer.buffer.mb", "24")
from spark-drools-example.
@freedev I have done exactly what you have shown, but run into:
java.io.IOException: com.esotericsoftware.kryo.KryoException: Error during Java deserialization.
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1283)
at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:174)
at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:65)
at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:65)
at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:89)
at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
at rules.SparkRulesEngine$$anonfun$1.apply(SparkRulesEngine.scala:125)
at rules.SparkRulesEngine$$anonfun$1.apply(SparkRulesEngine.scala:123)
at org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$6.apply(objects.scala:178)
at org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$6.apply(objects.scala:175)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:332)
at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:330)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:935)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:926)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:926)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:670)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.esotericsoftware.kryo.KryoException: Error during Java deserialization.
at com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:65)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:790)
at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:229)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$10.apply(TorrentBroadcast.scala:254)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1310)
at org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:255)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:189)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1276)
... 44 more
Caused by: java.lang.ClassNotFoundException: org.drools.core.impl.KnowledgeBaseImpl
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:677)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1826)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
at com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:63)
Did you run into this at all?
I have posted this at: https://stackoverflow.com/questions/44960492/user-kryoserializer-alongside-javaserializer-in-spark-2
from spark-drools-example.
Hello,
I am trying to run this code in spark cluster v1.6.0 in yarn mode with JDK 1.8 and I am facing the This code runs fine in spark local mode but fails horribly with the above exception when you submit the job in the yarn mode.
spark-submit --class com.awesome.App --master yarn ./SparkDroolsExample-1.0-SNAPSHOT.jar
17/08/17 19:07:44 INFO scheduler.DAGScheduler: ResultStage 0 (count at App.java:38) failed in 12.653 s due to Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 6, ip-172-31-11-240.ec2.internal, executor 4): java.lang.UnsupportedClassVersionError: com/awesome/Applicant : Unsupported major.minor version 52.0 at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:800) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:449) at java.net.URLClassLoader.access$100(URLClassLoader.java:71) at java.net.URLClassLoader$1.run(URLClassLoader.java:361) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:270) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:68) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:500) at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply$mcV$sp(ParallelCollectionRDD.scala:74) at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply(ParallelCollectionRDD.scala:70) at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply(ParallelCollectionRDD.scala:70) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1207) at org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:70) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:222) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745)
Anyone facing similar issue ?
from spark-drools-example.
Hello,
I am facing similar issue. I am executing code in spark cluster v2.0.0 in yarn mode with JSE 1.8.
Is anyone able to resolve this issue then please reply with solution.
Error log: -
Exception in thread "main" com.esotericsoftware.kryo.KryoException: java.util.ConcurrentModificationException
Serialization trace:
classes (sun.misc.Launcher$AppClassLoader)
classloader (java.security.ProtectionDomain)
context (java.security.AccessControlContext)
acc (org.apache.spark.util.MutableURLClassLoader)
cacheByClassLoader (org.drools.core.base.ClassFieldAccessorCache)
classFieldAccessorCache (org.drools.core.impl.KnowledgeBaseImpl)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:101)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:518)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:552)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:80)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:518)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:366)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:307)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:552)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:80)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:518)
at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:606)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:87)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:518)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628)
at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:106)
at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:39)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:552)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:80)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:518)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:552)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:80)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:518)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628)
at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:195)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$blockifyObject$2.apply(TorrentBroadcast.scala:236)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$blockifyObject$2.apply(TorrentBroadcast.scala:236)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1310)
at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:237)
at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:107)
at org.apache.spark.broadcast.TorrentBroadcast.(TorrentBroadcast.scala:86)
at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:56)
at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1393)
at org.apache.spark.api.java.JavaSparkContext.broadcast(JavaSparkContext.scala:646)
at org.sssihl.license.SparkDroolLicenseExample.main(SparkDroolLicenseExample.java:46)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:736)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.util.ConcurrentModificationException
from spark-drools-example.
any solutions with Kyro serializer? facing the same issue
from spark-drools-example.
i have solved this issue,i use spark-submit run my maven jar as well.
this is my config:
.config("spark.serializer", "org.apache.spark.serializer.JavaSerializer")
from spark-drools-example.
I am yet facing the issue. I tried with
.config("spark.serializer", "org.apache.spark.serializer.JavaSerializer")
Exception - Exception in thread "main" com.esotericsoftware.kryo.KryoException: java.util.ConcurrentModificationException
Serialization trace:
classes (sun.misc.Launcher$AppClassLoader)
classloader (java.security.ProtectionDomain)
context (java.security.AccessControlContext)
acc (org.apache.spark.util.MutableURLClassLoader)
cacheByClassLoader (org.drools.core.base.ClassFieldAccessorCache)
classFieldAccessorCache (org.drools.core.impl.KnowledgeBaseImpl)
from spark-drools-example.
Hi @rahulchanda123 ,
Please set config before/while creating the spark session. It will not work work if you are setting the config after session creation.
from spark-drools-example.
Please check below stack overflow link for solution :'
from spark-drools-example.
Related Issues (3)
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from spark-drools-example.