Giter VIP home page Giter VIP logo

spark-mdlp-discretization's Introduction

Minimum Description Length Discretizer

This method implements Fayyad's discretizer [1] based on Minimum Description Length Principle (MDLP) in order to treat non discrete datasets from a distributed perspective. We have developed a distributed version from the original one performing some important changes.

Spark package: http://spark-packages.org/package/sramirez/spark-MDLP-discretization

Please cite as:

Ramírez‐Gallego Sergio, García Salvador, Mouriño‐Talín Héctor, Martínez‐Rego David, Bolón‐Canedo Verónica, Alonso‐Betanzos Amparo, Benítez José Manuel, Herrera Francisco. "Data discretization: taxonomy and big data challenge". WIREs Data Mining Knowl Discov 2016, 6: 5-21. doi: 10.1002/widm.1173

Improvements:

  • Version for ml library.
  • Support for sparse data.
  • Multi-attribute processing. The whole process is carried out in a single step when the number of boundary points per attribute fits well in one partition (<= 100K boundary points per attribute).
  • Support for attributes with a huge number of boundary points (> 100K boundary points per attribute). Rare case.

This software has been proved with two large real-world datasets such as:

Design doc: https://docs.google.com/document/d/1HOaPL_HJzTbL2tVdzbTjhr5wxVvPe9e-23S7rc2VcsY/edit?usp=sharing

Example (ml):

import org.apache.spark.ml.feature._

val discretizer = new MDLPDiscretizer()
    .setMaxBins(10)
    .setMaxByPart(10000)
    .setInputCol("features")
    .setLabelCol("class")
    .setOutputCol("buckedFeatures")
  
val result = discretizer.fit(df).transform(df)

Example (MLLIB):

import org.apache.spark.mllib.feature.MDLPDiscretizer

val categoricalFeat: Option[Seq[Int]] = None
val nBins = 25
val maxByPart = 10000

println("*** Discretization method: Fayyad discretizer (MDLP)")
println("*** Number of bins: " + nBins)

// Data must be cached in order to improve the performance

val discretizer = MDLPDiscretizer.train(data, // RDD[LabeledPoint]
    categoricalFeat, // continuous features
    nBins, // max number of thresholds by feature
    maxByPart) // max elements per partition
discretizer

val discrete = data.map(i => LabeledPoint(i.label, discretizer.transform(i.features)))
discrete.first()

Important notes:

MDLP uses maxByPart parameter to group boundary points by feature in order to perform an independent computation of entropy per attribute. In most of cases, a default value of 10K is enough to compute the entropy in a parallel way, thus removing iterativity implicit when we manage features with many boundary points. Log messages inform when there is a "big" feature (| boundary | > maxByPart) in our algorithm, which can deteriorate the performance of the algorithm. To solve this problem, it is recommended to increment the maxByPart's value to 100K, or to reduce the precision of data in problems with floating-point values.

##References

[1] Fayyad, U., & Irani, K. (1993). "Multi-interval discretization of continuous-valued attributes for classification learning."

Please, for any comment, contribution or question refer to: https://issues.apache.org/jira/browse/SPARK-6509.

spark-mdlp-discretization's People

Contributors

barrybecker4 avatar hbghhy avatar marckaminski avatar sramirez avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

spark-mdlp-discretization's Issues

Null values in the label (target) cause NPE

I noticed that if I specified a label column that has null values, then a NPE is thrown.
Possible options for resolving:

  • treat nulls as a separate class value
  • drop instances with null label values
  • give an exception with a better message about nulls not being allowed.

Splits sometimes have excessive precision

This may be minor but I sometimes noticed that the bins have many decimals of precision like
5.550000190734863
when the data itself has very limited precision.
For example, the above split was determined for "sepal length" in the traditional iris dataset. The values of sepal length in the data are things like 4.9 and 5.1 - none of the values have more than one digit, so why does the split need to have 15 decimal places?

Publish on Spark Packages Main Repository

Hi @sramirez,

Would you like to make a release of this package on the Spark Packages Maven Repo? There is an sbt-plugin called sbt-spark-package that would help you make the release straight from your sbt console. All you need to do is set a couple configurations.

Publishing on the Spark Packages Repo will bump your ranking on the website, and will fill in the How To section, which users can use to include your package in their work.

Please let me know if you have any comments/questions/suggestions!

Best,
Burak

Include cache call in unit tests

Currently there is a warning shown repeatedly when running the unit tests:

@sramires wrote: The client is supposed to persist the data as he/she wants to. Most of methods have reviewed in MLlib use this message to warn about data persistence. I think it's good to follow this solution. Therefore, it'd be nice to include a .cache() call in the unit tests because the algorithm is designed to be iterative in some parts.

Including a cache() call should eliminate the warnings.

Why are the column(s) to bin required to be a vector?

Hi Sergio,
I wanted to better understand the motivation for making the inputColumn a vector. Is it because it is much more efficient to process all columns to be binned in one pass rather than apply the Discretizer separately to each column? A related question I have is - could you get different column splits if you apply it all at once compared to one at a time? IOW, is there interaction between features that are being binned? The QuantileDiscretize (equal-weight discretizer) only requires a single inputColumn. I was hoping that the two discretizers would operate similarly. I thought about adding a fitToBucketizers method on MDLPDistretizer so that I could easily get the list of Bucketizers corresponding to the input feature column but that did not work well as I did not have the names of the columns in the feature vector. I will do it outside in client code instead.

Need to handle nulls as a separate bin

What do we do if the column to be binned contains null (i.e. NaN)?
The resulting binning currently does not say anything about nulls, but I think it needs to.
There should probably be a separate null bin. Not sure exactly how to do this.
From my experience with QuantileDiscretizer, it seems to be adding NaN splits to the end of the list of splits, but I'm not sure that is right either. Maybe the first (or last) split could be NaN if any of the values in the column are NaN. Or maybe there should always be a NaN split at beginning (or end) because future data may have NaN even if the training/fitting data did not.

Generated splits are missing leading -Infinity

I hope that @sramirez or someone else familiar spark discretizers can tell me if this is a bug.
Other discretizers produce splits that have an initial cutpoint of -Infinity, and a final cutpoint of Infinity in order to catch data outside the regular bins. The MDLPDiscretzer produces a model with splits like this for the sample data I have tried
{code}
16.1, 21.05, 30.95, Infinity
5.5, Infinity
97.5, 169.5, Infinity
78.5, 134.0, Infinity
2379.5, 2959.5, Infinity
13.5, 19.5, Infinity
1980.5, Infinity
{code}
It looks to me like the initial -Infinity split. I think this is a bug.
See the unit tests I have added on branch #5 for more detail.

few values threshold method does not seem to be deterministic

This is a strange one, and not that easy to reproduce.
If I run the tests a bunch of times, I have noticed that on occasion, I get an extra split for the fare attribute in the "Run MDLPD on all columns in titanic data (label = embarked)" test.

Sometimes the splits are

-Infinity, 7.1833496, 7.2396, 7.6875, 7.7625, 13.20835, 15.3729, 15.74585, 56.7125, Infinity

but usually they are

-Infinity, 6.6229, 7.1833496, 7.2396, 7.6875, 7.7625, 13.20835, 15.3729, 15.74585, 56.7125, Infinity

I can't think why this might be unless something is non-deterministic or maintaining state between runs. I just ran a bunch of times and always get the second result, but know I have sometimes seen the first.

Possible performance issue on 40k row dataset

I've started doing some testing on larger datasets, and it seems to be taking more time to do the discretization - although I really do not know what to expect. I've added test cases for a 40,000 row dataset with several numeric columns to bin. I ran the tests on a quadcore laptop with 32G of memory and spark in standalone mode, but I saw similar results running on a 24 core server (which is what prompted me to add the tests). In the following results, 6 numeric columns are discretized at the same time (in batch).

srvRequest40000 data (label = churned, maxBins = 100, maxByPart = 10000) --- 98 seconds
srvRequest40000 data (label = churned, maxBins = 10, maxByPart = 10000) --- 46 seconds
srvRequest40000 data (label = churned, maxBins = 10, maxByPart = 1000000) --- 36 seconds

I noticed that performance improves when I reduce maxBins and increase maxByPart, but overall it seems to be taking too long. I will profile a little and see if I can find any bottlenecks. I would like to know what sort of performance numbers were attained for the 64 million row protein dataset using what hardware.

Sometimes buckets with too few instances are generated

I think we should consider adding a param to limit the minimum number of instances in a bucket. I have seen cases where there is one huge bucket with most of the data - like shown in this spinogram
too_many_bins100
.
Here I have set the maxBins to 100. The problem is not so much that there are too many bins, as that there were many bins generated that had just a few instances in them.
A reasonable default for such a new "minBinPercentage" might be like 0.1% of the total number of records. In other words, we would never split bins that have less than "minBinPercentage" of the instances of the whole dataset in them.

Upgrade to spark 2.x

Currently the MDLP code runs fine with spark 1.6.x, but changes are needed in order for it to work using 2.x spark. Probably there needs to be a seprate versioned release of MDLP for spark 2.x.

Error when label is date

I noticed that we see the following exception when trying to run MDLP with a date label specified.
I will add a unit test and try to debug.

java.util.NoSuchElementException: nullscala.collection.LinearSeqOptimized$class.last(LinearSeqOptimized.scala:148) scala.collection.immutable.List.last(List.scala:84) org.apache.spark.mllib.feature.InitialThresholdsFinder.findInitialThresholds(InitialThresholdsFinder.scala:58) org.apache.spark.mllib.feature.MDLPDiscretizer.initialThresholds(MDLPDiscretizer.scala:58) org.apache.spark.mllib.feature.MDLPDiscretizer.runAll(MDLPDiscretizer.scala:120) org.apache.spark.mllib.feature.MDLPDiscretizer$.train(MDLPDiscretizer.scala:312) org.apache.spark.ml.feature.MDLPDiscretizer.fit(MDLPDiscretizer.scala:131) com.mineset.spark.ml.evidence.EvidenceInducer.createDiscretizerModel(EvidenceInducer.scala:256) com.mineset.spark.ml.evidence.EvidenceInducer.createBucketizers(EvidenceInducer.scala:227) com.mineset.spark.ml.evidence.EvidenceInducer.createPipeline(EvidenceInducer.scala:138) com.mineset.spark.ml.evidence.EvidenceInducer.execute(EvidenceInducer.scala:92) com.mineset.spark.ml.evidence.EvidenceInducer.execute(EvidenceInducer.scala:75)

Handling metadata

It would be good if the ml version could consider the DataFrame metadata to prevent discretizing nominal attributes. Also , it would be good if the produced DataFrame could contain metadata for the discretized attributes, mainly the values property of them.

Add some unit tests

It would be nice to add some unit tests that would prove that it works as expected and act as documentation by way of examples. I will try to add some and do a PR.

Match error for some datasets

This is another strange bug. It may be related to #14.
I have two different versions of the titanic dataset that both contain an integer valued "parch" column with values like 0, 1, 2, or 5, but one dataset has many other columns removed.
I find that the version that just has a few columns works when binning all continuous columns, but the one with all the columns, has a problem binning the "parch" column. The error is

ERROR Executor: Exception in task 1.0 in stage 339.0 (TID 518)
scala.MatchError: (0.0,(5,[0,2],[NaN,2.0])) (of class org.apache.spark.mllib.regression.LabeledPoint)
at org.apache.spark.mllib.feature.MDLPDiscretizer$$anonfun$7.apply(MDLPDiscretizer.scala:144)
at org.apache.spark.mllib.feature.MDLPDiscretizer$$anonfun$7.apply(MDLPDiscretizer.scala:144)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:396)
at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:189)
at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)

I have a reproducible test case and will look at finding a fix.
Its odd because it demonstrates that there must be some interaction between features in the feature vector as they get discretized. Some state must be getting carried over from one to the next as processing progresses.

DiscretizerModelReader::load fails with Spark 2.1.1

Steps to reproduce with this dataset

spark-shell --jars "/path/to/spark-MDLP-discretization-1.3.jar"

import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.feature._

val carsPath = "/path/to/cars.data"
val mdlPath = "/path/to/save/mdl"

val df = spark.read.option("header", true)
  .option("inferSchema", true)
  .csv(carsPath)

val strId = new StringIndexer().setInputCol("v7").setOutputCol("v7_IDX")

val assmbleFeatures: VectorAssembler = new VectorAssembler()
  .setInputCols(Array("v0", "v1", "v2", "v3", "v4", "v5", "v6"))
  .setOutputCol("featuresRaw")

val dis: MDLPDiscretizer = new MDLPDiscretizer()
  .setInputCol("featuresRaw")
  .setOutputCol("featuresBucket")
  .setLabelCol("v7_IDX")
  .setMaxBins(10)
  .setMaxByPart(10000)
  .setMinBinPercentage(0.01)

val plm = new Pipeline()
  .setStages(Array(strId, assmbleFeatures, dis))
  .fit(df)

plm.write.overwrite.save(mdlPath)

PipelineModel.load(mdlPath)

Gives:

scala.MatchError: [WrappedArray(WrappedArray(-Infinity, 21.05, Infinity), WrappedArray(-Infinity, 5.5, Infinity), WrappedArray(-Infinity, 120.5, 134.5, Infinity), WrappedArray(-Infinity, 78.5, Infinity), WrappedArray(-Infinity, 2550.5, Infinity), WrappedArray(-Infinity, Infinity), WrappedArray(-Infinity, Infinity))] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)
  at org.apache.spark.ml.feature.DiscretizerModel$DiscretizerModelReader.load(MDLPDiscretizer.scala:249)
  at org.apache.spark.ml.feature.DiscretizerModel$DiscretizerModelReader.load(MDLPDiscretizer.scala:232)
  at org.apache.spark.ml.util.DefaultParamsReader$.loadParamsInstance(ReadWrite.scala:435)
  at org.apache.spark.ml.Pipeline$SharedReadWrite$$anonfun$4.apply(Pipeline.scala:273)
  at org.apache.spark.ml.Pipeline$SharedReadWrite$$anonfun$4.apply(Pipeline.scala:271)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
  at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
  at org.apache.spark.ml.Pipeline$SharedReadWrite$.load(Pipeline.scala:271)
  at org.apache.spark.ml.PipelineModel$PipelineModelReader.load(Pipeline.scala:347)
  at org.apache.spark.ml.PipelineModel$PipelineModelReader.load(Pipeline.scala:341)
  at org.apache.spark.ml.util.MLReadable$class.load(ReadWrite.scala:215)
  at org.apache.spark.ml.PipelineModel$.load(Pipeline.scala:331)
  ... 50 elided

Fix in file ml.feature.MDLPDiscretizer.scala:
(line 258ff)

val splits = sqlContext.read.parquet(dataPath)
  .select("splits")
  .head().getAs[Seq[Seq[Float]]](0).map(arr => arr.toArray).toArray

Add jenkinsfile to enable CI building in jenkins

I would like to investigate creating a PR to add a jenkins file to allow for automated building if there is a jenkins server. The jenkins file will have no effect if there is no jenkins server.

Update to scala 11 and spark 1.6.2

I forked and built locally.
I made the changes to the sbt and maven files to use the more recent versions of scala, spark, and sbt.
Will create a pull request shortly.

AbstractMethodError in Spark 2.3.0

I would like to discretize the Epsilon dataset that it has 2k features and 400k records. To achieve this, I utilized the Spark 2.3.0. Once I execute the code, I deal with below error. It is noteworthy this error had not occurred when I used Spark 2.2.0, but It seems that the Spark 2.2.0 is not suitable for a high dimensional dataset.
If is it possible, please fix this problem.

#########################################################
Exception in thread "main" java.lang.AbstractMethodError
at org.apache.spark.internal.Logging$class.initializeLogIfNecessary(Logging.scala:99)
at org.apache.spark.mllib.feature.MDLPDiscretizer.initializeLogIfNecessary(MDLPDiscretizer.scala:48)
at org.apache.spark.internal.Logging$class.log(Logging.scala:46)
at org.apache.spark.mllib.feature.MDLPDiscretizer.log(MDLPDiscretizer.scala:48)
at org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54)
at org.apache.spark.mllib.feature.MDLPDiscretizer.logInfo(MDLPDiscretizer.scala:48)
at org.apache.spark.mllib.feature.MDLPDiscretizer.runAll(MDLPDiscretizer.scala:106)
at org.apache.spark.mllib.feature.MDLPDiscretizer$.train(MDLPDiscretizer.scala:335)
at org.apache.spark.ml.feature.MDLPDiscretizer.fit(MDLPDiscretizer.scala:149)
#########################################################

maxBins not honored

I set the maxBins to 2 (1 gives an error) like this:

val discretizer = new MDLPDiscretizer()
      .setMaxBins(2)
      .setMaxByPart(10000)
      .setInputCol("features")  // this must be a feature vector
      .setLabelCol(labelColumn )
      .setOutputCol("bucketFeatures")

and there were 3 bins produced with these splits:
"16.1, 21.05, 30.95, Infinity"
Shouldn't there be at most 2 bins? This is the same result I get if I specify maxBins as 1000.

Need way to control split stopping criteria

Currently the stopping criteria appears to be quite conservative.
For example, if I set maxBins to 1000, it will usually just generate 2 - 5 bins. Why doesn't it generate 8 or 10 instead? I think there should be another parameter that allows the user to specify the stopping criteria. I realize that there will be diminishing returns for generating more bins, but it seems that in many cases it would be better to go further than its currently doing. The combination of both maxBins and a parameter to control when to stop splitting could give the user a lot of control over the number of bins that get as a result.

Something wrong with vector?

I just test a toy code in spark 2.1.1. Then it report:

Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'UDF(vFeatures)' due to data type mismatch: argument 1 requires vector type, however, 'vFeatures' is of vector type.;;
'Project [id#9, features#10, vFeatures#11, clicked#12, UDF(vFeatures#11) AS buckedFeatures#87]
+- Project [_1#0 AS id#9, _2#1 AS features#10, _3#2 AS vFeatures#11, _4#3 AS clicked#12]
+- LocalRelation [_1#0, _2#1, _3#2, _4#3]

I have seen the sourse code , is it because the transform in the ml version of MDLP call this issue:

val discModel = new feature.mdlp_discretization.DiscretizerModel(splits)
val discOp = udf { discModel.transform _ }
dataset.withColumn($(outputCol), discOp(col($(inputCol))).as($(outputCol), metadata))

And in the sprak2 the vector in ml version should be org.apache.spark.ml.linalg.Vector, but the discModel.transform need org.apache.spark.mllib.linalg.Vector. So make the above error?

Here is the toy code

import org.apache.spark.ml.feature.MDLPDiscretizer
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.SparkSession


object test {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder().master("local[3]")
      .appName("GroupStringIndexer module test")
      .getOrCreate()

    val data = Seq(
      (7, 1,Vectors.dense(0.0, 0.0, 18.0, 1.0), 1.0),
      (8, 1,Vectors.dense(0.0, 1.0, 12.0, 0.0), 0.0),
      (9, 0,Vectors.dense(1.0, 0.0, 15.0, 0.0), 0.0)
    )

    val df = spark.sqlContext.createDataFrame(data).toDF("id", "features","vFeatures", "clicked")

    df.show()


    val discretizer = new MDLPDiscretizer()
      .setMaxBins(10)
      .setMaxByPart(10000)
      .setInputCol("vFeatures")
      .setLabelCol("clicked")
      .setOutputCol("buckedFeatures")

    val result = discretizer.fit(df).transform(df)

    result.show()
   }
}

I just try to fix that by #34
@sramirez
THX

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.