Giter VIP home page Giter VIP logo

deequ's Introduction

Deequ - Unit Tests for Data

GitHub license GitHub issues Build Status Maven Central

Deequ is a library built on top of Apache Spark for defining "unit tests for data", which measure data quality in large datasets. We are happy to receive feedback and contributions.

Python users may also be interested in PyDeequ, a Python interface for Deequ. You can find PyDeequ on GitHub, readthedocs, and PyPI.

Requirements and Installation

Deequ depends on Java 8. Deequ version 2.x only runs with Spark 3.1, and vice versa. If you rely on a previous Spark version, please use a Deequ 1.x version (legacy version is maintained in legacy-spark-3.0 branch). We provide legacy releases compatible with Apache Spark versions 2.2.x to 3.0.x. The Spark 2.2.x and 2.3.x releases depend on Scala 2.11 and the Spark 2.4.x, 3.0.x, and 3.1.x releases depend on Scala 2.12.

Available via maven central.

Choose the latest release that matches your Spark version from the available versions. Add the release as a dependency to your project. For example, for Spark 3.1.x:

Maven

<dependency>
  <groupId>com.amazon.deequ</groupId>
  <artifactId>deequ</artifactId>
  <version>2.0.0-spark-3.1</version>
</dependency>

sbt

libraryDependencies += "com.amazon.deequ" % "deequ" % "2.0.0-spark-3.1"

Example

Deequ's purpose is to "unit-test" data to find errors early, before the data gets fed to consuming systems or machine learning algorithms. In the following, we will walk you through a toy example to showcase the most basic usage of our library. An executable version of the example is available here.

Deequ works on tabular data, e.g., CSV files, database tables, logs, flattened json files, basically anything that you can fit into a Spark dataframe. For this example, we assume that we work on some kind of Item data, where every item has an id, a productName, a description, a priority and a count of how often it has been viewed.

case class Item(
  id: Long,
  productName: String,
  description: String,
  priority: String,
  numViews: Long
)

Our library is built on Apache Spark and is designed to work with very large datasets (think billions of rows) that typically live in a distributed filesystem or a data warehouse. For the sake of simplicity in this example, we just generate a few toy records though.

val rdd = spark.sparkContext.parallelize(Seq(
  Item(1, "Thingy A", "awesome thing.", "high", 0),
  Item(2, "Thingy B", "available at http://thingb.com", null, 0),
  Item(3, null, null, "low", 5),
  Item(4, "Thingy D", "checkout https://thingd.ca", "low", 10),
  Item(5, "Thingy E", null, "high", 12)))

val data = spark.createDataFrame(rdd)

Most applications that work with data have implicit assumptions about that data, e.g., that attributes have certain types, do not contain NULL values, and so on. If these assumptions are violated, your application might crash or produce wrong outputs. The idea behind deequ is to explicitly state these assumptions in the form of a "unit-test" for data, which can be verified on a piece of data at hand. If the data has errors, we can "quarantine" and fix it, before we feed it to an application.

The main entry point for defining how you expect your data to look is the VerificationSuite from which you can add Checks that define constraints on attributes of the data. In this example, we test for the following properties of our data:

  • there are 5 rows in total
  • values of the id attribute are never NULL and unique
  • values of the productName attribute are never NULL
  • the priority attribute can only contain "high" or "low" as value
  • numViews should not contain negative values
  • at least half of the values in description should contain a url
  • the median of numViews should be less than or equal to 10

In code this looks as follows:

import com.amazon.deequ.VerificationSuite
import com.amazon.deequ.checks.{Check, CheckLevel, CheckStatus}


val verificationResult = VerificationSuite()
  .onData(data)
  .addCheck(
    Check(CheckLevel.Error, "unit testing my data")
      .hasSize(_ == 5) // we expect 5 rows
      .isComplete("id") // should never be NULL
      .isUnique("id") // should not contain duplicates
      .isComplete("productName") // should never be NULL
      // should only contain the values "high" and "low"
      .isContainedIn("priority", Array("high", "low"))
      .isNonNegative("numViews") // should not contain negative values
      // at least half of the descriptions should contain a url
      .containsURL("description", _ >= 0.5)
      // half of the items should have less than 10 views
      .hasApproxQuantile("numViews", 0.5, _ <= 10))
    .run()

After calling run, deequ translates your test to a series of Spark jobs, which it executes to compute metrics on the data. Afterwards it invokes your assertion functions (e.g., _ == 5 for the size check) on these metrics to see if the constraints hold on the data. We can inspect the VerificationResult to see if the test found errors:

import com.amazon.deequ.constraints.ConstraintStatus


if (verificationResult.status == CheckStatus.Success) {
  println("The data passed the test, everything is fine!")
} else {
  println("We found errors in the data:\n")

  val resultsForAllConstraints = verificationResult.checkResults
    .flatMap { case (_, checkResult) => checkResult.constraintResults }

  resultsForAllConstraints
    .filter { _.status != ConstraintStatus.Success }
    .foreach { result => println(s"${result.constraint}: ${result.message.get}") }
}

If we run the example, we get the following output:

We found errors in the data:

CompletenessConstraint(Completeness(productName)): Value: 0.8 does not meet the requirement!
PatternConstraint(containsURL(description)): Value: 0.4 does not meet the requirement!

The test found that our assumptions are violated! Only 4 out of 5 (80%) of the values of the productName attribute are non-null and only 2 out of 5 (40%) values of the description attribute did contain a url. Fortunately, we ran a test and found the errors, somebody should immediately fix the data :)

More examples

Our library contains much more functionality than what we showed in the basic example. We are in the process of adding more examples for its advanced features. So far, we showcase the following functionality:

Citation

If you would like to reference this package in a research paper, please cite:

Sebastian Schelter, Dustin Lange, Philipp Schmidt, Meltem Celikel, Felix Biessmann, and Andreas Grafberger. 2018. Automating large-scale data quality verification. Proc. VLDB Endow. 11, 12 (August 2018), 1781-1794.

License

This library is licensed under the Apache 2.0 License.

deequ's People

Contributors

asubmissions avatar aviatesk avatar bevhanno avatar carlsonp avatar dustin-lange avatar eycho-am avatar iamsteps avatar jhchee avatar lange-labs avatar malcolmgreaves avatar mauriciopl avatar mentekid avatar mrpowers avatar pengzai6666 avatar r-aviatesk avatar rambrus avatar rdsharma26 avatar samarth-c1 avatar sergred avatar shashank-sharma avatar singhrajk avatar sscdotopen avatar stefan-grafberger avatar tammor avatar tdhd avatar twollnik avatar venkatakarthikp avatar yannis-mentekidis avatar zeotuan avatar zhuowang-linkedin avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

deequ's Issues

Logging instead of Printing

The constraint suggestion and column profiling API, which is still experimental, offers the option to print status updates after each pass over the data. We should replace this with proper logging and see if we can use that in other places in the library as well.

Has anyone successfully used this in combination with AWS Glue?

Hoping to use this in conjunction with Glue to automate testing into our data pipeline. Has anyone done this successfully?

I'm attempting to run the BasicExample.scala code using a SageMaker notebook attached to a Glue Development Endpoint. I'm working in the SparkMagic kernel.

Histogram analysis for categorical data with numerical values

It is possible of have categorical data with numerical values.
For example 1 indicating male and 2 indicating female.

It would be nice be have histogram analysis available for those columns. Currently, it is limited to only boolean and string columns

Suggested hasDataType constraint fails when data is not complete

Constraints that are automatically suggested must be able to apply to the dataset that they were suggested from.

When using a ConstraintSuggestionRunner [1] to automatically suggest Check constraints on a DataFrame, the suggested .hasDataType(...) constraints do not include any information about completeness. Thus, if the column that the .hasDataType constraint was suggested for has any null values (i.e. is not a complete column), then the suggested constraint will fail on the dataset that it was suggested from.

[1] Specifically, this expression is what is meant by "automatically suggested constraints":

ConstraintSuggestionRunner()
  .onData(data)
  .addConstraintRules(Rules.DEFAULT)
  .run()

Where data is a org.apache.spark.sql.DataFrame instance.

Add guide for contributing

Write a markdown doc detailing how to contribute (including style guide, expected test coverage, etc)

Add markdown documentation for an AnalysisRunner example

The AnalysisRunner is our main entry point for calculating metrics in a scenario where you don't want to write unit tests but want to do something different with the metrics. We need to make the distinction between the AnalysisRunner and the ColumnProfilerRunner clear.

Using Deequ with zeppelin

Not sure if this is a Zeppelin related issue or not,
But I get the following error, I'm using zeppelin 0.8.1, with spark 2.4, scala version version 2.11.12 running on AWS EMR.

java.lang.NoSuchMethodError: scala.reflect.internal.Symbols$Symbol.originalEnclosingMethod()Lscala/reflect/internal/Symbols$Symbol; 
	at scala.tools.nsc.backend.jvm.GenASM$JPlainBuilder.getEnclosingMethodAttribute(GenASM.scala:1306) 
	at scala.tools.nsc.backend.jvm.GenASM$JPlainBuilder.genClass(GenASM.scala:1222) 
	at scala.tools.nsc.backend.jvm.GenASM$AsmPhase.emitFor$1(GenASM.scala:135) 
	at scala.tools.nsc.backend.jvm.GenASM$AsmPhase.run(GenASM.scala:141) 
	at scala.tools.nsc.Global$Run.compileUnitsInternal(Global.scala:1625) 
	at scala.tools.nsc.Global$Run.compileUnits(Global.scala:1610) 
	at scala.tools.nsc.Global$Run.compileSources(Global.scala:1605) 
	at scala.tools.nsc.interpreter.IMain.compileSourcesKeepingRun(IMain.scala:388) 
	at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.compileAndSaveRun(IMain.scala:804) 
	at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.compile(IMain.scala:763) 
	at scala.tools.nsc.interpreter.IMain.bind(IMain.scala:627) 
	at scala.tools.nsc.interpreter.IMain.bind(IMain.scala:664) 
	at scala.tools.nsc.interpreter.IMain$$anonfun$quietBind$1.apply(IMain.scala:663) 
	at scala.tools.nsc.interpreter.IMain$$anonfun$quietBind$1.apply(IMain.scala:663) 
	at scala.tools.nsc.interpreter.IMain.beQuietDuring(IMain.scala:202) 
	at scala.tools.nsc.interpreter.IMain.quietBind(IMain.scala:663) 
	at org.apache.zeppelin.spark.SparkScala211Interpreter$.loopPostInit$1(SparkScala211Interpreter.scala:179) 
	at org.apache.zeppelin.spark.SparkScala211Interpreter$.org$apache$zeppelin$spark$SparkScala211Interpreter$$loopPostInit(SparkScala211Interpreter.scala:214) 
	at org.apache.zeppelin.spark.SparkScala211Interpreter.open(SparkScala211Interpreter.scala:86) 
	at org.apache.zeppelin.spark.NewSparkInterpreter.open(NewSparkInterpreter.java:102) 
	at org.apache.zeppelin.spark.SparkInterpreter.open(SparkInterpreter.java:62) 
	at org.apache.zeppelin.interpreter.LazyOpenInterpreter.open(LazyOpenInterpreter.java:69) 
	at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:617) 
	at org.apache.zeppelin.scheduler.Job.run(Job.java:188) 
	at org.apache.zeppelin.scheduler.FIFOScheduler$1.run(FIFOScheduler.java:140) 
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) 
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) 
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
	at java.lang.Thread.run(Thread.java:748)

Create constraint suggestion rules for continuous columns

For columns with continuous values (int, double, ...), we want to suggest appropriate constraints. We need to determine when we want to use static rules (e.g., max/min; this could replace the isPositive rule also) and when to use anomaly detection. May depend on whether there is a trend change in the data distribution.

Add markdown documentation on how to reuse existing results in a MetricsRepository

Show how to use the method reuseExistingResultsForKey and maybe show how this can be used in some example use cases:

  • Experiment with checks and constraint suggestion rules
  • Run different checks on some data without duplicating work, maybe a cheap one using basic analyzers first and then a second check that uses both the cheap constraints from before and some of the more expensive constraints?

Suggested .isNonNegative check fails on String columns with non-negative integers

Constraints that are automatically suggested must be able to apply to the dataset that they were suggested from.

When using a ConstraintSuggestionRunner [1] to automatically suggest Check constraints on a DataFrame, the suggested .isNonNegative(...) constraints assume that the column they are applied to is of a numeric type. However, the constraint suggestion runner will produce this constraint on String-typed columns that contain numeric values. Therefore, this suggested .isNonNegative check will fail on the data that was used to generate it.

Emphasize our four main API entry points and when to use them

We have 4 main entry points in our API at the moment:

  • VerificationSuite
  • AnalysisRunner
  • ColumnProfilerRunner
  • ConstraintSuggestionRunner

Should we list these 4 in our main README in a separate paragraph and write one sentence for each of them to highlight the main use cases?

isContainedIn Check breaks for columns with special characters

The isContainedIn Checks will fail whenever there exists at least one column with a special character. Here, a special character means something that is reserved in the Spark SQL language: e.g. a [ or ]. isContainedIn generates SQL, but the column name is not properly escaped. This means that, at execution time, the generated SQL will fail with a syntax error. As a consequence, this syntax error will cascade through all other Checks when associated with a VerificationRunBuilder. Thus, even valid Checks will fail due to this SQL syntax error.

Fluent API entry points that don't require Data as Input if the Metrics are already available in a MetricsRepository

Using the MetricsRepository, you can reuse metrics calculated at some earlier point. If you do that, you don't always require the data again if everything that's needed was already calculated before. At the moment, it is possible to work around that issue for most cases by just inputting an empty DataFrame and then use the method .reuseExistingResultsForKey(resultKey, failIfResultsForReusingMissing=true). However, we should properly support this use case in our API, for example by adding a method .withoutData in addition to the current .onData methods. In case we need some information about the schema, providing that instead of the data itself should be enough.

Documentation for core concepts in our library

Do we need a separate markdown file where we explain all core concepts like analyzers, checks, constraints, metrics, column profiles, metric repositories, check levels etc.?

We should also have a list of all analyzers, constraints, anomaly detection methods and constraint suggestion rules we offer in one place.

Maybe even a readthedocs-documentation?

Support for state aggregation and persistence for VerificationSuite fluent API

The fluent API does currently not offer setting two parameters for state aggregation and state saving.

A workaround for this is to use the deprecated run method:

val stateProvider = InMemoryStateProvider()
VerificationSuite().run(df, checks, saveStatesWith = Some(stateProvider))
val state = stateProvider.load(analyzer)

Implementation of the two additional parameters should be straightforward by adding a method for aggregation and one for state saving in VerificationRunBuilder possibly with this signature:

class VerificationRunBuilder {
  // ...
  def aggregateWith(stateLoader: StateLoader): this.type
  def saveStatesWith(statePersister: StatePersister): this.type
  // ...
}

and passing them to VerificationSuite().doVerificationRun().

No histgram for boolean columns?

Example:

"column_hasVisit": {
      "column": "column_hasVisit",
      "completeness": 1,
      "approximateNumDistinctValues": 2,
      "dataType": {
        "enumClass": "com.amazon.deequ.analyzers.DataTypeInstances",
        "value": "Boolean"
      },
      "isDataTypeInferred": false,
      "typeCounts": {},
      "histogram": null
    }

Improve CategoricalRangeRule in constraint suggestion

When running constraint suggestion for CategoricalRangeRule for columns with cardinality of a few dozens, the suggested constraint is failing by a very small margin (Value: 0.99999907 does not meet the constraint requirement!).

Support Spark 2.3

Hi Deequ team,

This is a pretty useful tool. However, it only supports Spark 2.2. Is there a plan to support Spark 2.3 version?

Release RC1

We should release a new RC for deequ today, as many of the examples depend on Stefan's latest changes which will not be in the RC0 release.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.