Giter VIP home page Giter VIP logo

idap-200gbps-atlas's Introduction

IDAP 200 Gbps with ATLAS PHYSLITE

Targeting analysis at 200 Gbps with ATLAS PHYSLITE. This repository is very much a work in progress.

ATLAS does not have released OpenData, so there isn't an AGC we can copy and try to run. As a result, this repository's main purpose is as a facilities test:

  • Run from PHYSLITE
  • Load 200 Gbps off of the PHYSLITE samples
  • Push all that data downstream to DASK (or similar) workers.

We have a losely tracked set of lessons learned.

Description of files

  • materialize_branches.ipynb: read list of branches, distributable with Dask (use for benchmarking)

Usage

When run on the UChicago AF Jupyter Notebook no package installs are required.

There is a requirements.txt which should allow this to be run on a bare-bones machine (modulo location of files, etc.).

If you are going to use the servicex version, you have to pin dask_awkward==2024.2.0. The future versions have a bug which hasn't been fixed yet.

Input file details

The folder input_files contains the list of input containers / files and related metadata plus scripts to produce these.

In total:

  • number of files: 219,029
  • size: 191.073 TB
  • number of events: 23,347,787,104

with additional files:

  • input_files/find_containers.py: query rucio for a list of containers, given a list of (hardcoded) DSIDs

  • input_files/container_list.txt: list of containers to run over

  • input_files/produce_container_metadata.py: query metadata for containers: number of files / events, size

  • input_files/container_metadata.json: output of input_files/produce_container_metadata.py with container metadata

  • input_files/get_file_list.py: for a given dataset creates a txt file listing file access paths that include appropriate xcache. The same kind of output can be obtained by doing:

    export SITE_NAME=AF_200
    rucio list-file-replicas mc20_13TeV:mc20_13TeV.364126.Sherpa_221_NNPDF30NNLO_Zee_MAXHTPTV500_1000.deriv.DAOD_PHYSLITE.e5299_s3681_r13145_p6026 --protocol root  --pfns --rses MWT2_UC_LOCALGROUPDISK
    
  • input_files/containers_to_files.py: process the list of containers into a list of files per container with hardcoded xcache instances, writes to input_files/file_lists/*.

Branch list determination

Branches to be read are determined with a 2018 data file.

  • input_files/size_per_branch.ipynb: produce breakdown of branch sizes for given file
  • input_files/branch_sizes.json: output of notebook above

Acknowledgements

NSF-1836650 PHY-2323298

This work was supported by the U.S. National Science Foundation (NSF) cooperative agreements OAC-1836650 and PHY-2323298 (IRIS-HEP).

idap-200gbps-atlas's People

Contributors

gordonwatts avatar alexander-held avatar ivukotic avatar kyungeonchoi avatar ponyisi avatar matthewfeickert avatar

Stargazers

 avatar  avatar  avatar David Koch avatar Vangelis Kourlitis avatar

Watchers

Brian P Bockelman avatar  avatar  avatar Peter Elmer avatar  avatar David Lange avatar  avatar  avatar  avatar

Forkers

kyungeonchoi

idap-200gbps-atlas's Issues

Get `[cpp_vfloat]` working properly in `getAttribute`

This code is broken, and it now seems like the problem is buried in func_adl (which did not have sufficient tests). Can we get that fixed so we can properly read out the various other branches that we are avoiding right now?

50 TB test

Ready for a week 5 data test

  • Run with dask cluster unallocated (so we don't kill CEPH, or die by it).
  • Try pre-allocated 100 workers.

See if we can understand #52 (river POD's worse) - or if it is still true.

A major new thing for running against the raw dataset: we have a large amount of the data read out.

Tracking logs of workers and errors found therein

We can forward all worker output with

client.register_plugin(distributed.diagnostics.plugin.ForwardOutput())

which gets very noisy and probably is really bad for performance but may help debugging. To use this it is best to not use notebooks but .py and pipe them to some file.

Here is an example output that crashes during pre-processing (with autoscaling) and with all uproot reporting disabled:
log_minimal_preprocess.txt

One thing seen in there is

  File "/venv/lib/python3.9/site-packages/dask/utils.py", line 773, in __call__
    return meth(arg, *args, **kwargs)
  File "/venv/lib/python3.9/site-packages/dask/sizeof.py", line 59, in sizeof_python_collection
    return sys.getsizeof(seq) + sum(map(sizeof, seq))
RecursionError: maximum recursion depth exceeded
2024-04-23 21:09:05,729 - distributed.sizeof - WARNING - Sizeof calculation for object of type 'dict' failed. Defaulting to -1 B

The closest thing to that I find is dask/distributed#8378 but it is not clear to me that this is related.

ServiceX Client S3 tells you to slow down with too many dask workers

After fixing #20 , we were able to run with more DASK workers. But we can still trigger this error:

(venv) [bash][gwatts]:idap-200gbps-atlas > python servicex/servicex_materialize_branches.py -v --distributed-client scheduler --dask-scheduler 'tcp://dask-gwatts-ead73a76-c.af-jupyter:8786' --num-files 0
0000.0744 - INFO - Using release 21.2.231
0000.0749 - INFO - Building ServiceX query
0000.1044 - WARNING - Fetched the default calibration configuration for a query. It should have been intentionally configured - using configuration for data format PHYS
0000.1327 - INFO - Starting ServiceX query
0000.7497 - INFO - Running servicex query for f70228e6-6655-443a-a7f2-77de0937d134 took 0:00:00.278472 (no files downloaded)                                      
0000.7583 - INFO - Finished ServiceX query
0000.7593 - INFO - Using `uproot.dask` to open files
0001.2214 - INFO - Generating the dask compute graph for 27 fields
0001.3238 - INFO - Computing the total count
Traceback (most recent call last):
  File "/home/gwatts/code/iris-hep/idap-200gbps-atlas/servicex/servicex_materialize_branches.py", line 325, in <module>
    main(ignore_cache=args.ignore_cache, num_files=args.num_files,
  File "/home/gwatts/code/iris-hep/idap-200gbps-atlas/servicex/servicex_materialize_branches.py", line 228, in main
    r = total_count.compute()  # type: ignore
  File "/venv/lib/python3.9/site-packages/dask/base.py", line 375, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/venv/lib/python3.9/site-packages/dask/base.py", line 661, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/venv/lib/python3.9/site-packages/uproot/_dask.py", line 1343, in __call__
    result, _ = self._call_impl(
  File "/venv/lib/python3.9/site-packages/uproot/_dask.py", line 1266, in _call_impl
    ttree = uproot._util.regularize_object_path(
  File "/venv/lib/python3.9/site-packages/uproot/_util.py", line 962, in regularize_object_path
    file = ReadOnlyFile(
  File "/venv/lib/python3.9/site-packages/uproot/reading.py", line 761, in root_directory
    return ReadOnlyDirectory(
  File "/venv/lib/python3.9/site-packages/uproot/reading.py", line 1400, in __init__
    keys_chunk = file.chunk(keys_start, keys_stop)
  File "/venv/lib/python3.9/site-packages/uproot/reading.py", line 1185, in chunk
    return self._source.chunk(start, stop)
  File "/venv/lib/python3.9/site-packages/uproot/source/fsspec.py", line 115, in chunk
    data = self._fh.read(stop - start)
  File "/venv/lib/python3.9/site-packages/fsspec/implementations/http.py", line 598, in read
    return super().read(length)
  File "/venv/lib/python3.9/site-packages/fsspec/spec.py", line 1846, in read
    out = self.cache._fetch(self.loc, self.loc + length)
  File "/venv/lib/python3.9/site-packages/fsspec/caching.py", line 439, in _fetch
    self.cache = self.fetcher(start, bend)
  File "/venv/lib/python3.9/site-packages/fsspec/asyn.py", line 118, in wrapper
    return sync(self.loop, func, *args, **kwargs)
  File "/venv/lib/python3.9/site-packages/fsspec/asyn.py", line 103, in sync
    raise return_result
  File "/venv/lib/python3.9/site-packages/fsspec/asyn.py", line 56, in _runner
    result[0] = await coro
  File "/venv/lib/python3.9/site-packages/fsspec/implementations/http.py", line 653, in async_fetch_range
    r.raise_for_status()
  File "/venv/lib/python3.9/site-packages/aiohttp/client_reqrep.py", line 1060, in raise_for_status
    raise ClientResponseError(
Exception: ClientResponseError(RequestInfo(url=URL('https://s3.af.uchicago.edu/f70228e6-6655-443a-a7f2-77de0937d134/root:::192.170.240.145::root:::eosatlas.cern.ch:1094::eos:atlas:atlasdatadisk:rucio:mc23_13p6TeV:e5:17:DAOD_PHYSLITE.37223155._000341.pool.root.1?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=ABAOJZ4XMLKWO5H0PZJ3/20240412/af-object-store/s3/aws4_request&X-Amz-Date=20240412T190811Z&X-Amz-Expires=604800&X-Amz-SignedHeaders=host&X-Amz-Signature=385d92df18e0cad7e071e0dc84ef8c72fc32d8ec2f02a63bf1fd97d2304083f9'), method='GET', headers=<CIMultiDictProxy('Host': 's3.af.uchicago.edu', 'Range': 'bytes=30381811-35624926', 'Accept': '*/*', 'Accept-Encoding': 'gzip, deflate', 'User-Agent': 'Python/3.9 aiohttp/3.9.3')>, real_url=URL('https://s3.af.uchicago.edu/f70228e6-6655-443a-a7f2-77de0937d134/root:::192.170.240.145::root:::eosatlas.cern.ch:1094::eos:atlas:atlasdatadisk:rucio:mc23_13p6TeV:e5:17:DAOD_PHYSLITE.37223155._000341.pool.root.1?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=ABAOJZ4XMLKWO5H0PZJ3/20240412/af-object-store/s3/aws4_request&X-Amz-Date=20240412T190811Z&X-Amz-Expires=604800&X-Amz-SignedHeaders=host&X-Amz-Signature=385d92df18e0cad7e071e0dc84ef8c72fc32d8ec2f02a63bf1fd97d2304083f9')), (), status=503, message='Slow Down', headers=<CIMultiDictProxy('Date': 'Fri, 12 Apr 2024 19:08:48 GMT', 'Content-Type': 'application/xml', 'Content-Length': '211', 'Connection': 'keep-alive', 'x-amz-request-id': 'tx00000000000000002daba-00661986c0-7b36232-af-object-store', 'Accept-Ranges': 'bytes', 'Strict-Transport-Security': 'max-age=15724800; includeSubDomains')>)

This is testing with workers already setup (not dynamically scaling). It occurs with:

  • 800 workers
  • 600 workers
  • 400 workers
  • 200 workers

I think what is happening is 200 workers hit S3 at exactly the same time and that causes its slow down message. With dynamic scaling, the nodes slowly come up, and so the S3 load is spread out a little bit.

Update ServiceX to use the new client (3.0bX)

The new client is out and uses databinder. Data binder is a great way to move to reading in a large number of datasets.

Process

  1. Make sure the xaodr22 type package supports the new frontend [DONE]
  2. Create a copy of the notebook that will run the data binder
  3. Upgrade, servicex_material_branches to run with data binder (and move to running 2 datasets rather than one).
  4. Come up with a new script, perhaps based on the one above, that will run on a file based lists of datasets pulled from Alex's JSON file (or similar).

The idea is to keep a simple script around that will run "run" and can test everything. Then a second script that can really run the scale test.

I've been thinking that 2&3 were a single MR, and 4 was a fourth MR.

Missing branches in some files

Looking at 2015 data, this file

root://192.170.240.147:1094//root://fax.mwt2.org:1094//pnfs/uchicago.edu/atlaslocalgroupdisk/rucio/data15_13TeV/90/62/DAOD_PHYSLITE.37001626._000001.pool.root.1

does not have electron branches (at least none we can read with uproot). It comes out of data15_13TeV:data15_13TeV.periodAllYear.physics_Main.PhysCont.DAOD_PHYSLITE.grp15_v01_p6026. Other files in that container seem fine. I have not tested this systematically across all files in all containers. This causes errors we cannot catch as easily, so we need to understand what is happening and if this file is maybe broken somehow.

Generate First Performance Report for SX Chain

  • Use 200 DASK workers
  • Use a "largeish" dataset
  • Run more than once to see if SX caching makes any difference
  • Create report in a reports/ sub-directory.
  • And the instructions should be in the servicex README.

No need to automate the report, let's just get something there.

Derive new list of branches for full Run-2 dataset

Building on #5, we'll need a new breakdown of branch sizes to get a new list. The overall behavior is presumably going to be following that of data closely, so we can derive on data and later on correct as needed. I'd propose to start with lists for 10% and 25% (with 25% being the actual target to use for demonstration). Given that we'll have the full breakdown of branch sizes, we can benchmark against other file fractions too down the line.

Convert the simple test to use PHYSLITE `func_adl` typing

Currently uses a R21 type system to run R22 PHYSLITE. Lets get everything in order to make sure that works

  • Alter the current notebook
  • Alter the current script that runs the performance test
  • Update all references to the pip install to make sure it is installing the right package.

This may require some work in the type packages, especially to support the qastle generation in a straightforward way.

Create a ServiceX script/notebook version

The goal is to fetch the data from ServiceX rather than a direct read.

  • Access built only for PHYSLITE
  • No calibrations are run
  • Simple basic setup

This isn't meant to be a final version as much as something that shows how to do this - and get many columns from SX.

Recursion errors at scale

When running with sufficiently many files, we hit RecursionError during preprocessing. The solution to that should be CoffeaTeam/coffea#1079. Once that is fixed we'll need a new AB-dev image. cc @matthewfeickert: I assume we don't need a coffea tag to build an image in principle? It's not critical enough to warrant building the image before this is reviewed and merged though.

Lessons learned

This will gather lessons learned, we might want to switch to another format eventually but this at least will have to do as an intermediate solution.

  • PHYSLITE branch reading & interpratations with uproot (link to conversation): branches like AnalysisJetsAuxDyn.EnergyPerSampling (AsObjects(AsVector(True, AsVector(False, dtype('>f4'))))) can be expensive to interpret (done with Forth), choice of branches can matter for total CPU cost
    • PrimaryVerticesAuxDyn.neutralParticleLinks seems to take an extremely long time to read
  • more PHYSLITE related issues:
  • a substantial amount of time seems to be spent towards the end of processing, either finishing up the last handful of files or doing the merging of all information
  • [FIXED] recursion errors at scale and tree reduction #37
  • we need to dig into causes for reading problems, running all of the data files (~175k) produced problems with around 300 files: read_errors.txt
  • pre-processing has a decent amount of gaps in the task graph it seems, we need to understand bottlenecks there, April 15: all of data file pre-processing (~175k files) took around half an hour roughly with 1k cores
  • Some amount of time (up to a few minutes) seems to be spent after a task is completed and coincides with the time when autoscaling scales down. It may be that terminating pods is expensive / resource intensive?
  • no big difference between a single cluster with 2k workers vs 2 clusters with 1k workers each (uproot.open setup)

Figure out how to work around the awkward array bug for ServiceX

Currently Awkward array won't let you build a dask-awkward array the way the servicex frontend wants to (this is a regression - it used to work). So, we can work around this, probably:

  • Get a list of the files (html or downloaded files)
  • Use uproot.dask to open them all at once.

This should avoid the ak.concatenate bug that the servicex client library seems to be stumbling over.

Large Datasets get duplicated files

The problem of extra files:

image

This dataset has 64803 files. And, indeed, it gets up to that number and then stabilizes for 5 minutes. And then new files start coming in again.

image

We believe this is because it takes more than 30 minutes for the files to be inserted. RabbitMQ thinks the DID finder has died, takes the message back, and then re-sends it.

This is from #49 .

Why are some files missed in SX when the query completes?

Even with small submissions sometimes there are 4 files, but the client only sees 2. It started happening this week.

This might have to do with the elimination of the 10 second sleep in SX between the last "done" message and actually declaring the transform done.

Profile uproot performance per branch to identify possible bottlenecks

The time per branch read seems to vary not only with branch size, but also with the type of branch. Some cases like PrimaryVerticesAuxDyn.neutralParticleLinks seem to take a very long amount of time. The size_per_branch notebook has a few more vetos of branches that might take a suspiciously long time. Given the possibly very big impact on throughput, we need to profile reading with the current list of branches and adjust as needed.

uproot reads need to back off when hitting the S3 infrastructure

It seems CEPH has a read limit when being hit by http requests. We aren't sure what the problem is, but we see it with 400 dask workers pre-started, and we also see intermittent read failures.

To work around this we need some sort of backoff mechanism in the reads.

Understand what samples are needed to get to a full 200 TB

ATLAS has this very annoying habit of deleting p-tags out from under one - especially with modern PHYSLITE samples as they are evolving so quickly. So it behoves us to get a list of samples that we can pin at Chicago.

  • List of the rucio samples that we will use below that total about 200 TB of overall data.
  • Place the samples in the README.md at the root directory.

Once this is done, then we can have UChicago pin them.

Generate report for pre-processing

We should generate a Dask report also for the pre-processing step. We might also want to start time stamping them to keep them around, which will produce some clutter of html files but could be useful to not accidentally override information.

Update instructions for running on `AB-dev`

We have to do some installs and non of the current instructions have them. This is particularly problematic due to various version numbers.

  • Install the special version of the servicex library: pip install git+https://github.com/ssl-hep/ServiceX_frontend@fe6a938e75d4c85d6d0f2e0178969a81fd2ab727

That turns out to be it. Soon that code will be updated with pydantic, and then we can have that as part of the image.

Try out a large data test

Use one of Alex's very large datasets and run the simple single dataset test.

  • Save the query with a servicex_query_cache.json file so we don't have to re-run when running later.
  • Find a large dataset from Alex's file.
  • Get @ivukotic to describe what he changed in SX for posterity.

Check fraction of files read across full list of files

The branches to read 25% of a file are determined on 2018 data, however the contribution of a given branch to the total file size will vary by file. For example, the same branches result in around 13% read for a ttbar file. Check what fraction is read when processing all containers and update the list of branches to ensure we end up around 25% overall.

Specification for SX leaves to extract

Create a set of data structures and converters that will go from a declarative list of leaves we want to a query that will return them.

  • Name of each leaf in the output
  • How to access it via a lambdas
  • Depth for dealing with collections (and collections of collections).

All this should be done in a (python file) which can then be used by the system to load up a query.

Vector read errors

Full ttbar produces 11 vector read errors that seem reproducible.

import uproot

fname = "root://192.170.240.148//root://fax.mwt2.org:1094//pnfs/uchicago.edu/atlaslocalgroupdisk/rucio/mc20_13TeV/f5/99/DAOD_PHYSLITE.37230013._001196.pool.root.1"
treename = "CollectionTree"

with uproot.open({fname: treename}) as f:
    f["AnalysisJetsAuxDyn.SumPtChargedPFOPt500"].array()

->

OSError: File did not vector_read properly: [ERROR] Server responded with an error: [3000] Read vector is invalid

The same file can be read fine from local path after xrdcp.

ServiceX S3 tells you to "slow down"!

If you run the current SX script, you get the following slow down error when running with a 200 item DASK cluster.

Exception: ClientResponseError(RequestInfo(url=URL('https://s3.af.uchicago.edu/2e177b73-166f-4e0a-9726-8269a0f079d8/root:::xcache.af.uchicago.edu:1094::root:::clrlcgse01.in2p3.fr:1094::dpm:in2p3.fr:home:atlas:atlasdatadisk:rucio:mc23_13p6TeV:a5:67:DAOD_PHYSLITE.37223155._000309.pool.root.1?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=ABAOJZ4XMLKWO5H0PZJ3/20240408/af-object-store/s3/aws4_request&X-Amz-Date=20240408T002316Z&X-Amz-Expires=604800&X-Amz-SignedHeaders=host&X-Amz-Signature=825154eabb34e281de98fb6efa1236c5400c6400f2e79d2209bdac242c5e845a'), method='GET', headers=<CIMultiDictProxy('Host': 's3.af.uchicago.edu', 'Range': 'bytes=292842881-292890932', 'Accept': '*/*', 'Accept-Encoding': 'gzip, deflate', 'User-Agent': 'Python/3.9 aiohttp/3.9.3')>, real_url=URL('https://s3.af.uchicago.edu/2e177b73-166f-4e0a-9726-8269a0f079d8/root:::xcache.af.uchicago.edu:1094::root:::clrlcgse01.in2p3.fr:1094::dpm:in2p3.fr:home:atlas:atlasdatadisk:rucio:mc23_13p6TeV:a5:67:DAOD_PHYSLITE.37223155._000309.pool.root.1?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=ABAOJZ4XMLKWO5H0PZJ3/20240408/af-object-store/s3/aws4_request&X-Amz-Date=20240408T002316Z&X-Amz-Expires=604800&X-Amz-SignedHeaders=host&X-Amz-Signature=825154eabb34e281de98fb6efa1236c5400c6400f2e79d2209bdac242c5e845a')), (), status=503, message='Slow Down', headers=<CIMultiDictProxy('Date': 'Mon, 08 Apr 2024 00:23:26 GMT', 'Content-Type': 'application/xml', 'Content-Length': '211', 'Connection': 'keep-alive', 'x-amz-request-id': 'tx000000000000009b2faf5-00661338fe-7784132-af-object-store', 'Accept-Ranges': 'bytes', 'Strict-Transport-Security': 'max-age=15724800; includeSubDomains')>)

Actually, it worked once, and then when I repeatedly tried it, I got this error. It causes the DASK graph to fail.

Build query library

We need to have a query that does some heavy skimming of the SX query to better understand:

  • How this will behave when we are doing "real physics"
  • How much time is being lost to output compression and I/O.

Do this by putting the queries in a file that can then be easily imported. This is just to create say 3 queries for xaod:

  1. All the data (what we do now)
  2. Jet cuts as @alexander-held recommends
  3. Super tight to really exaggerate the differences

Once this is in, then we can add even more queries (from different back-ends!).

Add the module generating the message into the log messages we generate

Currently our log messages are not helpful as to where the messages are coming from:

0000.0323 - INFO - Using release 22.2.107 for type information.
0000.0538 - WARNING - Unknown type for name len
0000.4665 - INFO - Building ServiceX query
0000.4666 - INFO - Using dataset mc20_13TeV.364157.Sherpa_221_NNPDF30NNLO_Wmunu_MAXHTPTV0_70_CFilterBVeto.deriv.DAOD_PHYSLITE.e5340_s3681_r13145_p6026.
0000.4667 - INFO - Running on the full dataset.
0000.4672 - INFO - Starting ServiceX query

Add the source of the log message into our formatter.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.