Comments (8)
Thanks for the details. So it seems like I can reproduce it. I also tried using Pandas and Fugue:
Pandas
df_1 = pd.DataFrame([{"a": 1}])
df_2 = pd.DataFrame([{"a": 1}])
compare = datacompy.Compare(df_1, df_2, join_columns=["a"])
print(compare.report())
Results in:
DataComPy Comparison
--------------------
DataFrame Summary
-----------------
DataFrame Columns Rows
0 df1 1 1
1 df2 1 1
Column Summary
--------------
Number of columns in common: 1
Number of columns in df1 but not in df2: 0
Number of columns in df2 but not in df1: 0
Row Summary
-----------
Matched on: a
Any duplicates on match values: No
Absolute Tolerance: 0
Relative Tolerance: 0
Number of rows in common: 1
Number of rows in df1 but not in df2: 0
Number of rows in df2 but not in df1: 0
Number of rows with some compared columns unequal: 0
Number of rows with all compared columns equal: 1
Column Comparison
-----------------
Number of columns compared with some values unequal: 0
Number of columns compared with all values equal: 1
Total number of values which compare unequal: 0
Fugue
df_1 = spark.createDataFrame([{"a": 1}])
df_2 = spark.createDataFrame([{"a": 1}])
print(datacompy.report(df_1, df_2, join_columns=["a"]))
Results in:
DataComPy Comparison
--------------------
DataFrame Summary
-----------------
DataFrame Columns Rows
0 df1 1 1
1 df2 1 1
Column Summary
--------------
Number of columns in common: 1
Number of columns in df1 but not in df2: 0
Number of columns in df2 but not in df1: 0
Row Summary
-----------
Matched on: a
Any duplicates on match values: No
Absolute Tolerance: 0
Relative Tolerance: 0
Number of rows in common: 1
Number of rows in df1 but not in df2: 0
Number of rows in df2 but not in df1: 0
Number of rows with some compared columns unequal: 0
Number of rows with all compared columns equal: 1
Column Comparison
-----------------
Number of columns compared with some values unequal: 0
Number of columns compared with all values equal: 1
Total number of values which compare unequal: 0
We should align the Spark with Pandas and Fugue.
@rupertbarton Would you be open to using Fugue for your Spark Compare for now? you should be able to run it successfully. I'll need to debug the native Spark compare. I have been debating if we should just remove it in favor of using Fugue moving forward.
from datacompy.
I'll create a ticket in our backlog to investigate switching over, thanks!
from datacompy.
@rupertbarton more for my understanding but could you articulate what sort of use case you have where you are just joining on a single column with nothing else to compare?
Hi! Our use case is that we have a large number of tables we are running assertions on, and all of them work fine apart from 1 particular table. This table has multiple columns, but all of the columns apart from 1 are being encrypted so we're excluding them from the comparison as it's awkward trying to work out what the encrypted values will be, hence why the DF only has a single column.
We still would want to compare that all the values in the expected DF and all the values in the actual DF match up, and we're using the same code for every table.
from datacompy.
Thanks for the report. Would you be able to provide a minimal example so we can reproduce the issue? That would be really helpful to debug here.
from datacompy.
Thanks for your reply, here is a sample of some code that throws an exception for us:
df_1 = spark.createDataFrame([{"a": 1}])
df_2 = spark.createDataFrame([{"a": 1}])
compare = datacompy.SparkCompare(
spark,
df_1,
df_2,
join_columns=["a"],
cache_intermediates=True,
)
compare.rows_both_mismatch.count()
from datacompy.
The error message is:
---------------------------------------------------------------------------
ParseException Traceback (most recent call last)
Cell In[7], line 12
2 df_2 = spark.createDataFrame([{"a": 1}])
4 compare = datacompy.SparkCompare(
5 spark,
6 df_1,
(...)
9 cache_intermediates=True,
10 )
---> 12 compare.rows_both_mismatch.count()
File /opt/venv/lib/python3.8/site-packages/datacompy/spark.py:356, in SparkCompare.rows_both_mismatch(self)
354 """pyspark.sql.DataFrame: Returns all rows in both dataframes that have mismatches"""
355 if self._all_rows_mismatched is None:
--> 356 self._merge_dataframes()
358 return self._all_rows_mismatched
File /opt/venv/lib/python3.8/site-packages/datacompy/spark.py:462, in SparkCompare._merge_dataframes(self)
458 where_cond = " OR ".join(
459 ["A." + name + "_match= False" for name in self.columns_compared]
460 )
461 mismatch_query = """SELECT * FROM matched_table A WHERE {}""".format(where_cond)
--> 462 self._all_rows_mismatched = self.spark.sql(mismatch_query).orderBy(
463 self._join_column_names
464 )
File /opt/spark/python/lib/pyspark.zip/pyspark/sql/session.py:1440, in SparkSession.sql(self, sqlQuery, args, **kwargs)
1438 try:
1439 litArgs = {k: _to_java_column(lit(v)) for k, v in (args or {}).items()}
-> 1440 return DataFrame(self._jsparkSession.sql(sqlQuery, litArgs), self)
1441 finally:
1442 if len(kwargs) > 0:
File /opt/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args)
1316 command = proto.CALL_COMMAND_NAME +\
1317 self.command_header +\
1318 args_command +\
1319 proto.END_COMMAND_PART
1321 answer = self.gateway_client.send_command(command)
-> 1322 return_value = get_return_value(
1323 answer, self.gateway_client, self.target_id, self.name)
1325 for temp_arg in temp_args:
1326 if hasattr(temp_arg, "_detach"):
File /opt/spark/python/lib/pyspark.zip/pyspark/errors/exceptions/captured.py:175, in capture_sql_exception.<locals>.deco(*a, **kw)
171 converted = convert_exception(e.java_exception)
172 if not isinstance(converted, UnknownException):
173 # Hide where the exception came from that shows a non-Pythonic
174 # JVM exception message.
--> 175 raise converted from None
176 else:
177 raise
ParseException:
[PARSE_SYNTAX_ERROR] Syntax error at or near end of input.(line 1, pos 36)
== SQL ==
SELECT * FROM matched_table A WHERE
------------------------------------^^^
from datacompy.
@rupertbarton more for my understanding but could you articulate what sort of use case you have where you are just joining on a single column with nothing else to compare?
from datacompy.
@jdawang @rupertbarton I have a WIP fix here
Getting the following back:
In [2]: print(compare.report())
****** Column Summary ******
Number of columns in common with matching schemas: 1
Number of columns in common with schema differences: 0
Number of columns in base but not compare: 0
Number of columns in compare but not base: 0
****** Row Summary ******
Number of rows in common: 2
Number of rows in base but not compare: 0
Number of rows in compare but not base: 0
Number of duplicate rows found in base: 0
Number of duplicate rows found in compare: 0
****** Row Comparison ******
Number of rows with some columns unequal: 0
Number of rows with all columns equal: 2
****** Column Comparison ******
Number of columns compared with some values unequal: 0
Number of columns compared with all values equal: 0
None
Seems like the Column Comparison is different than the Pandas version. I think this is mostly due to the difference in the underlying logic. In Pandas it would say: Number of columns compared with all values equal: 1.
I can kind of see this both ways. This corner case is just a bit odd cause you aren't really comparing anything, just joining on the key (a
).
@jdawang Another reason why I'm thinking maybe we just drop the native Spark implementation. The differences are annoying.
from datacompy.
Related Issues (20)
- Pandas 2.0 support
- Fugue support for extra helper functions from core HOT 2
- No objects to concatenate issue with Fugue HOT 3
- The intersection logic of Compare has problems. HOT 3
- Adding column naming differences to the column summary page HOT 3
- Speed up spark unit tests HOT 2
- Python 3.11 support HOT 12
- Feature Request: Ability to Update Compare Object Over Multiple Chunks HOT 4
- Datacompare for Date field is not working HOT 4
- SparkCompare() not working for dask - dropDuplicates HOT 1
- Add list of dissimilar columns to report HOT 8
- Restrictive dependency versions - NumPy 1.24.4 blocked HOT 6
- confused about df_unq_rows HOT 2
- Add mypy to the project HOT 4
- Add new action for running tests when PySpark is NOT installed HOT 1
- Benchmark Documentation between pandas, fugue, and native spark.
- who can help make the result significantly HOT 2
- Issue in writing report HOT 9
- Look into porting Compare to a polars backend for performance testing. HOT 2
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from datacompy.