Giter VIP home page Giter VIP logo

connectors's People

Contributors

6a0juu avatar aamend avatar ahirreddy avatar allisonport-db avatar areese avatar cabbaggecollector avatar cheleb avatar dennyglee avatar fx196 avatar gbrueckl avatar gopik avatar grzegorz8 avatar handreassa avatar kination avatar kristoffsc avatar noelo avatar pablofloreshdz avatar pichlerpa avatar pkubit-g avatar rajagurunath avatar scottsand-db avatar shtusha avatar sonhmai avatar tdas avatar vkorukanti avatar windpiger avatar wwang-talend avatar yannbyron avatar zsxwing avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

connectors's Issues

Deltalake Atlas Connector

Hi everyone,

This Databricks article

https://databricks.com/discover/data-lakes/best-practices

discusses the possibility of using Apache Atlas for Data Cataloguing.

There are a number of software offerings that can make data cataloguing easier. The major cloud providers offer their own proprietary data catalog software offerings, namely Azure Data Catalog and AWS Glue. Outside of those, Apache Atlas is available as open source software, and other options include offerings from Alation, Collibra, and Informatica, to name a few.

I'm working with this idea. The problem is that there is no connector for Delta Lake and Atlas. I am trying to use the Spark connector that comes in Apache Atlas (built by Hortonwork) but I have compatibility problems. The thing is that the connector that use Apache Atlas has a Scala 2.11 version and Delta v0.7+ versions use Spark 3.0 with a Scala 2.12 version, so I can't connect it.

With Spark version 2.4 and Delta v0.6, I have been able to create an External Table from a file written in "delta" format. I have tried to make a "folk" of the Spark-Atlas-Connector project but there are dependencies that are written in Scala 2.11 that do not have Hortonwork released.

image

Any idea how I can evolve this theme?

Thanks in advance.
Best,

A generic connector support

I would like to purpose an enhancement request to add a generic, open standard JDBC connector support for Delta lake tables so that any JDBC SQL clients and BI tools able to connect and explore the data. Also, the connector should provide a plugin like an interface that others can extend and add new connectors easily. The current Hive connector has limited support and should support all hive engines since the delta lake connector is outside of Spark.

when support spark3.0

spark 3.0.0-rc1
delta 0.5.0

com.google.common.util.concurrent.ExecutionError: java.lang.NoSuchMethodError: org.apache.spark.util.Utils$.classForName(Ljava/lang/String;)Ljava/lang/Class;
  at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2261)
  at com.google.common.cache.LocalCache.get(LocalCache.java:4000)
  at com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4789)
  at org.apache.spark.sql.delta.DeltaLog$.apply(DeltaLog.scala:740)
  at org.apache.spark.sql.delta.DeltaLog$.forTable(DeltaLog.scala:712)
  at org.apache.spark.sql.delta.sources.DeltaDataSource.createRelation(DeltaDataSource.scala:169)
  at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:339)
  at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:279)
  at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:268)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:268)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:214)
  ... 47 elided
Caused by: java.lang.NoSuchMethodError: org.apache.spark.util.Utils$.classForName(Ljava/lang/String;)Ljava/lang/Class;
  at org.apache.spark.sql.delta.storage.LogStoreProvider.createLogStore(LogStore.scala:122)
  at org.apache.spark.sql.delta.storage.LogStoreProvider.createLogStore$(LogStore.scala:120)
  at org.apache.spark.sql.delta.DeltaLog.createLogStore(DeltaLog.scala:58)
  at org.apache.spark.sql.delta.storage.LogStoreProvider.createLogStore(LogStore.scala:117)
  at org.apache.spark.sql.delta.storage.LogStoreProvider.createLogStore$(LogStore.scala:115)
  at org.apache.spark.sql.delta.DeltaLog.createLogStore(DeltaLog.scala:58)
  at org.apache.spark.sql.delta.DeltaLog.<init>(DeltaLog.scala:79)
  at org.apache.spark.sql.delta.DeltaLog$$anon$3.$anonfun$call$2(DeltaLog.scala:744)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:194)
  at org.apache.spark.sql.delta.DeltaLog$$anon$3.$anonfun$call$1(DeltaLog.scala:744)
  at com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:77)
  at com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:67)
  at org.apache.spark.sql.delta.DeltaLog$.recordOperation(DeltaLog.scala:671)
  at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:103)
  at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:89)
  at org.apache.spark.sql.delta.DeltaLog$.recordDeltaOperation(DeltaLog.scala:671)
  at org.apache.spark.sql.delta.DeltaLog$$anon$3.call(DeltaLog.scala:743)
  at org.apache.spark.sql.delta.DeltaLog$$anon$3.call(DeltaLog.scala:740)
  at com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4792)
  at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
  at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
  at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
  at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2257)
  ... 58 more

Delta Source for Apache Flink

Build a Flink/Delta source (i.e., Flink reads from Delta Lake) potentially leveraging the Delta Standalone Reader. Expansion of issue #74 .

Design doc: https://docs.google.com/document/d/1v8QEEEaJoOIWKklYSmn3sC2egKSo3bRK_gZhqGUozxc/edit#heading=h.gjdgxs

PR Plan:

  • PR 5 - Option support in Bounded Mode - DONE
  • PR 6 - Continuous mode without monitoring for changes - DONE
  • PR 6.1 - Package Refactoring - DONE
  • PR 7 - Monitoring for changes in Continuous mode - DONE
  • PR 7.1 - More tests for PR 7 including IT case tests for Continuous mode. - DONE
  • PR 9 - Builder - DONE
  • PR 10 - Builder refactoring to extract Schema from Delta Log. - DONE
  • PR 10.1 - Tests for SourceUtils::buildSourceSchema, DeltaSourceBuilderBase::getSourceSchema and SnapshotSupplier implementations. - DONE
  • PR 11 - Partition Support using Delta log to identify partition columns. - DONE
  • PR 12 - Get BATCH_SIZE for Format builder from Source options - this value is currently hardcoded. Add validation for option names and values including using not applicable options for given mode via generic option method. - DONE
  • PR 12.1 More tests To option type conversion + handling option(...) validation errors. - DONE
  • PR 13 - Use new DeltaLog API getVersionAtOrAfterTimestamp for continuous getChanges call when using starting timestamp. This uses the "streaming" semantics of timestamp -> version conversion. - DONE
  • @scottsand-db -> review Java docs.
  • PR 14 - Additional IT tests including all options and end to end tests using Source and Sink. - DONE
  • After PR 14 run Sample job using Delta Sink/Source on a real cluster. - DONE
  • PR 15 - simple test case for remote file table path, similar to #340 and #341 - DONE
  • PR 16 - update README - DONE
  • PR 17 - update examples - DONE
  • PR 18 - add "columnNames" key support to .option(....)
  • PR 19 - Flink Sink integration test that creates a Delta Log checkpoint and asserts its correctness - DONE
  • PR 20 - update javadocs - ** DONE**

-- All functionalities are in place up to this point. The following PRs are extra tests, eventual bug fixes and code/javadoc adjustments. --

P1 Functionalities

  • Aggregate and report as one all option validation errors from Source Builder.
  • continuous from version 0, handling Metadata & Protocol actions.
  • source metrics. also update readme.

PR 11 - Annotate with @deprecated Sink's builder methods that names starts from "with". Add new ones without "with" prefix and overload partitions with List argument for Sink.

TODOs

  • Use index for keeping track of already processed paths in SnapshotProcessor - needs new Delta API.
  • Re-visit Delta Standalone APIs for when the source see's metadata/protocol actions. Update ActionProcessor class to handle MetaData and Protocol Actions

when support hive on spark

private def checkHiveConf(job: JobConf): Unit = {
    val engine = HiveConf.getVar(job, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE)
    val deltaFormat = classOf[HiveInputFormat].getName
    engine match {
      case "mr" =>
        if (HiveConf.getVar(job, HiveConf.ConfVars.HIVEINPUTFORMAT) != deltaFormat) {
          throw deltaFormatError(engine, HiveConf.ConfVars.HIVEINPUTFORMAT.varname, deltaFormat)
        }
      case "tez" =>
        if (HiveConf.getVar(job, HiveConf.ConfVars.HIVETEZINPUTFORMAT) != deltaFormat) {
          throw deltaFormatError(engine, HiveConf.ConfVars.HIVETEZINPUTFORMAT.varname, deltaFormat)
        }
      case other =>
        throw new UnsupportedOperationException(s"The execution engine '$other' is not supported." +
          s" Please set '${HiveConf.ConfVars.HIVE_EXECUTION_ENGINE.varname}' to 'mr' or 'tez'")
    }
  }

when use hive on spark. the full stack trace:

Job failed with java.lang.UnsupportedOperationException: The execution engine 'spark' is not supported. Please set 'hive.execution.engine' to 'mr' or 'tez'
FAILED: Execution Error, return code 3 from org.apache.hadoop.hive.ql.exec.spark.SparkTask. java.util.concurrent.ExecutionException: Exception thrown by job
        at org.apache.spark.JavaFutureActionWrapper.getImpl(FutureAction.scala:337)
        at org.apache.spark.JavaFutureActionWrapper.get(FutureAction.scala:342)
        at org.apache.hive.spark.client.RemoteDriver$JobWrapper.call(RemoteDriver.java:362)
        at org.apache.hive.spark.client.RemoteDriver$JobWrapper.call(RemoteDriver.java:323)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.UnsupportedOperationException: The execution engine 'spark' is not supported. Please set 'hive.execution.engine' to 'mr' or 'tez'
        at io.delta.hive.DeltaInputFormat.checkHiveConf(DeltaInputFormat.scala:121)
        at io.delta.hive.DeltaInputFormat.listStatus(DeltaInputFormat.scala:91)
        at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:315)
        at io.delta.hive.DeltaInputFormat.getSplits(DeltaInputFormat.scala:176)
        at org.apache.hadoop.hive.ql.io.HiveInputFormat.addSplitsForGroup(HiveInputFormat.java:442)
        at org.apache.hadoop.hive.ql.io.HiveInputFormat.getSplits(HiveInputFormat.java:561)
        at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:204)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
        at org.apache.spark.ShuffleDependency.<init>(Dependency.scala:94)
        at org.apache.spark.rdd.ShuffledRDD.getDependencies(ShuffledRDD.scala:87)
        at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:240)
        at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:238)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.dependencies(RDD.scala:238)
        at org.apache.spark.scheduler.DAGScheduler.getShuffleDependencies(DAGScheduler.scala:512)
        at org.apache.spark.scheduler.DAGScheduler.getOrCreateParentStages(DAGScheduler.scala:461)
        at org.apache.spark.scheduler.DAGScheduler.createResultStage(DAGScheduler.scala:448)
        at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:962)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2067)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)

The Delta table schema is not the same as the Hive schema.

Why is it inconsistent? I checked that the schema of delta table and hive table are correct and the quantity is the same

at org.apache.hadoop.util.RunJar.main(RunJar.java:227)
Caused by: MetaException(message:The Delta table schema is not the same as the Hive schema. Please update your Hive
table's schema to match the Delta table schema.
Delta table schema: StructType(StructField(types,StringType,true), StructField(tableName,StringType,true), StructField(primaryKey,StringType,true), StructField(timesmap,StringType,true), StructField(id,StringType,true), StructField(task_id,StringType,true), StructField(order_id,StringType,true), StructField(task_name,StringType,true), StructField(display_name,StringType,true), StructField(task_state,StringType,true), StructField(create_time,StringType,true), StructField(update_time,StringType,true), StructField(end_time,StringType,true), StructField(parent_task_id,StringType,true), StructField(variable,StringType,true), StructField(trade_id,StringType,true), StructField(partition,StringType,true))
Hive schema: structtypes:string,tablename:string,primarykey:string,timesmap:string,id:string,task_id:string,order_id:string,task_name:string,display_name:string,task_state:string,create_time:string,update_time:string,end_time:string,parent_task_id:string,variable:string,trade_id:string,partition:string)
at org.apache.spark.sql.delta.DeltaHelper$.metaInconsistencyException(DeltaHelper.scala:252)
at org.apache.spark.sql.delta.DeltaHelper$.checkTableSchema(DeltaHelper.scala:176)
at io.delta.hive.DeltaStorageHandler.preCreateTable(DeltaStorageHandler.scala:195)

Failed to create external table from Athena

I am running the following steps to integrate Athena with Delta Lake

from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forPath(spark, "s3://test-data/input/")
deltaTable.generate("symlink_format_manifest")

CREATED delta_db from GLUE.

CREATE EXTERNAL TABLE delta_db.mytable ( name string, hll binary )
PARTITIONED BY ( date string, partition_key string)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3://test-data/input/_symlink_format_manifest/';

Your query has the following error(s):
FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. java.lang.UnsupportedOperationException: Unknown field type: binary

Would need help to resolve this issue to query delta table data from Athena.

ERROR : Output Format must implement HiveOutputFormat, otherwise it should be either IgnoreKeyTextOutputFormat

CREATE EXTERNAL TABLE student(id string, name string, age string)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT 'io.delta.hive.DeltaInputFormat'
OUTPUTFORMAT 'io.delta.hive.DeltaOutputFormat'
LOCATION '/tmp/student';

20/04/03 09:59:16 ERROR ql.Driver: FAILED: SemanticException [Error 10055]: Output Format must implement HiveOutputFormat, otherwise it should be either IgnoreKeyTextOutputFormat or SequenceFileOutputFormat
org.apache.hadoop.hive.ql.parse.SemanticException: Output Format must implement HiveOutputFormat, otherwise it should be either IgnoreKeyTextOutputFormat or SequenceFileOutputFormat
at org.apache.hadoop.hive.ql.plan.CreateTableDesc.validate(CreateTableDesc.java:497)
at org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.analyzeCreateTable(SemanticAnalyzer.java:12063)
at org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.genResolvedParseTree(SemanticAnalyzer.java:11020)
at org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.analyzeInternal(SemanticAnalyzer.java:11133)
at org.apache.hadoop.hive.ql.parse.CalcitePlanner.analyzeInternal(CalcitePlanner.java:286)
at org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.analyze(BaseSemanticAnalyzer.java:258)
at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:512)
at org.apache.hadoop.hive.ql.Driver.compileInternal(Driver.java:1317)
at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1457)
at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1237)
at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1227)
at org.apache.hadoop.hive.cli.CliDriver.processLocalCmd(CliDriver.java:233)
at org.apache.hadoop.hive.cli.CliDriver.processCmd(CliDriver.java:184)
at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:403)
at org.apache.hadoop.hive.cli.CliDriver.executeDriver(CliDriver.java:821)
at org.apache.hadoop.hive.cli.CliDriver.run(CliDriver.java:759)
at org.apache.hadoop.hive.cli.CliDriver.main(CliDriver.java:686)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.util.RunJar.run(RunJar.java:226)
at org.apache.hadoop.util.RunJar.main(RunJar.java:141)

i need help...

Hive create external table giving error ..

After having the Deltatable and with some delta data at the location, attemped to create external table by following the steps mentioned in https://github.com/delta-io/connectors for hive connector, got the below error.
Appreciate you advise on how to address it.

FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. org/apache/hadoop/hive/metastore
/MetaStoreUtils

Delta Standalone Reader doesn't work with jackson-moudle-scala 2.10.0

Due to the binary incompatible change in jackson-moudle-scala 2.10.0, the following line in JsonUtils is compiled with jackson-moudle-scala 2.6.7.1, and it doesnโ€™t work when jackson-moudle-scala 2.10.0 is on the classpath.

val mapper = new ObjectMapper with ScalaObjectMapper

Verify the pushed partition filters are still valid

It's possible that the pushed partition filters were generated by a different snapshot when we are listing files. We should verify the partition filters are still valid, in other words, if partition columns are changed and the filters contain non partition columns, we should fail the query.

Delta Sink for Apache Flink Streams API

Build a Flink/Delta sink (i.e., Flink writes to Delta Lake) leveraging the Delta Standalone Writer per #85. This issue expands on #74

Design doc of the flink sink
This work is currently in progress. Here are a list of tasks that need to be completed.

  • Basic sink and writer framework - #196 and #213
  • Committable generation with ids needed for txn guarantees - #220
  • Committing to the Delta table with txn guarantees - #224
  • Schema validation and migration - #230
  • Partitioning support - #233
  • Package name fixes - #239
  • Detailed schema support tests with actual file writes and reads - #244
  • Documentation + Example project (maven, pom) - #236
  • Add more logging for easier debugging
  • Fix Maven artifact generation
  • Finalize sink public API
  • Update examples with latest .withPartitionKeys builder API
  • Verify Java docs

Follow up work to extend this sink to support table APIs is tracked by #238 .

Error: Failed to load class io.delta.connectors.spark.JDBC.ImportRunner.

Hi,

I have just tried to run it, in order to import some data from my SQL database to Azure Data Lake Storage. I am opening this issue, since I am not sure whether it's an actual bug, or I am doing something wrong.

Steps I have done:

  1. Upload sql_delta_import_2_12_0_2_1_SNAPSHOT.jar to my DBFS system.
  2. Create job with the following parameters:
["--class","io.delta.connectors.spark.JDBC.ImportRunner",
 "/jars/sql_delta_import_2_12_0_2_1_SNAPSHOT.jar",
"--jdbc", "jdbc:sqlserver:/myserver.database.windows.net:1433;database=mydatabase;user=myuser;password=mypass;encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30",
"--source","sourcedb.sourcetable",
"--destination", "targetdb.targettable",
"--split-by", "PersonID"]

The job fails with the following message.

OpenJDK 64-Bit Server VM warning: ignoring option MaxPermSize=512m; support was removed in 8.0
Warning: Ignoring non-Spark config property: libraryDownload.sleepIntervalSeconds
Warning: Ignoring non-Spark config property: libraryDownload.timeoutSeconds
Warning: Ignoring non-Spark config property: eventLog.rolloverIntervalSeconds
Error: Failed to load class io.delta.connectors.spark.JDBC.ImportRunner.

I was not able to re-produce it with other scala jars.

Cluster specs:

{
    "num_workers": 1,
    "cluster_name": "",
    "spark_version": "8.1.x-scala2.12",
    "spark_conf": {
        "spark.databricks.delta.preview.enabled": "true"
    },
    "azure_attributes": {
        "first_on_demand": 1,
        "availability": "SPOT_WITH_FALLBACK_AZURE",
        "spot_bid_max_price": -1
    },
    "node_type_id": "Standard_DS3_v2",
    "ssh_public_keys": [],
    "custom_tags": {},
    "spark_env_vars": {
        "PYSPARK_PYTHON": "/databricks/python3/bin/python3"
    },
    "enable_elastic_disk": true,
    "cluster_source": "JOB",
    "init_scripts": []
}

Runtime 8.1

Can you use this hive connector with pyspark?

I see the configuration provided for scala and java and the usage for the same. Is there a way to read delta lake tables using hive by creating external tables using pyspark?
Or do I need to create external tables on presto and access the tables from there?

Pulsar + Delta Integration

Motivation

Pulsar is a cloud-native messaging and streaming platform. It has been well adopted by many companies in different regions. There is a need requested from the Pulsar community to have a native integration between Pulsar and Delta lake.

There can be many different integrations between Pulsar and Delta Lake. Here are some ideas:

  1. A Pulsar source connector to capture the data changes from a Delta Lake table.
  2. A Pulsar sink connector to write the Pulsar streams to Delta Lake.

This is a master issue for tracking the progress on integrating Pulsar and Delta Lake.

PowerBI dataset doesn't refresh with Delta connector and Data Lake Gen2

I'm using the Delta connector with a Data Lake Storage Gen2 source. When in PowerBI desktop everything works, I can preview the tables and refresh the content, and the experience is flawless.

= fn_ReadDeltaTable(AzureStorage.DataLake("https://Data_Lake_URL_to_Delta_table"), [])

However, after publishing the dashboard, it's no longer possible to refresh the dataset.
The error is related with the dataset not being a dynamic data source.

Something went wrong
This dataset includes a dynamic data source. Since dynamic data sources aren't refreshed in the Power BI service, this dataset won't be refreshed. Learn more: https://aka.ms/dynamic-data-sources.
Please try again later or contact support. If you contact support, please provide these details.

Hive SQL counts queries are not working on Hive tables created on Delta format

Hive external tables created on Delta format paths are working for simple select queries but not for aggregated count queries.

Steps to reproduce

  1. Added delta-hive assembly jar to classpath using following approach
<property>
          <name>hive.aux.jars.path</name>
          <value>s3://helpshift-test-bucket/emrpoc/workspace/abhishekgupta/delta-hive-assembly_2.11-0.2.0.jar</value>
  </property>
  1. Restarted Hive server
    sudo systemctl stop hive-server2.service
    sudo systemctl start hive-server2.service

On Hive CLI

  1. hive
SET hive.input.format=io.delta.hive.HiveInputFormat;
SET hive.tez.input.format=io.delta.hive.HiveInputFormat;
SET hive.execution.engine=tez;
  1. Create Ext Table
CREATE EXTERNAL TABLE deltaTable(domain string, app_id string, app_name string, platform_id string, event_date date)
STORED BY 'io.delta.hive.DeltaStorageHandler'
LOCATION 's3://helpshift-test-bucket/emrpoc/sinks/sdk_events_test_delta1';
select * from deltatable;
>> results show up
select count(*) from deltatable;
>> Error - java.lang.ClassNotFoundException: Class io.delta.hive.HiveInputFormat not found

Stack trace

DAG did not succeed due to VERTEX_FAILURE. failedVertices:1 killedVertices:1
FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.tez.TezTask. Vertex failed, vertexName=Map 1, vertexId=vertex_1617974054397_0025_1_00, diagnostics=[Vertex vertex_1617974054397_0025_1_00 [Map 1] killed/failed due to:ROOT_INPUT_INIT_FAILURE, Vertex Input: deltatable initializer failed, vertex=vertex_1617974054397_0025_1_00 [Map 1], org.apache.tez.dag.api.TezUncheckedException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class io.delta.hive.HiveInputFormat not found
        at org.apache.tez.mapreduce.hadoop.MRInputHelpers.generateOldSplits(MRInputHelpers.java:472)
        at org.apache.tez.mapreduce.hadoop.MRInputHelpers.generateInputSplitsToMem(MRInputHelpers.java:337)
        at org.apache.tez.mapreduce.common.MRInputAMSplitGenerator.initialize(MRInputAMSplitGenerator.java:122)
        at org.apache.tez.dag.app.dag.RootInputInitializerManager$InputInitializerCallable$1.run(RootInputInitializerManager.java:278)
        at org.apache.tez.dag.app.dag.RootInputInitializerManager$InputInitializerCallable$1.run(RootInputInitializerManager.java:269)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1926)
        at org.apache.tez.dag.app.dag.RootInputInitializerManager$InputInitializerCallable.call(RootInputInitializerManager.java:269)
        at org.apache.tez.dag.app.dag.RootInputInitializerManager$InputInitializerCallable.call(RootInputInitializerManager.java:253)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class io.delta.hive.HiveInputFormat not found
        at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2460)
        at org.apache.hadoop.mapred.JobConf.getInputFormat(JobConf.java:707)
        at org.apache.tez.mapreduce.hadoop.MRInputHelpers.generateOldSplits(MRInputHelpers.java:470)
        ... 13 more
Caused by: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class io.delta.hive.HiveInputFormat not found
        at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2428)
        at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2452)
        ... 15 more
Caused by: java.lang.ClassNotFoundException: Class io.delta.hive.HiveInputFormat not found
        at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2332)
        at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2426)
        ... 16 more

Get error with Spark and Hive when using this connector

Hi, I want to use Delta Lake together with Hive in Spark, but I got some "Operation not allowed: STORED BY" error. I checked this error but found nothing helpful.

I wonder is it because this Hive connector is not supported when using Hive in Spark?

BTW, I tried running the create delta table SQL directly using hive CLI on the Hive machine, and it was working.

Below is how I start the spark session:

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master('local')\
    .config("spark.databricks.delta.retentionDurationCheck.enabled", "false") \
    .config('spark.delta.logStore.class', 'org.apache.spark.sql.delta.storage.S3SingleDriverLogStore') \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .enableHiveSupport() \
    .getOrCreate()

I got error when I ran the code below:

spark.sql("""
CREATE EXTERNAL TABLE deltaHiveTable (id BIGINT) 
STORED BY 'io.delta.hive.DeltaStorageHandler' 
LOCATION 's3a://my_bucket/my-delta-table/'
""")
Py4JJavaError: An error occurred while calling o69.sql.
: org.apache.spark.sql.catalyst.parser.ParseException: 
Operation not allowed: STORED BY(line 3, pos 0)

== SQL ==

CREATE EXTERNAL TABLE deltaHiveTable (id BIGINT) 
STORED BY 'io.delta.hive.DeltaStorageHandler' 
^^^
LOCATION 's3a://my_bucket/my-delta-table/'

	at org.apache.spark.sql.catalyst.parser.ParserUtils$.operationNotAllowed(ParserUtils.scala:41)
	at org.apache.spark.sql.execution.SparkSqlAstBuilder$$anonfun$visitCreateFileFormat$1.apply(SparkSqlParser.scala:1251)
	at org.apache.spark.sql.execution.SparkSqlAstBuilder$$anonfun$visitCreateFileFormat$1.apply(SparkSqlParser.scala:1243)
	at org.apache.spark.sql.catalyst.parser.ParserUtils$.withOrigin(ParserUtils.scala:108)
	at org.apache.spark.sql.execution.SparkSqlAstBuilder.visitCreateFileFormat(SparkSqlParser.scala:1242)
	at org.apache.spark.sql.execution.SparkSqlAstBuilder$$anonfun$visitCreateHiveTable$1$$anonfun$34.apply(SparkSqlParser.scala:1146)
	at org.apache.spark.sql.execution.SparkSqlAstBuilder$$anonfun$visitCreateHiveTable$1$$anonfun$34.apply(SparkSqlParser.scala:1146)
	at scala.Option.map(Option.scala:146)
	at org.apache.spark.sql.execution.SparkSqlAstBuilder$$anonfun$visitCreateHiveTable$1.apply(SparkSqlParser.scala:1146)
	at org.apache.spark.sql.execution.SparkSqlAstBuilder$$anonfun$visitCreateHiveTable$1.apply(SparkSqlParser.scala:1113)
	at org.apache.spark.sql.catalyst.parser.ParserUtils$.withOrigin(ParserUtils.scala:108)
	at org.apache.spark.sql.execution.SparkSqlAstBuilder.visitCreateHiveTable(SparkSqlParser.scala:1113)
	at org.apache.spark.sql.execution.SparkSqlAstBuilder.visitCreateHiveTable(SparkSqlParser.scala:55)
	at org.apache.spark.sql.catalyst.parser.SqlBaseParser$CreateHiveTableContext.accept(SqlBaseParser.java:1215)
	at org.antlr.v4.runtime.tree.AbstractParseTreeVisitor.visit(AbstractParseTreeVisitor.java:18)
	at org.apache.spark.sql.catalyst.parser.AstBuilder$$anonfun$visitSingleStatement$1.apply(AstBuilder.scala:72)
	at org.apache.spark.sql.catalyst.parser.AstBuilder$$anonfun$visitSingleStatement$1.apply(AstBuilder.scala:72)
	at org.apache.spark.sql.catalyst.parser.ParserUtils$.withOrigin(ParserUtils.scala:108)
	at org.apache.spark.sql.catalyst.parser.AstBuilder.visitSingleStatement(AstBuilder.scala:71)
	at org.apache.spark.sql.catalyst.parser.AbstractSqlParser$$anonfun$parsePlan$1.apply(ParseDriver.scala:70)
	at org.apache.spark.sql.catalyst.parser.AbstractSqlParser$$anonfun$parsePlan$1.apply(ParseDriver.scala:69)
	at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:100)
	at org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:48)
	at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parsePlan(ParseDriver.scala:69)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:642)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)

Count(column_name) fails in Hive

Hi All,

I have set up delta hive connector jars in Hive CLI correctly, and I am to execute the below query successfully.
Select * from <tableName>

But, count(*) query is failing in hive. Any idea why this is failing ?

select count(*) from <tableName>

Exception I see in debug mode is below.

select count(cluster_id) from t1; WARNING: Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider using a different execution engine (i.e. spark, tez) or using Hive 1.X releases. Query ID = hadoop_20210520122641_aaf06a11-3754-4a3f-847e-359b1c0f60fc Total jobs = 1 Launching Job 1 out of 1 Number of reduce tasks determined at compile time: 1 In order to change the average load for a reducer (in bytes): set hive.exec.reducers.bytes.per.reducer=<number> In order to limit the maximum number of reducers: set hive.exec.reducers.max=<number> In order to set a constant number of reducers: set mapreduce.job.reduces=<number> java.lang.NullPointerException at java.io.DataOutputStream.writeUTF(DataOutputStream.java:347) at java.io.DataOutputStream.writeUTF(DataOutputStream.java:323) at io.delta.hive.PartitionColumnInfo.write(PartitionColumnInfo.scala:41) at io.delta.hive.DeltaInputSplit.write(DeltaInputSplit.java:58) at org.apache.hadoop.hive.ql.io.HiveInputFormat$HiveInputSplit.write(HiveInputFormat.java:188) at org.apache.hadoop.mapreduce.split.JobSplitWriter.writeOldSplits(JobSplitWriter.java:164) at org.apache.hadoop.mapreduce.split.JobSplitWriter.createSplitFiles(JobSplitWriter.java:92) at org.apache.hadoop.mapreduce.JobSubmitter.writeOldSplits(JobSubmitter.java:352) at org.apache.hadoop.mapreduce.JobSubmitter.writeSplits(JobSubmitter.java:322) at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:198) at org.apache.hadoop.mapreduce.Job$11.run(Job.java:1341) at org.apache.hadoop.mapreduce.Job$11.run(Job.java:1338) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844) at org.apache.hadoop.mapreduce.Job.submit(Job.java:1338) at org.apache.hadoop.mapred.JobClient$1.run(JobClient.java:575) at org.apache.hadoop.mapred.JobClient$1.run(JobClient.java:570) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844) at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:570) at org.apache.hadoop.mapred.JobClient.submitJob(JobClient.java:561) at org.apache.hadoop.hive.ql.exec.mr.ExecDriver.execute(ExecDriver.java:414) at org.apache.hadoop.hive.ql.exec.mr.MapRedTask.execute(MapRedTask.java:151) at org.apache.hadoop.hive.ql.exec.Task.executeTask(Task.java:199) at org.apache.hadoop.hive.ql.exec.TaskRunner.runSequential(TaskRunner.java:100) at org.apache.hadoop.hive.ql.Driver.launchTask(Driver.java:2183) at org.apache.hadoop.hive.ql.Driver.execute(Driver.java:1839) at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1526) at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1237) at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1227) at org.apache.hadoop.hive.cli.CliDriver.processLocalCmd(CliDriver.java:233) at org.apache.hadoop.hive.cli.CliDriver.processCmd(CliDriver.java:184) at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:403) at org.apache.hadoop.hive.cli.CliDriver.executeDriver(CliDriver.java:821) at org.apache.hadoop.hive.cli.CliDriver.run(CliDriver.java:759) at org.apache.hadoop.hive.cli.CliDriver.main(CliDriver.java:686) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.hadoop.util.RunJar.run(RunJar.java:239) at org.apache.hadoop.util.RunJar.main(RunJar.java:153) Job Submission failed with exception 'java.lang.NullPointerException(null)'

Cannot view columns from Delta Hive tables in DBeaver or when connecting to PowerBI

I'm using PySpark to save Delta-Hive tables for Spark Thrift Server to access so the data can be queried externally or connected to a BI Tool such as PowerBI. When trying to load the table into PowerBI directly, I get the error This query does not have any columns with the supported data types. It will be disabled from loaded to the model. However, when I do a custom SQL query import using SELECT * FROM delta_table; it loads just fine.

When viewing the table in DBeaver, I'm able to query the tables, but cannot see any information or metadata regarding the data. I'm guessing this is because the Delta connector only gives HIVE enough information to create the tables. Are there ways I can access the metadata via a Hive connection or are there other ways I should be approaching this?

Run unit tests in Tez mode

Tez is the default execution engine in Amazon EMR. It would be great that we can run our unit tests in Tez mode so that we can make sure it's working out of the box.

PowerBI: Add support for TIMESTAMP AS OF

Beside the already existing support for VERSION AS OF, it should also be possible to query a Delta lake table using TIMESTAMP AS OF in a similar way - e.g. as part of the optional second parameter

CREATE EXTERNAL TABLE fails with java.lang.IllegalArgumentException: System memory 242745344 must be at least 471859200

I try to create an external table using sample delta data (converted from regular parquet files here: https://github.com/Teradata/kylo/tree/master/samples/sample-data/parquet):

CREATE EXTERNAL TABLE `default.userdata_delta` (
    `registration_dttm`timestamp,
    `id` int,
    `first_name` string,
    `last_name` string,
    `email` string,
    `gender` string,
    `ip_address` string,
    `cc` string,
    `country` string,
    `birthdate` string,
    `salary` double,
    `title` string,
    `comments` string
)
STORED BY 'io.delta.hive.DeltaStorageHandler'  
LOCATION
  's3a://...'

I receive the following error:

java.lang.IllegalArgumentException: System memory 242745344 must be at least 471859200. Please increase heap size using the --driver-memory option or spark.driver.memory in Spark configuration.

Traceback:

2020-08-28 12:57:13,548 INFO  [fcf5ca99-91dd-47a8-b2e7-c37fb5e239b4 main] ql.Driver: Starting task [Stage-0:DDL] in serial mode
2020-08-28 12:57:13,550 INFO  [fcf5ca99-91dd-47a8-b2e7-c37fb5e239b4 main] plan.CreateTableDesc: Use StorageHandler-supplied org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe for table userdata_delta
2020-08-28 12:57:13,553 INFO  [fcf5ca99-91dd-47a8-b2e7-c37fb5e239b4 main] exec.DDLTask: creating table default.userdata_delta on s3a://dataeng-data-test/dev/coredata/eval/kylin/userdata_delta
2020-08-28 12:57:14,024 INFO  [fcf5ca99-91dd-47a8-b2e7-c37fb5e239b4 main] spark.SparkContext: Running Spark version 2.4.3
2020-08-28 12:57:14,054 INFO  [fcf5ca99-91dd-47a8-b2e7-c37fb5e239b4 main] spark.SparkContext: Submitted application: Delta Connector
2020-08-28 12:57:14,119 INFO  [fcf5ca99-91dd-47a8-b2e7-c37fb5e239b4 main] spark.SecurityManager: Changing view acls to: root
2020-08-28 12:57:14,120 INFO  [fcf5ca99-91dd-47a8-b2e7-c37fb5e239b4 main] spark.SecurityManager: Changing modify acls to: root
2020-08-28 12:57:14,120 INFO  [fcf5ca99-91dd-47a8-b2e7-c37fb5e239b4 main] spark.SecurityManager: Changing view acls groups to: 
2020-08-28 12:57:14,121 INFO  [fcf5ca99-91dd-47a8-b2e7-c37fb5e239b4 main] spark.SecurityManager: Changing modify acls groups to: 
2020-08-28 12:57:14,122 INFO  [fcf5ca99-91dd-47a8-b2e7-c37fb5e239b4 main] spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(root); groups with view permissions: Set(); users  with modify permissions: Set(root); groups with modify permissions: Set()
2020-08-28 12:57:14,413 INFO  [fcf5ca99-91dd-47a8-b2e7-c37fb5e239b4 main] util.Utils: Successfully started service 'sparkDriver' on port 36643.
2020-08-28 12:57:14,439 INFO  [fcf5ca99-91dd-47a8-b2e7-c37fb5e239b4 main] spark.SparkEnv: Registering MapOutputTracker
2020-08-28 12:57:14,451 ERROR [fcf5ca99-91dd-47a8-b2e7-c37fb5e239b4 main] spark.SparkContext: Error initializing SparkContext.
java.lang.IllegalArgumentException: System memory 242745344 must be at least 471859200. Please increase heap size using the --driver-memory option or spark.driver.memory in Spark configuration.
	at org.apache.spark.memory.UnifiedMemoryManager$.getMaxMemory(UnifiedMemoryManager.scala:219)
	at org.apache.spark.memory.UnifiedMemoryManager$.apply(UnifiedMemoryManager.scala:199)
	at org.apache.spark.SparkEnv$.create(SparkEnv.scala:330)
	at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:185)
	at org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:257)
	at org.apache.spark.SparkContext.<init>(SparkContext.scala:424)
	at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2520)
	at org.apache.spark.sql.SparkSession$Builder.$anonfun$getOrCreate$5(SparkSession.scala:935)
	at scala.Option.getOrElse(Option.scala:138)
	at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:926)
	at org.apache.spark.sql.delta.DeltaHelper$.spark(DeltaHelper.scala:348)
	at org.apache.spark.sql.delta.DeltaHelper$.loadDeltaLatestSnapshot(DeltaHelper.scala:178)
	at io.delta.hive.DeltaStorageHandler.preCreateTable(DeltaStorageHandler.scala:189)
	at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.createTable(HiveMetaStoreClient.java:747)
	at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.createTable(HiveMetaStoreClient.java:740)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:173)
	at com.sun.proxy.$Proxy36.createTable(Unknown Source)
	at org.apache.hadoop.hive.ql.metadata.Hive.createTable(Hive.java:852)
	at org.apache.hadoop.hive.ql.metadata.Hive.createTable(Hive.java:867)
	at org.apache.hadoop.hive.ql.exec.DDLTask.createTable(DDLTask.java:4356)
	at org.apache.hadoop.hive.ql.exec.DDLTask.execute(DDLTask.java:354)
	at org.apache.hadoop.hive.ql.exec.Task.executeTask(Task.java:199)
	at org.apache.hadoop.hive.ql.exec.TaskRunner.runSequential(TaskRunner.java:100)
	at org.apache.hadoop.hive.ql.Driver.launchTask(Driver.java:2183)
	at org.apache.hadoop.hive.ql.Driver.execute(Driver.java:1839)
	at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1526)
	at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1237)
	at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1227)
	at org.apache.hadoop.hive.cli.CliDriver.processLocalCmd(CliDriver.java:233)
	at org.apache.hadoop.hive.cli.CliDriver.processCmd(CliDriver.java:184)
	at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:403)
	at org.apache.hadoop.hive.cli.CliDriver.executeDriver(CliDriver.java:821)
	at org.apache.hadoop.hive.cli.CliDriver.run(CliDriver.java:759)
	at org.apache.hadoop.hive.cli.CliDriver.main(CliDriver.java:686)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.hadoop.util.RunJar.run(RunJar.java:239)
	at org.apache.hadoop.util.RunJar.main(RunJar.java:153)
2020-08-28 12:57:14,465 INFO  [fcf5ca99-91dd-47a8-b2e7-c37fb5e239b4 main] spark.SparkContext: Successfully stopped SparkContext
2020-08-28 12:57:14,475 ERROR [fcf5ca99-91dd-47a8-b2e7-c37fb5e239b4 main] exec.DDLTask: SparkSession.scala:926)
	at org.apache.spark.sql.delta.DeltaHelper$.spark(DeltaHelper.scala:348)
	at org.apache.spark.sql.delta.DeltaHelper$.loadDeltaLatestSnapshot(DeltaHelper.scala:178)
	at io.delta.hive.DeltaStorageHandler.preCreateTable(DeltaStorageHandler.scala:189)
	at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.createTable(HiveMetaStoreClient.java:747)
	at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.createTable(HiveMetaStoreClient.java:740)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:173)
	at com.sun.proxy.$Proxy36.createTable(Unknown Source)
	at org.apache.hadoop.hive.ql.metadata.Hive.createTable(Hive.java:852)
	... 22 more

FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. java.lang.IllegalArgumentException: System memory 242745344 must be at least 471859200. Please increase heap size using the --driver-memory option or spark.driver.memory in Spark configuration.

Since the Hive connector uses its own local Spark, I cannot configure this setting through the normal means (spark-defaults.conf). Also it turns out the error message is misleading - the value that needs to be adjusted is not spark.driver.memory, it is spark.testing.memory.

I added .config("spark.testing.memory", 536870912) after line 348 here: https://github.com/delta-io/connectors/blob/master/hive/src/main/scala/org/apache/spark/sql/delta/DeltaHelper.scala#L348

Then I rebuilt the JARs and now I am able to create the external table as expected.

This link was helpful: https://www.waitingforcode.com/apache-spark/troubleshooting-system-memory-must-be-at-least-error/read

And here is the relevant code in org.apache.spark.memory.UnifiedMemoryManeger.getMaxMemory: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala#L214

Counts are not working with hive table created for Delta format

Hi - I could able to query the delta format data by creating a hive table. But on count(*) of the hive table getting Class io.delta.hive.HiveInputFormat not found error.

org.apache.hive.service.cli.HiveSQLException: Error while processing statement: FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.tez.TezTask. Vertex failed, vertexName=Map 1, vertexId=vertex_1601279013282_0002_3_00, diagnostics=[Vertex vertex_1601279013282_0002_3_00 [Map 1] killed/failed due to:ROOT_INPUT_INIT_FAILURE, Vertex Input: caccounts initializer failed, vertex=vertex_1601279013282_0002_3_00 [Map 1], org.apache.tez.dag.api.TezUncheckedException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class io.delta.hive.HiveInputFormat not found at org.apache.tez.mapreduce.hadoop.MRInputHelpers.generateOldSplits(MRInputHelpers.java:472) at org.apache.tez.mapreduce.hadoop.MRInputHelpers.generateInputSplitsToMem(MRInputHelpers.java:337) at org.apache.tez.mapreduce.common.MRInputAMSplitGenerator.initialize(MRInputAMSplitGenerator.java:122) at org.apache.tez.dag.app.dag.RootInputInitializerManager$InputInitializerCallable$1.run(RootInputInitializerManager.java:278) at org.apache.tez.dag.app.dag.RootInputInitializerManager$InputInitializerCallable$1.run(RootInputInitializerManager.java:269) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844) at org.apache.tez.dag.app.dag.RootInputInitializerManager$InputInitializerCallable.call(RootInputInitializerManager.java:269) at org.apache.tez.dag.app.dag.RootInputInitializerManager$InputInitializerCallable.call(RootInputInitializerManager.java:253) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class io.delta.hive.HiveInputFormat not found at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2401) at org.apache.hadoop.mapred.JobConf.getInputFormat(JobConf.java:689) at org.apache.tez.mapreduce.hadoop.MRInputHelpers.generateOldSplits(MRInputHelpers.java:470) ... 13 more Caused by: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class io.delta.hive.HiveInputFormat not found at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2369) at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2393) ... 15 more Caused by: java.lang.ClassNotFoundException: Class io.delta.hive.HiveInputFormat not found at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2273) at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2367) ... 16 more ]Vertex killed, vertexName=Reducer 2, vertexId=vertex_1601279013282_0002_3_01, diagnostics=[Vertex received Kill in INITED state., Vertex vertex_1601279013282_0002_3_01 [Reducer 2] killed/failed due to:OTHER_VERTEX_FAILURE]DAG did not succeed due to VERTEX_FAILURE. failedVertices:1 killedVertices:1 at org.apache.hive.service.cli.operation.Operation.toSQLException(Operation.java:380) at org.apache.hive.service.cli.operation.SQLOperation.runQuery(SQLOperation.java:257) at org.apache.hive.service.cli.operation.SQLOperation.access$800(SQLOperation.java:91) at org.apache.hive.service.cli.operation.SQLOperation$BackgroundWork$1.run(SQLOperation.java:348) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844) at org.apache.hive.service.cli.operation.SQLOperation$BackgroundWork.run(SQLOperation.java:363) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Vertex failed, vertexName=Map 1, vertexId=vertex_1601279013282_0002_3_00, diagnostics=[Vertex vertex_1601279013282_0002_3_00 [Map 1] killed/failed due to:ROOT_INPUT_INIT_FAILURE, Vertex Input: caccounts initializer failed, vertex=vertex_1601279013282_0002_3_00 [Map 1], org.apache.tez.dag.api.TezUncheckedException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class io.delta.hive.HiveInputFormat not found at org.apache.tez.mapreduce.hadoop.MRInputHelpers.generateOldSplits(MRInputHelpers.java:472) at org.apache.tez.mapreduce.hadoop.MRInputHelpers.generateInputSplitsToMem(MRInputHelpers.java:337) at org.apache.tez.mapreduce.common.MRInputAMSplitGenerator.initialize(MRInputAMSplitGenerator.java:122) at org.apache.tez.dag.app.dag.RootInputInitializerManager$InputInitializerCallable$1.run(RootInputInitializerManager.java:278) at org.apache.tez.dag.app.dag.RootInputInitializerManager$InputInitializerCallable$1.run(RootInputInitializerManager.java:269) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844) at org.apache.tez.dag.app.dag.RootInputInitializerManager$InputInitializerCallable.call(RootInputInitializerManager.java:269) at org.apache.tez.dag.app.dag.RootInputInitializerManager$InputInitializerCallable.call(RootInputInitializerManager.java:253) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class io.delta.hive.HiveInputFormat not found at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2401) at org.apache.hadoop.mapred.JobConf.getInputFormat(JobConf.java:689) at org.apache.tez.mapreduce.hadoop.MRInputHelpers.generateOldSplits(MRInputHelpers.java:470) ... 13 more Caused by: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class io.delta.hive.HiveInputFormat not found at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2369) at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2393) ... 15 more Caused by: java.lang.ClassNotFoundException: Class io.delta.hive.HiveInputFormat not found at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2273) at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2367) ... 16 more ]Vertex killed, vertexName=Reducer 2, vertexId=vertex_1601279013282_0002_3_01, diagnostics=[Vertex received Kill in INITED state., Vertex vertex_1601279013282_0002_3_01 [Reducer 2] killed/failed due to:OTHER_VERTEX_FAILURE]DAG did not succeed due to VERTEX_FAILURE. failedVertices:1 killedVertices:1 at org.apache.hadoop.hive.ql.exec.tez.TezTask.execute(TezTask.java:196) at org.apache.hadoop.hive.ql.exec.Task.executeTask(Task.java:199) at org.apache.hadoop.hive.ql.exec.TaskRunner.runSequential(TaskRunner.java:100) at org.apache.hadoop.hive.ql.Driver.launchTask(Driver.java:2183) at org.apache.hadoop.hive.ql.Driver.execute(Driver.java:1839) at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1526) at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1237) at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1232) at org.apache.hive.service.cli.operation.SQLOperation.runQuery(SQLOperation.java:255) ... 11 more

(Question) Can Delta-Hive connector auto-detect new data?

Hello,

I am using the manifest-generation solution to connect Delta Lake with Presto. I am seeing decent latency between Delta Lake and Presto because of manifest regeneration. This Delta-Hive Connector does seem a nice alternative, so would like to know about it.

  • Initial solution to support Presto was to generate manifest every-time there was a change in Delta lake. Would we still need to do that with this connector?

  • Can this connector auto-detect new data from both partitioned and partitioned Delta tables as the Delta tables are updated?

  • Since this connector can detect partitions, want to clarify if there will be a need to separately run MSCK REPAIR TABLE

Thanks,
Amar

Convert build to GitHub Actions

Right now the project uses circleci which doesn't allow anonymous view of the build logs (I had to create an account ๐Ÿ˜’ )

The build looks pretty simple to convert to GitHub Actions, probably would be a fairly easy "good first issue" ๐Ÿ˜ธ

Support for Hive 3

Hi
I am following this connectors github link: https://github.com/delta-io/connectors/releases/v0.1.0

Hive version: 3.1.0

Added below jars to Hive CLI
delta-core-shaded-assembly_2.12-0.1.0.jar
hive-delta_2.12-0.1.0.jar

Executed below statements in Hive CLI
SET hive.input.format=io.delta.hive.HiveInputFormat;
SET hive.tez.input.format=io.delta.hive.HiveInputFormat;

Trying to create a hive external table, but facing the error :
CREATE EXTERNAL TABLE deltaTable(col1 INT, col2 STRING)
. . . . . . . . . . . . . . . . . . . . . . .> STORED BY 'io.delta.hive.DeltaStorageHandler';
INFO : Compiling command(queryId=hive_20200407061535_6cf6b42b-ef87-4af9-b2fa-3161b15840bb): CREATE EXTERNAL TABLE deltaTable(col1 INT, col2 STRING)
STORED BY 'io.delta.hive.DeltaStorageHandler'
INFO : Semantic Analysis Completed (retrial = false)
INFO : Returning Hive schema: Schema(fieldSchemas:null, properties:null)
INFO : Completed compiling command(queryId=hive_20200407061535_6cf6b42b-ef87-4af9-b2fa-3161b15840bb); Time taken: 0.028 seconds
INFO : Executing command(queryId=hive_20200407061535_6cf6b42b-ef87-4af9-b2fa-3161b15840bb): CREATE EXTERNAL TABLE deltaTable(col1 INT, col2 STRING)
STORED BY 'io.delta.hive.DeltaStorageHandler'
INFO : Starting task [Stage-0:DDL] in serial mode
ERROR : FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. org/apache/hadoop/hive/metastore/MetaStoreUtils
INFO : Completed executing command(queryId=hive_20200407061535_6cf6b42b-ef87-4af9-b2fa-3161b15840bb); Time taken: 0.002 seconds
Error: Error while processing statement: FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. org/apache/hadoop/hive/metastore/MetaStoreUtils (state=08S01,code=1)

Create Hive table from Delta Lake using HiveQL

Hi everyone,

I try to create one table in Hive from Delta lake but the Hive shell return me the following message:

Error: org.apache.hive.service.cli.HiveSQLException: Error while processing statement: FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. java.lang.IllegalArgumentException: a StructType must have at least one field

I have the next delta table:


> scala> spark.read.format("delta").load(pathToEventsTable).show
> +-------+-----+--------+------+-----------+
> |   date|delay|distance|origin|destination|
> +-------+-----+--------+------+-----------+
> |1011245|    6|     602|   ABE|        ATL|
> |1020600|   -8|     369|   ABE|        DTW|
> |1021245|   -2|     602|   ABE|        ATL|
> |1020605|   -4|     602|   ABE|        ATL|
> |1031245|   -4|     602|   ABE|        ATL|
> |1030605|    0|     602|   ABE|        ATL|
> |1041243|   10|     602|   ABE|        ATL|
> |1040605|   28|     602|   ABE|        ATL|
> |1051245|   88|     602|   ABE|        ATL|
> |1050605|    9|     602|   ABE|        ATL|
> |1061215|   -6|     602|   ABE|        ATL|
> |1061725|   69|     602|   ABE|        ATL|
> |1061230|    0|     369|   ABE|        DTW|
> |1060625|   -3|     602|   ABE|        ATL|
> |1070600|    0|     369|   ABE|        DTW|
> |1071725|    0|     602|   ABE|        ATL|
> |1071230|    0|     369|   ABE|        DTW|
> |1070625|    0|     602|   ABE|        ATL|
> |1071219|    0|     569|   ABE|        ORD|
> |1080600|    0|     369|   ABE|        DTW|
> +-------+-----+--------+------+-----------+
> only showing top 20 rows

> scala> spark.read.format("delta").load(pathToEventsTable).schema
> res22: org.apache.spark.sql.types.StructType = StructType(StructField(date,IntegerType,true), StructField(delay,IntegerType,true), StructField(distance,IntegerType,true), StructField(origin,StringType,true), StructField(destination,StringType,true))

> scala> print(pathToEventsTable)
> /tmp/departureDelays.delta`

I run CREATE TABLE in Hive shell:

CREATE EXTERNAL TABLE delta_tbl(`date` bigint, delay bigint, distance bigint, origin string, destination string)
STORED BY 'io.delta.hive.DeltaStorageHandler'
LOCATION '/tmp/departureDelays.delta';

or else

CREATE EXTERNAL TABLE delta_tbl(`date` bigint, delay bigint, distance bigint, origin string, destination string)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
WITH SERDEPROPERTIES("path" = "/tmp/departureDelays.delta'")
STORED BY 'io.delta.hive.DeltaStorageHandler'
LOCATION 'file:///tmp/departureDelays.delta';

Does anyone know what can happen?

Thanks in advance.
Best regards,

Unable to compile sql-delta-import

Trying to compile sql-delta-import, but facing the following error. I face same error with hive-tez too.

<my_server>:~/connectors-master$ build/sbt sql-delta-import/compile
Using /usr/lib/jvm/java-8-openjdk-amd64 as default JAVA_HOME.
Note, this will be overridden by -java-home if it is set.
[info] Loading project definition from /connectors-master/project
[info] Set current project to connectors-master (in build file:
/connectors-master/)
[error] Expected ID character
[error] Not a valid command: sql-delta-import
[error] Expected project ID
[error] Expected configuration
[error] Expected ':' (if selecting a configuration)
[error] Expected key
[error] Not a valid key: sql-delta-import
[error] sql-delta-import/compile
[error] ^

Can't able to create delta table in Hive 2.3.8 - Getting Execution Error from org.apache.hadoop.hive.ql.exec.DDLTask. org.json4s.jackson.JsonMethods$.parse$...

My environment:
HDP: 3.1.4
Ambari: Version 2.7.4
Hive: 2.3.8 (Installed as a standalone one)
Spark: 3.0.1 (Pre-built for Hadoop 2.7)

Step 1:
In my Hive 2, I have added the following properties in hive-site.xml:

<property>
  <name>hive.input.format</name>
  <value>io.delta.hive.HiveInputFormat</value>
</property>
<property>
  <name>hive.tez.input.format</name>
  <value>io.delta.hive.HiveInputFormat</value>
</property>

Step 2:
Created a simple delta data using Apach Spark (pyspark) as shown below:

data = spark.range(0, 5)
data.write.format("delta").save("/delta/table/path")

Step 3:
In my Hive 2 CLI, I have added Hive connector uber jar using the below statement:

ADD JAR <path-to-jar>;

Step 4:
In Hive 2 CLI, I tried creating an external delta table as shown below:

CREATE EXTERNAL TABLE deltaTable(col1 INT, col2 STRING)
STORED BY 'io.delta.hive.DeltaStorageHandler'
LOCATION '/delta/table/path'

Note: Location is a HDFS path where I store the delta data. Hence, I used hdfs:// as my scheme.

I am not able to create a delta table in Hive and I am getting the below error:

**FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. org.json4s.jackson.JsonMethods$.parse$default$3()Z**

Greatly appreciate if anyone could help in this regard. Thanks!

Flink Connector

it would be amazing if it was possible to read Delta formatted data in Apache Flink

Error while reading Delta table from S3 using standalone reader

I am trying to read a delta table from S3 using delta-standalone_2.12 version 0.2.0 , i provided the the access key and the secret key in the org.apache.hadoop.conf.Configuration version 2.10.1 and i get the error java.lang.ArrayIndexOutOfBoundsException: 12111, for the versions 3.3.0 i get the error java.lang.ClassNotFoundException: com.fasterxml.jackson.annotation.JsonMerge .
Greatly appreciate if anyone could help in this regard. Thanks!

Support both Scala 2.11 and 2.12

We run Zeppelin 0.8.2 in our infrastructure and the Spark interpreter in that version doesn't support Scala 2.12, there is a fix in place that won't be released until Zeppelin 0.9.0 (https://jira.apache.org/jira/browse/ZEPPELIN-3552)

To support Zeppelin we build out everything, including spark from source, to use Scala 2.11

Currently there is no documented way to build the Delta connectors using Scala 2.11 outside of potentially modifying the variable set here: https://github.com/delta-io/connectors/blob/master/build.sbt#L31

DeltaLog java.io.IOException: Filesystem closed

when use hive jdbc,hive metastore operation error, the full stack trace:

Caused by: java.io.IOException: Filesystem closed
        at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:817)
        at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:2114)
        at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1305)
        at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301)
        at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
        at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1301)
        at org.apache.spark.sql.delta.DeltaLogFileIndex$$anonfun$apply$1.apply(DeltaLogFileIndex.scala:56)
        at org.apache.spark.sql.delta.DeltaLogFileIndex$$anonfun$apply$1.apply(DeltaLogFileIndex.scala:56)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.immutable.List.map(List.scala:296)
        at org.apache.spark.sql.delta.DeltaLogFileIndex$.apply(DeltaLogFileIndex.scala:56)
        at org.apache.spark.sql.delta.DeltaLog$$anonfun$org$apache$spark$sql$delta$DeltaLog$$updateInternal$1$$anonfun$apply$7.apply(DeltaLog.scala:313)
        at org.apache.spark.sql.delta.DeltaLog$$anonfun$org$apache$spark$sql$delta$DeltaLog$$updateInternal$1$$anonfun$apply$7.apply(DeltaLog.scala:275)
        at org.apache.spark.sql.delta.util.DeltaProgressReporter$class.withStatusCode(DeltaProgressReporter.scala:30)
        at org.apache.spark.sql.delta.DeltaLog.withStatusCode(DeltaLog.scala:58)
        at org.apache.spark.sql.delta.DeltaLog$$anonfun$org$apache$spark$sql$delta$DeltaLog$$updateInternal$1.apply(DeltaLog.scala:275)
        at org.apache.spark.sql.delta.DeltaLog$$anonfun$org$apache$spark$sql$delta$DeltaLog$$updateInternal$1.apply(DeltaLog.scala:275)
        at com.databricks.spark.util.DatabricksLogging$class.recordOperation(DatabricksLogging.scala:77)
        at org.apache.spark.sql.delta.DeltaLog.recordOperation(DeltaLog.scala:58)
        at org.apache.spark.sql.delta.metering.DeltaLogging$class.recordDeltaOperation(DeltaLogging.scala:103)
        at org.apache.spark.sql.delta.DeltaLog.recordDeltaOperation(DeltaLog.scala:58)
        at org.apache.spark.sql.delta.DeltaLog.org$apache$spark$sql$delta$DeltaLog$$updateInternal(DeltaLog.scala:274)
        at org.apache.spark.sql.delta.DeltaLog$$anonfun$update$2.apply(DeltaLog.scala:235)
        at org.apache.spark.sql.delta.DeltaLog$$anonfun$update$2.apply(DeltaLog.scala:235)
        at org.apache.spark.sql.delta.DeltaLog.lockInterruptibly(DeltaLog.scala:207)
        at org.apache.spark.sql.delta.DeltaLog.update(DeltaLog.scala:234)
        at org.apache.spark.sql.delta.DeltaHelper$.loadDeltaLatestSnapshot(DeltaHelper.scala:178)
        at io.delta.hive.DeltaStorageHandler.preCreateTable(DeltaStorageHandler.scala:189)
        at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.createTable(HiveMetaStoreClient.java:747)
        at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.createTable(HiveMetaStoreClient.java:740)
        at sun.reflect.GeneratedMethodAccessor290.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:173)
        ... 29 more

Illegal Access Error while querying the created hive table through deltalake connector

Hive version : 2.6
delta lake connector version : delta-hive-assembly_2.11-0.2.0.jar
Spark version : 3.1.1
Scala version :2.12
delta-core : 1.0.0

I have created a deltalake table using spark, and then I am able to create a hive table out of it using delta lake connector.

When we tried to access the created table in hive, it is throwing below error.

Delta lake table is working fine when I query it through spark.

Error Trace

Exception in thread "main" java.lang.IllegalAccessError: tried to access method org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport.getColumnNames(Ljava/lang/String;)Ljava/util/List; from class io.delta.hive.DeltaStorageHandler
at io.delta.hive.DeltaStorageHandler.configureInputJobProperties(DeltaStorageHandler.scala:71)
at org.apache.hadoop.hive.ql.plan.PlanUtils.configureJobPropertiesForStorageHandler(PlanUtils.java:820)
at org.apache.hadoop.hive.ql.plan.PlanUtils.configureInputJobPropertiesForStorageHandler(PlanUtils.java:790)
at org.apache.hadoop.hive.ql.optimizer.SimpleFetchOptimizer$FetchData.convertToWork(SimpleFetchOptimizer.java:385)
at org.apache.hadoop.hive.ql.optimizer.SimpleFetchOptimizer$FetchData.access$000(SimpleFetchOptimizer.java:323)
at org.apache.hadoop.hive.ql.optimizer.SimpleFetchOptimizer.optimize(SimpleFetchOptimizer.java:134)
at org.apache.hadoop.hive.ql.optimizer.SimpleFetchOptimizer.transform(SimpleFetchOptimizer.java:105)
at org.apache.hadoop.hive.ql.optimizer.Optimizer.optimize(Optimizer.java:215)
at org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.analyzeInternal(SemanticAnalyzer.java:10572)
at org.apache.hadoop.hive.ql.parse.CalcitePlanner.analyzeInternal(CalcitePlanner.java:219)
at org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.analyze(BaseSemanticAnalyzer.java:238)
at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:474)
at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:330)
at org.apache.hadoop.hive.ql.Driver.compileInternal(Driver.java:1233)
at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1274)
at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1170)
at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1160)
at org.apache.hadoop.hive.cli.CliDriver.processLocalCmd(CliDriver.java:217)
at org.apache.hadoop.hive.cli.CliDriver.processCmd(CliDriver.java:169)
at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:380)
at org.apache.hadoop.hive.cli.CliDriver.executeDriver(CliDriver.java:740)
at org.apache.hadoop.hive.cli.CliDriver.run(CliDriver.java:685)
at org.apache.hadoop.hive.cli.CliDriver.main(CliDriver.java:625)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.hadoop.util.RunJar.run(RunJar.java:233)
at org.apache.hadoop.util.RunJar.main(RunJar.java:148)

Hive 3+ support

The Latest Hive connector release is compatible with Hive 2+ releasees. Any idea when can we expect Hive 3+ support ?

Trying with Hive 3.0 ends up with below exception as "MetaStoreUtils is moved to utils package in Hive 3"

java.lang.NoClassDefFoundError: org/apache/hadoop/hive/metastore/MetaStoreUtils at io.delta.hive.DeltaStorageHandler.preCreateTable(DeltaStorageHandler.scala:173) ~[delta-hive-assembly_2.12-0.2.0.jar:0.2.0] at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.createTable(HiveMetaStoreClient.java:832) ~[hive-exec-3.1.2.jar:3.1.2] at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.createTable(HiveMetaStoreClient.java:822) ~[hive-exec-3.1.2.jar:3.1.2] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_282]

delta lake 1.0.0 support

Hey,

Does 0.2.0 of the connector support delta lake 1.0.0? If not, are there any plans to support it?

Thanks,
Jon

Check all Hive DDL commands

Right now we have tested CREATE TABLE, DROP TABLE. But ALTER TABLE is missing. We should take a look at it and see what we can support and what we cannot.

Presto doesn't recognise Hive-External table for Delta: com.facebook.presto.spi.PrestoException: InputFormat is not present in StorageFormat

Hello,

I am seeing following exception when I used this connector with presto. Can you please help me with this issue.

A) My server setup:

  1. Spark 2.4.4
  2. Hadoop 3.2.1
  3. Hive 2.3.6
  4. Scala 2.11.12
  5. Presto 0.228

B) Build changes:

I had to change the change some build options in order create connector jars for my cluster

-val sparkVersion = "2.4.3"
-val hadoopVersion = "2.7.2"
-val hiveVersion = "2.3.3"
+val sparkVersion = "2.4.4"
+val hadoopVersion = "3.2.1"
+val hiveVersion = "2.3.6"
 val deltaVersion = "0.5.0"
 
 lazy val commonSettings = Seq(
@@ -53,8 +53,8 @@ lazy val core = (project in file("core"))
     name := "delta-core-shaded",
     libraryDependencies ++= Seq(
       "io.delta" %% "delta-core" % deltaVersion excludeAll ExclusionRule("org.apache.hadoop"),
-      "org.apache.spark" %% "spark-sql" % sparkVersion excludeAll ExclusionRule("org.apache.hadoop"),
-      "org.apache.hadoop" % "hadoop-client" % hadoopVersion % "provided"
+      "org.apache.spark" %% "spark-sql" % sparkVersion excludeAll ExclusionRule("org.apache.hadoop") exclude("org.glassfish.hk2", "hk2-utils") exclude("org.glassfish.hk2", "hk2-locator") exclude("javax.validation", "validation-api"),
+      "org.apache.hadoop" % "hadoop-client" % hadoopVersion % "provided" excludeAll ExclusionRule("org.slf4j", "slf4j-log4j12")
     ),

C) Exception:

com.facebook.presto.spi.PrestoException: InputFormat is not present in StorageFormat
	at com.facebook.presto.hive.metastore.StorageFormat.getInputFormat(StorageFormat.java:57)
	at com.facebook.presto.hive.BackgroundHiveSplitLoader.loadPartition(BackgroundHiveSplitLoader.java:291)
	at com.facebook.presto.hive.BackgroundHiveSplitLoader.loadSplits(BackgroundHiveSplitLoader.java:264)
	at com.facebook.presto.hive.BackgroundHiveSplitLoader.access$300(BackgroundHiveSplitLoader.java:96)
	at com.facebook.presto.hive.BackgroundHiveSplitLoader$HiveSplitLoaderTask.process(BackgroundHiveSplitLoader.java:193)
	at com.facebook.presto.hive.util.ResumableTasks.safeProcessTask(ResumableTasks.java:47)
	at com.facebook.presto.hive.util.ResumableTasks.access$000(ResumableTasks.java:20)
	at com.facebook.presto.hive.util.ResumableTasks$1.run(ResumableTasks.java:35)
	at io.airlift.concurrent.BoundedExecutor.drainQueue(BoundedExecutor.java:78)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)

Thanks,
Amar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.