Giter VIP home page Giter VIP logo

spark-rapids-benchmarks's Introduction

spark-rapids-benchmarks's People

Contributors

abellina avatar andygrove avatar dependabot[bot] avatar garyshen2008 avatar gerashegalov avatar nvtimliu avatar pxli avatar sameerz avatar wjxiz1992 avatar yanxuanliu avatar zhanga5 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

spark-rapids-benchmarks's Issues

unrecognized arguments: --input-suffix

I am trying to convert CSV to another file format. My input data has paths ending in .dat. According to the documentation, I can use the --input-suffix argument to handle this:

  --input_suffix INPUT_SUFFIX
                        text to append to every input filename (e.g., ".dat"; the default is empty)

However, this does not seem to be implemented.

I am running this:

$ ./spark-submit-template convert_submit_gpu.template nds_transcode.py --input-suffix .dat --output_format json /mnt/bigdata/tpcds/sf100-tbl/ /mnt/bigdata/tpcds/sf100-jso00-json/ report.txt
```

It fails with:

```
nds_transcode.py: error: unrecognized arguments: --input-suffix report.txt
```

misleading in document about 'convert_submit_cpu_iceberg.template'

In document, step "nds_transcode.py" and "nds_maintenance.py" use the same submit template "convert_submit_cpu_iceberg" in examples. Actually, 'spark_catalog' warehouse is needed in nds_maintenance step , and should removed in nds_transcode step. So the two steps should use different submit templates.
Could you clarify it in document ? otherwise could be a misleading.

lack of test code

The repository contains multiple functional parts but lack of test code to detect any regression issue.

Data Maintenance error on Dataproc

====== Run LF_CR ======
...
...
22/08/17 17:12:04 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 0.3 in stage 23.0 (TID 26) (csputil-icmo-w-1.c.rapids-spark.internal executor 5): java.lang.IllegalStateException: Incoming records violate the writer assumption that records are clustered by spec and by partition within each spec. Either cluster the incoming records or switch to fanout writers.
Encountered records that belong to already closed files:
partition 'cr_returned_date_sk=2451972' in spec [
  1000: cr_returned_date_sk: identity(1)
]
        at org.apache.iceberg.io.ClusteredWriter.write(ClusteredWriter.java:95)
        at org.apache.iceberg.io.ClusteredDataWriter.write(ClusteredDataWriter.java:34)
        at org.apache.iceberg.spark.source.SparkWrite$PartitionedDataWriter.write(SparkWrite.java:641)
        at org.apache.iceberg.spark.source.SparkWrite$PartitionedDataWriter.write(SparkWrite.java:616)
        at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$1(WriteToDataSourceV2Exec.scala:416)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1473)
        at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:452)
        at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:360)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:498)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)

22/08/17 17:12:04 ERROR org.apache.spark.scheduler.TaskSetManager: Task 0 in stage 23.0 failed 4 times; aborting job
22/08/17 17:12:04 ERROR org.apache.spark.sql.execution.datasources.v2.AppendDataExec: Data source write support IcebergBatchWrite(table=spark_catalog.default.catalog_returns, format=PARQUET) is aborting.
22/08/17 17:12:04 ERROR org.apache.spark.sql.execution.datasources.v2.AppendDataExec: Data source write support IcebergBatchWrite(table=spark_catalog.default.catalog_returns, format=PARQUET) aborted.
An error occurred while calling o60.sql.
: org.apache.spark.SparkException: Writing job aborted.
        at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:388)
        at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:336)
        at org.apache.spark.sql.execution.datasources.v2.AppendDataExec.writeWithV2(WriteToDataSourceV2Exec.scala:218)
        at org.apache.spark.sql.execution.datasources.v2.AppendDataExec.run(WriteToDataSourceV2Exec.scala:225)
        at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:40)
        at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:40)
        at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:46)
        at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:228)
        at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3700)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3698)
        at org.apache.spark.sql.Dataset.<init>(Dataset.scala:228)
        at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:99)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:96)
        at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:618)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:613)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 23.0 failed 4 times, most recent failure: Lost task 0.3 in stage 23.0 (TID 26) (csputil-icmo-w-1.c.rapids-spark.internal executor 5): java.lang.IllegalStateException: Incoming records violate the writer assumption that records are clustered by spec and by partition within each spec. Either cluster the incoming records or switch to fanout writers.
Encountered records that belong to already closed files:
partition 'cr_returned_date_sk=2451972' in spec [
  1000: cr_returned_date_sk: identity(1)
]
        at org.apache.iceberg.io.ClusteredWriter.write(ClusteredWriter.java:95)
        at org.apache.iceberg.io.ClusteredDataWriter.write(ClusteredDataWriter.java:34)
        at org.apache.iceberg.spark.source.SparkWrite$PartitionedDataWriter.write(SparkWrite.java:641)
        at org.apache.iceberg.spark.source.SparkWrite$PartitionedDataWriter.write(SparkWrite.java:616)
        at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$1(WriteToDataSourceV2Exec.scala:416)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1473)
        at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:452)
        at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:360)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:498)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)


Failed to load delete and inventory_delete table data

After #77, refresh data should be read as independent files, but our code to get delete and inventory_delete table data are still based on Iceberg, we need to change the code to deal with new read manner.

error messages:

Traceback (most recent call last):
  File "/usr/workspace/deploy/temp/spark-rapids-benchmarks/nds/nds_maintenance.py", line 207, in <module>
    query_dict = get_maintenance_queries(args.maintenance_queries_folder,
  File "/usr/workspace/deploy/temp/spark-rapids-benchmarks/nds/nds_maintenance.py", line 98, in get_maintenance_queries
    delete_date_dict = get_delete_date(spark)
  File "/usr/workspace/deploy/temp/spark-rapids-benchmarks/nds/nds_maintenance.py", line 65, in get_delete_date
    delete_dates = spark_session.sql("select * from delete").collect()
  File "/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 1034, in sql
  File "/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1321, in __call__
  File "/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 196, in deco
pyspark.sql.utils.AnalysisException: Table or view not found: delete; line 1 pos 14;
'Project [*]
+- 'UnresolvedRelation [delete], [], false

Use independent source files for Data Maintenance

Data Maintenance will apply N times refresh runs with different refresh data.
The current implementation will require the user to transcode the refresh data from raw CSV into Iceberg.
It's more convenient to allow users to transcode the refresh data to independent Parquet/Orc files instead of Iceberg tables so that user can easily specify only refresh data path to do a refresh run.

Iceberg datasets maintenance Failed with UnicodeDecodeError : ordinal not in range(128)

CLI to generate iceberg refreshed datasets:

./spark-submit-template convert_submit_gpu.template $EXTRA_CONFS \
    nds_transcode.py \
    $PWD/raw_refresh_sf1  \
    $PWD/parquet_refresh_sf1  \
    report.txt \
    --output_format parquet \
    --output_mode overwrite \
    --update

CLI to run Iceberg datasets maintenance:

 ./spark-submit-template convert_submit_cpu_iceberg.template nds_maintenance.py     $PWD/parquet_refresh_sf1     ./data_maintenance    time.csv

Error Log:

took 0.033919 s
Traceback (most recent call last):
  File "/spark-rapids-benchmarks/nds/nds_maintenance.py", line 234, in <module>
    valid_queries)
  File "/spark-rapids-benchmarks/nds/nds_maintenance.py", line 125, in get_maintenance_queries
    q_content = [ c.decode('utf-8').strip() + ';' for c in f.read().split(';')[1:-1]]
  File "/usr/lib/python3.6/encodings/ascii.py", line 26, in decode
    return codecs.ascii_decode(input, self.errors)[0]
UnicodeDecodeError: 'ascii' codec can't decode byte 0xe2 in position 985: ordinal not in range(128)
22/08/17 07:07:19 INFO SparkContext: Invoking stop() from shutdown hook

Let conversion script handles the creation of the Iceberg table

The nds_transcode.py script is now only responsible for converting CSV raw data to other type of data source like Parquet or Orc. To implement Data Maintenance(#4) test, it's required to setup Iceberg base tables before we do "maintenance" including INSERT, UPDATE, DELETE operations.

According to Iceberg document, we can do something like

df = # raw data frame
df.writeTo("local.db.sample_table").create()

Note: this requires DataSourceV2 API.

refresh data generation on hdfs on Dataproc failed with specific scale factor and parallel args

comamnd:

python3 nds_gen_data.py hdfs 10 10 /data/full_bench_raw_sf10_1 --overwrite_output --update 1

error:

2022-08-15 07:29:39,299 INFO mapreduce.Job: Task Id : attempt_1660543957812_0005_m_000009_2, Status : FAILED
Error: java.lang.InterruptedException: Process failed with status code 139

        at org.notmysock.tpcds.GenTable$DSDGen.map(GenTable.java:256)
        at org.notmysock.tpcds.GenTable$DSDGen.map(GenTable.java:225)
        at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:146)
        at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:799)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:347)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:174)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1762)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:168)

I cannot reproduce this error on our internal yarn cluster.

But the following command works well:

python3 nds_gen_data.py hdfs 10 5 /data/full_bench_raw_sf10_1 --overwrite_output --update 1
python3 nds_gen_data.py hdfs 20 5 /data/full_bench_raw_sf10_1 --overwrite_output --update 1
python3 nds_gen_data.py hdfs 10 9 /data/full_bench_raw_sf10_1 --overwrite_output --update 1

Generating queries fails with an unhelpful error when no template or stream is specified

The instructions for generating queries states that the --template and --streams arguments are optional, but if both are missing then the command fails with an unhelpful stacktrace, e.g.:

$ python -i nds_gen_query_stream.py $TPCDS_HOME/query_templates 100 ./queries
Traceback (most recent call last):
  File "nds_gen_query_stream.py", line 122, in <module>
    generate_query_streams(args, tool_path) 
  File "nds_gen_query_stream.py", line 67, in generate_query_streams
    subprocess.run(cmd, check=True, cwd=str(work_dir))
  File "/home/jlowe/miniconda3/envs/cudf_dev/lib/python3.8/subprocess.py", line 493, in run
    with Popen(*popenargs, **kwargs) as process:
  File "/home/jlowe/miniconda3/envs/cudf_dev/lib/python3.8/subprocess.py", line 858, in __init__
    self._execute_child(args, executable, preexec_fn, close_fds,
  File "/home/jlowe/miniconda3/envs/cudf_dev/lib/python3.8/subprocess.py", line 1639, in _execute_child
    self.pid = _posixsubprocess.fork_exec(
TypeError: expected str, bytes or os.PathLike object, not NoneType

Arguably this mode should work and be treated as if the user specified --streams 1. Otherwise the usage statement needs to be updated to show these are not truly optional and that one must be specified or the other, e.g.: {--template <template_file> | --streams <stream_count>}

Split some big files into small ones for better reading

Problem:
Some files like nds_transocde.py, nds_bench.py are too big to read.

Expectation:
We should move out some parts in these big files to new files.
Like get_schemas in nds_transcode.py, we can create a new file to only define schema related, and it's also referred by multiple python scripts.
In nds_bench.py, we can move out the functions which analyze the report file to some utils file, so that only keep the step functions and main process, it should be clear to read.

Add more query status flags in json summary report

With data validation, we can check if the query output produced by GPU are the same as the one produced by CPU. Thus we want to add more status flags to show if a query is "both successfully finished and query output validated" or "successfully finished but query output not valid" etc.

Some configs in GPU template don't exist anymore

In nds/convert_submit_gpu.template, below configs don't exist in plulgin anymore.
"--conf" "spark.rapids.sql.csv.read.date.enabled=true"
"--conf" "spark.rapids.sql.csvTimestamps.enabled=false"
"--conf" "spark.rapids.sql.csv.read.integer.enabled=true"

Let's clean up the configs used in template files.

Support other Datasources

In nds_power.py, setup_tables uses get_schemas to provide all schemas. Incase the tpcds data is not generated using this tool , one of the folder dbgen_version would not be present.

This causes an error unless the following code block is commented:

SCHEMAS["dbgen_version"] = StructType([
StructField("dv_version", VarcharType(16)),
StructField("dv_create_date", DateType()),
StructField("dv_create_time", TimestampType()),
StructField("dv_cmdline_args", VarcharType(200))
])

It would be nice to avoid this constraint so that other datasources can be used as well.

[FEA]Use millisecds in result.csv

Currently in result.csv, it shows "application_id,query,time/s" as the header.

However actually the query time is in milli-seconds, while the last 2 lines "Power Test Time" and "Total Time" are in seconds.
I wish we use milli-seconds for all rows in the result.csv so that there is no confusion.

KeyError: 'table_name' when executing Power Run script

When running power run with nds_power.py, the following error is thrown:



 Traceback (most recent call last):

   File "/opt/sparkRapidsPlugin/spark-raplab-cicd/jenkins-nds2-test-raplab-70/nds_power.py", line 288, in <module>

     args.output_format)

   File "/opt/sparkRapidsPlugin/spark-raplab-cicd/jenkins-nds2-test-raplab-70/nds_power.py", line 201, in run_query_stream

     execution_time_list)

   File "/opt/sparkRapidsPlugin/spark-raplab-cicd/jenkins-nds2-test-raplab-70/nds_power.py", line 99, in setup_tables

     spark_session.read.format(input_format).schema(get_schemas(use_decimal)['table_name']).load(

 KeyError: 'table_name'

Would be nice to be able to specify different input formats for each input to nds_validate.py

I would like to be able to use nds_validate.py to compare two runs that used different output formats (e.g., parquet vs orc).
Currently there is only one --input_format parameter. Ideally, I would not have to specify this at all, and the tool would figure out what format the data files are. But in the absence of that, maybe provide an optional way to specify different formats for input1 and intput2.

unmatched in NDS query78

Parameters:
scala_factor=5
parallel=10
transcode with iceberg(parquet)
power run with iceberg

Detail message:
=== Comparing Query: query78 ===
Collected 100 rows in 0.19057893753051758 seconds
Row 7:
[255, 2.03, 81, Decimal('58.95'), Decimal('4.66'), 40, Decimal('63.34'), Decimal('1.36')]
[255, 2.02, 81, Decimal('58.95'), Decimal('4.66'), 40, Decimal('63.34'), Decimal('1.36')]
Processed 100 rows
There were 1 errors
Collected 100 rows in 0.1670541763305664 seconds
Processed 100 rows
Results match
=== Unmatch Queries: ['query78'] ===

Query 78 in stream:
-- start query 26 in stream 1 using template query78.tpl
with ws as
(select d_year AS ws_sold_year, ws_item_sk,
ws_bill_customer_sk ws_customer_sk,
sum(ws_quantity) ws_qty,
sum(ws_wholesale_cost) ws_wc,
sum(ws_sales_price) ws_sp
from web_sales
left join web_returns on wr_order_number=ws_order_number and ws_item_sk=wr_item_sk
join date_dim on ws_sold_date_sk = d_date_sk
where wr_order_number is null
group by d_year, ws_item_sk, ws_bill_customer_sk
),
cs as
(select d_year AS cs_sold_year, cs_item_sk,
cs_bill_customer_sk cs_customer_sk,
sum(cs_quantity) cs_qty,
sum(cs_wholesale_cost) cs_wc,
sum(cs_sales_price) cs_sp
from catalog_sales
left join catalog_returns on cr_order_number=cs_order_number and cs_item_sk=cr_item_sk
join date_dim on cs_sold_date_sk = d_date_sk
where cr_order_number is null
group by d_year, cs_item_sk, cs_bill_customer_sk
),
ss as
(select d_year AS ss_sold_year, ss_item_sk,
ss_customer_sk,
sum(ss_quantity) ss_qty,
sum(ss_wholesale_cost) ss_wc,
sum(ss_sales_price) ss_sp
from store_sales
left join store_returns on sr_ticket_number=ss_ticket_number and ss_item_sk=sr_item_sk
join date_dim on ss_sold_date_sk = d_date_sk
where sr_ticket_number is null
group by d_year, ss_item_sk, ss_customer_sk
)
select
ss_item_sk,
round(ss_qty/(coalesce(ws_qty,0)+coalesce(cs_qty,0)),2) ratio,
ss_qty store_qty, ss_wc store_wholesale_cost, ss_sp store_sales_price,
coalesce(ws_qty,0)+coalesce(cs_qty,0) other_chan_qty,
coalesce(ws_wc,0)+coalesce(cs_wc,0) other_chan_wholesale_cost,
coalesce(ws_sp,0)+coalesce(cs_sp,0) other_chan_sales_price
from ss
left join ws on (ws_sold_year=ss_sold_year and ws_item_sk=ss_item_sk and ws_customer_sk=ss_customer_sk)
left join cs on (cs_sold_year=ss_sold_year and cs_item_sk=ss_item_sk and cs_customer_sk=ss_customer_sk)
where (coalesce(ws_qty,0)>0 or coalesce(cs_qty, 0)>0) and ss_sold_year=2001
order by
ss_item_sk,
ss_qty desc, ss_wc desc, ss_sp desc,
other_chan_qty,
other_chan_wholesale_cost,
other_chan_sales_price,
ratio
LIMIT 100;

-- end query 26 in stream 1 using template query78.tpl

Error when running nds_gen_data.py

When running python3 nds_gen_data.py hdfs 10 100 /user/sraheja/raw_sf1 --overwrite_output on an HDFS cluster I got the following error message at the end.

mv: `/user/sraheja/raw_sf1/delete_1.dat-m-00000': No such file or directory
Traceback (most recent call last):
  File "/home/sraheja/dev/spark-rapids-benchmarks/nds/nds_gen_data.py", line 282, in <module>
    generate_data(args)
  File "/home/sraheja/dev/spark-rapids-benchmarks/nds/nds_gen_data.py", line 246, in generate_data
    generate_data_hdfs(args, jar_path)
  File "/home/sraheja/dev/spark-rapids-benchmarks/nds/nds_gen_data.py", line 172, in generate_data_hdfs
    move_delete_date_tables(args.data_dir)
  File "/home/sraheja/dev/spark-rapids-benchmarks/nds/nds_gen_data.py", line 124, in move_delete_date_tables
    subprocess.run(move, check=True)
  File "/home/sraheja/.pyenv/versions/3.10.4/lib/python3.10/subprocess.py", line 524, in run
    raise CalledProcessError(retcode, process.args,
subprocess.CalledProcessError: Command '['hadoop', 'fs', '-mv', '/user/sraheja/raw_sf1/delete_1.dat-m-00000', '/user/sraheja/raw_sf1/delete/']' returned non-zero exit status 1.

The following files were written to HDFS, and the subdirectories appear to have data
$ hdfs dfs -ls /user/sraheja/raw_sf1
Found 27 items
-rw-r--r-- 3 sraheja sraheja 0 2022-07-26 16:04 /user/sraheja/raw_sf1/_SUCCESS
drwxr-xr-x - sraheja sraheja 0 2022-07-26 16:04 /user/sraheja/raw_sf1/call_center
drwxr-xr-x - sraheja sraheja 0 2022-07-26 16:04 /user/sraheja/raw_sf1/catalog_page
drwxr-xr-x - sraheja sraheja 0 2022-07-26 16:04 /user/sraheja/raw_sf1/catalog_returns
drwxr-xr-x - sraheja sraheja 0 2022-07-26 16:04 /user/sraheja/raw_sf1/catalog_sales
drwxr-xr-x - sraheja sraheja 0 2022-07-26 16:04 /user/sraheja/raw_sf1/customer
drwxr-xr-x - sraheja sraheja 0 2022-07-26 16:04 /user/sraheja/raw_sf1/customer_address
drwxr-xr-x - sraheja sraheja 0 2022-07-26 16:04 /user/sraheja/raw_sf1/customer_demographics
drwxr-xr-x - sraheja sraheja 0 2022-07-26 16:04 /user/sraheja/raw_sf1/date_dim
drwxr-xr-x - sraheja sraheja 0 2022-07-26 16:04 /user/sraheja/raw_sf1/dbgen_version
drwxr-xr-x - sraheja sraheja 0 2022-07-26 16:04 /user/sraheja/raw_sf1/delete
drwxr-xr-x - sraheja sraheja 0 2022-07-26 16:04 /user/sraheja/raw_sf1/household_demographics
drwxr-xr-x - sraheja sraheja 0 2022-07-26 16:04 /user/sraheja/raw_sf1/income_band
drwxr-xr-x - sraheja sraheja 0 2022-07-26 16:04 /user/sraheja/raw_sf1/inventory
drwxr-xr-x - sraheja sraheja 0 2022-07-26 16:04 /user/sraheja/raw_sf1/item
drwxr-xr-x - sraheja sraheja 0 2022-07-26 16:04 /user/sraheja/raw_sf1/promotion
drwxr-xr-x - sraheja sraheja 0 2022-07-26 16:04 /user/sraheja/raw_sf1/reason
drwxr-xr-x - sraheja sraheja 0 2022-07-26 16:04 /user/sraheja/raw_sf1/ship_mode
drwxr-xr-x - sraheja sraheja 0 2022-07-26 16:04 /user/sraheja/raw_sf1/store
drwxr-xr-x - sraheja sraheja 0 2022-07-26 16:04 /user/sraheja/raw_sf1/store_returns
drwxr-xr-x - sraheja sraheja 0 2022-07-26 16:04 /user/sraheja/raw_sf1/store_sales
drwxr-xr-x - sraheja sraheja 0 2022-07-26 16:04 /user/sraheja/raw_sf1/time_dim
drwxr-xr-x - sraheja sraheja 0 2022-07-26 16:04 /user/sraheja/raw_sf1/warehouse
drwxr-xr-x - sraheja sraheja 0 2022-07-26 16:04 /user/sraheja/raw_sf1/web_page
drwxr-xr-x - sraheja sraheja 0 2022-07-26 16:04 /user/sraheja/raw_sf1/web_returns
drwxr-xr-x - sraheja sraheja 0 2022-07-26 16:04 /user/sraheja/raw_sf1/web_sales
drwxr-xr-x - sraheja sraheja 0 2022-07-26 16:04 /user/sraheja/raw_sf1/web_site

cc: @anchorbob

Update README.md

The README.md in the top level directory refers to the 22.02 jar, it should be updated to the latest version of the jar.

The "usage", "positional arguments" and "optional arguments" for nds_validate.py look like they need to be formatted.

Add RNGSEED option when generating Query Stream

As per Specification 4.3:

4.3.1 Each query has one or more substitution parameters. Dsqgen must be used to generate executable query texts for the 
query streams. In order to generate the required number of query streams, dsqgen must be used with the 
RNGSEED, INPUT and STREAMS options. The value for the RNGSEED option, <SEED>, is selected as the 
timestamp of the end of the database load time (Load End Time) expressed in the format mmddhhmmsss as defined 
in Clause 7.4.3.8. The value for the STREAMS option, <S>, is two times the number of streams, Sq, to be executed 
during each Throughput Test (S=2* Sq). The value of the INPUT option, <input.txt>, is a file containing the location 
of all 99 query templates in numerical order.


Comment: RNGSEED guarantees that the query substitution parameter values are not known prior to running 
the power and throughput tests. Called with a value of <S> for the STREAMS parameter, dsqgen generates S+1
files, named query_0.sql through query_[S].sql. Each file contains a different permutation of the 99 queries


4.3.2 Query_0.sql is the sequence of queries to be executed during the Power Test; files query_1.sql through 
query_[Sq].sql are the sequences of queries to be executed during the first Throughput Test; and files 
query_[Sq+1].sql through query_[2*Sq].sql are the sequences of queries to be executed during the second 
Throughput Test.

We need to add this seed option to our gen script.

Data gen on Amazon EMR fails

When following the steps to generate data on HDFS in EMR, the data gen script fails with this exception:

[hadoop@ip-172-31-17-43 nds]$ python3 nds_gen_data.py hdfs 3000 128 vr/raw_sf3000 --overwrite_output Exception in thread "main" java.lang.IllegalAccessError: class org.apache.hadoop.hdfs.web.HftpFileSystem cannot access its superinterface org.apache.hadoop.hdfs.web.TokenAspect$TokenManagementDelegator at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:756) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:473) at java.net.URLClassLoader.access$100(URLClassLoader.java:74) at java.net.URLClassLoader$1.run(URLClassLoader.java:369) at java.net.URLClassLoader$1.run(URLClassLoader.java:363) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:362) at java.lang.ClassLoader.loadClass(ClassLoader.java:418) at java.lang.ClassLoader.loadClass(ClassLoader.java:351) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:370) at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404) at java.util.ServiceLoader$1.next(ServiceLoader.java:480) at org.apache.hadoop.fs.FileSystem.loadFileSystems(FileSystem.java:3278) at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3323) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3362) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:123) at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3413) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3381) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:486) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:234) at org.notmysock.tpcds.GenTable.genInput(GenTable.java:192) at org.notmysock.tpcds.GenTable.run(GenTable.java:118) at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:76) at org.notmysock.tpcds.GenTable.main(GenTable.java:45) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.hadoop.util.RunJar.run(RunJar.java:323) at org.apache.hadoop.util.RunJar.main(RunJar.java:236)

Steps to reproduce:

  1. Create an EMR cluster
  2. cd spark-rapids-benchmarks/nds/tpcds-gen
  3. export TPCDS_HOME=/home/hadoop/DSGen-software-code-3.2.0rc1
  4. make
  5. cd ~/spark-rapids-benchmarks/nds
  6. python3 nds_gen_data.py hdfs 3000 128 /user/test/sf3000 --overwrite_output

Residual json files in NDS directory

Executing the power run leaves residual json files in the nds directory. Is this desired or would it be better to dump them in a separate directory?

image

Support Data Maintenance Benchmark

Data Maintenance is detailed explained in TPC-DS Specification at Section 5. To simplify, the following 3 steps are needed for it:

  1. Use TPC-DS Tool to regenerate data
    • add "update" option for data generation script
  2. Use SQL files in tests folder in DSGen-software-code-3.2.0rc1, to update table data.
    • add script(currently we have a nds_power.py to read SQL stream files) to read these new SQL files to update the existing table data.
    • the SQL files may contain multiple SQL statement, need to be able to recognize them and execute them one by one.
    • Data Maintenance Functions are at section 5.3.2 in Specification.
    • add new data source schema described under $TPCDS_HOME/tools/tpcds_source.sql
  3. Do Throughput Run again.

Step 2. is called "Data Maintenance". the SQL files are "lf_*.sql" and "dm_*.sql". Note, we need to make them Spark-compatible.

Step 1+2+3 is called Refresh Run.

Use main and dev branches for development

Use

  1. dev branch for active development
  2. main branch for stable commit and potential release (tagging)
  3. disable/remove CI like auto-merge

and clean up current branch-X.Y after all new branches and CI work fine

Do not save JSON summaries by default

If --json_summary_folder arg is not specified, the JSON summary files will be saved to the default folder "json-summary" by default.
When we run the test for the second time, we detect if the json_summary folder has any files inside, if so, the code will raise an exception to protect those existed summaries.
Some users complain it's really annoying and they don't need those summary files.
Also when running Throughput Test, if this json_summary argument is not specified differently, the run will fail due to duplicated summary folder name.

Support data validation

It is necessary to compare the query output between GPU and CPU runs to make sure the GPU plugin produces the correct results.

Implement TaskFailure listener at JVM side

Current TaskFailureListener is all implemented at Python side. This has caused a lot of expected issues like #37. and errors like

22/06/28 08:09:58 ERROR org.apache.spark.scheduler.AsyncEventQueue: Dropping event from queue shared. This likely means one of the listeners is too slow and cannot keep up with the rate at which tasks are being started by the scheduler.

We need to implement this listener at JVM side and only let it listen to specific event "TaskEnd" to reduce traffic between JVM and Python when transferring data via py4j gateway.

Errors when shutting down the executors on Dataproc

When executing PowerRun on Dataproc, the following error is thrown:

22/06/28 08:48:35 INFO org.sparkproject.jetty.server.AbstractConnector: Stopped Spark@4d149fe9{HTTP/1.1, (http/1.1)}{0.0.0.0:0}
22/06/28 08:49:05 ERROR org.apache.spark.rpc.netty.Inbox: Ignoring error
org.apache.spark.SparkException: Could not find CoarseGrainedScheduler.
        at org.apache.spark.rpc.netty.Dispatcher.postMessage(Dispatcher.scala:176)
        at org.apache.spark.rpc.netty.Dispatcher.postOneWayMessage(Dispatcher.scala:150)
        at org.apache.spark.rpc.netty.NettyRpcEnv.send(NettyRpcEnv.scala:193)
        at org.apache.spark.rpc.netty.NettyRpcEndpointRef.send(NettyRpcEnv.scala:564)
        at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.removeExecutor(CoarseGrainedSchedulerBackend.scala:600)
        at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.$anonfun$reset$1(CoarseGrainedSchedulerBackend.scala:578)
        at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.$anonfun$reset$1$adapted(CoarseGrainedSchedulerBackend.scala:576)
        at scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:335)
        at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:1111)
        at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.reset(CoarseGrainedSchedulerBackend.scala:576)
        at org.apache.spark.scheduler.cluster.YarnSchedulerBackend.reset(YarnSchedulerBackend.scala:254)
        at org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint$$anonfun$receive$1.applyOrElse(YarnSchedulerBackend.scala:326)
        at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115)
        at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213)
        at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
        at org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75)
        at org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
22/06/28 08:51:09 WARN org.apache.spark.HeartbeatReceiver: Removing executor 2 with no recent heartbeats: 153876 ms exceeds timeout 120000 ms
22/06/28 08:51:09 WARN org.apache.spark.HeartbeatReceiver: Removing executor 5 with no recent heartbeats: 157542 ms exceeds timeout 120000 ms
22/06/28 08:51:09 WARN org.apache.spark.HeartbeatReceiver: Removing executor 7 with no recent heartbeats: 158587 ms exceeds timeout 120000 ms
22/06/28 08:51:09 WARN org.apache.spark.HeartbeatReceiver: Removing executor 1 with no recent heartbeats: 158708 ms exceeds timeout 120000 ms
22/06/28 08:51:09 WARN org.apache.spark.HeartbeatReceiver: Removing executor 4 with no recent heartbeats: 161529 ms exceeds timeout 120000 ms
22/06/28 08:51:09 WARN org.apache.spark.HeartbeatReceiver: Removing executor 6 with no recent heartbeats: 159709 ms exceeds timeout 120000 ms
22/06/28 08:51:09 WARN org.apache.spark.HeartbeatReceiver: Removing executor 3 with no recent heartbeats: 157897 ms exceeds timeout 120000 ms
22/06/28 08:51:09 ERROR org.apache.spark.network.client.TransportClient: Failed to send RPC RPC 4870712985705646405 to /10.138.0.56:39672: java.nio.channels.ClosedChannelException
java.nio.channels.ClosedChannelException
        at io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957)
        at io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
        at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
        at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
        at io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1071)
        at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
        at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:750)
22/06/28 08:51:09 ERROR org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: Sending KillExecutors(List(2)) to AM was unsuccessful
java.io.IOException: Failed to send RPC RPC 4870712985705646405 to /10.138.0.56:39672: java.nio.channels.ClosedChannelException
        at org.apache.spark.network.client.TransportClient$RpcChannelListener.handleFailure(TransportClient.java:363)
        at org.apache.spark.network.client.TransportClient$StdChannelListener.operationComplete(TransportClient.java:340)
        at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577)
        at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:551)
        at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490)
        at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615)
        at io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:608)
        at io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
        at io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetFailure(AbstractChannel.java:993)
        at io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
        at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
        at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
        at io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1071)
        at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
        at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:750)
Caused by: java.nio.channels.ClosedChannelException
        at io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957)
        ... 12 more

This error doens't impact the performance number for PowerRun as all queries have successfully finished before the shutting down process.

====== Power Test Time: 2905709 milliseconds ======
====== Total Time: 2972227 milliseconds ======

Paste command to reproduce this error:

sudo /usr/lib/spark/bin/spark-submit \
--master yarn \
--conf spark.rapids.sql.batchSizeBytes=1GB \
--conf spark.driver.maxResultSize=2GB \
--conf spark.executor.cores=16 \
--conf spark.locality.wait=0 \
--conf spark.rapids.sql.concurrentGpuTasks=2 \
--conf spark.executor.resource.gpu.amount=1 \
--conf spark.task.resource.gpu.amount=0.0625 \
--conf spark.executor.memory=24G \
--conf spark.driver.memory=100G \
--conf spark.sql.files.maxPartitionBytes=256mb \
--conf spark.rapids.memory.host.spillStorageSize=32G \
--conf spark.sql.adaptive.enabled=true \
--conf spark.plugins=com.nvidia.spark.SQLPlugin \
--conf spark.rapids.memory.pinnedPool.size=8g \
--conf spark.rapids.sql.incompatibleOps.enabled=true \
--conf spark.executor.instances=8 \
--conf spark.executor.memoryOverhead=24G \
--conf spark.scheduler.minRegisteredResourcesRatio=1.0 \
--conf spark.dynamicAllocation.enabled=false \
--conf spark.eventLog.compress=true \
--conf spark.sql.shuffle.partitions=200 \
--conf spark.extraListeners="" \
--files /usr/lib/spark/scripts/gpu/getGpusResources.sh \
--jars jars/rapids-4-spark_2.12-22.08.0-20220626.145727-18-cuda11.jar \
nds_power.py \
gs://nds2/parquet_sf3k_decimal \
query_0.sql \
time-1656322357.csv

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.