Giter VIP home page Giter VIP logo

kowalski's Introduction

Kowalski

Kowalski

a multi-survey data archive and alert broker for time-domain astronomy



Kowalski is an API-driven multi-survey data archive and alert broker. Its main focus is the Zwicky Transient Facility.

Technical details

A schematic overview of the functional aspects of Kowalski and how they interact is shown below:

data/img/system.png

  • A non-relational (NoSQL) database MongoDB powers the data archive, the alert stream sink, and the alert handling service.
  • An API layer provides an interface for the interaction with the backend: it is built using a python asynchronous web framework, aiohttp, and the standard python async event loop serves as a simple, fast, and robust job queue. Multiple instances of the API service are maintained using the Gunicorn WSGI HTTP Server.
  • A programmatic python client is also available to interact with Kowalski's API (we recommend versions>=2.3.2, which support multi-instance querying).
  • Incoming and outgoing traffic can be routed through traefik, which acts as a simple and performant reverse proxy/load balancer.
  • An alert brokering layer listens to Kafka alert streams and uses a dask.distributed cluster for distributed alert packet processing, which includes data preprocessing, execution of machine learning models, catalog cross-matching, and ingestion into MongoDB. It also executes user-defined filters based on the augmented alert data and posts the filtering results to a SkyPortal instance.
  • Kowalski is containerized using Docker software and orchestrated with docker compose allowing for simple and efficient deployment in the cloud and/or on-premise. However, it can also run without Docker especially for development purposes.

Interacting with a Kowalski instance

Kowalski is an API-first system. The full OpenAPI specs can be found here. Most users will only need the queries section of the specs.

The easiest way to interact with a Kowalski instance is by using a python client penquins.

Cloning and Environment configuration

Clone the repository

Start off by creating your own kowalski fork and github, and cloning it, then cd into the cloned directory:

git clone https://github.com/<your_github_id>/kowalski.git
cd kowalski

Environment setup on Linux amd64

First, you'll need to install few system dependencies:

sudo apt install -y default-jdk wget

Make sure you have a version of python that is 3.8 or above before following the next steps.

Now, in the same terminal, run:

sudo pip install virtualenv
virtualenv env
source env/bin/activate

to create your virtual environment. If you are told that pip is not found, try using pip3 instead. For the following steps however (in the virtualenv), pip should work.

The python dependencies will be install automatically when you start the app. The same will happen for Kafka and the ML models.

Environment setup on MacOS arm64 (M1 or M2)

First, you need to install several system dependencies using homebrew:

brew install java librdkafka wget

After installing java, run the following to make sure it is accessible by kafka later on:

sudo ln -sfn /opt/homebrew/opt/openjdk/libexec/openjdk.jdk /Library/Java/JavaVirtualMachines/openjdk.jdk
echo 'export PATH="/opt/homebrew/opt/openjdk/bin:$PATH"' >> ~/.zshrc

Seperately, we install hdf5:

brew install hdf5

At the end of hdf5's installation, the path where it has been installed will be displayed in your terminal. Copy that path and make sure that you save it somewhere. You will need it when installing or updating python dependencies. We suggest you save it to your .bashrc or .zshrc file, by adding the following line:

export HDF5_DIR=<path_to_hdf5>

Don't forget to source your .zshrc file after adding the above line, or else the path will not be accessible. You can also simply restart your terminal.

Make sure you have a version of python that is 3.8 or above before following the next steps. You can consider installing a newer version with homebrew if needed.

To install a new version with homebrew, run:

brew install [email protected]

and then add the following line to your .bashrc or .zshrc file:

alias python='python3.10'
alias pip='pip3.10'

These lines will allow you to use that binary when calling python in your terminal. You can also use python3.10 instead of python in the following steps.

If you added it to your .zshrc file, don't forget to source it.

Now, in the same terminal, run:

sudo pip install virtualenv
virtualenv env
source env/bin/activate

to create your virtual environment. If you are told that pip is not found, try using pip3 instead. For the following steps however (in the virtualenv), pip should work.

The python dependencies will be install automatically when you start the app. The same will happen for Kafka and the ML models.

Spin up your own kowalski without Docker

Setting up config files

Similar to the Docker setup, you need config files in order to run Kowalski. You can start off by copying the default config/secrets over. Here however, the default config file is config.local.yaml:

cp config.defaults.yaml config.yaml

Setting up the MongoDB database

Running a local MongoDB instance

If you are running a local mongodb instance, the default config file should work out of the box, except if you are using a different port, a replica set, different database name or different usernames/passwords. In that case, you will need to edit the database section of the config file.

You can find detailed instructions on how to set up a MongoDB cluster here.

We also need to set the admin and user roles for the database. To do so, login to mongdb:

mongosh --host 127.0.0.1 --port 27017

and then from within the mongo terminal, set (using the default values from the config):

use admin
db.createUser( { user: "mongoadmin", pwd: "mongoadminsecret", roles: [ { role: "userAdmin", db: "admin" } ] } )
db.createUser( { user: "ztf", pwd: "ztf", roles: [ { role: "readWrite", db: "admin" } ] } )
use kowalski
db.createUser( { user: "mongoadmin", pwd: "mongoadminsecret", roles: [ { role: "userAdmin", db: "kowalski" } ] } )
db.createUser( { user: "ztf", pwd: "ztf", roles: [ { role: "readWrite", db: "kowalski" } ] } )
exit

Using MongoDB Atlas or a remote MongoDB instance

If you are using a mongodb atlas cluster, kowalski won't be able to create admin users, so you will need to do so manually on the cluster's web interface. You will need to create 2 users: admin user and user, based on what usernames and passwords you've set in the config file in the database section. Don't forget to also allow access from your IP address, or simply allow access from anywhere.

Start and test the app

Starting the app

To start the app, run:

make run

This will start the API, the dask clusters and alert brokers.

Run the tests

To run the tests, run:

make test

If you want to run a specific test, you can do so by running:

PYTHONPATH=. pytest -s kowalski/tests/<test_file.py>

Ingester (Pushing alerts to a local kafka topic)

Once the broker is running, you might want to create a local kafka stream of alerts to test it. To do so, you can run the ingester with

cd kowalski

and running:

PYTHONPATH=. python kowalski/tools/kafka_stream.py --topic="<topic_listened_by_your_broker>" --path="<alerts_folder_in_data>" --test=True

where <topic_listened_by_your_broker> is the topic listened by your broker (ex: ztf_20200301_programid3 for the ztf broker) and <alerts_folder_in_data> is the path to the alerts in the data/ directory of the kowalski app (ex: ztf_alerts/20200202 for the ztf broker).

To stop the broker, you can simply press Ctrl+C in the terminal where you started it.

Spin up your own kowalski using Docker

Setting up config files

You need config files in order to run Kowalski. When running in docker, this is done with the docker.yaml file, which is already configured to use a database running in a mongodb container. If you need to make any other changes, you can copy the relevant sections from the config.defaults.yaml file into docker.yaml. You also need to create a docker-compose.yaml file. You can start off by copying the default config/secrets over:

cp docker-compose.defaults.yaml docker-compose.yaml

config.defaults.yaml contains the API and ingester configs, together with all the secrets. So be careful when committing code / pushing docker images.

However, if you want to run in a production setting, be sure to modify docker.yaml and choose strong passwords!

docker-compose.yaml serves as a config file for docker compose, and can be used for different Kowalski deployment modes. Kowalski comes with several template docker compose configs (see below for more info).

Building Kowalski

Finally, once you've set the config files, you can build an instance of Kowalski. You can do this with the following command:

make docker_build

You have now successfully built a Kowalski instance! Any time you want to rebuild kowalski, you need to re-run this command.

Running Kowalski

  • make docker_up to start up a pre-built Kowalski instance

Running the tests

You can check that a running docker Kowalski instance is working by using the Kowalski test suite:

make docker_test

Shutting down Kowalski

make docker_down

Different Deployment scenarios (using Docker)

Kowalski uses docker compose under the hood and requires a docker-compose.yaml file. There are several available deployment scenarios:

  • Bare-bones
  • Bare-bones + broker for SkyPortal / Fritz
  • Behind traefik

Bare-bones

Use docker-compose.defaults.yaml as a template for docker-compose.yaml. Note that the environment variables for the mongo service must match admin_* under kowalski.database in config.yaml.

Bare-bones + broker for SkyPortal / Fritz

Use docker-compose.fritz.defaults.yaml as a template for docker-compose.yaml. If you want the alert ingester to post (filtered) alerts to SkyPortal, make sure {"misc": {"broker": true}} in config.yaml.

Behind traefik

Use docker-compose.traefik.defaults.yaml as a template for docker-compose.yaml.

If you have a publicly accessible host allowing connections on port 443 and a DNS record with the domain you want to expose pointing to this host, you can deploy kowalski behind traefik, which will act as the edge router -- it can do many things including load-balancing and getting a TLS certificate from letsencrypt.

In docker-compose.yaml:

  • Replace [email protected] with your email.
  • Replace private.caltech.edu with your domain.

API Docs

OpenAPI specs are to be found under /docs/api/ once Kowalski is up and running.

Developer guide

How to contribute

Contributions to Kowalski are made through GitHub Pull Requests, a set of proposed commits (or patches).

To prepare, you should:

Then, for each feature you wish to contribute, create a pull request:

  1. Download the latest version of Kowalski, and create a new branch for your work.

    Here, let's say we want to contribute some documentation fixes; we'll call our branch rewrite-contributor-guide.

    git checkout master
    git pull upstream master
    git checkout -b rewrite-contributor-guide
  2. Make modifications to Kowalski and commit your changes using git add and git commit. Each commit message should consist of a summary line and a longer description, e.g.:

    Rewrite the contributor guide
    
    While reading through the contributor guide, I noticed several places
    in which instructions were out of order. I therefore reorganized all
    sections to follow logically, and fixed several grammar mistakes along
    the way.
    
  3. When ready, push your branch to GitHub:

    git push origin rewrite-contributor-guide

    Once the branch is uploaded, GitHub should print a URL for turning your branch into a pull request. Open that URL in your browser, write an informative title and description for your pull request, and submit it. There, you can also request a review from a team member and link your PR with an existing issue.

  4. The team will now review your contribution, and suggest changes. To simplify review, please limit pull requests to one logical set of changes. To incorporate changes recommended by the reviewers, commit edits to your branch, and push to the branch again (there is no need to re-create the pull request, it will automatically track modifications to your branch).

  5. Sometimes, while you were working on your feature, the master branch is updated with new commits, potentially resulting in conflicts with your feature branch. To fix this, please merge in the latest upstream/master branch:

    git merge rewrite-contributor-guide upstream/master

Developers may merge master into their branch as many times as they want to.

  1. Once the pull request has been reviewed and approved by at least two team members, it will be merged into Kowalski.

Pre-commit hook

Install our pre-commit hook as follows:

pip install pre-commit
pre-commit install

This will check your changes before each commit to ensure that they conform with our code style standards. We use black to reformat Python code and flake8 to verify that code complies with PEP8.

Add a new alert stream to Kowalski

To add a new alert stream to kowalski, see the PR associated with the addition of WINTER to Kowalski. A brief summary of the changes required (to add WINTER into Kowalski, but hopefully can be extended to any other survey) is given below -

  1. A new kowalski/alert_brokers/alert_broker_<winter>.py needs to be created for the new alert stream. This can be modelled off the existing alert_broker_ztf.py or alert_broker_pgir.py scripts, with the following main changes -

    a. watchdog needs to be pointed to pull from the correct topic associated with the new stream

    b. topic_listener needs to be updated to use the correct dask-ports associated with the new stream from the config file (every alert stream should have different dask ports to avoid conflicts). topic_listener also needs to be updated to use the <WNTR>AlertConsumer asociated with the new stream.

    c. <WNTR>AlertConsumer needs to be updated per the requirements of the survey. For example, WINTER does not require MLing prior to ingestion, so that step is excluded unlike in the ZTFAlertConsumer. The WNTRAlertConsumer also does a cross-match to the ZTF alert stream, a step that is obviously not present in ZTFAlertConsumer.

    d. <WNTR>AlertWorker needs to be updated to use the correct stream from SkyPortal. alert_filter__xmatch_ztf_alerts needs to be updated with the new survey-specific cross-match radius (2 arcsec for WINTER).

  2. In kowalski/alert_brokers/alert_broker.py, make_photometry needs to be updated with the filterlist and zeropoint system appropriate for the new stream.

  3. A new kowalski/dask_clusters/dask_cluster_<winter>,py needs to be created, modeled on dask_cluster.py but using the ports for the new stream from the config file.

  4. The config file config.defaults.yaml needs to be updated to include the collections, upstream filters, crossmatches, dask ports, and ml_models (if MLing is necessary) for the new stream. No two streams should use the same ports for dask to avoid conflicts. Entries also need to be made in the supervisord_<api_or_ingester.conf.template configs found in the conf directory.

  5. Some alerts need to be added to data/ for testing. Tests for alert ingestion (tests/test_ingester_<wntr>.py) and alert processing (tests/test_alert_broker_wntr.py) can be modeled on the ZTF tests, with appropriate changes for the new stream. The ingester test is where you will be able to create a mock kafka stream to test your broker.

  6. Need to edit ingester.Dockerfile so that all new files are copied into the docker container (add or modify the COPY lines).

Add a new ML model to Kowalski

For now, only the ZTF alert stream has a method implement to run ML models on the alerts. However, this can be extended as reused as a basis to run ML models on other streams as well.

To add a new ML model to run on the ZTF alert stream, you simply need to add the model to the models directory, where you create a directory named after your instrument, which will contain the models you want to add. The models need to be referenced in the config file in the ml section, in a key named after your instrument. The name of that key lowered will be the name of the directory added in the models directory. The model will then be automatically loaded and will run on the alerts. In the configm you cqn provide an URL for your model. This will ensure that the model is downloaded and added in the right directory when Kowalski is started.

Here are the exact steps to add a new ML model to Kowalski:

  1. Add the model in .h5 format, or if you are using a .pb format you can also add the model files and directories in a folder called <instrument_name_lowered> in the models directory.

  2. Add the model name to models.<instrument_name>.<model_name> in the config file. All models need to have at least the following fields:

    • triplet: True or False, whether the model uses the triplet (images) or not as an input to the model
    • feature_names: list of features used by the model as a tuple, they need to be a subset of the ZTF_ALERT_NUMERICAL_FEATURES found in kowalski/utils.py. Ex: ('drb', 'diffmaglim', 'ra', 'dec', 'magpsf', 'sigmapsf')
    • version: version of the model
    • url: URL to download the model from. This is useful so that Kowalski can download the model when it starts, and you don't need to add the model to the models directory. If you don't provide a URL, Kowalski will look for the model in the models directory.
  3. Then, you might want to provide additional information about the model, such as:

    • feature_norms: dictionary of feature names and their normalization values, if the model was trained with normalized features
    • order: in which order do the triplet and features need to be passed to the model. ex: ['triplet', 'features'] or ['features', 'triplet']
    • format: format of the model, either h5 or pb. If not provided, the default is h5.

The best way to see if the model is being loaded correctly is to run the broker tests. These tests will show you the models that are running, and the errors encountered when loading the models (if any).

Ingest Catalogs in Kowalski (fits, csv)

To ingest a new catalog into Kowalski from a fits, csv, or parquet file, you can use the ingest_catalog.py script in the ingesters directory. To use it, run:

PYTHONPATH=. python kowalski/ingesters/ingest_catalog.py --catalog_name="TEST" --path="data/catalogs/<your_file.fits_csv_or_parquet>" --max_docs=500 --ra_col="RA" --dec_col="DEC" --format="<fits_csv_or_parquet>"

The max_docs argument is optional, and if not provided, the entire catalog will be ingested. We advise setting it to somewhere between 1 and 10 the first time, to assess if the catalog is being ingested as expected before ingesting the rest. If you want to specify the RA and Dec columns, you can use the --ra_col and --dec_col arguments; if not provided, the script will try to find the RA and Dec columns in the catalog. The format argument is also optional, and if not provided, the script will try to process the file as a fits file. The catalog name argument and the path are required.

Once a catalog has been added, you can perform cross-matches between the catalog and candidates. To do so, add the catalog and the cross-match conditions (like the search radius) in the database.xmatch.<instrument_name> section of the config file.

Here is an example of how to add a cross-match between the ZTF alert stream and a catalog called TEST, which has some columns called j_m, h_m, and k_m, and for which we want to perform a cross-match with a search radius of 2 arcsec:

ZTF:
   TEST:
      cone_search_radius: 2
      cone_search_unit: "arcsec"
      filter: {}
      projection: #column for your catalog that you want to have in the crossmatch results
         _id: 1
         coordinates.radec_str: 1
         j_m: 1
         h_m: 1
         k_m: 1

kowalski's People

Contributors

andytza avatar bfhealy avatar dannygoldstein avatar dekishalay avatar dependabot[bot] avatar dmitryduev avatar guiga004 avatar guynir42 avatar kmshin1397 avatar mcoughlin avatar robertdstein avatar rstrausb avatar stefanv avatar theodlz avatar virajkaram avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar

kowalski's Issues

How to ingest local data into kowalski

Looking for a tutorial on what to do after adding a new stream to kowalski. I am adding a new instrument to kowalski called TURBO. I have followed the steps in the tutorial about adding WINTER to kowalski. I think I have things set up correctly as I can successfully complete the test step using TURBO data. I now don't know what to do to have kowalski ingest local turbo data and then post candidates to skyportal.

TNS on Fritz front end reports "No matches found", but when TNS searched, object found

Describe the bug
Front end source display shows "No matches found" for TNS, but when you click the TNS catalog button, a source is found.

To Reproduce
Steps to reproduce the behavior:

  1. Go to source page for ZTF21abctnoh, or ZTF21abebphe
  2. Click on TNS catalog button
  3. Scroll down to search results
  4. See error

Expected behavior
Consistency between TNS match display and search results from catalog

Screenshots
image

image

Platform information:

  • Fritz version: v0.9.dev0+git20210603.ab93174+fritz.716e016.
  • Browser
  • I'm browsing on a desktop
  • I'm browsing on my phonePlease fill out relevant sections below; remove those that are unused.

Context

Originally reported on fritz as Issue #105

iterrows in catalog ingestion casts all dtypes to float

When ingesting a parquet file using ingest_catalog.py, this line casts all columns' dtypes to float. Thus the downstream checks for different dtypes will not work as intended.

Perhaps using itertuples instead would help by preserving the dtypes (at the expense of some re-working of the loop)? Note this could introduce a new issue where columns beginning with an underscore are renamed (e.g. _id, which gets mapped to _1). This might be avoided by setting name=None within itertuples.

Non-docker-based build system

Feature Summary
In order to increase the dev speed, we need a non-docker-based build system.

Implementation details
Use a virtual environment for the python stuff. For devs, run the app with the reload parameter or use gunicorn with the --reload option. Keep mongo and the java/kafka stuff containerized.
Avoid make at all costs!

Additional context
The docker stuff should not be scrapped since it'll still be used in prod.

Add VLASS as a database option

VLASS is a useful point source catalog of radio sources:

https://cirada.ca/vlasscatalogueql0

Can be loaded as, for example:

    df = pd.read_csv(catalog_file)
    df = df[df["Duplicate_flag"] < 2]
    df = df[df["Quality_flag"] == 0]
    df = df[df["Total_flux"] >= 1]
    df = df[df["Total_flux"]/df["E_Total_flux"] >= 5]

Test ZTF matchfile ingestion

Feature Summary
Create several small matchfiles with public data only (from real matchfiles) and test ingesting them with ingest_ztf_matchfiles.py.

Assessing completeness of ingested catalogs

It may be useful to have code that quickly assesses the completeness of ingested ZTF source catalogs. This code in scope (https://github.com/ZwickyTransientFacility/scope/blob/dbf2c31719ec04cb7e86338d7fa6f9c0fd8a01cf/tools/generate_features_slurm.py#L51) could be adapted for this purpose. It iterates over ZTF field/ccd/quad combinations, performing find_one queries to identify whether any sources exist. Comparing the results to previous data releases is a quick way to see if a new catalog is complete (if the new catalog has sources in fewer fields than an older catalog, something likely went wrong during the ingestion).

Add mypy pre-commit hook

Feature Summary
As suggested by @stefanv, add mypy to pre-commit hooks.

Implementation details

  - repo: https://github.com/pre-commit/mirrors-mypy
    rev: 'v0.782'
    hooks:
    -   id: mypy

A file is missing

When I'm using ./kowalski.py up to build, there is an error failed to compute cache key: "/version.txt" not found: not found.
In fact there is no version.txt file.

Add PTF data as database option

As preparation for practicing combining ZTF with future surveys like LSST (and to facilitate long baseline period searches now), PTF would be a really nice value-added catalog for our period searches.

CI action to deploy to staging on PR merges/commits to master

Feature Summary
A GH action that would deploy K to a/the staging server on PR merges/commits to master.

Implementation details
Mimic Ansible and just ssh into, say, skipper, then cd ..., pull latest master, check/fix config if necessary, build images, deploy.

Alternative Solutions
Build, upload, and deploy images similar to how it's done for Fritz?

Primary/secondary instances of Kowalski for querying

This should be discussed with the team to see what the implementation should look like.

Feature Summary
Implement a mode that would allow multiple instances of Kowalski to operate under one "umbrella".

Usage / behavior
The user would query a primary instance of K. If the latter does not find the requested catalog in its db, it would ask secondary instances to see if they have it, and query them if they do.

Alternative Solutions

  • MongoDB sharding.
  • Sync the users across instances, report the "routing" info to the user; decide under the hood on the penquins side what instance to query

Implementation details
In config.yaml, set up

{
  primary_instance: <false|true>
  secondary_instances: <list of known secondary instances access info>
}

Refactor ZTF matchfiles pre-processing and ingestion

Feature Summary
ZTF switches to a 2-month data release schedule in Spring 2021. This means that the ZTF matchfiles that contain absolute photometry for over a billion sources in multiple filters will also be re-generated every 2 months. The matchfiles are pre-processed and ingested into Kowalski feeding many of the variable science and machine learning applications. The matchfiles take up well over 20 TB (in a binary format) and it takes quite a bit of time to pre-process and ingest that data. We need to refactor the pipeline so that it is faster and complies with the constraints that we have in terms of the available space and compute.

Usage / behavior
[to be filled in]

Implementation details
Currently, we are using a set of scripts:
https://github.com/dmitryduev/kowalski/blob/master/tools/ztf_mf2dump.py

Applying filter changes immediately on subsequent alerts

Hi,

We had a discussion about whether filter modifications are immediately applied to subsequent alerts. You said that the filters are only loaded once per day. Would it be possible to reduce the delay time, ideally no delay at all? This would be very useful for debugging filters, in case a problem was not caught by the Compass.

Cheers,

Steve

Brokering service for Gattini IR alert stream

Feature Summary
Implement a brokering service for the Palomar Gattini IR (PGIR) alert stream.

Usage / behavior
The service should mimic the behavior of the ZTF alert brokering service.

Implementation details

  • config.defaults.yaml
    • Add alerts_pgir: "PGIR_alerts" and alerts_pgir_aux: "PGIR_alerts_aux" to database.collections
    • Add necessary indices to database.indexes.PGIR_alerts
    • Define upstream filters in database.filters.PGIR_alerts
    • Refactor xmatch section?
    • Add dask_pgir section; use a different port for the dask scheduler
    • Add supervisor programs to supervisord.ingester for program:dask-cluster-pgir and program:alert-broker-pgir
  • ingester.Dockerfile
    • Copy kowalski/alert_broker_pgir.py and kowalski/dask_cluster_pgir.py to /app
  • kowalski/alert_broker_pgir.py and kowalski/dask_cluster_pgir.py
    • Essentially copy over kowalski/alert_broker_ztf.py and kowalski/dask_cluster.py, keep the relevant parts of the code, and make the necessary adjustments.
  • tests/test_ingester_pgir.py
    • Implement an end-to-end PGIR alert processing test; use tests/test_ingester.py as an example. The latter pulls sample alerts from a public Google Cloud bucket. You may either do the same or post several examples to a GH repo.
    • add corresponding code to kowalski.py::test (use the section for ZTF alerts as an example)

Note that we will need to implement several things on the F side to enable filtering (create a stream, handle permissioning, PGIR alert page etc.)

Ingest PS1 STRM into Kowalski

Please fill out relevant sections below; remove those that are unused.

Feature Summary
It would be useful if the PS1 STRM catalog can be ingested into Kowalski. The catalog is described and can be downloaded here. It provides photo-z and morphological classes (galaxy/star/qso) for all point sources detected in the PS1 catalog.

Usage / behavior
The user should be able to query this Table using ra, dec, and a cross match radius.

Additional context
Currently I query this table using CasJobs, as is shown here. If this table is ingested into Kowalski, then the different groups on Fritz can cross match their ZTF candidates with this catalog in the filtering and add auto-annotation into the source page.

Add some salt to generated tokens

Feature Summary
...otherwise if expiration date is not set in the config, the API will always generate the same token.

Alternative Solutions
Care not?

Implementation details
Add the "created_at" field or something.

Store info on successful/failed ops in broker mode

Keep track of alerts that should have been posted to SP, but the operation failed for whatever reason. Retry posting next time the ingester pulls the alert in from the Kafka topic.

  • store info on failed candids in a dedicated collection

Refactor all data ingest scripts into a general class + subclassing

Feature Summary
There's quite a bit of duplicate code in those ingest scripts, so it would be useful to refactor them into a general Ingestor class + subclassing for the custom logic.

Notes on implementation

  • _id type conversion:
df["_id"] = df["_id"].astype(int)
  • search and remove debug statements

TestIngester::test_ingester Failed while testing local version of kowalski

Describe the bug
When testing a local version of Kowalski (using ./kowalski.py test), the test_ingester.py::TestIngester::test_ingester - assert 0 == 313 test fails.
The cause of this could possibly be the following failures on connecting to the Kafka cluster (see screenshot, full log attached)-

  1. Message delivery failed: KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}

2. Message delivery failed: KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}%3|1615087455.390|FAIL|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT, 15 identical error(s) suppressed)

To Reproduce

  • Execute ./kowalski.py test

Expected behavior

  • All tests should pass, and the different mongo-db collections should be populated with a few ZTF alerts.

Screenshots
Screen Shot 2021-03-09 at 11 38 12 AM
Screen Shot 2021-03-09 at 11 41 38 AM

Platform information:

  • macOS Catalina 10.15.7
  • anaconda3 version 3.8.5
  • docker version 20.10.5
  • docker-compose version 1.28.5

Additional context
Full log of the tests
kowalski_test_log.log

Ingest VSX catalog

The VSX catalog contains 2M variable sources. For automatic filtering and crossmatching, this catalogue is useful to have.

Feature Summary
Ingest VSX catalogue from vizier: http://vizier.u-strasbg.fr/viz-bin/VizieR?-source=B%2Fvsx The vizier catalog is regularly updated, and we like to keep to Kowalski version up to date too (can be done manually).

Implementation details
requires a python script that takes the downloaded Vizier catalog and ingests it into Kowalski. Similar to other catalogs.

Codec problem

Hi Dima,

I am using python 3.6 and try to install kowalski. When I do

./kowalski.py up

I get the following error message:

Traceback (most recent call last):
  File "./kowalski.py", line 194, in <module>
    getattr(sys.modules[__name__], args.command)(args)
  File "./kowalski.py", line 32, in up
    print('Spinning up Kowalski \U0001f680')
UnicodeEncodeError: 'ascii' codec can't encode character '\U0001f680' in position 21: ordinal not in range(128)

How can I fix this problem?

Cheers,

Steve

Reticles are off for non-square thumbnails posted to SP in broker mode

We need a way to handle this situation:
image

These are events detected close to the CCD edge. They are always centered on the longer side. We need to use the (x, y) position of the transient to pad the shorter side in such a way that it always lands in the center.

A nice idea from @stefanv: make the thumbnails transparent where there is no data!

The relevant code lives here: https://github.com/dmitryduev/kowalski/blob/master/kowalski/alert_broker.py#L434.

add IGAPS dataset

IPHAS dr2 has been superseded by the IGAPS catalog and the survey should be updated

Feature Summary
The IPHAS dr2 catalog has been superseded by IGAPS (http://www.star.ucl.ac.uk/IGAPS/survey.shtml). IGAPS included both IPHAS and UVEX; and contains data in the Ugriz-filters. Especially the U-band data is important for compact object science.

Usage / behavior
This is simply an update of a catalog available for crossmatching

Implementation details
This is simply an update of a catalog. If needed, some catalog columns can be omitted.

Additional context
IGAPS is a Galactic Plane catalog, having this available soon would be nice since the Galactic plane is rising again in the sky

New query type: "near"

Feature Summary
Implement a new query type near with a footprint similar to that of cone_search, which would return the nearest object from catalog(s).

Usage / behavior
User will be able to specify min_distance and max_distance in radians in kwargs.

Implementation details
Use MongoDB's $nearSphere operator.

Consider passing image directly to frontend

Adam Miller requested that we add median centered inverse hyperbolic sine scaling. However, currently LogNorm is hardcoded.

    if ztftype != 'Difference':
        img[img <= 0] = np.median(img)
        plt.imshow(img, cmap=plt.cm.bone, norm=LogNorm(), origin='lower')
    else:
        plt.imshow(img, cmap=plt.cm.bone, origin='lower')

With this scaling, we could potentially get rid of the if clause.

However, perhaps it would be simplest to leave these scalings to the frontend, paving the way for using JS9.

Update TNS "mirror" service

To reduce the number of API calls to the TNS, we should use the public_timestamp parameter in our queries, see here for details. The weekly sync job could then be excluded.

ZTFTriggerHandler should gracefully handle repeated queue name

Kowalski only currently knows about a 201 response from ZTF's scheduler. However, 200 is another useful error code indicating the queue already exists. We should have that returned separately.

check if the queue already exists--don't replace it if so

# (PUT should be idempotent)
if data['queue_name'] in request.app['scheduler'].queues:
    msg = f"Submitted queue {data['queue_name']} already exists"
    [logging.info](http://logging.info/)(msg)
    return web.Response(status=200, text=msg)

Rate limiting for filters in broker mode

Feature Summary
With broker mode activated, should impose certain rate limits, e.g. don't allow auto-saving more than 5,000 sources in a 24-hour period to SP.

Usage / behavior
Stop posting new sources to SP once the limit is reached.

Alternative Solutions
Limit rate on the SP side?

Implementation details
Current thinking: implement #18 and this on top of it, i.e. store the info on what has been posted to SP (objectId, candid, jd, passing filter ids, save/post state)

Add option to update annotations when alert passes filter again

Currently, in broker mode, K posts annotations to SP only the first time an alert from an object passes a filter. We need to add an option to update annotations in case a new alert from the same object passes the filter.
[As a side note: we do update the passing_alert_id and passed_at fields every time]

Queries API refactor

Feature Summary
Refactor the queries API to be more clean and concise.

Usage / behavior
Noting should change from the enduser perspective.

Implementation details

  • Refactor the queries API to do data validation and preprocessing using odmantic
  • Assemble the relevant handlers into a class
  • Ditch saving queries to db and executing them in the background: no one has used this capability for about a year.

Merge all config files into a single yaml file

  • config_api.json and config_ingester.json should just use the new config.yaml
  • merge secrets.json into config.yaml as well
  • supervisord config files should be auto-generated at build time

Performance report

Feature Summary
Inspect the broker logs on a daily basis and generate and post a summary pdf report with plots to a Slack channel.

Implementation details
Run it every day or so; add a supervisor program to config.defaults.yaml in superviosrd.ingester

Use something like:

import datetime
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import pathlib
from tqdm.auto import tqdm

p_base = "/data/logs"
start_date = datetime.datetime(2021, 2, 16)
end_date = datetime.datetime(2021, 2, 17)

log_dir = pathlib.Path(p_base)
log_files = list(log_dir.glob("dask_cluster*"))

actions = {
    "mongification": [],
    "mling": [],
    "ingesting": [],
    "xmatch": [],
    "clu_xmatch": [],
    "aux_updating": [],
    "filtering": [],
    "is_candidate": [],
    "is_source": [],
    "get_candidate": [],
    "make_photometry": [],
    "post_photometry": [],
    "post_annotation": [],
    "make_thumbnail": [],
    "post_thumbnail": [],
}

for log_file in tqdm(log_files):
    with open(log_file) as f_l:
        lines = f_l.readlines()
        
    for line in tqdm(lines):
        if len(line) > 5:
            try:
                tmp = line.split()
                t = datetime.datetime.strptime(tmp[0], "%Y%m%d_%H:%M:%S:")
                if start_date <= t <= end_date:
                    if ("mongification" in line) and ("took" in line):
                        actions["mongification"].append(float(tmp[-2]))
                    if ("mling" in line) and ("took" in line):
                        actions["mling"].append(float(tmp[-2]))
                    if ("Xmatch" in line) and ("took" in line):
                        actions["xmatch"].append(float(tmp[-2]))
                    if ("CLU xmatch" in line) and ("took" in line):
                        actions["clu_xmatch"].append(float(tmp[-2]))
                    if ("ingesting" in line) and ("took" in line):
                        actions["ingesting"].append(float(tmp[-2]))
                    if ("aux updating" in line) and ("took" in line):
                        actions["aux_updating"].append(float(tmp[-2]))
                    if ("Filtering" in line) and ("took" in line):
                        actions["filtering"].append(float(tmp[-2]))
                    if ("Checking if object is Candidate" in line) and ("took" in line):
                        actions["is_candidate"].append(float(tmp[-2]))
                    if ("Checking if object is Source" in line) and ("took" in line):
                        actions["is_source"].append(float(tmp[-2]))
                    if ("Getting candidate info" in line) and ("took" in line):
                        actions["get_candidate"].append(float(tmp[-2]))
                    if ("Making alert photometry" in line) and ("took" in line):
                        actions["make_photometry"].append(float(tmp[-2]))
                    if ("Posting photometry" in line) and ("took" in line):
                        actions["post_photometry"].append(float(tmp[-2]))
                    if ("Posting annotation" in line) and ("took" in line):
                        actions["post_annotation"].append(float(tmp[-2]))
                    if ("Making" in line) and ("thumbnail" in line) and ("took" in line):
                        actions["make_thumbnail"].append(float(tmp[-2]))
                    if ("Posting" in line) and ("thumbnail" in line) and ("took" in line):
                        actions["post_thumbnail"].append(float(tmp[-2]))
            except Exception as e:
                continue
    
for action, values in actions.items():
    actions[action] = np.array(values)
    print(action)
    print(
        f"median: {np.median(values):.5f}, "
        f"std: {np.std(values):.5f}, "
        f"min: {np.min(values):.5f}, "
        f"max: {np.max(values):.5f}"
    )
    print(f"total: {np.sum(values):.5f} seconds / {np.sum(values)/60:.2f} minutes")
    fig = plt.figure(figsize=(8, 2), dpi=120)
    ax = fig.add_subplot(111)
    ax.hist(values, bins=100, range=(0, np.median(values) + 3*np.std(values)))
    ax.grid(alpha=0.3)
    plt.show()

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.