Dask is a flexible parallel computing library for analytics. See documentation for more information.
New BSD. See License File.
A multi-tenant server for securely deploying and managing Dask clusters.
Home Page: https://gateway.dask.org/
License: BSD 3-Clause "New" or "Revised" License
Dask is a flexible parallel computing library for analytics. See documentation for more information.
New BSD. See License File.
@jacobtomlinson made a comment here on externalizing services. I would suggest that the default be a NodePort
for a two reasons:
NodePorts are builtin to k8s, we can easily test, and the setup should work out of the box without additional configuration
When using a NodePort I believe the pod/service is accessible by any nodeIP:
https://kubernetes.io/docs/tutorials/services/source-ip/#source-ip-for-services-with-type-nodeport
For configuring a NodePort I would recommend looking at:
https://kubernetes.io/docs/tasks/access-application-cluster/service-access-application-cluster/
or the following:
kind: Service
apiVersion: v1
metadata:
name: SERVICENAME
spec:
type: NodePort
selector:
app: APPNAME
ports:
# nodePort - a static port assigned on each the node
# port - port exposed internally in the cluster (assigned on creation)
# targetPort - the container port to send requests to
- nodePort: 30163
port: 8080
cc @jcrist
@jacobtomlinson do you have thoughts here ?
When misconfigured, under some conditions the proxy server will exit immediately after starting, with no error log and an exit code of 0. This seems to correlate to inability to bind to the specified addresses.
At large enterprises / institutions it's often common to have proxy stuff configured via environment variables -- out of the box this causes trouble with the gateway client as it doesn't pay attention to those.
However, telling it to use CurlAsyncHTTPClient
seems to fix the issue, by adding
AsyncHTTPClient.configure("tornado.curl_httpclient.CurlAsyncHTTPClient")
after the initial import in https://github.com/jcrist/dask-gateway/blob/master/dask-gateway/dask_gateway/client.py
Any concerns with making this the default? According to https://www.tornadoweb.org/en/stable/httpclient.html#module-tornado.httpclient it will require newish version of libcurl
and pycurl
"Currently the minimum supported version of libcurl is 7.22.0, and the minimum version of pycurl is 7.18.2."
With #29 TLS, all communication external to the cluster is secured with TLS. However, there still are routes internal to the cluster that are unencrypted. These include:
We should be able to secure these automatically by minting temporary credentials to use for each (as we're already doing for the schedulers). The trick here will be distributing these in a secure way. I propose the following:
generate-certificate
which outputs a certificate/key pair using the internal CA. A configuration option for setting the internal CA and persisting it somewhere must be present for this to work. The proxy certificates can then be managed and configured separately, and won't be automatically generated on startup.If the proxies aren't being externally managed, securing internal communications is transparent to the user, and could even be on by default.
I've been working on adding support for Job Queue (PBS, Slurm) systems to dask-gateway
, and have the following questions:
How do users distribute software throughout the cluster (if at all)?
For other cluster systems there's a clear artifact (docker image, tar archive, etc...) that should be used by the cluster manager (YARN, kubernetes, etc...) for distributing software to all nodes. How do people accomplish this for job queueing systems? Rely on a networked file system, and store custom software there? Manually scp
during the job? Rely on the cluster manager's concept of stagein
files (PBS has this, Slurm doesn't seem to)? Rely on administrators?
How should log files be handled?
Other systems have tools for managing log files externally to the users (e.g. YARN stores them on HDFS in an admin managed directory). Job queueing systems seem to default to outputting the log files in a user owned directory. I'm fine doing this (currently they're stored in ~/.dask-gateway/<cluster-id>/*
), but am worried that this may require users to periodically clean out their log files to avoid ever-growing disk usage. Ideally the gateway wouldn't be in the business of managing log files, and would leave that to the backing system.
In what directory should jobs be run?
To match the behavior of other cluster manager's, I'd prefer this directory to have the following attributes:
dask-gateway
if the cluster backend doesn't support this option)PBS has the -Wsandbox=PRIVATE
attribute to create a temporary directory to run the job in (admin configurable). For other systems right now I'm running in ~/.dask-gateway/<cluster-id>/
, which may be on a network file system and isn't ideal.
One option that could be consistent across all job queue backends is to provide a configurable local_directory
that has fast storage, and manually create a user-owned sub-directory of this on scheduler/worker startup (and clean it up on shutdown). I'm not sure what a good default for this should be (/tmp
? ~/.dask-gateway
?), or if it would require templating by user information (e.g. "{user_home}/.dask-gateway"
)
In #54 we added an API that allows users to configure admin-defined fields when starting a cluster. This API provides an endpoint for discovering what options exist (name, type, description). In the Python client library we use this information to build a ipywidget
interface for configuring the cluster. We could do the same here with a JupyterLab plugin.
I'm not sure what this would look like. I suspect it'd be easier to build a new plugin than modify the existing dask plugin, as we have features they don't need, and vice versa (e.g. we don't need any of the proxying support).
To handle potential database migrations, we should be using alembic to manage database versions.
Maybe a silly question: would dask-gateway be a good home for a dask submit
command, analogous to dask-yarn's https://yarn.dask.org/en/latest/submit.html#submitting-an-application?
Should have a docker-compose setup demonstrating usage on an example Hadoop cluster. This would be similar to the setup found in the jupyterhub-on-hadoop project here.
Not 100% sure why this happened or how to gather more information, but I happened to be watching my k8s pods (watch kubectl get po -n dask-gateway
) and saw that my scheduler pod got OOMKilled. The gateway log has the following line;
[W 2019-10-07 09:54:22.473 DaskGateway] Cluster b51b5f2b687d4b5eb9c020de892ebf07 stopped unexpectedly: Container stopped with exit code 137
And in my notebook i got a StreamClosedError
---------------------------------------------------------------------------
StreamClosedError Traceback (most recent call last)
~/anaconda/envs/airflow/lib/python3.6/site-packages/distributed/comm/tcp.py in read(self, deserializers)
183 try:
--> 184 n_frames = await stream.read_bytes(8)
185 n_frames = struct.unpack("Q", n_frames)[0]
StreamClosedError: Stream is closed
During handling of the above exception, another exception occurred:
CommClosedError Traceback (most recent call last)
<ipython-input-210-e6a074d7e0bd> in <module>
----> 1 ret = df.compute()
~/anaconda/envs/airflow/lib/python3.6/site-packages/dask/base.py in compute(self, **kwargs)
173 dask.base.compute
174 """
--> 175 (result,) = compute(self, traverse=False, **kwargs)
176 return result
177
~/anaconda/envs/airflow/lib/python3.6/site-packages/dask/base.py in compute(*args, **kwargs)
444 keys = [x.__dask_keys__() for x in collections]
445 postcomputes = [x.__dask_postcompute__() for x in collections]
--> 446 results = schedule(dsk, keys, **kwargs)
447 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
448
~/anaconda/envs/airflow/lib/python3.6/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
2518 should_rejoin = False
2519 try:
-> 2520 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
2521 finally:
2522 for f in futures.values():
~/anaconda/envs/airflow/lib/python3.6/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
1818 direct=direct,
1819 local_worker=local_worker,
-> 1820 asynchronous=asynchronous,
1821 )
1822
~/anaconda/envs/airflow/lib/python3.6/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
752 else:
753 return sync(
--> 754 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
755 )
756
~/anaconda/envs/airflow/lib/python3.6/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
335 if error[0]:
336 typ, exc, tb = error[0]
--> 337 raise exc.with_traceback(tb)
338 else:
339 return result[0]
~/anaconda/envs/airflow/lib/python3.6/site-packages/distributed/utils.py in f()
319 if callback_timeout is not None:
320 future = gen.with_timeout(timedelta(seconds=callback_timeout), future)
--> 321 result[0] = yield future
322 except Exception as exc:
323 error[0] = sys.exc_info()
~/anaconda/envs/airflow/lib/python3.6/site-packages/tornado/gen.py in run(self)
733
734 try:
--> 735 value = future.result()
736 except Exception:
737 exc_info = sys.exc_info()
~/anaconda/envs/airflow/lib/python3.6/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
1703 else:
1704 self._gather_future = future
-> 1705 response = await future
1706
1707 if response["status"] == "error":
~/anaconda/envs/airflow/lib/python3.6/site-packages/distributed/client.py in _gather_remote(self, direct, local_worker)
1755
1756 else: # ask scheduler to gather data for us
-> 1757 response = await self.scheduler.gather(keys=keys)
1758 finally:
1759 self._gather_semaphore.release()
~/anaconda/envs/airflow/lib/python3.6/site-packages/distributed/core.py in send_recv_from_rpc(**kwargs)
736 name, comm.name = comm.name, "ConnectionPool." + key
737 try:
--> 738 result = await send_recv(comm=comm, op=key, **kwargs)
739 finally:
740 self.pool.reuse(self.addr, comm)
~/anaconda/envs/airflow/lib/python3.6/site-packages/distributed/core.py in send_recv(comm, reply, serializers, deserializers, **kwargs)
529 await comm.write(msg, serializers=serializers, on_error="raise")
530 if reply:
--> 531 response = await comm.read(deserializers=deserializers)
532 else:
533 response = None
~/anaconda/envs/airflow/lib/python3.6/site-packages/distributed/comm/tcp.py in read(self, deserializers)
202 self.stream = None
203 if not shutting_down():
--> 204 convert_stream_closed_error(self, e)
205 else:
206 try:
~/anaconda/envs/airflow/lib/python3.6/site-packages/distributed/comm/tcp.py in convert_stream_closed_error(obj, exc)
130 raise CommClosedError("in %s: %s: %s" % (obj, exc.__class__.__name__, exc))
131 else:
--> 132 raise CommClosedError("in %s: %s" % (obj, exc))
133
134
CommClosedError: in <closed TLS>: Stream is closed
distributed.client - ERROR - Failed to reconnect to scheduler after 10.00 seconds, closing client
distributed.client - ERROR - Failed to reconnect to scheduler after 10.00 seconds, closing client
distributed.client - ERROR - Failed to reconnect to scheduler after 10.00 seconds, closing client
distributed.client - ERROR - Failed to reconnect to scheduler after 10.00 seconds, closing client
distributed.utils - ERROR -
Traceback (most recent call last):
File "/opt/continuum/anaconda/envs/airflow/lib/python3.6/site-packages/distributed/utils.py", line 666, in log_errors
yield
File "/opt/continuum/anaconda/envs/airflow/lib/python3.6/site-packages/distributed/client.py", line 1276, in _close
await gen.with_timeout(timedelta(seconds=2), list(coroutines))
concurrent.futures._base.CancelledError
distributed.utils - ERROR -
Traceback (most recent call last):
File "/opt/continuum/anaconda/envs/airflow/lib/python3.6/site-packages/distributed/utils.py", line 666, in log_errors
yield
File "/opt/continuum/anaconda/envs/airflow/lib/python3.6/site-packages/distributed/client.py", line 1005, in _reconnect
await self._close()
File "/opt/continuum/anaconda/envs/airflow/lib/python3.6/site-packages/distributed/client.py", line 1276, in _close
await gen.with_timeout(timedelta(seconds=2), list(coroutines))
concurrent.futures._base.CancelledError
distributed.utils - ERROR -
Traceback (most recent call last):
File "/opt/continuum/anaconda/envs/airflow/lib/python3.6/site-packages/distributed/utils.py", line 666, in log_errors
yield
File "/opt/continuum/anaconda/envs/airflow/lib/python3.6/site-packages/distributed/client.py", line 1276, in _close
await gen.with_timeout(timedelta(seconds=2), list(coroutines))
concurrent.futures._base.CancelledError
distributed.utils - ERROR -
Traceback (most recent call last):
File "/opt/continuum/anaconda/envs/airflow/lib/python3.6/site-packages/distributed/utils.py", line 666, in log_errors
yield
File "/opt/continuum/anaconda/envs/airflow/lib/python3.6/site-packages/distributed/client.py", line 1005, in _reconnect
await self._close()
File "/opt/continuum/anaconda/envs/airflow/lib/python3.6/site-packages/distributed/client.py", line 1276, in _close
await gen.with_timeout(timedelta(seconds=2), list(coroutines))
concurrent.futures._base.CancelledError
distributed.utils - ERROR -
Traceback (most recent call last):
File "/opt/continuum/anaconda/envs/airflow/lib/python3.6/site-packages/distributed/utils.py", line 666, in log_errors
yield
File "/opt/continuum/anaconda/envs/airflow/lib/python3.6/site-packages/distributed/client.py", line 1276, in _close
await gen.with_timeout(timedelta(seconds=2), list(coroutines))
concurrent.futures._base.CancelledError
distributed.utils - ERROR -
Traceback (most recent call last):
File "/opt/continuum/anaconda/envs/airflow/lib/python3.6/site-packages/distributed/utils.py", line 666, in log_errors
yield
File "/opt/continuum/anaconda/envs/airflow/lib/python3.6/site-packages/distributed/client.py", line 1005, in _reconnect
await self._close()
File "/opt/continuum/anaconda/envs/airflow/lib/python3.6/site-packages/distributed/client.py", line 1276, in _close
await gen.with_timeout(timedelta(seconds=2), list(coroutines))
concurrent.futures._base.CancelledError
distributed.utils - ERROR -
Traceback (most recent call last):
File "/opt/continuum/anaconda/envs/airflow/lib/python3.6/site-packages/distributed/utils.py", line 666, in log_errors
yield
File "/opt/continuum/anaconda/envs/airflow/lib/python3.6/site-packages/distributed/client.py", line 1276, in _close
await gen.with_timeout(timedelta(seconds=2), list(coroutines))
concurrent.futures._base.CancelledError
distributed.utils - ERROR -
Traceback (most recent call last):
File "/opt/continuum/anaconda/envs/airflow/lib/python3.6/site-packages/distributed/utils.py", line 666, in log_errors
yield
File "/opt/continuum/anaconda/envs/airflow/lib/python3.6/site-packages/distributed/client.py", line 1005, in _reconnect
await self._close()
File "/opt/continuum/anaconda/envs/airflow/lib/python3.6/site-packages/distributed/client.py", line 1276, in _close
await gen.with_timeout(timedelta(seconds=2), list(coroutines))
concurrent.futures._base.CancelledError
This is the code that was being executed (well this is some of the relevant code that had been executed prior)
import dask.dataframe as dd
small = dd.read_parquet('gs://edill-data/flights/parquet/2006')#, gather_statistics=False, validate_schema=False)
small = client.persist(small)
df['ComputedAirTime'] = df['ArrTime'] - df['DepTime']
df['AirTimeDelta'] = df['ComputedAirTime'] - df['AirTime']
And here's the cell that I ran when the scheduler got OOMKilled
ret = df.compute()
Anything else I can provide to figure out if there's a bug on the dask-gateway side?
Sorry I don't have many details, but I'm noticing that when I create a new cluster and .scale(n)
I get n-1
workers.
But it's only on the first call to .scale
... If I do
cluster = gateway.new_cluster()
cluster.scale(2) # gets 1 worker
cluster.scale(0) # down to 0
cluster.scale(2) # gets 2 workers
I think I wait long enough in between each .scale
call. I can reliably reproduce, so I can show you when you get in today.
It would be useful to support users with admin permissions. This would allow admins permissions to see the state of the whole gateway, as well as stop/scale down user's clusters, among other possibilities.
Probably user error, but I see this in my logs with dask-gateway 0.4.1 when deploying on kubernetes
$ kubectl -n dask-gateway log gateway-dask-gateway-6b77565b47-jhcss 366ms
[I 2019-10-02 13:02:44.454 DaskGateway] Starting dask-gateway-server - version 0.4.1
[E 2019-10-02 13:02:44.455 DaskGateway] Exception while loading config file /etc/dask-gateway/dask_gateway_config.py
Traceback (most recent call last):
File "/opt/conda/lib/python3.7/site-packages/traitlets/config/application.py", line 562, in _load_config_files
config = loader.load_config()
File "/opt/conda/lib/python3.7/site-packages/traitlets/config/loader.py", line 457, in load_config
self._read_file_as_dict()
File "/opt/conda/lib/python3.7/site-packages/traitlets/config/loader.py", line 489, in _read_file_as_dict
py3compat.execfile(conf_filename, namespace)
File "/opt/conda/lib/python3.7/site-packages/ipython_genutils/py3compat.py", line 198, in execfile
exec(compiler(f.read(), fname, 'exec'), glob, loc)
File "/etc/dask-gateway/dask_gateway_config.py", line 106, in <module>
api_url = "http://{HUB_SERVICE_HOST}:{HUB_SERVICE_PORT}/hub/api".format(**os.environ)
KeyError: 'HUB_SERVICE_HOST'
[I 2019-10-02 13:02:44.457 DaskGateway] Cluster manager: 'dask_gateway_server.managers.local.UnsafeLocalClusterManager'
[I 2019-10-02 13:02:44.457 DaskGateway] Authenticator: 'dask_gateway_server.auth.DummyAuthenticator'
[I 2019-10-02 13:02:44.472 DaskGateway] Starting the Dask gateway scheduler proxy...
[I 2019-10-02 13:02:44.476 DaskGateway] Dask gateway scheduler proxy started at 'tls://gateway-dask-gateway-6b77565b47-jhcss:8786', api at 'http://127.0.0.1:35285'
Here's my helm config
gateway:
proxyToken: "<>"
auth:
type: jupyterhub
jupyterhub:
apiToken: "<>"
clusterManager:
image:
name: "gcr.io/dask-demo-182016/anaconda-gartner"
tag: latest
scheduler:
extraPodConfig:
affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: cloud.google.com/gke-preemptible
operator: DoesNotExist
worker:
extraPodConfig:
affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: cloud.google.com/gke-preemptible
operator: Equal
tolerations:
Toleration:
- key: "preemptible"
operator: "Equal"
value: "true"
effect: "NoSchedule"
It would be good to have a demo video in the docs demonstrating usage (similar to the one I did for jupyterhub-on-hadoop
. This should discuss the following:
It would be good to get this out before SciPy to have something to point people to.
Leaving this note here to figure out this problem later. To figure out how to run this against an actual EMR cluster with Livy enabled, the shortest path forward is to reuse the infrastructure that AWS handles for you -- it creates the Livy user, the livy keytab and adds the right proxyuser bits to the core-site.xml files so that impersonation is working properly. Ideally, we'd figure out how to do this with the dask
user too, but that's a reasonably big lift and not really that interesting in the scope of "let's make sure this works against a real EMR cluster".
The TODO here is to figure out what the necessary bootstrap script is to add:
For now, the docs will reflect reusing the Livy user
It seems skein tries to deploy a YARN cluster with my username (benschreck), rather than dask or hadoop? Getting the following error:
[E 2019-09-25 20:34:46.415 DaskGateway] Error while starting cluster 19f76a979aa3457bab7f1d39a0bcd20a
Traceback (most recent call last):
File "/mnt/var/dask-gateway/miniconda/lib/python3.6/site-packages/dask_gateway_server/app.py", line 722, in start_cluster
timeout=cluster.manager.cluster_start_timeout,
File "/mnt/var/dask-gateway/miniconda/lib/python3.6/asyncio/tasks.py", line 358, in wait_for
return fut.result()
File "/mnt/var/dask-gateway/miniconda/lib/python3.6/site-packages/dask_gateway_server/app.py", line 707, in _start_cluster
async for state in cluster.manager.start_cluster():
File "/mnt/var/dask-gateway/miniconda/lib/python3.6/site-packages/dask_gateway_server/managers/yarn.py", line 293, in start_cluster
app_id = await loop.run_in_executor(None, skein_client.submit, spec)
File "/mnt/var/dask-gateway/miniconda/lib/python3.6/concurrent/futures/thread.py", line 56, in run
result = self.fn(*self.args, **self.kwargs)
File "/mnt/var/dask-gateway/miniconda/lib/python3.6/site-packages/skein/core.py", line 509, in submit
resp = self._call('submit', spec.to_protobuf())
File "/mnt/var/dask-gateway/miniconda/lib/python3.6/site-packages/skein/core.py", line 289, in _call
raise self._server_error(exc.details())
skein.exceptions.DriverError: Failed to submit application, exception:
Permission denied: user=benschreck, access=WRITE, inode="/user":hdfs:hadoop:drwxr-xr-x
at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:319)
at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:219)
at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:189)
at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1663)
at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1647)
at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkAncestorAccess(FSDirectory.java:1606)
at org.apache.hadoop.hdfs.server.namenode.FSDirMkdirOp.mkdirs(FSDirMkdirOp.java:60)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirs(FSNamesystem.java:3049)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.mkdirs(NameNodeRpcServer.java:1079)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.mkdirs(ClientNamenodeProtocolServerSideTranslatorPB.java:652)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:447)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:989)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:850)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:793)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2489)
My Gateway is running using the following params (some are environment variables I set):
c.DaskGateway.cluster_manager_class = "dask_gateway_server.managers.yarn.YarnClusterManager"
c.DaskGateway.authenticator_class = "dask_gateway_server.auth.DummyAuthenticator"
c.DaskGateway.public_url = "http://:$DASK_GATEWAY_APPLICATION_PORT"
c.DaskGateway.gateway_url = "tls://:$DASK_SCHEDULER_PORT"
# The resource limits for a worker
c.YarnClusterManager.worker_memory = '$WORKER_MEM G'
c.YarnClusterManager.worker_cores = $WORKER_VCORES
c.YarnClusterManager.scheduler_memory = '$SCHEDULER_MEM G'
c.YarnClusterManager.scheduler_cores = $SCHEDULER_VCORES
c.YarnClusterManager.localize_files = {
'environment': {
'source': '$HDFS_PACKED_ENV',
'visibility': 'public'
}
}
c.YarnClusterManager.environment = {'HADOOP_HOME': '/usr'}
c.YarnClusterManager.scheduler_setup = 'source default/bin/activate'
c.YarnClusterManager.worker_setup = 'source default/bin/activate'
We should add documentation on installing and using server/gateway -- possibly note the go requirement for devs
Currently Kerberos authentication is implemented and works, but lacks tests.
We currently support yarn and local processes, it would be good to also add support for Kubernetes. See dask-kubernetes for reference.
e.g. https://gateway.dask.org/install-kube.html the link documentation provided by zero-to-jupyterhub-k8s
. should link to https://zero-to-jupyterhub.readthedocs.io/en/latest/create-k8s-cluster.html, but it tries to link to a local file instead.
My current workflow involves somewhat regularly starting a new dask cluster via dask-gateway. I'm finding that I then need to copy paste the URL of that cluster into a few new windows so I can watch the task graph / status / workers / etc. It would be convenient if I could provide a cluster alias when starting the cluster so that instead of needing to copy/paste the url into a few new windows, I can just refresh the existing browser windows (or maybe they'd just refresh automatically? idk) when I need to create a new cluster
So, for example, when I start a cluster I'm handed a UUID like addcbd20eea44364a9484a89a67dc2da
which means the status endpoint is:
http://<gateway url>/gateway/clusters/addcbd20eea44364a9484a89a67dc2da/status
If, when creating the cluster (or i guess at any point down the line, really) I could also provide a kwarg alias='my_alias'
then I could reference that cluster via
http://<gateway url>/gateway/clusters/my_alias/status
I'd say this can be filed under "minor usability tweaks" (i.e., not super high priority)
The server currently used one DB session per application (from what I can see). This led to complex perf problems in JupyterHub, since sqlalchemy sort of holds a cache of everything you touch in memory. So creating a new object can be much slower when it is object 1000 vs object 1. This hurts a lot on high scale, since you want this to be a constant time operation.
jupyterhub/jupyterhub#1291 and stuff linked to from there has information on how we ran into this in JupyterHub, and how we worked around it (merged as jupyterhub/jupyterhub#1809). However, I think the right thing to do is to have a DB session per request - this forces you into better architectural patterns, useful both in this case and for HA related stuff. We had too much code that would have needed to be changed - I think gateway server is young enough for this to happen. Makes life a lot better in the long run!
We should be able to secure both the external and internal communication with HTTPS. This will mostly be handled by the go web proxy.
I may be missing something, but I can't find documentation on how to install this or get a basic setup running on my local machine. I think that there used to be something like this (perhaps on a README), but I'm no longer able to find it.
(dev) MROCKLIN-MLT:dask-gateway-server mrocklin$ pip install -e .
Obtaining file:///Users/mrocklin/workspace/dask-gateway/dask-gateway-server
Requirement already satisfied: cryptography in /Users/mrocklin/miniconda/envs/dev/lib/python3.7/site-packages (from dask-gateway-server==0.1.0+16.g210ce7b) (2.4.1)
Requirement already satisfied: tornado in /Users/mrocklin/miniconda/envs/dev/lib/python3.7/site-packages (from dask-gateway-server==0.1.0+16.g210ce7b) (6.0.2)
Requirement already satisfied: traitlets in /Users/mrocklin/miniconda/envs/dev/lib/python3.7/site-packages (from dask-gateway-server==0.1.0+16.g210ce7b) (4.3.2)
Requirement already satisfied: sqlalchemy in /Users/mrocklin/miniconda/envs/dev/lib/python3.7/site-packages (from dask-gateway-server==0.1.0+16.g210ce7b) (1.3.4)
Requirement already satisfied: six>=1.4.1 in /Users/mrocklin/miniconda/envs/dev/lib/python3.7/site-packages (from cryptography->dask-gateway-server==0.1.0+16.g210ce7b) (1.11.0)
Requirement already satisfied: idna>=2.1 in /Users/mrocklin/miniconda/envs/dev/lib/python3.7/site-packages (from cryptography->dask-gateway-server==0.1.0+16.g210ce7b) (2.7)
Requirement already satisfied: asn1crypto>=0.21.0 in /Users/mrocklin/miniconda/envs/dev/lib/python3.7/site-packages (from cryptography->dask-gateway-server==0.1.0+16.g210ce7b) (0.24.0)
Requirement already satisfied: cffi!=1.11.3,>=1.7 in /Users/mrocklin/miniconda/envs/dev/lib/python3.7/site-packages (from cryptography->dask-gateway-server==0.1.0+16.g210ce7b) (1.12.3)
Requirement already satisfied: decorator in /Users/mrocklin/miniconda/envs/dev/lib/python3.7/site-packages (from traitlets->dask-gateway-server==0.1.0+16.g210ce7b) (4.3.0)
Requirement already satisfied: ipython-genutils in /Users/mrocklin/miniconda/envs/dev/lib/python3.7/site-packages (from traitlets->dask-gateway-server==0.1.0+16.g210ce7b) (0.2.0)
Requirement already satisfied: pycparser in /Users/mrocklin/miniconda/envs/dev/lib/python3.7/site-packages (from cffi!=1.11.3,>=1.7->cryptography->dask-gateway-server==0.1.0+16.g210ce7b) (2.19)
Installing collected packages: dask-gateway-server
Found existing installation: dask-gateway-server 0.1.0
Uninstalling dask-gateway-server-0.1.0:
Successfully uninstalled dask-gateway-server-0.1.0
Running setup.py develop for dask-gateway-server
Complete output from command /Users/mrocklin/miniconda/envs/dev/bin/python -c "import setuptools, tokenize;__file__='/Users/mrocklin/workspace/dask-gateway/dask-gateway-server/setup.py';f=getattr(tokenize, 'open', open)(__file__);code=f.read().replace('\r\n', '\n');f.close();exec(compile(code, __file__, 'exec'))" develop --no-deps:
running develop
running build_go
error: [Errno 2] No such file or directory: 'go': 'go'
Currently clusters/workers go through the following states on startup:
STARTING
: Waiting on the ClusterManager.start_*
methodSTARTED
: ClusterManager.start_*
has returned, waiting for the process to connect to the gatewayRUNNING
: Process has connected, and is deemed functional.To quickly catch failures (rather than just timeout on process startup), cluster managers are responsible for tracking process state until it's deemed STARTED. For some cluster backends this can be expensive, and would be good to avoid. During perfect execution (no failures), tracking job status isn't even necessary, it's only really useful for getting quick feedback on failures.
As such, it would be good to split. I propose the following states:
SUBMITTING
: Waiting on the ClusterManager.submit_cluster
/ClusterManager.submit_worker
method. This can return as soon as some other system is responsible for managing the process (e.g. YARN/kerberos/job queues.STARTING
: ClusterManager.submit_*
has returned.STARTED
: Process is deemed started (more on this later)RUNNING
: Process has connected, and is deemed functional.Once a process (cluster or worker) is in the STARTING
state, the gateway starts a task to wait for the process to start (which could use polling or something more efficient, depending on the cluster backend). If polling, this period should be configurable, and should be longer than the current one (we want it small enough that an interactive user won't grow impatient on debugging failing jobs, but large enough that it won't take too much time. 15-30 seconds could be a good default). The method could be called wait_for_cluster
/wait_for_worker
or something like that.
If the waiting task returns true, the process will be moved to the STARTED
state, and a timeout will be started for the process to connect. If the process connects before it's moved to the STARTED
state (with a large polling interval this isn't unlikely), the waiting task can be cancelled, and the process can move directly to RUNNING
. If the waiting task times out, the normal timeout logic is applied.
For cluster backends that have no "submit" concept (such as local processes), the wait_for_*
methods would be no-ops.
The reason we can't decrease the polling frequency with the current design is that start_*
tasks block until the process is started, and can't be cancelled for a successful startup (since they're responsible for returning any process state to be saved). This means that if a process connects before the start_*
method returns we still have to wait for the start_*
method to return before moving the process into the RUNNING
state. By splitting the state into SUBMITTING
and STARTING
, we divide a task that may return state that needs saving (e.g. job_id
), and a task that's just waiting on a job and may be cancelled.
It would be good to provide a web frontend. This should expose:
Before this is done, we'd need to decide on a method for web authentication (see #22).
We should add gateway.dask.org
on cloudflare to point to our docs. @quasiben I believe you have the rights for this? The docs are hosted at https://dask.github.io/dask-gateway/.
Sometimes a user may want to look at logs of completed workers/clusters. Right now all log handling is backend specific - users need to be familiar with the cluster backend and the particularities of how logs are handled for that backend. For example, YARN logs are stored to HDFS and can be accessed with the yarn
cli tool.
It may be useful for dask-gateway
to provide a LogProvider
class that different log backends could implement. This might look like:
class LogProvider(LoggingConfigurable):
def get_logs_for_cluster(self, cluster_name, cluster_state):
"""Get the logs for a completed cluster
Parameters
----------
cluster_name : str
The cluster name.
cluster_state : dict
Any backend-specific information (e.g. application id, pod name, ...)
Returns
-------
logs : dict[str, str]
A mapping from job id to logs for that job.
"""
def get_logs_for_worker(self, cluster_name, cluster_state, worker_name, worker_state):
"""get the logs for a completed worker"""
I'd prefer dask-gateway doesn't manage the storage of these logs (although we could if needed), rather it should be an abstraction around accessing the logs wherever they're being held by some other service/convention.
Possible implementations for our cluster backends:
~/dask-gateway-logs
per user, or in some directory managed directly by dask-gateway?I'm probably doing something dumb here, but at the very least this should help to identify a user speedbump.
(dev) ngvpn01-163-171:~ mrocklin$ dask-gateway-server
[I 2019-09-09 19:57:16.247 DaskGateway] Generating new cookie secret
[I 2019-09-09 19:57:16.249 DaskGateway] Generating new auth token for scheduler proxy
[I 2019-09-09 19:57:16.249 DaskGateway] Starting the Dask gateway scheduler proxy...
[I 2019-09-09 19:57:16.423 DaskGateway] Dask gateway scheduler proxy running at 'tls://:8786', api at 'http://127.0.0.1:54456'
[I 2019-09-09 19:57:16.430 DaskGateway] Generating new auth token for web proxy
[I 2019-09-09 19:57:16.430 DaskGateway] Starting the Dask gateway web proxy...
[I 2019-09-09 19:57:16.433 DaskGateway] Dask gateway web proxy running at 'http://:8000', api at 'http://127.0.0.1:54461'
[I 2019-09-09 19:57:16.648 DaskGateway] Gateway API listening on http://127.0.0.1:8081
In [1]: from dask_gateway import Gateway
In [2]: gateway = Gateway("127.0.0.1:8081")
In [3]: gateway.list_clusters()
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
<ipython-input-3-f8cbd85b6152> in <module>
----> 1 gateway.list_clusters()
~/miniconda/envs/dev/lib/python3.7/site-packages/dask_gateway/client.py in list_clusters(self, status, **kwargs)
329 clusters : list of ClusterReport
330 """
--> 331 return self.sync(self._clusters, status=status, **kwargs)
332
333 async def _cluster_options(self):
~/miniconda/envs/dev/lib/python3.7/site-packages/dask_gateway/client.py in sync(self, func, *args, **kwargs)
277 return future
278 else:
--> 279 return sync(self.loop, func, *args, **kwargs)
280
281 async def _fetch(self, req):
~/miniconda/envs/dev/lib/python3.7/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
335 e.wait(10)
336 if error[0]:
--> 337 six.reraise(*error[0])
338 else:
339 return result[0]
~/miniconda/envs/dev/lib/python3.7/site-packages/six.py in reraise(tp, value, tb)
691 if value.__traceback__ is not tb:
692 raise value.with_traceback(tb)
--> 693 raise value
694 finally:
695 value = None
~/miniconda/envs/dev/lib/python3.7/site-packages/distributed/utils.py in f()
320 if callback_timeout is not None:
321 future = gen.with_timeout(timedelta(seconds=callback_timeout), future)
--> 322 result[0] = yield future
323 except Exception as exc:
324 error[0] = sys.exc_info()
~/miniconda/envs/dev/lib/python3.7/site-packages/tornado/gen.py in run(self)
733
734 try:
--> 735 value = future.result()
736 except Exception:
737 exc_info = sys.exc_info()
~/miniconda/envs/dev/lib/python3.7/site-packages/dask_gateway/client.py in _clusters(self, status)
308 url = "%s/gateway/api/clusters/%s" % (self.address, query)
309 req = HTTPRequest(url=url)
--> 310 resp = await self._fetch(req)
311 return [
312 ClusterReport._from_json(self.address, r)
~/miniconda/envs/dev/lib/python3.7/site-packages/dask_gateway/client.py in _fetch(self, req)
281 async def _fetch(self, req):
282 self._cookie_jar.pre_request(req)
--> 283 resp = await self._http_client.fetch(req, raise_error=False)
284 if resp.code == 401:
285 context = self._auth.pre_request(req, resp)
~/miniconda/envs/dev/lib/python3.7/site-packages/tornado/simple_httpclient.py in run(self)
291 self.parsed = urllib.parse.urlsplit(_unicode(self.request.url))
292 if self.parsed.scheme not in ("http", "https"):
--> 293 raise ValueError("Unsupported url scheme: %s" % self.request.url)
294 # urlsplit results have hostname and port results, but they
295 # didn't support ipv6 literals until python 2.7.
ValueError: Unsupported url scheme: 127.0.0.1:8081/gateway/api/clusters/
Currently the database grows unbounded. We should add support for a periodic task that removes stopped clusters older than a threshold (24 hours might be a good default). This would allow looking at recent history, without needing to support an increasingly large database.
Currently we support REST api authentication via:
We could also handle REST api authentication via validating a JupyterHub api token with JupyterHub. This would allow nice integration with JupyterHub, which provides an api token in the kernel environment.
Currently all docker images for dask-gateway are stored in my docker hub account (https://hub.docker.com/u/jcrist). After moving to the dask org, we probably want to move these somewhere else. Two options:
daskdev
dask-gateway
orgThe first is nice, since all dask things are together. Downside is there's a lot of dask-gateway images, and it might be nice to keep them all organized together.
Right now I'm leaning slightly towards a new org for dask-gateway specific things.
From an admin perspective it may sometimes be useful to support restrictions on user resource usage. A few examples:
We should probably provide a common metrics, but will likely also need a generic hook for operations an admin may want to restrict. This may look like:
def can_start_cluster(user):
"""Check if a user can start a cluster.
Returns
-------
allowed : bool
True if the user can start a cluster, False otherwise
msg : str or None
A user-facing message explaining why the operation is currently forbidden.
"""
pass
def can_start_worker(user, cluster):
"""Check if a user can start a worker.
Returns
-------
allowed : bool
True if the user can start a worker, False otherwise
msg : str or None
A user-facing message explaining why the operation is currently forbidden.
"""
pass
This is separate but related to #20. With JupyterHub for authentication we should be able to integrate nicely as a JupyterHub service. It would be good to get this working, and document the deployment steps involved.
When trying to use a cluster manager with uninstalled external dependencies the error message is a bit too general. For example, when trying to use KubeClusterManager
I get
The 'cluster_manager_class' trait of <dask_gateway_server.app.DaskGateway object at 0x7ff2ba00f198> instance must be a type, but 'dask_gateway_server.managers.kubernetes.KubeClusterManager' could not be imported
Installing the kubernetes python library (pip install kubernetes) resolves the issue
We have some CI set-up in dask-jobqueue for different job schedulers (SGE, SLURM, PBS, with some more on the way hopefully, e.g. HTCondor and LSF). It feels like we could mutualize part of the CI set-up with dask-gateway
to ease maintenance. Any opinion on this?
Some differences I noticed (there are likely more):
dask-jobqueue
is using docker-compose
and dask-gateway
is using single Dockerfile
images. For dask-jobqueue
, part of it is historic that was how it was done in dask-drmaa
originally. Also with docker-compose
, it feels more like a "real" cluster setup, with a master node and compute nodes, rather than a toy single node setup.dask-jobqueue
is rebuilding the docker image on each CI build whereas dask-gateway
uses images from dockerhub. I am guessing images are built and pushed automatically but I haven't figured out where it happensA few fields of the database contain secrets (api_token
, tls certs/keys, etc...). Some of these could be hashed to avoid storing them in raw form at all, but other ones (the tls certs/keys) will need to be stored in an encrypted form.
I wasn't sure whether the fields like
config c.KubeClusterManager.worker_memory = MemoryLimit('2 G')
and worker_cores, etc. were passed through. An initial test with
clusterManager:
worker_cores: "4"
worker_memory: "6G"
didn't seem to work, though I may be wrong. I think I still had 1 core / 2G per worker.
Initial docs should include:
When running helm upgrade
, some objects (e.g. the configmap) aren't recreated even though when rendering them they are different than the previously submitted version. To be more specific:
helm install --name gateway --values config.yaml dask-gateway/dask-gateway
helm upgrade gateway dask-gateway/dask-gateway --values new-config.yaml
where new-config.yaml
changes something like gateway.clusterManager.environment
(which in turn results in a new configmap
), none of the objects are recreated. If I delete the configmap and then run helm upgrade
, the new configmap is written and things upgrade properly.
This may be a case of helm/helm#5915, but there's likely something we can do about it.
A Helm Chart Repository is a static site with a simple spec. We should be able to generate one as part of the docs and host it on github pages. Built chart artifacts would then be hosted in github releases. This is similar to what JupyterHub does, except we'd host them as releases and in the main repository.
This isn't strictly necessary to use the existing helm chart, but can be nicer as it avoids users having to download the chart locally.
In #116 we added a setup using chartpress for publishing helm charts to https://dask.org/dask-gateway-helm-repo/. It would be nice to automate this using CI.
The chart naming scheme is:
dask-gateway-{dask_gateway_server.__version__}-{git tag}
with the tag removed for releases (using the --reset
flag).
Should cluster.dashboard_address
have /status
appended to it? That would make it easier for the user to navigate to their dashboard.
Currently kubernetes objects (e.g. pods) are of the form dask-worker-{workername}
. It may be useful for debugging/monitoring to make these templated strings, and by default include other things like username.
Right now the gateway operates only as a REST api, with no web frontend. For #21 we'll need to add support for pluggable web authentication. When deployed with JupyterHub we can bootstrap off their authentication (similar to #20), but we'll also need to provide options for deploying without JupyterHub. I'm not sure what's best here.
As of #43 we support kubernetes as a backend. It would be good to develop a helm chart for deploying this on kubernetes clusters. I have no helm experience, but may eventually be able to figure this out - others will likely have an easier time at it. Here's a high level of how I see a kubernetes deployment looking:
The gateway will have three components running and managed by a deployment:
dask-gateway-server
)dask-gateway-server scheduler-proxy
)dask-gateway-server web-proxy
)The resources for these are all part of the dask-gateway-server
python package, and could all be run using the same image (but in different containers). I think we'd want different containers rather than running in the same container to let kubernetes handle restarts (the gateway should be robust to transient component restarts). The appropriate configuration would need to be set to allow for this (this exists, but is only documented at the traitlets config level).
We'd want to expose configuration for common things via helm. We should be able to do this similar to the way jupyterhub does (https://github.com/jupyterhub/zero-to-jupyterhub-k8s/blob/master/images/hub/jupyterhub_config.py). A few things people may be interested in:
dask_gateway_server.managers.kubernetes.KubeClusterManager
I'm not sure how people would run this with a persistent database in kubernetes. A few ideas:
We should provide default images to use. One already exists for running the scheduler/worker processes here. All deployment resources should go somewhere under this directory.
It would be good to have a way to demo this locally on minikube. We have a demo for a hadoop cluster here, it would be nice to replicate this for kubernetes.
We currently support yarn and local processes, it would be good to also add support for traditional job queues (SLURM, etc...). dask-jobqueue will be a good reference.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.