Giter VIP home page Giter VIP logo

Comments (23)

YarShev avatar YarShev commented on August 30, 2024 1

@yx367563, we are currently working on this in #7259 to allow for execution with native pandas under the hood. You could try to set cfg.NPartitions.put(1) but it still involves data transfer between processes for a single partition.

from modin.

YarShev avatar YarShev commented on August 30, 2024 1

When you set NPartitions to 1, you have a single partitioned Modin dataframe and thus it will be proccessed on a single core (but on a worker process). Note that it requres Ray to pull data onto that worker process to be able to process the operation. We are trying to avoid this overhead in #7258 and operate directly on a pandas dataframe in the main process.

from modin.

YarShev avatar YarShev commented on August 30, 2024

Hi @yx367563, thanks for filing this issue! You should not wrap process_data into ray.remote decorator. Modin itself takes care of distributing computation. If you remove ray.remote decorator and still see performance worse pandas, check out these posts on how to boost performance (1, 2).

from modin.

yx367563 avatar yx367563 commented on August 30, 2024

@YarShev Thank you for your reply, in fact I only added ray.remote at the very beginning, it was to allow the initial procedure to be executed on a non-head node.
I found that the performance optimisations provided are mainly for modin dataframe calculations, but is it possible to improve the performance of operations like to_parquet, I think a possible reason for slow performance is the generation of too many small files.

from modin.

YarShev avatar YarShev commented on August 30, 2024

@yx367563, can you try calling this before to_parquet?

import modin.config as cfg

with cfg.context(NPartitions=1):
    df = df._repartition(axis=0)

df.to_parquet(...)

This should write a single parquet file.

from modin.

yx367563 avatar yx367563 commented on August 30, 2024

@YarShev I tried executing the following code but it still generates a lot of small files

with cfg.context(NPartitions=1):
    df._repartition(axis=0)
    df.to_parquet("xxx.parquet")

from modin.

YarShev avatar YarShev commented on August 30, 2024

@yx367563, sorry, I didn't put the code correctly. Please see the updated comment above.

from modin.

yx367563 avatar yx367563 commented on August 30, 2024

@YarShev Yes, currently only one part file will be generated in the folder, but the performance is still very poor. Can I assume that modin has poor performance for operations such as storage and inserting columns, and is more suitable for some computationally intensive operations?

from modin.

YarShev avatar YarShev commented on August 30, 2024

@yx367563, the operations such as storage and inserting columns should also perform well depending on the data size. It would be great if you could share the exact script and data you are using so we could reproduce the performance issue.

from modin.

yx367563 avatar yx367563 commented on August 30, 2024

@YarShev I'm sorry I can't provide the exact data file, but I can tell you it's a simple file with 100 columns of data, about 1 million rows, and a file size of about 500M.
The operation is to read the file into a modin dataframe, select 50 columns, and concat 60 such dataframes, and then execute:

source_df["xxx"].apply(round)
target_df = source_df.groupby(['xxx', 'yyy'])[select_cols].sum().reset_index()
target_df.insert(1, 'new-column', date)
with cfg.context(NPartitions=1):
            target_df = target_df._repartition(axis=0)
            target_df.to_parquet("xxx.parquet")

from modin.

yx367563 avatar yx367563 commented on August 30, 2024

@YarShev There is another very strange phenomenon. I tested the time taken to call each method. I found that if the operation of inserting columns is added, the time taken by to_parquet will be very short (about 2 seconds), but if the operation of inserting columns is deleted, the time taken by to_parquet will be very long (more than 30 seconds).

from modin.

yx367563 avatar yx367563 commented on August 30, 2024

In addition, I found that the total number of CPUs in the Ray cluster is used to set NPartitions in the initialize_ray method. Will it affect performance when there are too many CPUs? Can other configurations modify this value? Would you happen to have any suggestions or best practices?

from modin.

YarShev avatar YarShev commented on August 30, 2024

In addition, I found that the total number of CPUs in the Ray cluster is used to set NPartitions in the initialize_ray method. Will it affect performance when there are too many CPUs? Can other configurations modify this value? Would you happen to have any suggestions or best practices?

NPartitions is intentionally set with the value of CPUs count in the Ray cluster to have maximum performance. If the CPUs count is low (e.g., <=4), you will have a low number of partitions to be processed in parallel. You can modify the number of partitions on your own with the following config.

import modin.config as cfg

cfg.NPartitions.put(<N>)

Note that if you set a value that is much greater than the number of CPUs, you will get a dataframe that is overpartitioned, which also affects performance. You can find some performance tips on Optimizations Notes page.

from modin.

YarShev avatar YarShev commented on August 30, 2024

@YarShev There is another very strange phenomenon. I tested the time taken to call each method. I found that if the operation of inserting columns is added, the time taken by to_parquet will be very short (about 2 seconds), but if the operation of inserting columns is deleted, the time taken by to_parquet will be very long (more than 30 seconds).

We will try to reproduce your performance issue with a generated file of the size you mentioned.

The operation is to read the file into a modin dataframe, select 50 columns, and concat 60 such dataframes, and then execute:

Could you provide a script with the operations you perform? That would be helpful because your sentence and the script in the issue description seem to have some discrepancies.

from modin.

YarShev avatar YarShev commented on August 30, 2024

Could you also share the following info?

  • python version:
  • Modin version:
  • Engine:
  • CPUs number:

from modin.

yx367563 avatar yx367563 commented on August 30, 2024

Could you provide a script with the operations you perform? That would be helpful because your sentence and the script in the issue description seem to have some discrepancies.

@YarShev The general logic is the same. The script provided at the beginning slightly simplifies some unimportant parts. You can refer to this script

import ray
# import pandas as pd
import modin.pandas as pd
import time

@ray.remote
def process_data():
    modin_df_list = [] # stored 59 modin_dfs. The logic of reading and preprocessing is the same as below
    for i in range(59, size):
        # Read file and do some preprocess
        modin_df = pd.read_csv("xxx.csv")
        select_cols = list(modin_df.columns[-40:])
        select_cols_plus = select_cols[:]
        select_cols_plus.extend(['a', 'b', 'c'])
        modin_df = modin_df[select_cols_plus]
        modin_df_list.append(modin_df)

        # Start data analysis
        source_df = pd.concat([df for df in modin_df_list])
        source_df["a"].apply(round)
        target_df = source_df.groupby(['a', 'b'])[select_cols].sum().reset_index()
        target_df.insert(1, 'new-line', 'test')
        target_df.to_parquet(f"xxx.parquet")
        del(modin_df_list[0])

if __name__ == '__main__':
    ray.init()
    ray.get(process_data.remote())

python version: 3.10.12
Modin version: 0.30.1
Engine: Ray 2.23.0
CPUs number: 256

from modin.

yx367563 avatar yx367563 commented on August 30, 2024

@YarShev If I have a large number of CPUs in the cluster, say around 300, and the processing logic is as described above, will I still achieve better performance by setting it to 300?
In addition, if I enable auto-scaling in Ray cluster, then the number of CPUs will change dynamically with different loads, how should I configure it better?

from modin.

YarShev avatar YarShev commented on August 30, 2024

How many nodes do you have in the Ray cluster?

@YarShev If I have a large number of CPUs in the cluster, say around 300, and the processing logic is as described above, will I still achieve better performance by setting it to 300?

Actually, you should adjust this value for each concrete case.

  1. If you are running in a single node, it is not always beneficial to use all CPUs cores since it depends on the workload.
  2. If you are running in a cluster, there might be data transfers across nodes, which can affect performance. The less data transfers occur, the better performance you get. Modin relies on Ray scheduling mechanism, which is quite mature and should take into account data placement. You could try to use RayInitCustomResources and RayTaskCustomResources to specify task parallelism and Ray nodes where tasks will be scheduled on.

In addition, if I enable auto-scaling in Ray cluster, then the number of CPUs will change dynamically with different loads, how should I configure it better?

There are no certain recommendations here as well. You should adjust CPUs and NPartitions values for each concrete case.

from modin.

yx367563 avatar yx367563 commented on August 30, 2024

@YarShev OK. In Ray Cluster I used 8~10 nodes, each with 32 cores.

from modin.

yx367563 avatar yx367563 commented on August 30, 2024

@YarShev By the way, is there a way to turn off the parallelism of Modin and only keep the dataframe data structure used by Modin, and if so, will the performance be the same as pandas?

from modin.

yx367563 avatar yx367563 commented on August 30, 2024

@YarShev It is not necessary to call the implementation logic of pandas. Just keep the dataframe data structure of modin, but remove the parallel logic and execute it serially on a single core. Is this possible?

from modin.

Retribution98 avatar Retribution98 commented on August 30, 2024

Hi @yx367563,
Thanks for the details, I was able to reproduce your problem, but unfortunately I don't have a solution for you.

First of all, "achieving performance as good as native pandas, at least" is the goal for Modin, but right now it doesn't always work for some operations or some pipelines. The reason for your slowdown and phenomenons, as you said, is the materialization of data from workers. (This happens when necessary for correct executing and wastes a lot of time.) In your pipeline, this happens on the insert operation, and if you remove it, it goes into the to_parquet operation.

The next point I would like to pay your attention to working on a Ray cluster. Modin expects it to run on the head node of the cluster, allowing it to manage resources correctly. If you use Modin in a remote function, it may cause performance slowdown, so we heigly recommend not using it. If you want to avoid executing on the local machine, you can create a head node on the remote machine and connect to it via ssh or ray submit. You can find more information about using modin in a cluster here.

Will it affect performance when there are too many CPUs?

Since using a cluster increases the slowdowns caused by data transfer to/from workers, we recommend using it if local execution is not possible (For example if data is very large).
This note also applies to the number of CPUs, because if there are too many partitions, the time overhead increases. Sometimes it is better to choose fewer CPUs, which will give a better perfomance. Just try different options and find the optimal configuration for your pipeline.

from modin.

yx367563 avatar yx367563 commented on August 30, 2024

@Retribution98 In fact, I set the CPU of the head node to 0 on the Ray Cluster and turned on the autoscaler, so if I run it directly on the head node, I will encounter the error No KeyError CPU, because in the Modin code, the number of partitions will be determined according to the number of CPUs in the current cluster during initialization.
To solve this problem, I can only convert the corresponding logic to the Worker Node. In fact, this configuration method of Ray Cluster is officially recommended and many users are using it. Maybe you can fix this problem and try to consider the scenario with Ray autoscaler and make some optimizations.
In any case, thank you for your feedback and efforts!

from modin.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.