Giter VIP home page Giter VIP logo

isarn-sketches-spark's Introduction

isarn

isarn-sketches-spark's People

Contributors

erikerlandson avatar jonathantaws avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar

isarn-sketches-spark's Issues

java.lang.NoClassDefFoundError

Hi,
I'm trying out this package, but consistently got this NoClassDefFoundError.
I'm using Spark 2.3, Scala 2.11, and tested in Spark shell. My Python version is 3.7.
spark-shell --jars /tmp/isarn-sketches-spark_2.11-0.3.1-sp2.3-py3.6.jar

scala> import org.isarnproject.sketches.udaf., org.apache.spark.isarnproject.sketches.udt.
import org.isarnproject.sketches.udaf._
import org.apache.spark.isarnproject.sketches.udt._

scala> val udaf = tdigestUDAF[Double]
java.lang.NoClassDefFoundError: org/isarnproject/sketches/TDigest$
at org.isarnproject.sketches.udaf.package$.tdigestUDAF(package.scala:43)
... 53 elided
Caused by: java.lang.ClassNotFoundException: org.isarnproject.sketches.TDigest$
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 54 more

Would anyone be able to help?
Thanks very much in advance!

Richard

TDigest __repr__ not in line with constructor

I am trying to save the TDigest object (in Python) to a format that I can use to recreate it. In the past (version isarn-sketches-spark_2.11:0.3.1-sp2.2-py2.7), I was able to access the below parameters and save these, and then recreate a TDigest by calling the constructor with those parameters.
https://github.com/isarn/isarn-sketches-spark/blob/v0.3.1/python/isarnproject/sketches/udt/tdigest.py#L115

With the latest version, aside from the renaming of some of parameters, the constructor for TDigest does not accept the same parameters:

def __init__(self, compression, maxDiscrete, cent, mass):

The __repr__ representation includes the nclusters parameter, which is not in the constructor signature (rightfully), meaning I can't use the __repr__ string to construct a new object (e.g. by using eval(repr(tdigest))) without some hacking around.

def __repr__(self):
return "TDigest(%s, %s, %s, %s, %s)" % \
(repr(self.compression), repr(self.maxDiscrete), repr(self.nclusters), repr(self._cent), repr(self._mass))

TDigestSQL results from the TDigestUDAF cannot themselves be aggregated in Spark SQL.

I had second-level data aggregated to minute level using a TDigest, and wanted to further aggregate the persisted data frame to hour level and could not do so as there is no way to aggregate the TDigestSQL objects themselves in an aggregate function in Spark SQL.

To work around this, I copied and tweaked the TDigestUDAF which allowed me to aggregate TDigestSQL instances resulting from the first TDigestUDAF. Using this, you can get as high level as you need, re-aggregating the TDigest results.


  // A t-digest is deterministic, but it is only statistically associative or commutative
  // and spark will merge partition results in nondeterministic order. That makes
  // the result of the aggregation statistically "deterministic" but not strictly so.
  def deterministic: Boolean = false

  def inputSchema: StructType = StructType(StructField("tdigest", TDigestUDT) :: Nil)

  def bufferSchema: StructType = StructType(StructField("tdigest", TDigestUDT) :: Nil)

  def dataType: DataType = TDigestUDT

  def initialize(buf: MutableAggregationBuffer): Unit = {
    buf(0) = TDigestSQL(TDigest.empty(deltaV, maxDiscreteV))
  }

  //Since every value will be a TDigestSQL, update acts just like merge and combines TDigestSQL
  //objects by combining their inner TDigests.
  def update(buf: MutableAggregationBuffer, input: Row): Unit = {
    if (!input.isNullAt(0)) {
      buf(0) = TDigestSQL(buf.getAs[TDigestSQL](0).tdigest ++ input.getAs[TDigestSQL](0).tdigest)
    }
  }

  def merge(buf1: MutableAggregationBuffer, buf2: Row): Unit = {
    buf1(0) = TDigestSQL(buf1.getAs[TDigestSQL](0).tdigest ++ buf2.getAs[TDigestSQL](0).tdigest)
  }

  def evaluate(buf: Row): Any = buf.getAs[TDigestSQL](0)
}

Might be a useful addition to the library; though you may have a better way of achieving the same :).

Python: can't serialize/pickle TDigest due to error in __reduce__

If you have a TDigest, for example:

from isarnproject.sketches.udaf.tdigest import *
from random import gauss
from pyspark.sql.types import *
data = sc.parallelize([[gauss(0,1)] for x in xrange(1000)]).toDF(StructType([StructField("x", DoubleType())]))
agg = data.agg(tdigestDoubleUDAF("x"))
td = agg.first()[0]

And you go ahead to broadcast it to the executors:
sc.broadcast(td)

The following error appears:

Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/broadcast.py", line 83, in dump
    pickle.dump(value, f, 2)
  File "python/isarnproject/sketches/udt/tdigest.py", line 144, in __reduce__
AttributeError: 'int' object has no attribute 'self'
Traceback (most recent call last):
  File "/tmp/zeppelin_pyspark-631244049223455329.py", line 367, in <module>
    raise Exception(traceback.format_exc())
Exception: Traceback (most recent call last):
  File "/tmp/zeppelin_pyspark-631244049223455329.py", line 360, in <module>
    exec(code, _zcUserQueryNameSpace)
  File "<stdin>", line 1, in <module>
  File "/usr/lib/spark/python/pyspark/context.py", line 802, in broadcast
    return Broadcast(self, value, self._pickled_broadcast_vars)
  File "/usr/lib/spark/python/pyspark/broadcast.py", line 74, in __init__
    self._path = self.dump(value, f)
  File "/usr/lib/spark/python/pyspark/broadcast.py", line 90, in dump
    raise pickle.PicklingError(msg)
PicklingError: Could not serialize broadcast: AttributeError: 'int' object has no attribute 'self'

This looks like it's due to a bug in the code of the TDigest class reduce method that is used for serialization, where there is a missing comma after maxDiscrete : https://github.com/isarn/isarn-sketches-spark/blob/develop/python/isarnproject/sketches/udt/tdigest.py#L144

Python: cdfInverse results in wrong order of values on monotonic distribution with large ranges

When using cdfInverse on a T-Digest created from a dataset, I get the following:
screen shot 2018-08-01 at 2 24 34 pm

On this graph, I have a distribution of values with their probability on the y axis and on the x axis the actual values. I generate the value using cdfInverse, as follow:

xs = [td.cdfInverse(i/1000.) for i in range(1001)]
ys = [i/1000. for i in range(1001)]

When I dive deeper into the distribution, I can see that, even though my distribution should be monotonically increasing, I get some values in xs that are unordered, and thus I get the following result (look at 1k, just before 4k, and after 8k):
screen shot 2018-08-01 at 2 25 09 pm

My assumption was that I would get only increasing values in my xs when generating them from the cdfInverse method, as I am increasing the value of the probability/percentile rank when looping.
A workaround for now is to generate the values, order them, and then call cdf on the ordered values, but it adds extra steps and I'm not sure this is the right method.

To give more example, here are the results of the following:

print(td.cdf(8517.442))
>> 0.6443371631522132
print(td.cdfInverse(0.629))
>> 8517.442135224697
print(td.cdfInverse(0.644))
>> 8509.811889971521

I would expect td.cdfInverse(0.629) to give a smaller value than td.cdfInverse(0.644)(as the probability of the former is smaller than the latter).

Potential secutiry vulnerability in the shared library zstd. Can you help upgrade to patch versions?

Hi, @erikerlandson , @JonathanTaws , I'd like to report a vulnerable dependency in org.isarnproject:isarn-sketches-spark_2.12:0.5.2-sp3.0.

Issue Description

I noticed that org.isarnproject:isarn-sketches-spark_2.12:0.5.2-sp3.0 directly depends on org.apache.spark:spark-core_2.12:3.0.1 in the pom. However, as shown in the following dependency graph, org.apache.spark:spark-core_2.12:3.0.1 sufferes from the vulnerability which the C library zstd(version:1.4.4) exposed: CVE-2021-24032.

Dependency Graph between Java and Shared Libraries

image (12)

Suggested Vulnerability Patch Versions

org.apache.spark:spark-core_2.12:3.2.0 (>=3.2.0) has upgraded this vulnerable C library zstd to the patch version 1.5.0.

Java build tools cannot report vulnerable C libraries, which may induce potential security issues to many downstream Java projects. Could you please upgrade this vulnerable dependency?

Thanks for your help~
Best regards,
Helen Parr

Unsafe Symbol TDigestSQL

Hi

I was able to run my script in spark-shell, but when I submitted the job to run, it failed. The error message contains the following information:

Exception in thread "main" java.lang.AssertionError: assertion failed: unsafe symbol TDigestSQL (child of <none>) in runtime reflection universe at scala.reflect.internal.Symbols$Symbol.<init>(Symbols.scala:205) at scala.reflect.internal.Symbols$TypeSymbol.<init>(Symbols.scala:3030) at scala.reflect.internal.Symbols$ClassSymbol.<init>(Symbols.scala:3222) at scala.reflect.internal.Symbols$StubClassSymbol.<init>(Symbols.scala:3522) at scala.reflect.internal.Symbols$class.newStubSymbol(Symbols.scala:191) at scala.reflect.internal.SymbolTable.newStubSymbol(SymbolTable.scala:16) at scala.reflect.internal.Symbols$Symbol.newStubSymbol(Symbols.scala:521) at scala.reflect.internal.pickling.UnPickler$Scan.readExtSymbol$1(UnPickler.scala:258) at scala.reflect.internal.pickling.UnPickler$Scan.readSymbol(UnPickler.scala:286) at scala.reflect.internal.pickling.UnPickler$Scan.readSymbolRef(UnPickler.scala:651) at scala.reflect.internal.pickling.UnPickler$Scan.readType(UnPickler.scala:419) at scala.reflect.internal.pickling.UnPickler$Scan$LazyTypeRef$$anonfun$7.apply(UnPickler.scala:734) at scala.reflect.internal.pickling.UnPickler$Scan$LazyTypeRef$$anonfun$7.apply(UnPickler.scala:734) at scala.reflect.internal.pickling.UnPickler$Scan.at(UnPickler.scala:179) at scala.reflect.internal.pickling.UnPickler$Scan$LazyTypeRef.completeInternal(UnPickler.scala:734) at scala.reflect.internal.pickling.UnPickler$Scan$LazyTypeRef.complete(UnPickler.scala:761) at scala.reflect.internal.Symbols$Symbol.info(Symbols.scala:1535) at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$14.scala$reflect$runtime$SynchronizedSymbols$SynchronizedSymbol$$super$info(SynchronizedSymbols.scala:195) at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anonfun$info$1.apply(SynchronizedSymbols.scala:127) at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anonfun$info$1.apply(SynchronizedSymbols.scala:127) at scala.reflect.runtime.Gil$class.gilSynchronized(Gil.scala:19) at scala.reflect.runtime.JavaUniverse.gilSynchronized(JavaUniverse.scala:16) at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$class.gilSynchronizedIfNotThreadsafe(SynchronizedSymbols.scala:123) at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$14.gilSynchronizedIfNotThreadsafe(SynchronizedSymbols.scala:195) at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$class.info(SynchronizedSymbols.scala:127) at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$14.info(SynchronizedSymbols.scala:195) at scala.reflect.internal.Symbols$SymbolContextApiImpl.typeSignature(Symbols.scala:160) at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$14.scala$reflect$runtime$SynchronizedSymbols$SynchronizedSymbol$$super$typeSignature(SynchronizedSymbols.scala:195) at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anonfun$typeSignature$1.apply(SynchronizedSymbols.scala:129) at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anonfun$typeSignature$1.apply(SynchronizedSymbols.scala:129) at scala.reflect.runtime.Gil$class.gilSynchronized(Gil.scala:19) at scala.reflect.runtime.JavaUniverse.gilSynchronized(JavaUniverse.scala:16) at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$class.gilSynchronizedIfNotThreadsafe(SynchronizedSymbols.scala:123) at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$14.gilSynchronizedIfNotThreadsafe(SynchronizedSymbols.scala:195) at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$class.typeSignature(SynchronizedSymbols.scala:129) at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$14.typeSignature(SynchronizedSymbols.scala:195) at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$14.typeSignature(SynchronizedSymbols.scala:195) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$getConstructorParameters$2.apply(ScalaReflection.scala:974) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$getConstructorParameters$2.apply(ScalaReflection.scala:973) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.List.foreach(List.scala:392) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.immutable.List.map(List.scala:296) at org.apache.spark.sql.catalyst.ScalaReflection$class.getConstructorParameters(ScalaReflection.scala:973) at org.apache.spark.sql.catalyst.ScalaReflection$.getConstructorParameters(ScalaReflection.scala:46) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:631) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:452) at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56) at org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:906) at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:46) at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:452) at org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor(ScalaReflection.scala:441) at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:71) at org.apache.spark.sql.Encoders$.product(Encoders.scala:275) at org.apache.spark.sql.LowPrioritySQLImplicits$class.newProductEncoder(SQLImplicits.scala:248) at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:34) at MultiTdigestToPostgres$.main(multi_feature_Tdigest.scala.scala:414) at MultiTdigestToPostgres.main(multi_feature_Tdigest.scala.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:849) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:924) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:933) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 2018-12-13 18:22:46 INFO SparkContext:54 - Invoking stop() from shutdown hook
Any clue of what had caused that? Any help would be greatly appreciated.

Richard

Python: cdfInverse does not accept a value of 1 as q

The cdfInverse method is supposed to accept qq values in range [0,1] as explained in the doc (note that the doc mentions q, while qq is the actual parameter used):

def cdfInverse(self, qq):
"""
Given a value q on [0,1], return the value x such that CDF(x) = q.
Returns NaN for any q > 1 or < 0, or if this TDigest is empty.
"""

However, if I give the value 1, I get an IndexError: list index out of range. The error comes from here:

How to reproduce (using the example on the github page https://github.com/isarn/isarn-sketches-spark#sketch-a-numeric-column-python):

from random import gauss, randint
from isarnproject.sketches.spark.tdigest import *
data = spark.createDataFrame([[randint(1,10),gauss(0,1)] for x in range(1000)])
udf1 = tdigestIntUDF("_1", maxDiscrete = 25)
udf2 = tdigestDoubleUDF("_2", compression = 0.5)
agg = data.agg(udf1, udf2).first()

td = agg[0]
td.cdfInverse(1)

Result:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/mnt/tmp/spark-2bfe3869-9322-41af-b0e2-5a289d198492/userFiles-ebc89fbe-3b18-4851-9923-cc04eca13d6d/org.isarnproject_isarn-sketches-spark_2.12-0.5.0-sp3.0.jar/isarnproject/sketches/spark/tdigest.py", line 318, in cdfInverse
IndexError: list index out of range

This code runs isarn-sketches-spark_2.12:0.5.0-sp3.0. However, in version isarn-sketches-spark_2.11:0.3.1-sp2.2-py2.7, the cdfInverse used to accept a value of 1 and run with it.

Can't use TDigestUDT in spark UDF to get cdf

I am currently using TDigests to obtain cdf and inverse cdfs. I have a DataFrame that now contains TDigests via TDigestSQL. However i want to be able to get back the CDF etc via the Spark SQL DSL api.

So to do that I am using this:

  val tDigestCdf: UserDefinedFunction =
    UserDefinedFunction(getCDFFromTDigest _, DoubleType, Some(Seq(TDigestUDT, DoubleType)))
      .withName("getCDFFromTDigest")

  def getCDFFromTDigest(tdigestsql: TDigestSQL, percent: Double): Double = {
    tdigestsql.tdigest.cdf(percent)
  }

Then to use this i do:

 dataFrame.select(  tDigestCdf(col("mydigest))  )

But unfortunately i am then getting this error:

Wrapped by: java.util.concurrent.ExecutionException: java.lang.RuntimeException: java.lang.ClassCastException: MySparkFunctions$$anonfun$3 cannot be cast to scala.Function1
org.apache.spark.sql.catalyst.expressions.ScalaUDF.<init>(ScalaUDF.scala:98)
org.apache.spark.sql.expressions.UserDefinedFunction.apply(UserDefinedFunction.scala:71)

Should it be possible to use the provided UDT in this fashion? I read something about UDT's not being exposed any more but that seems a bit weird to me.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.