Giter VIP home page Giter VIP logo

spark-daria's Introduction

spark-daria

Spark helper methods to maximize developer productivity.

CI: GitHub Build Status

Code quality: Maintainability

typical daria

Setup

Fetch the JAR file from Maven.

// Spark 3
libraryDependencies += "com.github.mrpowers" %% "spark-daria" % "1.2.3"

// Spark 2
libraryDependencies += "com.github.mrpowers" %% "spark-daria" % "0.39.0"

You can find the spark-daria releases for different Scala versions:

Writing Beautiful Spark Code

Reading Beautiful Spark Code is the best way to learn how to build Spark projects and leverage spark-daria.

spark-daria will make you a more productive Spark programmer. Studying the spark-daria codebase will help you understand how to organize Spark codebases.

PySpark

Use quinn to access similar functions in PySpark.

Usage

spark-daria provides different types of functions that will make your life as a Spark developer easier:

  1. Core extensions
  2. Column functions / UDFs
  3. Custom transformations
  4. Helper methods
  5. DataFrame validators

The following overview will give you an idea of the types of functions that are provided by spark-daria, but you'll need to dig into the docs to learn about all the methods.

Core extensions

The core extensions add methods to existing Spark classes that will help you write beautiful code.

The native Spark API forces you to write code like this.

col("is_nice_person").isNull && col("likes_peanut_butter") === false

When you import the spark-daria ColumnExt class, you can write idiomatic Scala code like this:

import com.github.mrpowers.spark.daria.sql.ColumnExt._

col("is_nice_person").isNull && col("likes_peanut_butter").isFalse

This blog post describes how to use the spark-daria createDF() method that's much better than the toDF() and createDataFrame() methods provided by Spark.

See the ColumnExt, DataFrameExt, and SparkSessionExt objects for all the core extensions offered by spark-daria.

Column functions

Column functions can be used in addition to the org.apache.spark.sql.functions.

Here is how to remove all whitespace from a string with the native Spark API:

import org.apache.spark.sql.functions._

regexp_replace(col("first_name"), "\\s+", "")

The spark-daria removeAllWhitespace() function lets you express this logic with code that's more readable.

import com.github.mrpowers.spark.daria.sql.functions._

removeAllWhitespace(col("first_name"))

Datetime functions

  • beginningOfWeek
  • endOfWeek
  • beginningOfMonth
  • endOfMonth

Custom transformations

Custom transformations have the following method signature so they can be passed as arguments to the Spark DataFrame#transform() method.

def someCustomTransformation(arg1: String)(df: DataFrame): DataFrame = {
  // code that returns a DataFrame
}

The spark-daria snakeCaseColumns() custom transformation snake_cases all of the column names in a DataFrame.

import com.github.mrpowers.spark.daria.sql.transformations._

val betterDF = df.transform(snakeCaseColumns())

Protip: You'll always want to deal with snake_case column names in Spark - use this function if your column names contain spaces of uppercase letters.

Helper methods

The DataFrame helper methods make it easy to convert DataFrame columns into Arrays or Maps. Here's how to convert a column to an Array.

import com.github.mrpowers.spark.daria.sql.DataFrameHelpers._

val arr = columnToArray[Int](sourceDF, "num")

DataFrame validators

DataFrame validators check that DataFrames contain certain columns or a specific schema. They throw descriptive error messages if the DataFrame schema is not as expected. DataFrame validators are a great way to make sure your application gives descriptive error messages.

Let's look at a method that makes sure a DataFrame contains the expected columns.

val sourceDF = Seq(
  ("jets", "football"),
  ("nacional", "soccer")
).toDF("team", "sport")

val requiredColNames = Seq("team", "sport", "country", "city")

validatePresenceOfColumns(sourceDF, requiredColNames)

// throws this error message: com.github.mrpowers.spark.daria.sql.MissingDataFrameColumnsException: The [country, city] columns are not included in the DataFrame with the following columns [team, sport]

Documentation

Here is the latest spark-daria documentation.

Studying these docs will make you a better Spark developer!

πŸ‘­ πŸ‘¬ πŸ‘« Contribution Criteria

We are actively looking for contributors to add functionality that fills in the gaps of the Spark source code.

To get started, fork the project and submit a pull request. Please write tests!

After submitting a couple of good pull requests, you'll be added as a contributor to the project.

Publishing

Sonatype passwords can go stale and need to be reset periodically. Go to the Sonatype website and log in to make sure your password is working to avoid errors that are difficult to understand and debug.

You need GPG installed on your machine as well. You can install it with brew install gnupg.

You need to get GPG keys properly setup on every machine. You can follow these instructions to get your GPG key setup on each machine.

  1. Version bump commit and create GitHub tag

  2. Publish documentation with sbt ghpagesPushSite

  3. Publish JAR

Run sbt to open the SBT console.

IMPORTANT Run sbt clean before running the publish commands! Otherwise you may run into this error.

Run > ; + publishSigned; sonatypeBundleRelease to create the JAR files and release them to Maven. These commands are made available by the sbt-sonatype plugin.

When the release command is run, you'll be prompted to enter your GPG passphrase.

The Sonatype credentials should be stored in the ~/.sbt/sonatype_credentials file in this format:

realm=Sonatype Nexus Repository Manager
host=oss.sonatype.org
user=$USERNAME
password=$PASSWORD

New Sonatype accounts need a different host, as described here. My Sonatype account was created before February 2021, so this does not apply to me.

spark-daria's People

Contributors

cbcoutinho avatar cmftall avatar damanp avatar eclosson avatar gorros avatar manuzhang avatar milin-k avatar mrpowers avatar nathanlim45 avatar nvander1 avatar oscarvarto avatar santhoshdesikachari avatar skestle avatar snithish avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

spark-daria's Issues

Datafram schema comparision fails with assertSmallDataFrameEquality method, eventhough schema is same

I am trying to write some test cases to validate the data between a .parquet file in s3 and target (hive table). I have loaded the .parquet data into one dataframe and the hive table data into another dataframe. When I now try to compare the schema of the two dataframes, using 'assertSmallDataFrameEquality' it returns false, eventhough schema is same. Not sure why it is failing. Any suggestions would be helpful?

Add junit4git

@afranzi - If you have a sec, can you please create a PR to add junit4git to this project? That's the library you mentioned in your talk right?

Some guys from Blizzard were asking me about how to use junit4git after your talk - I told them you were the expert. We can hopefully show them how to use this workflow here. I need to learn it too!

Functions as column extensions

@snithish @gorros @kirubakarrs @oscarvarto - I’ve always thought it’s a bit random how Spark defines some functionality as Column functions and other functionality as SQL functions. Here’s an example of the inconsistency:

lower(col("blah").substr(0, 2))

Having two SQL functions would look like this:

lower(substr(col("blah"), 0, 2))

Having two Column functions would look like this:

col("blah").substr(0, 2).lower()

I like the Column functions syntax, so I started monkey patching the SQL functions to the Column class: https://github.com/MrPowers/spark-daria/blob/master/src/main/scala/com/github/mrpowers/spark/daria/sql/FunctionsAsColumnExt.scala Let me know your thoughts.

camelCaseToSnakeCaseColumns doesn't seem to work

I have tried using the camelCaseToSnakeCaseColumns transform but it doesn't seem to be working when combined with other transformations.

Here is the production code:

def standardRefinedColumnCleaning()(df: Dataset[Row]): Dataset[Row] = {
    df.transform(snakeCaseColumns())
      .transform(camelCaseToSnakeCaseColumns())
      .transform(sortColumns())
  }

And the test case

it("should snake case columns for columns with spaces or camel cased") {
      val someDF = Seq(
        ("foo", "bar")
      ).toDF("SomeColumn", "A b C")

      val refinedDataSet = RefinedTransforms.standardRefinedColumnCleaning()(someDF)

      assert(refinedDataSet.columns == Array("a_b_c", "some_column"))
    }

The result is

Expected :Array("a_b_c", "some_column")
Actual   :Array("a_b_c", "somecolumn")

If I only run the camelCaseToSnakeCaseColumns then it works

Saner setter and getter methods for schema metadata

Spark allows you to use primitive objects to store metadata information about columns in a dataframe. It is useful in ML and analytical tasks where you'd like to avoid recomputing basic statistics on the data. The current way to do that is very messy. This is a feature that is not known or used by many, so adding this may help in increasing its adoption.

For further reference: https://stackoverflow.com/questions/32628845/is-there-a-way-to-add-extra-metadata-for-spark-dataframes

Strange FileNotFoundException when running printAthenaCreateTable

I'm getting a strange error. I'm not a regular Scala user, so I may be doing something silly.

First, I start a Spark shell as follows:

spark-shell --packages "org.apache.hadoop:hadoop-aws:2.7.6,mrpowers:spark-daria:0.32.0-s_2.11"

Then I run this code:

scala> val df = spark.read.parquet("s3a://...")
[Stage 0:>                                                          (0 + 1) 
                                                                            
df: org.apache.spark.sql.DataFrame = [... 96 more fields]

scala> import com.github.mrpowers.spark.daria.sql.DataFrameHelpers
import com.github.mrpowers.spark.daria.sql.DataFrameHelpers

scala> DataFrameHelpers.printAthenaCreateTable(
     |     df,
     |     "my.table",
     |     "s3a://..."
     | )
java.io.FileNotFoundException: /Users/powers/Documents/code/my_apps/spark-daria/target/scala-2.11/scoverage-data/scoverage.measurements.1 (No such file or directory)
  at java.io.FileOutputStream.open0(Native Method)
  at java.io.FileOutputStream.open(FileOutputStream.java:270)
  at java.io.FileOutputStream.<init>(FileOutputStream.java:213)
  at java.io.FileWriter.<init>(FileWriter.java:107)
  at scoverage.Invoker$$anonfun$1.apply(Invoker.scala:42)
  at scoverage.Invoker$$anonfun$1.apply(Invoker.scala:42)
  at scala.collection.concurrent.TrieMap.getOrElseUpdate(TrieMap.scala:901)
  at scoverage.Invoker$.invoked(Invoker.scala:42)
  at com.github.mrpowers.spark.daria.sql.DataFrameHelpers$.printAthenaCreate
Table(DataFrameHelpers.scala:194)
  ... 53 elided

The reference to /Users/powers/ seems strange, and suggests some path from the project author's workstation got mistakenly baked into the package somehow.

Make the createDF method even more concise

@snithish @lizparody - I'd like to make the createDF method even better.

The createDF method currently works like this:

val expectedDF = spark.createDF(
  List(
    Row(8, 2.0),
    Row(64, 4.0),
    Row(-27, -3.0)
  ), List(
    StructField("num1", IntegerType, false),
    StructField("cube_root", DoubleType, true)
  )
)

I'd like to make it even more concise by allowing this syntax:

val expectedDF = spark.createDF(
  List(
   (8, 2.0),
   (64, 4.0),
   (-27, -3.0)
  ), List(
   ("num1", IntegerType, false),
   ("cube_root", DoubleType, true)
  )
)

Let me know if you know how to make both syntaxes work. I tried and failed.

Deeply nested package structure

spark-daria follows the standard Scala / Java deep nesting package convention that's annoying when importing code.

Users currently need to import code like this: import com.github.mrpowers.spark.daria.sql.ColumnExt._

I noticed that some libraries are deviating from these Scala conventions and offering imports like this: import utest._.

Maybe we can change the package structure so users can import code like import mrpowers.daria.sql.ColumnExt._? Thoughts @nvander1 / @manuzhang...?

Add a SparkSessionExt `createDS` method

The createDF method is very useful.

I think a createDS method is needed as well, so we don't have to do this:

val sourceDS = spark.createDataset[Person](
  Seq(
    Person("Alice", 12),
    Person("Bob", 17)
  )
)

I'd rather do this:

val sourceDS = spark.createDS[Person](
  List(
    ("Alice", 12),
    ("Bob", 17)
  )
)

Enhancing query pushdown in Postgres...

A lot of queries are pushed down to the database level when Snowflake is used as described in this blog post.

Joins, aggregations, and SQL functions are all pushed down and performed in the Snowflake database before data is sent to Spark.

I know some stuff gets pushed down to Postgres (column pruning), but are joins and aggregations being pushed down? @nvander1 - do you know what gets pushed down to Postgres? Is this something we could improve?

Some analyses could do a lot of stuff at the database level, only send a fraction of the data to the Spark cluster, and then probably perform a lot faster. Spark isn't the best at joins, so pushing those down to the database level would probably help a lot...

Managing JAR files

@snithish - We need to figure out a good way to manage the spark-daria JAR files, so they're easily accessible for different versions of Scala and Spark.

I'm currently uploading the spark-daria JAR files to Spark Packages. Spark Packages seems to upload these to Maven somehow. Do you think we should continue using Spark Packages or migrate to Maven?

I like how spark-testing-base manages the JAR files in Maven. They make it easy to access different versions of spark-testing-base for different Spark, Scala, and spark-testing-base versions. Should we try to replicate the spark-testing-base approach?

Thanks!

asSchema should have Product as upper type bound

Careless user like me may leave () out when defining the schema

val df = spark.createDF(List("foo", "bar"),  List("data", StringType, true))

Currently, a MatchError is thrown at runtime.
Would it be better to report error at compile time ?

@MrPowers

Java, JDK and Hadoop versions for CI

Apache Spark itself is using these settings:

matrix:
  java: [ '1.8', '11' ]
  hadoop: [ 'hadoop-2.7', 'hadoop-3.2' ]
  exclude:
  - java: '11'
    hadoop: 'hadoop-2.7'

I think spark-daria can simply be tested with Java 1.8 and without any Hadoop specified, correct? I don't think we need to be testing multiple different Java / Hadoop versions.

I just learned that Java 8 and Java 1.8 are the same thing... what?!

I'm not even going to ask why Java 9 and Java 10 aren't included in this discussion. So confusing!!

Three level nested structure flattening fails

When I add one more level of nested structure it fails to flatten.

"uses the StackOverflow answer format" - {

        val data = Seq(
          Row(
            Row(
              "this",
              "is"
            ),
            "something",
            "cool",
            ";)"
          )
        )

        val schema = StructType(
          Seq(
            StructField(
              "foo",
              StructType(
                Seq(
                  StructField(
                    "bar",
                    StructType(
                      Seq(
                        StructField(
                          "zoo",
                          StringType,
                          true
                        )
                      )
                    )
                  ),
                  StructField(
                    "baz",
                    StringType,
                    true
                  )
                )
              ),
              true
            ),
            StructField(
              "x",
              StringType,
              true
            ),
            StructField(
              "y",
              StringType,
              true
            ),
            StructField(
              "z",
              StringType,
              true
            )
          )
        )

        val df = spark
          .createDataFrame(
            spark.sparkContext.parallelize(data),
            StructType(schema)
          )
          .flattenSchema("_")

        val expectedDF = spark.createDF(
          List(("this", "is", "something", "cool", ";)")),
          List(
            ("foo_bar_zoo", StringType, true),
            ("foo_baz", StringType, true),
            ("x", StringType, true),
            ("y", StringType, true),
            ("z", StringType, true)
          )
        )

        assertSmallDataFrameEquality(
          df,
          expectedDF
        )

      }

ParquetCompactor is not deleting old files and the input_file_name_parts directory on S3

ParquetCompactor is not deleting old files and the input_file_name_parts directory on S3.

We are using the spark databricks platform, spark 6.2, pyspark and mrpowers:spark-daria:0.36.0-s_2.11. After running ParquetCompactor we have a new big parquet file, but the old files and the input_file_name_parts directory still exists.

Is it not possible to use the ParquetCompactor on S3?

Should we add more validators?

I'm watching this talk and they're discussing how they run validations that are specified in YAML files.

Would it be useful to add more validations to spark-daria? Should we have a DariaValidator.validatesLengthIsLessThan("my_cool_column", 5) and DariaValidator.validatesBetween("my_integers", 3, 10) methods?

BTW, all of the DataFrameValidator methods were copied over to the DariaValidator object due to a weird bug I ran into when using SBT shading with traits. Let me know if you're interested in understanding more about the SBT shading weirdness I uncovered.

Function to traverse dataframe schema.

It would be nice to have a function which would traverse schema and apply some function on a column. For example, we could use it to make columns nullable or add metadata.

1.0 major release

We should make a major release to provide users with a stable public interface.

Here's how we can get this project ready for a major release:

  • Go through the entire codebase and see if any methods should be deleted
  • Make sure all issues are incorporated in the project or closed
  • Improve the code quality when necessary
  • Make all private methods are flagged accordingly and the documentation is up-to-date
  • Create a more official release process going forward

Feature request - range validator

Validates that the field values are with in a given range (and value geq \ gt \ leq \ lt). It is easy to implement but would be nice to wrap it in a function.
Something like -
validateRangeFields(Seq[Tuple(String, Numeric, Numeric, Boolean, Boolean)])

(sorry for the poor scala syntax, I'm a scala newbie)

Where the tuple fields stands for -

  • column name
  • range start
  • range end
  • start inclusive (false implies > check, true implies >= check)
  • end inclusive (false implies > check, true implies >= check)

Thanks!

Publish into maven central

dl.bintray.com is unfortunately not accessible from many of the clients I work with. Is it possible to publish spark-daria into maven central?

Get feedback on DataFrame validation code

@snithish - The DataFrame validation code is a very important portion of the spark-daria project: https://github.com/MrPowers/spark-daria/blob/master/src/main/scala/com/github/mrpowers/spark/daria/sql/DataFrameValidator.scala

It's important to validate DataFrame dependencies for public DataFrame transformations, so users get descriptive error messages.

Can you please take a look at the code and let me know if you have any questions / comments? I'd like to get the Spark community to start using this DataFrame validation code. Thanks!

duplicate repo entries in pom file

when using latest version:
Errors occurred while build effective model from C:\Users\igreenfield.gradle\caches\modules-2\files-2.1\mrpowers\spark-daria\0.31.0-s_2.11\c12b7d34b6165a7ad5de127edb9259fc094f3ae2\spark-daria-0.31.0-s_2.11.pom:
'repositories.repository.id' must be unique: SparkPackagesRepo -> https://dl.bintray.com/spark-packages/maven/ vs http://dl.bintray.com/spark-packages/maven/ in mrpowers:spark-daria:0.31.0-s_2.11

and in the pom file I see:

<repositories>
        <repository>
            <id>SparkPackagesRepo</id>
            <name>Spark Packages Repo</name>
            <url>https://dl.bintray.com/spark-packages/maven/</url>
            <layout>default</layout>
        </repository>
        <repository>
            <id>SparkPackagesRepo</id>
            <name>Spark Packages Repo</name>
            <url>http://dl.bintray.com/spark-packages/maven/</url>
            <layout>default</layout>
        </repository>
    </repositories>

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    πŸ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. πŸ“ŠπŸ“ˆπŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❀️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.