Giter VIP home page Giter VIP logo

Comments (32)

martint avatar martint commented on May 11, 2024 10

I've started collecting my thoughts on how I think we should approach this in this documents: https://github.com/prestosql/presto/wiki/Pushdown-of-complex-operations. It's still incomplete and lacking many details, but the overall direction shouldn't have to change much.

from trino.

mbasmanova avatar mbasmanova commented on May 11, 2024 2

@RugratsJ We have implemented compute plan pushdown into a connector and published a blog post explaining how it works: https://prestodb.io/blog/2019/12/23/improve-presto-planner

CC: @highker

from trino.

martint avatar martint commented on May 11, 2024 2

@josecanciani, absolutely! Once we add support for pushing down ORDER BY into connectors, that should work out of the box. The transformation you suggested happens entirely within the engine:

E.g., for:

SELECT orderkey 
FROM (
    SELECT * FROM orders 
    UNION ALL 
    SELECT * FROM orders) 
ORDER BY orderkey 
LIMIT 10;

you get the following plan:

 Output[orderkey]
 │   Layout: [expr_45:bigint]
 │   orderkey := expr_45
 └─ TopN[10 by (expr_45 ASC_NULLS_LAST)]
    │   Layout: [expr_45:bigint]
    └─ LocalExchange[SINGLE] ()
       │   Layout: [expr_45:bigint]
       └─ RemoteExchange[GATHER]
          │   Layout: [expr_45:bigint]
          ├─ TopNPartial[10 by (orderkey ASC_NULLS_LAST)]       <<<<<<<<<< 
          │  │   Layout: [orderkey:bigint]
          │  └─ TableScan[tpch:orders:sf0.01]
          │         Layout: [orderkey:bigint]
          │         orderkey := tpch:orderkey
          │         tpch:orderstatus
          │             :: [[F], [O], [P]]
          └─ TopNPartial[10 by (orderkey_17 ASC_NULLS_LAST)]    <<<<<<<<<<
             │   Layout: [orderkey_17:bigint]
             └─ TableScan[tpch:orders:sf0.01]
                    Layout: [orderkey_17:bigint]
                    orderkey_17 := tpch:orderkey
                    tpch:orderstatus
                        :: [[F], [O], [P]]

from trino.

sopel39 avatar sopel39 commented on May 11, 2024

revamp the IR and optimizer to support a fully exploratory optimizer

Related to exploratory optimizer is a concept of traits. I'm appending some old notes:
Traits roadmap:

from trino.

sopel39 avatar sopel39 commented on May 11, 2024

In addition, some worthy refactoring proposals that came from discussions around prestodb/presto#11591:
prestodb/presto#11591 (comment)
prestodb/presto#11689
FYI: @findepi

from trino.

pgagnon avatar pgagnon commented on May 11, 2024

related to:
prestodb/presto#11777
prestodb/presto#4839

from trino.

martint avatar martint commented on May 11, 2024

Also related: #187

from trino.

RugratsJ avatar RugratsJ commented on May 11, 2024

Joins between different data sources as a federation mean will not be possible to push down. May I suggest to add a user controllable session switch to push down the whole query to a single data source? That way, users with lot of SQL experience can use this feature in the near future. If we wait till we finish all features listed here, I don't know when this can be done. Take a look at issue #3203, the same regular count(*) query ran for 8.50 minutes in Presto while it only took 5 seconds in Postgresql Citus to return results. Can we control it by session decide whether we want to push down or not push down the whole query to the data source, especially for data sources as Postgresql Citus cluster, Netezza NPP. Lot of time, it will be much faster to run the whole query in them in stead of in Presto. Lot of us look into Presto because of it's capability to federate across databases, files in parallel. However, for systems can't be migrated in a short period of time to Hive, Glue, Redshift or other data source support massive parallel pull in Presto, but have power to do complex queries (Postgresql Citus. Netezza NPP) , adding this intermediate step will benefit/help lot of customers move to Presto, ease the pain of handling existing system resides on legacy system, such as Postgresql Citus cluster, Netezza NPP, and Oracle.

from trino.

RugratsJ avatar RugratsJ commented on May 11, 2024

@mbasmanova, improving presto planner is different scenario than the one that I talked about in #3203. We are not talking about to implement compute plan to push down into a connector. In my example given in #3203, it's a very regular aggregation function that any ad-hoc data scientist will run, the orders table contains 150000000 records, I want to find out in March 1997, how many customers put more than 4 orders. When Presto runs it, it will do table scan and return the whole 150m records and then do the filter, group by, and counts. Transfer back 150M records through network will be very very slow. We are not talking about filter clause such as "o_custkey like 'ree%'", we are talking about Presto return the whole 150m records through table scan. It took 8 minutes 30 seconds for Presto to return the final results.

presto> select o_custkey, count() from pgtpch100g.public.orders where to_char(o_orderdate, 'mm/yyyy') = '03/1997' group by o_custkey having count() > 4;
o_custkey | _col1
-----------+-------
10843207 | 5
13299838 | 5
1809103 | 5
12675628 | 5
12038404 | 5
(5 rows)

Query 20200322_190724_00019_njknr, FINISHED, 5 nodes
Splits: 177 total, 177 done (100.00%)
8:30 [150M rows, 0B] [294K rows/s, 0B/s]

For the same exact query, when running in the data source itself (a 5 nodes postgresql citus cluster), the whole query returns in 5 seconds.

Can we just push down the whole query, not even doing any execution plan analyse at Presto level, let the underline data source do the whole thing and just return the final results. Since lot of systems, such as postgresql citus cluster, Netezza has the power to do parallel process very efficiently.

This applies to a data scientist creating a dash board through zeppelin/Jupyter notebook, running ad-hoc queries against Presto to retrieve results from different sub-systems stored in different data sources, for example, SQL server, mysql, postgres, Oracle, and Netezza.

from trino.

RugratsJ avatar RugratsJ commented on May 11, 2024

@tooptoop4, thank you for you suggestions. This is for ad-hoc data scientist notebooks (zeppelin/Juypter), we won't know what the conditions/joins/where filters beforehand, how can we create the views? If there are thousands of data scientists doing ad-hoc queries, if they have all kinds of different conditions/filters/joins, can we create hundreds of thousands of views beforehand without knowing what the conditions/filter/joins will be?

Giving the ability to let users enable/disable push down the whole query to sub-system session by session will resolve this issue perfectly.

from trino.

electrum avatar electrum commented on May 11, 2024

Pushing down the entire query is not really feasible. Consider that the underlying connector will have different functions, SQL functionality, semantics, etc., compared to Presto. There would be no way for Presto to parse and analyze a query that it doesn't understand. Also consider how it would work for queries that access tables from different connectors.

One thing that could be done is to provide the raw SQL to the connector. For example, the Elasticsearch connector allows providing a search query as part of the table name:

SELECT * FROM "tweets: +presto SQL^2"

This would be better implemented as a polymorphic table function (after we support those).

from trino.

mbasmanova avatar mbasmanova commented on May 11, 2024

Can we just push down the whole query, not even doing any execution plan analyse at Presto level, let the underline data source do the whole thing and just return the final results.

That's exactly what we have implemented. Presto parses the query, creates query plan, then asks the connector which subtree of the plan it would like to execute if any. This allows to capable connectors to take over scans + filter + agg + join + whatever else they are capable of. For example, Uber is using this capability to pushdown the whole query plan into Pinot.

from trino.

RugratsJ avatar RugratsJ commented on May 11, 2024

@electrum, as I mentioned before, push down whole query that contains joins to different data sources or when federating against different data sources at the same time is not feasible. What I'm saying here is to submit (push down) the whole ANSI SQL query to the single data source, only return the aggregated results.

@mbasmanova, thank you for your reply. In which version do you have the capability to push down the whole query to Pinot? My test is based on version 331 against a five nodes Postgresql Citus cluster, which doesn't seem to have this feature. Plus, when analyzing the SQL at presto level, sometimes it won't be able to return the best query plan. For example, in Postgresql Citus cluster case, the table is distributed across the cluster worker nodes, Presto won't know how the data is distributed and how to optimize the query plan in this case. That's why pushing down/submit the "raw" query to let Postgresql cluster run it will be the way to go.

And if the setting can be turned on or off in user session, that will be the best.

from trino.

RugratsJ avatar RugratsJ commented on May 11, 2024

@highker, oh, you are saying it's just for pinot connector pushdown support. I was looking for a solution for all JDBC based connectors, such as mysql, Oracle, Postgresql, etc. I tested with both 331 prestosql and 0.233 prestodb, don't see such capability to pushdown whole query to Oracle, mysql, or Postgresql. In future, may need push down to Netezza through generic JDBC.

from trino.

mbasmanova avatar mbasmanova commented on May 11, 2024

@RugratsJ This functionality requires two pieces: (1) Presto planner offering the connector to take over a subtree of the query plan and (2) a connector capable of doing so. At this point, we only have Pinot connector implement the APIs needed to take over a subtree (or whole tree) of the query plan. To enable this functionality for other data sources corresponding connectors need to be updated to implement com.facebook.presto.spi.ConnectorPlanOptimizer API. We can provide necessary guidance in case you decide to do that for one or more connectors.

from trino.

martint avatar martint commented on May 11, 2024

@mbasmanova, @highker. For a number of reasons (e.g., future-proofing for a fully exploratory optimizer) that's not the architectural approach we're taking here, so let's please keep that discussion out of this issue to avoid confusion.

@RugratsJ, if you'd like to learn the direction we're going with the design and implementation, please take a look at the document linked above: https://github.com/prestosql/presto/wiki/Pushdown-of-complex-operations. The implementation currently supports pushing down limit, tablesample, and simple filters and projections. There are people in the community working on aggregation pushdown and more complex filter/projection pushdown. Joins and other more complex operations will come after that.

from trino.

RugratsJ avatar RugratsJ commented on May 11, 2024

@mbasmanova, @martint, thank you very much for your explanation. I fully understand the direction, design, and implementation, and where it goes. If we could achieve all things listed here, it will be great, and I'm all for it. At the same time, I guess the life cycle to finish this will certainly be a long time. What I'm looking for, is kind of like traditional JDBC connector, submit the query to the backend database, get the final results back. Since systems like Netezza, Postgres Citus cluster will have their own distribution/optimization/parallel processes to handle data, it will be much faster to just submit the whole query to those system instead of optimizing the query plan at Presto level. Just like the standard ANSI query that I tested, it could be run directly in both Oracle and Postgres Citus cluster, both of them returned the final results must faster than Presto. If we have this capability to let users decide if they want Presto to do query analysis/or push down, it will be a relief from Presto side to avoid analyzing all types of SQLs against different databases. Submit/push down it and get the final results in seconds. Of course, this is just a short term solution or a supplementary method of what you guys are doing. It will also help users to save time and money that they could use lots of existing code without migrating in a hurry, if the legacy system is powerful enough to support their needs, while they could navigate and integrate data across all type of systems.

from trino.

tooptoop4 avatar tooptoop4 commented on May 11, 2024

one idea I had (as views already work) was pushing down entire WITH blocks (similar to how ETL tools like IBM Datastage have separate connectors per DBMS where lots of small queries are run completely pushed on to source db, then only results brought back into memory for joining with results of other small queries pushed down). ie WITH o as (select o_custkey, count(1) c from pgtpch100g.public.orders where to_char(o_orderdate, 'mm/yyyy') = '03/1997' group by o_custkey having count() > 4) select * from o join hive.a.b b on o.o_custkey = b.ckey where b.type = 'abc' and b.ckey < 10;

In the above scenario, the 150mn rows in oracle table 'pgtpch100g.public.orders' are not brought back to presto, just the small result set is brought back (ie 10k rows) which is then joined against hive. Sure it would be nice to push < 10 filter into oracle but its still much better than returning entire 150million rows from oracle into presto.

prestodb/presto#4874 (comment) mentions similar idea to push down entire WITH block

One place that would need to change for this idea is
https://github.com/prestosql/presto/blob/332/presto-base-jdbc/src/main/java/io/prestosql/plugin/jdbc/QueryBuilder.java#L104-L122 and its callers

some other notes:

  • somehow only let 1 thread/worker send the sql to the source otherwise results doubling up??
  • make the parser happy with the syntax (there are dbms specific functions (that aren't in presto) so presto parser should not say those functions in the WITH part of the sql are invalid as they can be valid in the jdbc source db)
  • don't allow the planner/optimizer to mess with the raw sql in the WITH block, just send to jdbc connector as-is. Perhaps it needs to be run as very first thing, sequentially...
  • distinguish WITH blocks for single jdbc catalog (which should pushdown entire query) vs hive or multi jdbc catalogs (which should let the planner optimise)

from trino.

RugratsJ avatar RugratsJ commented on May 11, 2024

I have already tested the above method before I submitted this request. That's one way of writing optimized Presto SQL under current design. However, in some cases, we will need to do aggregation against the whole table, or in reality, we may even have tables with trillions of records, which the above method will not work.

from trino.

RugratsJ avatar RugratsJ commented on May 11, 2024

@tooptoop4, what do you mean how fast is the query using with? Do you mean how fast the query is running or how fast we will be using this kind of query? We are looking at lots of data scientists will use notebooks to run ad-hoc queries.

from trino.

RugratsJ avatar RugratsJ commented on May 11, 2024

@tooptoop4, it's not improved much. Still very very slow. I have modified the connector to change to different fetch size, it helped. However, it's still lot slower than running the query directly in the source database.

from trino.

RugratsJ avatar RugratsJ commented on May 11, 2024

I added the session properties of fetch size, so I can change the default fetch size by session. It helped the speed.

from trino.

vishalya avatar vishalya commented on May 11, 2024

If there's any short term solution for 'pushing down' the aggregation.
I think, we can rely on the session properties to 'force' the flow in the short term. @mbasmanova @RugratsJ, any existing connector code I can take a look at.

from trino.

RugratsJ avatar RugratsJ commented on May 11, 2024

@tooptoop4, @skyahead, has the agreegation-pushdown been fully tested? Can I integrate it with current 332 release?

from trino.

skyahead avatar skyahead commented on May 11, 2024

@tooptoop4, @skyahead, has the agreegation-pushdown been fully tested? Can I integrate it with current 332 release?

@RugratsJ My code in its current form can not be merged into the master branch. It is my first dirty experiment to implement what is proposed here: https://github.com/prestosql/presto/wiki/Pushdown-of-complex-operations#aggregation-pushdown. At best, it has done 1/3 of the work needed.

Also, my project need is to push aggregations down to Druid and thus that was my main focus

from trino.

tooptoop4 avatar tooptoop4 commented on May 11, 2024

another item not mentioned in the description is pushdown of ORDER BY ie to take advantage of indexes/partitions in source db and HAVING

from trino.

josecanciani avatar josecanciani commented on May 11, 2024

another item not mentioned in the description is pushdown of ORDER BY ie to take advantage of indexes/partitions in source db

I agree, I don't know how much of this is possible, but problems that requires finding the TOP N based on an order could be greatly improved.

For example something like:

select c1, c2, c3
from (
   select o1, o2, c1, c2, c3 from mysql.xxx.yyy
   union all
   select o1, o2, c1, c2, c3 from mysql.zzz.vvv
)
order by o1, o2
limit 1000;

could be transformed to

select c1, c2, c3
from (
   select o1, o2, c1, c2, c3 from mysql.xxx.yyy **order by o1, o2 limit 1000**
   union all
   select o1, o2, c1, c2, c3 from mysql.zzz.vvv **order by o1, o2 limit 1000**
)
order by o1, o2
limit 1000;

Allowing in this case the Mysql servers to quickly find the first 1000 based on an index, so Presto don't have to transfer to full tables.

Jose

from trino.

infearOnTheWay avatar infearOnTheWay commented on May 11, 2024

Hi all,

AFAIK Presto has already support many geo functions such as st_within,st_distance and so on. Such functions is always used in projection or filter in geo sql query. If we use presto to connect to any geo datastore, these calcution would be optimized by space index supported by data store. So can presto push down these geo functions or any general functions?

from trino.

ankuragarawal avatar ankuragarawal commented on May 11, 2024

Hi Maintainers,
I was looking into the PushAggregationIntoTableScan class and it requires the source to be a TABLE_SCAN and so aggregations are never pushed down to the connector if filters exist. I can see that with a different syntax (by adding FILTER) in the select clause we can push down the aggregations along with filters but we are using Trino as a datasource in a few BI Tools (i.e. Tableau) and do not have the ability to change the syntax. Can you please share your thoughts, issues that you can think of in terms of optimized plan if we add a new class PushAggregationIntoScanFilterProject which enables the connectors to be able to handle them both until the ability to pushAggregation/filters to connector is added?

from trino.

findepi avatar findepi commented on May 11, 2024

@ankuragarawal such capability already exists, see PushPredicateIntoTableScan.

from trino.

ankuragarawal avatar ankuragarawal commented on May 11, 2024

Thanks @findepi. Your suggestion on remainingFilter on slack channel helped me get past the issue.
Do we have an open PR on push down complex filters? I see that we have it in the plan though could not find any open issues/PR for the same. Thanks again!

from trino.

michaelcnchang avatar michaelcnchang commented on May 11, 2024

@mbasmanova Based on this feature https://prestodb.io/blog/2019/12/23/improve-presto-planner If suppose a connector wants to replace a complete subtree and appropriately returns new subtree in below
public class DruidConnectorPlanOptimizer
implements ConnectorPlanOptimizer {
public PlanNode optimize(PlanNode maxSubplan, ConnectorSession session, VariableAllocator variableAllocator, PlanNodeIdAllocator idAllocator) {
return maxSubplan.accept(new Visitor(scanNodes, session, idAllocator), null);
}
}

Then will presto upper layers will be able to handle appropriately new replacement returned by connector without any additional changes??
Say if complete push down of query is being done like, Are there any limitations/additions need to be done in presto like updating CONNECTOR_ACCESSIBLE_PLAN_NODES and etc..
Just beginning to explore, So request would be to educate me about push down of the whole query. Thanks Much!!

from trino.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.