Comments (2)
hey @m946107011 , great question and thanks for providing some code as best you could. While we usually may want some data (even fake data is fine), I will take a shot, and assume your for loop captures the entirety of your code.
- Move the local cuda cluster creation outside of the for loop. You only need it once. You can run multiple models without having to redo the cluster. Assuming you really do want to redo your cluster for each iteration, you did not shut down the cluster in each iteration. To do that,
client.restart()
may work for you. If you want to burn it all, between iterations, you can also useclient.shutdown()
at the end of your run before rebuilding it on your next run. - When we were doing some large benchmarking, we actually put
gc.collect()
before we ran our model, not at the end. Right beforestart=time.time()
would be appropriate. This should clean up everything before you start timing and hurts nothing if there is nothing to clean up. Where you call the deletion of your dataframes is appropriate, as i'm assuming it happens at the end of your for loop.
Outside of this, while it sounds like you're using a single A100 40GB, if you are using a multi GPU set up, below is some code that you can play with that will utilize UCX, which is a faster interconnect than the default tcp. It might help with performance. Tuning most likely will be required for best performance.
# limit work-stealing as much as possible
dask.config.set({'distributed.scheduler.work-stealing': False})
dask.config.set({'distributed.scheduler.bandwidth': 1})
cluster = LocalCUDACluster(rmm_pool_size=1024440,
device_memory_limit=1024440*.85, # 85% of pool size. you may also want to change this.
protocol="ucx",
enable_tcp_over_ucx=True,
enable_nvlink=True)
Also, in terms of formatting, I know its your first question, you may want to edit it a bit to make it one solid block. I don't have edit access. You can use Preview
to see what it will look like before you post.
from cuml.
hey @m946107011 , great question and thanks for providing some code as best you could. While we usually may want some data (even fake data is fine), I will take a shot, and assume your for loop captures the entirety of your code.
- Move the local cuda cluster creation outside of the for loop. You only need it once. You can run multiple models without having to redo the cluster. Assuming you really do want to redo your cluster for each iteration, you did not shut down the cluster in each iteration. To do that,
client.restart()
may work for you. If you want to burn it all, between iterations, you can also useclient.shutdown()
at the end of your run before rebuilding it on your next run.- When we were doing some large benchmarking, we actually put
gc.collect()
before we ran our model, not at the end. Right beforestart=time.time()
would be appropriate. This should clean up everything before you start timing and hurts nothing if there is nothing to clean up. Where you call the deletion of your dataframes is appropriate, as i'm assuming it happens at the end of your for loop.Outside of this, while it sounds like you're using a single A100 40GB, if you are using a multi GPU set up, below is some code that you can play with that will utilize UCX, which is a faster interconnect than the default tcp. It might help with performance. Tuning most likely will be required for best performance.
# limit work-stealing as much as possible dask.config.set({'distributed.scheduler.work-stealing': False}) dask.config.set({'distributed.scheduler.bandwidth': 1}) cluster = LocalCUDACluster(rmm_pool_size=1024440, device_memory_limit=1024440*.85, # 85% of pool size. you may also want to change this. protocol="ucx", enable_tcp_over_ucx=True, enable_nvlink=True)
Also, in terms of formatting, I know its your first question, you may want to edit it a bit to make it one solid block. I don't have edit access. You can use
Preview
to see what it will look like before you post.
Hi:
I appreciate your advice!
I've encountered an issue when trying to utilize NVLink with the provided code.
I'm using A600 x6 with NVLink connections, and my CUDA Version is 11.2.
Do you have any suggestions or insights on how to address this? Thank you!
KeyError Traceback (most recent call last)
File ~/anaconda3/envs/rapids/lib/python3.9/site-packages/distributed/core.py:664, in Server.start(self)
663 try:
--> 664 await wait_for(self.start_unsafe(), timeout=timeout)
665 except asyncio.TimeoutError as exc:
File ~/anaconda3/envs/rapids/lib/python3.9/site-packages/distributed/utils.py:1940, in wait_for(fut, timeout)
1939 async def wait_for(fut: Awaitable[T], timeout: float) -> T:
-> 1940 return await asyncio.wait_for(fut, timeout)
File ~/anaconda3/envs/rapids/lib/python3.9/asyncio/tasks.py:442, in wait_for(fut, timeout, loop)
441 if timeout is None:
--> 442 return await fut
444 if timeout <= 0:
File ~/anaconda3/envs/rapids/lib/python3.9/site-packages/distributed/scheduler.py:4039, in Scheduler.start_unsafe(self)
4038 for addr in self._start_address:
-> 4039 await self.listen(
4040 addr,
4041 allow_offload=False,
4042 handshake_overrides={"pickle-protocol": 4, "compression": None},
4043 **self.security.get_listen_args("scheduler"),
4044 )
4045 self.ip = get_address_host(self.listen_address)
File ~/anaconda3/envs/rapids/lib/python3.9/site-packages/distributed/core.py:859, in Server.listen(self, port_or_addr, allow_offload, **kwargs)
858 assert isinstance(addr, str)
--> 859 listener = await listen(
860 addr,
861 self.handle_comm,
862 deserialize=self.deserialize,
863 allow_offload=allow_offload,
864 **kwargs,
865 )
866 self.listeners.append(listener)
File ~/anaconda3/envs/rapids/lib/python3.9/site-packages/distributed/comm/core.py:256, in Listener.await.._()
255 async def _():
--> 256 await self.start()
257 return self
File ~/anaconda3/envs/rapids/lib/python3.9/site-packages/distributed/comm/ucx.py:527, in UCXListener.start(self)
525 await self.comm_handler(ucx)
--> 527 init_once()
528 self.ucp_server = ucp.create_listener(serve_forever, port=self._input_port)
File ~/anaconda3/envs/rapids/lib/python3.9/site-packages/distributed/comm/ucx.py:158, in init_once()
153 with patch.dict(os.environ, ucx_environment):
154 # We carefully ensure that ucx_environment only contains things
155 # that don't override ucx_config or existing slots in the
156 # environment, so the user's external environment can safely
157 # override things here.
--> 158 ucp.init(options=ucx_config, env_takes_precedence=True)
160 pool_size_str = dask.config.get("distributed.rmm.pool-size")
File ~/anaconda3/envs/rapids/lib/python3.9/site-packages/ucp/core.py:938, in init(options, env_takes_precedence, blocking_progress_mode)
935 logger.debug(
936 f"Ignoring environment {env_k}={env_v}; using option {k}={v}"
937 )
--> 938 _ctx = ApplicationContext(options, blocking_progress_mode=blocking_progress_mode)
File ~/anaconda3/envs/rapids/lib/python3.9/site-packages/ucp/core.py:223, in ApplicationContext.init(self, config_dict, blocking_progress_mode)
222 # For now, a application context only has one worker
--> 223 self.context = ucx_api.UCXContext(config_dict)
224 self.worker = ucx_api.UCXWorker(self.context)
File ucp/_libs/ucx_context.pyx:78, in ucp._libs.ucx_api.UCXContext.init()
KeyError: 'TLS'
The above exception was the direct cause of the following exception:
RuntimeError Traceback (most recent call last)
File ~/anaconda3/envs/rapids/lib/python3.9/site-packages/distributed/deploy/spec.py:325, in SpecCluster._start(self)
324 self.scheduler = cls(**self.scheduler_spec.get("options", {}))
--> 325 self.scheduler = await self.scheduler
326 self.scheduler_comm = rpc(
327 getattr(self.scheduler, "external_address", None)
328 or self.scheduler.address,
329 connection_args=self.security.get_connection_args("client"),
330 )
File ~/anaconda3/envs/rapids/lib/python3.9/site-packages/distributed/core.py:672, in Server.start(self)
671 await _close_on_failure(exc)
--> 672 raise RuntimeError(f"{type(self).name} failed to start.") from exc
673 if self.status == Status.init:
RuntimeError: Scheduler failed to start.
The above exception was the direct cause of the following exception:
RuntimeError Traceback (most recent call last)
Cell In[1], line 647
645 print('Type:'+str(problem_mode))
646 print('Evaluation Metrics:'+str(main_perform))
--> 647 MTForestNet_Multiprocess(folder_name,seed,problem_mode,main_perform,save_file_name,fast)
648 end=time.time()
649 print('Time: ',(end - start)/60,'minutes')
Cell In[1], line 56, in MTForestNet_Multiprocess(folder_name, seed, problem_mode, main_perform, save_file_name, fast)
53 dask.config.set({'distributed.scheduler.work-stealing': False})
54 dask.config.set({'distributed.scheduler.bandwidth': 1})
---> 56 cluster = LocalCUDACluster(rmm_pool_size=102444,
57 device_memory_limit=102444*.75, # 85% of pool size. you may also want to change this.
58 protocol="ucx",
59 enable_tcp_over_ucx=True,
60 enable_nvlink=True)
62 c = Client(cluster)
63 workers = c.has_what().keys()
File ~/anaconda3/envs/rapids/lib/python3.9/site-packages/dask_cuda/local_cuda_cluster.py:352, in LocalCUDACluster.init(self, CUDA_VISIBLE_DEVICES, n_workers, threads_per_worker, memory_limit, device_memory_limit, data, local_directory, shared_filesystem, protocol, enable_tcp_over_ucx, enable_infiniband, enable_nvlink, enable_rdmacm, rmm_pool_size, rmm_maximum_pool_size, rmm_managed_memory, rmm_async, rmm_release_threshold, rmm_log_directory, rmm_track_allocations, jit_unspill, log_spilling, worker_class, pre_import, **kwargs)
348 worker_class = partial(Nanny, worker_class=worker_class)
350 self.pre_import = pre_import
--> 352 super().init(
353 n_workers=0,
354 threads_per_worker=threads_per_worker,
355 memory_limit=self.memory_limit,
356 processes=True,
357 data=data,
358 local_directory=local_directory,
359 protocol=protocol,
360 worker_class=worker_class,
361 config={
362 "distributed.comm.ucx": get_ucx_config(
363 enable_tcp_over_ucx=enable_tcp_over_ucx,
364 enable_nvlink=enable_nvlink,
365 enable_infiniband=enable_infiniband,
366 enable_rdmacm=enable_rdmacm,
367 )
368 },
369 **kwargs,
370 )
372 self.new_spec["options"]["preload"] = self.new_spec["options"].get(
373 "preload", []
374 ) + ["dask_cuda.initialize"]
375 self.new_spec["options"]["preload_argv"] = self.new_spec["options"].get(
376 "preload_argv", []
377 ) + ["--create-cuda-context", "--protocol", protocol]
File ~/anaconda3/envs/rapids/lib/python3.9/site-packages/distributed/deploy/local.py:253, in LocalCluster.init(self, name, n_workers, threads_per_worker, processes, loop, start, host, ip, scheduler_port, silence_logs, dashboard_address, worker_dashboard_address, diagnostics_port, services, worker_services, service_kwargs, asynchronous, security, protocol, blocked_handlers, interface, worker_class, scheduler_kwargs, scheduler_sync_interval, **worker_kwargs)
250 worker = {"cls": worker_class, "options": worker_kwargs}
251 workers = {i: worker for i in range(n_workers)}
--> 253 super().init(
254 name=name,
255 scheduler=scheduler,
256 workers=workers,
257 worker=worker,
258 loop=loop,
259 asynchronous=asynchronous,
260 silence_logs=silence_logs,
261 security=security,
262 scheduler_sync_interval=scheduler_sync_interval,
263 )
File ~/anaconda3/envs/rapids/lib/python3.9/site-packages/distributed/deploy/spec.py:284, in SpecCluster.init(self, workers, scheduler, worker, asynchronous, loop, security, silence_logs, name, shutdown_on_close, scheduler_sync_interval)
282 if not self.called_from_running_loop:
283 self._loop_runner.start()
--> 284 self.sync(self._start)
285 try:
286 self.sync(self._correct_state)
File ~/anaconda3/envs/rapids/lib/python3.9/site-packages/distributed/utils.py:358, in SyncMethodMixin.sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
356 return future
357 else:
--> 358 return sync(
359 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
360 )
File ~/anaconda3/envs/rapids/lib/python3.9/site-packages/distributed/utils.py:434, in sync(loop, func, callback_timeout, *args, **kwargs)
431 wait(10)
433 if error is not None:
--> 434 raise error
435 else:
436 return result
File ~/anaconda3/envs/rapids/lib/python3.9/site-packages/distributed/utils.py:408, in sync..f()
406 awaitable = wait_for(awaitable, timeout)
407 future = asyncio.ensure_future(awaitable)
--> 408 result = yield future
409 except Exception as exception:
410 error = exception
File ~/anaconda3/envs/rapids/lib/python3.9/site-packages/tornado/gen.py:767, in Runner.run(self)
765 try:
766 try:
--> 767 value = future.result()
768 except Exception as e:
769 # Save the exception for later. It's important that
770 # gen.throw() not be called inside this try/except block
771 # because that makes sys.exc_info behave unexpectedly.
772 exc: Optional[Exception] = e
File ~/anaconda3/envs/rapids/lib/python3.9/site-packages/distributed/deploy/spec.py:335, in SpecCluster._start(self)
333 self.status = Status.failed
334 await self._close()
--> 335 raise RuntimeError(f"Cluster failed to start: {e}") from e
RuntimeError: Cluster failed to start: Scheduler failed to start.
from cuml.
Related Issues (20)
- [BUG] UserWarning: Error getting driver and runtime versions: HOT 1
- [BUG] Devcontainer 11.8 image base doesn't exist HOT 1
- Why cuml=24.04 cannot be found? HOT 5
- [QST] Version matching problem about python3.7 HOT 3
- Getting all cuml tests to pass with cudf.pandas enabled HOT 3
- Replace deprecated `cupyx.scatter_max` and `cupyx.scatter_add` with `cupy.maximum.at` and `cupy.add.at`
- Address scikit-learn FutureWarnings and DeprecationWarnings from 1.x
- [BUG] Dask PCA HOT 2
- [BUG] Random Forest issue with more than 2 models with different criterion settings HOT 2
- [FEA] IVIS HOT 1
- [FEA] Sparse PCA in C++ Layer HOT 1
- [BUG] UMAP random_state doesn't provide consistency HOT 5
- [BUG] Dask + UMAP does not work with numpy array. HOT 1
- [DOC]
- Request for MLPClassifier HOT 1
- How to get the medoids from HDBSCAN? [QST] HOT 1
- CUML models not working with textattack library HOT 4
- [BUG] Linear SVC fitting with cuDF inputs causes AttributeError HOT 2
- [BUG] SVC fit_proba doesn't seem to be using Class Weights
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from cuml.