Giter VIP home page Giter VIP logo

parquet4s's Introduction

Parquet4S

Simple I/O for Parquet. Allows you to easily read and write Parquet files in Scala.

Use just a Scala case class to define the schema of your data. No need to use Avro, Protobuf, Thrift or other data serialisation systems. You can use generic records if you don't want to use case class, too.

Compatible with files generated with Apache Spark. However, unlike in Spark, you do not have to start a cluster to perform I/O operations.

Based on official Parquet library, Hadoop Client and Shapeless.

Integration for Akka Streams.

Released for Scala 2.11.x, 2.12.x and 2.13.x.

Tutorial

  1. Quick Start
  2. AWS S3
  3. Akka Streams
  4. Before-read filtering or filter pushdown
  5. Supported storage types
  6. Supported types
  7. Generic Records
  8. Customisation and Extensibility
  9. More Examples
  10. Contributing

Quick Start

SBT

libraryDependencies ++= Seq(
  "com.github.mjakubowski84" %% "parquet4s-core" % "1.4.0",
  "org.apache.hadoop" % "hadoop-client" % yourHadoopVersion
)

Mill

def ivyDeps = Agg(
  ivy"com.github.mjakubowski84::parquet4s-core:1.4.0",
  ivy"org.apache.hadoop:hadoop-client:$yourHadoopVersion"
)
import com.github.mjakubowski84.parquet4s.{ ParquetReader, ParquetWriter }

case class User(userId: String, name: String, created: java.sql.Timestamp)

val users: Iterable[User] = Seq(
  User("1", "parquet", new java.sql.Timestamp(1L))
)
val path = "path/to/local/parquet"

// writing
ParquetWriter.writeAndClose(path, users)

// reading
val parquetIterable = ParquetReader.read[User](path)
try {
  parquetIterable.foreach(println)
} finally parquetIterable.close()

AWS S3

In order to connect to AWS S3 you need to define one more dependency:

"org.apache.hadoop" % "hadoop-aws" % yourHadoopVersion

Next, the most common way is to define following environmental variables:

export AWS_ACCESS_KEY_ID=my.aws.key
export AWS_SECRET_ACCESS_KEY=my.secret.key

Please follow documentation of Hadoop AWS for more details and troubleshooting.

Passing Hadoop Configs Programmatically

File system configs for S3, GCS or Hadoop can also be set programmatically to the ParquetReader and ParquetWriter by passing the Configuration object to the ParqetReader.Options and ParquetWriter.Options case classes.

Akka Streams

Parquet4S has an integration module that allows you to read and write Parquet files using Akka Streams! Just import it:

"com.github.mjakubowski84" %% "parquet4s-akka" % "1.4.0"
"org.apache.hadoop" % "hadoop-client" % yourHadoopVersion

Parquet4S has so far a single Source for reading single file or directory and four Sinks for writing. Choose one that suits you most.

import com.github.mjakubowski84.parquet4s.{ParquetStreams, ParquetWriter}
import org.apache.parquet.hadoop.ParquetFileWriter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, Materializer}
import akka.stream.scaladsl.Source
import org.apache.hadoop.conf.Configuration
import scala.concurrent.duration._

case class User(userId: String, name: String, created: java.sql.Timestamp)

implicit val system: ActorSystem =  ActorSystem()
implicit val materializer: Materializer =  ActorMaterializer()

val users: Iterable[User] = ???

val conf: Configuration = ??? // Set Hadoop configuration programmatically

// Please check all the available configuration options!
val writeOptions = ParquetWriter.Options(
  writeMode = ParquetFileWriter.Mode.OVERWRITE,
  compressionCodecName = CompressionCodecName.SNAPPY,
  hadoopConf = conf // optional hadoopConf
)

// Writes a single file.
Source(users).runWith(ParquetStreams.toParquetSingleFile(
  path = "file:///data/users/user-303.parquet",
  options = writeOptions
))

// Sequentially splits data into files of 'maxRecordsPerFile'.
// Recommended to use in environments with limitted available resources.
Source(users).runWith(ParquetStreams.toParquetSequentialWithFileSplit(
  path = "file:///data/users",
  // will create files consisting of max 2 row groups
  maxRecordsPerFile = 2 * writeOptions.rowGroupSize,
  options = writeOptions
))

// Writes files in parallel in number equal to 'parallelism'.
// Recommended to use in order to achieve better performance under condition that
// file order does not have to be preserved.
Source(users).runWith(ParquetStreams.toParquetParallelUnordered(
  path = "file:///data/users",
  parallelism = 4,
  options = writeOptions
))

// Tailored for writing indefinite streams.
// Writes file when chunk reaches size limit or defined time period elapses.
// Can also partition files!
// Check all other parameters and example usage in project sources.
Source(users).via(
  ParquetStreams
    .viaParquet[User]("file:///data/users")
    .withMaxCount(writeOptions.rowGroupSize)
    .withMaxDuration(30.seconds)
    .withWriteOptions(writeOptions)
    .build()
).runForeach(user => println(s"Just wrote user ${user.userId}..."))
  
// Reads a file or files from the path. Please also have a look at optional parameters.
ParquetStreams.fromParquet[User](
  path = "file:///data/users",
  options = ParquetReader.Options(hadoopConf = conf)
).runForeach(println)

Before-read filtering or filter pushdown

One of the best features of Parquet is efficient way of fitering. Parquet files contain additional metadata that can be leveraged to drop chunks of data without scanning them. Since version 0.10.0 Parquet4S allows do define filter predicates both in core and akka module in order to push filtering out from Scala collections or Akka Stream down to point before file content is even read.

You define you filters using simple algebra as follows.

In core library:

ParquetReader.read[User](path = "file://my/path", filter = Col("email") === "[email protected]")

In Akka filter applies both to content of files and partitions:

ParquetStreams.fromParquet[Stats](
  path = "file://my/path",
  filter = Col("stats.score") > 0.9 && Col("stats.score") <= 1.0
)

TAKE NOTE!

Take note that due to an issue with implicit resolution in Scala 2.11 you may need to define all parameters of ParquetStreams.fromParquet even if some have default values. Parameters must be specified in default order even when you use named arguments. It specifically refers to a case when you would like to omit options but define filter. Such a situation doesn't appear in Scala 2.12 and 2.13.

You can construct filter predicates using ===, !==, >, >=, <, <=, and in operators on columns containing primitive values. You can combine and modify predicates using &&, || and ! operators. in looks for values in a list of keys, similar to SQL's in operator. Mind that operations on java.sql.Timestamp and java.time.LocalDateTime are not supported as Parquet still not allows filtering by Int96 out of the box.

Check ScalaDoc and code for more!

Supported storage types

As it is based on Hadoop Client, Parquet4S can read and write from a variety of file systems:

  • Local files
  • HDFS
  • Amazon S3
  • Google Storage
  • Azure
  • OpenStack

Please refer to Hadoop Client documentation or your storage provider to check how to connect to your storage.

Supported types

Primitive types

Type Reading and Writing Filtering
Int
Long
Byte
Short
Boolean
Char
Float
Double
BigDecimal
java.time.LocalDateTime
java.time.LocalDate
java.sql.Timestamp
java.sql.Date
Array[Byte]

Complex Types

Complex types can be arbitrarily nested.

  • Option
  • List
  • Seq
  • Vector
  • Set
  • Array - Array of bytes is treated as primitive binary
  • Map - Key must be of primitive type, only immutable version.
  • Since 1.2.0. Any Scala collection that has Scala 2.13 collection Factory (in 2.11 and 2.12 it is derived from CanBuildFrom). Refers to both mutable and immutable collections. Collection must be bounded only by one type of element - because of that Map is supported only in immutable version (for now).
  • Any case class

Generic Records

You may want to not use strict schema and process your data in a generic way. Since version 1.2.0 Parquet4S has rich API that allows to build, transform, write and read Parquet records in easy way. Each implementation of ParquetRecord is Scala Iterable and a mutable collection. You can execute operations on RowParquetRecord and ListParquetRecord as on mutable Seq and you can treat MapParquetRecord as mutable Map. Moreover, records received addition functions like get and add (and more) that take implicit ValueCodec and allow to read and modify records using regular Scala types. There is default ParquetRecordEndcoder, ParquetRecordDecoder and ParquetSchemaResolver for RowParquetRecord so reading Parquet in a generic way works out of the box! In order to write you still need to provide a schema in form of Parquet's MessageType.

Funcionality is available both in core and Akka module. See examples.

Customisation and Extensibility

Parquet4S is built using Scala's type class system. That allows you to extend Parquet4S by defining your own implementations of its type classes.

For example, you may define your codecs of your own type so that they can be read from or written to Parquet. Assume that you have your own type:

case class CustomType(i: Int)

You want to save it as optional Int. In order to achieve you have to define your own codec:

import com.github.mjakubowski84.parquet4s.{OptionalValueCodec, IntValue, Value}

implicit val customTypeCodec: OptionalValueCodec[CustomType] = 
  new OptionalValueCodec[CustomType] {
    override protected def decodeNonNull(value: Value, configuration: ValueCodecConfiguration): CustomType = value match {
      case IntValue(i) => CustomType(i)
    }
    override protected def encodeNonNull(data: CustomType, configuration: ValueCodecConfiguration): Value =
      IntValue(data.i)
}

Additionally, if you want to write your custom type, you have to define the schema for it:

import org.apache.parquet.schema.{OriginalType, PrimitiveType}
import com.github.mjakubowski84.parquet4s.ParquetSchemaResolver._

implicit val customTypeSchema: TypedSchemaDef[CustomType] =
  typedSchemaDef[CustomType](
    PrimitiveSchemaDef(
      primitiveType = PrimitiveType.PrimitiveTypeName.INT32,
      required = false,
      originalType = Some(OriginalType.INT_32)
    )
  )

More Examples

Please check examples where you can find simple code covering basics both for core and akka modules.

Moreover, examples contain a simple application of lib comprising Akka Streams and Kafka. It shows how you can write partitioned Parquet files with data coming from an indefinite stream.

Contributing

Do you want to contribute? Please read the contribution guidelines.

parquet4s's People

Contributors

mjakubowski84 avatar coolbetm avatar aravindr-dv avatar bbstilson avatar farico avatar bwmcadams avatar hochgi avatar moonkev avatar mikulskibartosz avatar

Stargazers

Roman avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.