Giter VIP home page Giter VIP logo

delta-live-tables-notebooks's Introduction

Delta Live Tables Example Notebooks


Delta Live Tables is a new framework designed to enable customers to successfully declaratively define, deploy, test & upgrade data pipelines and eliminate operational burdens associated with the management of such pipelines.

This repo contains Delta Live Table examples designed to get customers started with building, deploying and running pipelines.

Getting Started

  • Connect your Databricks workspace using the feature to this repo

  • Choose one of the examples and create your pipeline!

Examples

Wikipedia

The Wikipedia clickstream sample is a great way to jump start using Delta Live Tables (DLT). It is a simple bificating pipeline that creates a table on your JSON data, cleanses the data, and then creates two tables.

This sample is available for both SQL and Python.

Running your pipeline

1. Create your pipeline using the following parameters

  • From your Databricks workspace, click Jobs, then Delta Live Tables and click on Create Pipeline

  • Fill in the Pipeline Name, e.g. Wikipedia

  • For the Notebook Libraries, fill in the path of the notebook such as /Repos/[email protected]/delta-live-tables-notebooks/SQL/Wikipedia

  • To publish your tables, add the target parameter to specify which database you want to persist your tables, e.g. wiki_demo.

2. Edit your pipeline JSON

  • Once you have setup your pipeline, click Edit Settings near the top, the JSON will look similar to below

3. Click Start

  • To view the progress of your pipeline, refer to the progress flow near the bottom of the pipeline details UI as noted in the following image.

4. Reviewing the results

  • Once your pipeline has completed processing, you can review the data by opening up a new Databricks notebook and running the following SQL statements:

    %sql
    -- Review the top referrers to Wikipedia's Apache Spark articles
    SELECT * FROM wiki_demo.top_spark_referers
    
  • Unsurprisingly, the top referrer is "Google" which you can see graphically when you convert your table into an area chart.

delta-live-tables-notebooks's People

Contributors

alexott avatar bharathperiy-db avatar chrishfish avatar dennyglee avatar fmunz avatar fuselessmatt avatar ganeshchand avatar jodb avatar karlwang-db avatar marmbrus avatar michael-denyer avatar morganmazouchi avatar neil90 avatar ravi-databricks avatar ravi-db avatar rportilla-databricks avatar tj-cycyota avatar vinijaiswal avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

delta-live-tables-notebooks's Issues

Why do you not name the dlt tables?

Hi, I am also learning DLT at the moment and something about the way you wrote the dlt code seems different than most guides is that you do not give them a name in the table properties. Is there a specific why don't you do that? Other than making it work with the dlt-with-debug tool

Parameterize NYC Taxi sample

  • Parameterize the notebook for DB and user (or storage folder)
  • Make the misc queries separate from the setup notebook
  • Just have pre-setup notebook copy your H3 data directory?

via Scott Crawford

Please provide sample sample exaple for DLT notebook deployment with Azure devops (CICD)

Hello,

We could do deployment using databricks UI easily.
But there is no sample example of how to DLT notebook deployment using CICD in Azure devops.

General notebook deployment is easy with CICD but in case od delta table , we need to first register the create delta live table workflow and make it as part of Job.

Could you please provide example for Azure CICD.

Thanks
Mahesh

Is it possible to get a max value out of delta live table's table to be used in another table?

Hi

I want to join two tables using date (DATE), doing the following

@dlt.table
def table_A():
    # partitioned on date
    A_df = spark.readStream.format("cloudFiles").option("cloudFiles.format", "parquet").load("location_A")
    return A_df

@dlt.table
def table_B():
    # partitioned on date
    B_df = spark.read.format("delta").load("location_B")
    return B_df

@dlt.table
def joined_table():
    A_df = dlt.read_stream("table_A")
    B_df = dlt.read("table_B")

    conds = [A_df.date >= B_df.date, A_df.some_key == B_df.some_key]
    joined_table = A_df.join(B_df, conds, "inner")
    return joined_table 

However, since table_B would contains a lot of dates that don't matter to the join so I want to use the maximum value in the date column max_date_from_table_A of table_A to filter out table_B before joining. i.e.

@dlt.table
def joined_table():
    A_df = dlt.read_stream("table_A")
    B_df = dlt.read("table_B").filter(B_df.date >= max_date_from_table_A)

    conds = [A_df.date >= B_df.date, A_df.some_key == B_df.some_key]
    joined_table = A_df.join(B_df, conds, "inner")
    return joined_table 

How can I do so?

I have tried doing

@dlt.table
def joined_table():
    A_df = dlt.read_stream("table_A")

    max_date_from_table_A = A_df.select(F.max('date')).first()['max(date)']
    B_df = dlt.read("table_B").filter(B_df.date >= max_date_from_table_A)

    conds = [A_df.date >= B_df.date, A_df.some_key == B_df.some_key]
    joined_table = A_df.join(B_df, conds, "inner")
    return joined_table 

but it would say the following error

org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;

Or does the filter before join to get the smaller table table_B NOT matter?

Thank you

Updates to wikipedia DLT

  • Remove references to #dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get()
  • Remove comment repeats code # Import comments

Include Expectation call outs to:

  • one of each type of expectation
  • a comment
  • partition at least one table by date
  • maybe use table properties to tag the "quality" of each table.

CSV Source and Load to Snowflake ETL

Hello Databricks team,

I've been looking through Databricks DLT documentation and I'm having a hard time to find a solution.
Is it possible to use a CSV file as the data source for DLT or do I have to convert my CSV to JSON or Parquet as these are the only file formats that are currently in your demo and documentation.

I am aiming to create an ETL which ingest data from CSV/Excel files and Load the data to our datawarehouse in Snowflake.

I am looking to use Databricks DLT to do the transformations such as data cleaning and data validations.
Is this a possible thing to do using Databricks DLT?

Hoping for a response from the team.

Thanks!

Unable to import dlt method

I am getting error saying dlt method not found.
Second question is do not find pipeline feature under jobs. Can you please help me how to resolve the issues.

Could you please explain what is the fundemental difference between the usage of dlt.read and dlt.readStream?

So our team at the moment would like to set up a DLT pipeline to achieve real time performance in data collection and analysis. We have a kafka connection set up as you did with readStream

The first DLT that we have returns the above mentioned kafka port. (Let us call it the "bronze_table")

Now further downstream we have other tables that need to read from that bronze_table.

  1. If I use dlt.readStream("bronze_table")
  • Does my 2nd table who reads from bronze_table only ingest the newly added data and does not consider the old data that was already here defined by the .option("startingOffsets", "earliest")? (i.e will I have the earliest offset with this read mode)
  1. If I use dlt.read("bronze_table")
  • If I run with this command instead, will my second DLT read the entirety of of the bronze_table once from the earliest offset to the most recent one when I ran the pipeline and then cease to update? (I know there is the CDC but honestly I am not sure when or when not to use that; the same thing applies for the SCD that is involved. If anyone can give an explanation on that too that would be perfect)
  • When the pipeline is running in "continous" mode instead of triggered. Does that mean the 2nd delta table that uses dlt.read (in scenario 2) will keep on updating to the newest data from the bronze_table?

please there is really not enough documentation about all this in general. I would appreciate any kind of feedback on this matter.
If you need more info I can provide that too

Update README for Wikipedia Pipeline

In Wikipedia DLT Pipeline Step 4

%sql
-- Review the top referrers to Wikipedia's Apache Spark articles
SELECT * FROM wiki_demo.wiki_spark

the name of the table wiki_spark should be changed totop_spark_referers instead.

Issue with read_kafka within a DLT using SQL

Hi @fmunz ,

I am tying to use the read_kafka function and populate my arguments, yet somehow, whenever I try to run the code it doesn't recognize the function.

Code is written as below:

CREATE  OR REFRESH STREAMING LIVE TABLE kafka_events_sql
  COMMENT 'The data ingested from kafka topic'
  AS SELECT
    *
  FROM STREAM read_kafka(
    bootstrapServers => 'xxx.xxx.azure.confluent.cloud:9092', 
    subscribe => 'fitness-tracker',
    startingOffsets => 'earliest',
    `kafka.security.protocol` => 'SASL_SSL',
    `kafka.sasl.mechanism` => 'PLAIN',
    `kafka.sasl.jaas.config` => 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="xxx" password="xxx";'
  );

The output that I get when running is below:

image

I am trying really hard to understand what is the issue and why this function wouldn't work no matter what i did.

Issue with approxQuantile() or usage of List within a DLT

HI,

I am tying to use the approxQuantile() function and populate a list that I made, yet somehow, whenever I try to run the code it's as if the list is empty and there are no values in it.

Code is written as below:

    @dlt.table(name = "customer_order_silver_v2")
    def capping_unitPrice_Qt():
        df = dlt.read("customer_order_silver")
        boundary_unit = [0,0]
        boundary_qty = [0,0]
        boundary_unit = df.select(col("UnitPrice")).approxQuantile('UnitPrice',[0.05,0.95], 0.25)
     
        boundary_qty = df.select(col("Quantity")).approxQuantile('Quantity',[0.05,0.95], 0.25)
     
     
        df = df.withColumn('UnitPrice', F.when(col('UnitPrice') > boundary_unit[1], boundary_unit[1])
                                           .when(col('UnitPrice') < boundary_unit[0], boundary_unit[0])
                                           .otherwise(col('UnitPrice')))
        
        df = df.withColumn('Quantity', F.when(col('Quantity') > boundary_qty[1], boundary_qty[1])
                                           .when(col('Quantity') < boundary_qty[0], boundary_qty[0])
                                           .otherwise(col('Quantity')))
                                              
        return df

The output that I get when running is below:

Screenshot_20230130_053953

I am trying really hard to understand what is the issue and why whatever I do the list is just not getting populated. approxQuantile should be returning a list so it does not make sense that it does not work.

Module not found error

I am attempting to utilize Databricks Delta Live Tables from a Python notebook. The initial step is to import the 'dlt' module, however, it fails and produces an error message that the module could not be found. Despite having a premium Databricks workspace and access to the new DLT Pipelines feature,

I am still trying to determine if the 'import dlt' command will work in the notebook without having to run it as part of a pipeline, as this would complicate the process significantly.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.