Giter VIP home page Giter VIP logo

Comments (2)

taureandyernv avatar taureandyernv commented on June 9, 2024

hey @m946107011 , great question and thanks for providing some code as best you could. While we usually may want some data (even fake data is fine), I will take a shot, and assume your for loop captures the entirety of your code.

  1. Move the local cuda cluster creation outside of the for loop. You only need it once. You can run multiple models without having to redo the cluster. Assuming you really do want to redo your cluster for each iteration, you did not shut down the cluster in each iteration. To do that, client.restart() may work for you. If you want to burn it all, between iterations, you can also use client.shutdown() at the end of your run before rebuilding it on your next run.
  2. When we were doing some large benchmarking, we actually put gc.collect() before we ran our model, not at the end. Right before start=time.time() would be appropriate. This should clean up everything before you start timing and hurts nothing if there is nothing to clean up. Where you call the deletion of your dataframes is appropriate, as i'm assuming it happens at the end of your for loop.

Outside of this, while it sounds like you're using a single A100 40GB, if you are using a multi GPU set up, below is some code that you can play with that will utilize UCX, which is a faster interconnect than the default tcp. It might help with performance. Tuning most likely will be required for best performance.

  # limit work-stealing as much as possible
  dask.config.set({'distributed.scheduler.work-stealing': False})
  dask.config.set({'distributed.scheduler.bandwidth': 1})

  cluster = LocalCUDACluster(rmm_pool_size=1024440,
                             device_memory_limit=1024440*.85, # 85% of pool size.  you may also want to change this.
                             protocol="ucx",
                             enable_tcp_over_ucx=True,
                             enable_nvlink=True)

Also, in terms of formatting, I know its your first question, you may want to edit it a bit to make it one solid block. I don't have edit access. You can use Preview to see what it will look like before you post.

from cuml.

m946107011 avatar m946107011 commented on June 9, 2024

hey @m946107011 , great question and thanks for providing some code as best you could. While we usually may want some data (even fake data is fine), I will take a shot, and assume your for loop captures the entirety of your code.

  1. Move the local cuda cluster creation outside of the for loop. You only need it once. You can run multiple models without having to redo the cluster. Assuming you really do want to redo your cluster for each iteration, you did not shut down the cluster in each iteration. To do that, client.restart() may work for you. If you want to burn it all, between iterations, you can also use client.shutdown() at the end of your run before rebuilding it on your next run.
  2. When we were doing some large benchmarking, we actually put gc.collect() before we ran our model, not at the end. Right before start=time.time() would be appropriate. This should clean up everything before you start timing and hurts nothing if there is nothing to clean up. Where you call the deletion of your dataframes is appropriate, as i'm assuming it happens at the end of your for loop.

Outside of this, while it sounds like you're using a single A100 40GB, if you are using a multi GPU set up, below is some code that you can play with that will utilize UCX, which is a faster interconnect than the default tcp. It might help with performance. Tuning most likely will be required for best performance.

  # limit work-stealing as much as possible
  dask.config.set({'distributed.scheduler.work-stealing': False})
  dask.config.set({'distributed.scheduler.bandwidth': 1})

  cluster = LocalCUDACluster(rmm_pool_size=1024440,
                             device_memory_limit=1024440*.85, # 85% of pool size.  you may also want to change this.
                             protocol="ucx",
                             enable_tcp_over_ucx=True,
                             enable_nvlink=True)

Also, in terms of formatting, I know its your first question, you may want to edit it a bit to make it one solid block. I don't have edit access. You can use Preview to see what it will look like before you post.

Hi:
I appreciate your advice!
I've encountered an issue when trying to utilize NVLink with the provided code.
I'm using A600 x6 with NVLink connections, and my CUDA Version is 11.2.
Do you have any suggestions or insights on how to address this? Thank you!


KeyError Traceback (most recent call last)
File ~/anaconda3/envs/rapids/lib/python3.9/site-packages/distributed/core.py:664, in Server.start(self)
663 try:
--> 664 await wait_for(self.start_unsafe(), timeout=timeout)
665 except asyncio.TimeoutError as exc:

File ~/anaconda3/envs/rapids/lib/python3.9/site-packages/distributed/utils.py:1940, in wait_for(fut, timeout)
1939 async def wait_for(fut: Awaitable[T], timeout: float) -> T:
-> 1940 return await asyncio.wait_for(fut, timeout)

File ~/anaconda3/envs/rapids/lib/python3.9/asyncio/tasks.py:442, in wait_for(fut, timeout, loop)
441 if timeout is None:
--> 442 return await fut
444 if timeout <= 0:

File ~/anaconda3/envs/rapids/lib/python3.9/site-packages/distributed/scheduler.py:4039, in Scheduler.start_unsafe(self)
4038 for addr in self._start_address:
-> 4039 await self.listen(
4040 addr,
4041 allow_offload=False,
4042 handshake_overrides={"pickle-protocol": 4, "compression": None},
4043 **self.security.get_listen_args("scheduler"),
4044 )
4045 self.ip = get_address_host(self.listen_address)

File ~/anaconda3/envs/rapids/lib/python3.9/site-packages/distributed/core.py:859, in Server.listen(self, port_or_addr, allow_offload, **kwargs)
858 assert isinstance(addr, str)
--> 859 listener = await listen(
860 addr,
861 self.handle_comm,
862 deserialize=self.deserialize,
863 allow_offload=allow_offload,
864 **kwargs,
865 )
866 self.listeners.append(listener)

File ~/anaconda3/envs/rapids/lib/python3.9/site-packages/distributed/comm/core.py:256, in Listener.await.._()
255 async def _():
--> 256 await self.start()
257 return self

File ~/anaconda3/envs/rapids/lib/python3.9/site-packages/distributed/comm/ucx.py:527, in UCXListener.start(self)
525 await self.comm_handler(ucx)
--> 527 init_once()
528 self.ucp_server = ucp.create_listener(serve_forever, port=self._input_port)

File ~/anaconda3/envs/rapids/lib/python3.9/site-packages/distributed/comm/ucx.py:158, in init_once()
153 with patch.dict(os.environ, ucx_environment):
154 # We carefully ensure that ucx_environment only contains things
155 # that don't override ucx_config or existing slots in the
156 # environment, so the user's external environment can safely
157 # override things here.
--> 158 ucp.init(options=ucx_config, env_takes_precedence=True)
160 pool_size_str = dask.config.get("distributed.rmm.pool-size")

File ~/anaconda3/envs/rapids/lib/python3.9/site-packages/ucp/core.py:938, in init(options, env_takes_precedence, blocking_progress_mode)
935 logger.debug(
936 f"Ignoring environment {env_k}={env_v}; using option {k}={v}"
937 )
--> 938 _ctx = ApplicationContext(options, blocking_progress_mode=blocking_progress_mode)

File ~/anaconda3/envs/rapids/lib/python3.9/site-packages/ucp/core.py:223, in ApplicationContext.init(self, config_dict, blocking_progress_mode)
222 # For now, a application context only has one worker
--> 223 self.context = ucx_api.UCXContext(config_dict)
224 self.worker = ucx_api.UCXWorker(self.context)

File ucp/_libs/ucx_context.pyx:78, in ucp._libs.ucx_api.UCXContext.init()

KeyError: 'TLS'

The above exception was the direct cause of the following exception:

RuntimeError Traceback (most recent call last)
File ~/anaconda3/envs/rapids/lib/python3.9/site-packages/distributed/deploy/spec.py:325, in SpecCluster._start(self)
324 self.scheduler = cls(**self.scheduler_spec.get("options", {}))
--> 325 self.scheduler = await self.scheduler
326 self.scheduler_comm = rpc(
327 getattr(self.scheduler, "external_address", None)
328 or self.scheduler.address,
329 connection_args=self.security.get_connection_args("client"),
330 )

File ~/anaconda3/envs/rapids/lib/python3.9/site-packages/distributed/core.py:672, in Server.start(self)
671 await _close_on_failure(exc)
--> 672 raise RuntimeError(f"{type(self).name} failed to start.") from exc
673 if self.status == Status.init:

RuntimeError: Scheduler failed to start.

The above exception was the direct cause of the following exception:

RuntimeError Traceback (most recent call last)
Cell In[1], line 647
645 print('Type:'+str(problem_mode))
646 print('Evaluation Metrics:'+str(main_perform))
--> 647 MTForestNet_Multiprocess(folder_name,seed,problem_mode,main_perform,save_file_name,fast)
648 end=time.time()
649 print('Time: ',(end - start)/60,'minutes')

Cell In[1], line 56, in MTForestNet_Multiprocess(folder_name, seed, problem_mode, main_perform, save_file_name, fast)
53 dask.config.set({'distributed.scheduler.work-stealing': False})
54 dask.config.set({'distributed.scheduler.bandwidth': 1})
---> 56 cluster = LocalCUDACluster(rmm_pool_size=102444,
57 device_memory_limit=102444*.75, # 85% of pool size. you may also want to change this.
58 protocol="ucx",
59 enable_tcp_over_ucx=True,
60 enable_nvlink=True)
62 c = Client(cluster)
63 workers = c.has_what().keys()

File ~/anaconda3/envs/rapids/lib/python3.9/site-packages/dask_cuda/local_cuda_cluster.py:352, in LocalCUDACluster.init(self, CUDA_VISIBLE_DEVICES, n_workers, threads_per_worker, memory_limit, device_memory_limit, data, local_directory, shared_filesystem, protocol, enable_tcp_over_ucx, enable_infiniband, enable_nvlink, enable_rdmacm, rmm_pool_size, rmm_maximum_pool_size, rmm_managed_memory, rmm_async, rmm_release_threshold, rmm_log_directory, rmm_track_allocations, jit_unspill, log_spilling, worker_class, pre_import, **kwargs)
348 worker_class = partial(Nanny, worker_class=worker_class)
350 self.pre_import = pre_import
--> 352 super().init(
353 n_workers=0,
354 threads_per_worker=threads_per_worker,
355 memory_limit=self.memory_limit,
356 processes=True,
357 data=data,
358 local_directory=local_directory,
359 protocol=protocol,
360 worker_class=worker_class,
361 config={
362 "distributed.comm.ucx": get_ucx_config(
363 enable_tcp_over_ucx=enable_tcp_over_ucx,
364 enable_nvlink=enable_nvlink,
365 enable_infiniband=enable_infiniband,
366 enable_rdmacm=enable_rdmacm,
367 )
368 },
369 **kwargs,
370 )
372 self.new_spec["options"]["preload"] = self.new_spec["options"].get(
373 "preload", []
374 ) + ["dask_cuda.initialize"]
375 self.new_spec["options"]["preload_argv"] = self.new_spec["options"].get(
376 "preload_argv", []
377 ) + ["--create-cuda-context", "--protocol", protocol]

File ~/anaconda3/envs/rapids/lib/python3.9/site-packages/distributed/deploy/local.py:253, in LocalCluster.init(self, name, n_workers, threads_per_worker, processes, loop, start, host, ip, scheduler_port, silence_logs, dashboard_address, worker_dashboard_address, diagnostics_port, services, worker_services, service_kwargs, asynchronous, security, protocol, blocked_handlers, interface, worker_class, scheduler_kwargs, scheduler_sync_interval, **worker_kwargs)
250 worker = {"cls": worker_class, "options": worker_kwargs}
251 workers = {i: worker for i in range(n_workers)}
--> 253 super().init(
254 name=name,
255 scheduler=scheduler,
256 workers=workers,
257 worker=worker,
258 loop=loop,
259 asynchronous=asynchronous,
260 silence_logs=silence_logs,
261 security=security,
262 scheduler_sync_interval=scheduler_sync_interval,
263 )

File ~/anaconda3/envs/rapids/lib/python3.9/site-packages/distributed/deploy/spec.py:284, in SpecCluster.init(self, workers, scheduler, worker, asynchronous, loop, security, silence_logs, name, shutdown_on_close, scheduler_sync_interval)
282 if not self.called_from_running_loop:
283 self._loop_runner.start()
--> 284 self.sync(self._start)
285 try:
286 self.sync(self._correct_state)

File ~/anaconda3/envs/rapids/lib/python3.9/site-packages/distributed/utils.py:358, in SyncMethodMixin.sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
356 return future
357 else:
--> 358 return sync(
359 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
360 )

File ~/anaconda3/envs/rapids/lib/python3.9/site-packages/distributed/utils.py:434, in sync(loop, func, callback_timeout, *args, **kwargs)
431 wait(10)
433 if error is not None:
--> 434 raise error
435 else:
436 return result

File ~/anaconda3/envs/rapids/lib/python3.9/site-packages/distributed/utils.py:408, in sync..f()
406 awaitable = wait_for(awaitable, timeout)
407 future = asyncio.ensure_future(awaitable)
--> 408 result = yield future
409 except Exception as exception:
410 error = exception

File ~/anaconda3/envs/rapids/lib/python3.9/site-packages/tornado/gen.py:767, in Runner.run(self)
765 try:
766 try:
--> 767 value = future.result()
768 except Exception as e:
769 # Save the exception for later. It's important that
770 # gen.throw() not be called inside this try/except block
771 # because that makes sys.exc_info behave unexpectedly.
772 exc: Optional[Exception] = e

File ~/anaconda3/envs/rapids/lib/python3.9/site-packages/distributed/deploy/spec.py:335, in SpecCluster._start(self)
333 self.status = Status.failed
334 await self._close()
--> 335 raise RuntimeError(f"Cluster failed to start: {e}") from e

RuntimeError: Cluster failed to start: Scheduler failed to start.

from cuml.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.