Giter VIP home page Giter VIP logo

arthur-redshift-etl's Introduction

Lint Python code Unit Tests Publish Docker image Language grade: JavaScript Language grade: Python

ETL Code for Loading Data Into a Redshift-based Data Warehouse

               _   _                  _____          _     _     _  __ _     ______ _______ _
    /\        | | | |                |  __ \        | |   | |   (_)/ _| |   |  ____|__   __| |
   /  \   _ __| |_| |__  _   _ _ __  | |__) |___  __| |___| |__  _| |_| |_  | |__     | |  | |
  / /\ \ | '__| __| '_ \| | | | '__| |  _  // _ \/ _` / __| '_ \| |  _| __| |  __|    | |  | |
 / ____ \| |  | |_| | | | |_| | |    | | \ \  __/ (_| \__ \ | | | | | | |_  | |____   | |  | |____
/_/    \_\_|   \__|_| |_|\__,_|_|    |_|  \_\___|\__,_|___/_| |_|_|_|  \__| |______|  |_|  |______|

Arthur is an ETL tool for managing a data warehouse in the AWS ecosystem.


  • Update 2023-02-13 We have decided to sunset development on this project. At this time, there are other mature offerings that allow to build out an ELT such as dbt, Airflow, meltano. We are going to only make updates necessary for production use given the current feature set and will cease complete support in 6 months.

Arthur is designed to manage a warehouse in full-rebuild mode where the entire warehouse is rebuilt, from scratch, every night and then updated using refresh runs during the day. Arthur is not designed to support streaming or micro-batch ETLs. Arthur is best suited for organizations whose data are managed in a stateful transactional database and have lots of complicated business logic for their data transformations that they want to be able to manage effectively.

If you’re interested in this approach or are in a similar situation, then we’d like to talk to you. Please reach out and let’s have a data & analytics meetup.

This README outlines how to get started with the ETL'ing and basic principles. This includes information about setting up Arthur which is the driver for ETL activities.

You are probably (also) looking for the wiki pages, which include a lot more information about the ETL and what it does (and why it does what it does). And if something appears amiss, check out the issues page.

Installing the code

See the separate INSTALL.md file.

Documentation

See also our wiki pages. And here's a presentation about Arthur, given at the Startup booth during the AWS Summit in New York.

Configuring the ETL (and upstream sources)

The best approach is to have a separate repo for your data warehouse that contains the configuration files and all the table design files and transformation code in SQL. The documentation will in many places assume that you have a "sibling" repo so that when within the repo for your local data warehouse (with configuration, credentials, and table designs), you can simply use ../arthur-redshift-etl/ to find your way back to this ETL code.

Redshift cluster and users

Although the Redshift cluster can be administered using the AWS console and psql, some helper scripts will make setting up the cluster consistently much easier. (See below for initialize and create_user.)

Also, add the AWS IAM role that the database owner may assume within Redshift to your settings file so that Redshift has the needed permissions to access the folder in S3. (And don't forget to add the role to the list of known IAM roles in Redshift.)

Sources

See the wiki pages about a description of configurations.

Running the ETL (arthur.py)

General notes about the CLI

  • It's easiest to use a Docker container to run arthur.py.
  • Commands will provide usage information when you use -h.
    • There is also a help command that provides introductions to various topics.
  • Commands need configuration files. They will pick up all files in a local ./config directory or from whatever directory to which DATA_WAREHOUSE_CONFIG points.
  • Commands accept a --dry-run command line flag to test without modifying the environment.
  • Most commands allow the use of glob patterns to select specific schema(s) or table(s).
  • Most commands use --prefix to select a folder in the S3 bucket.
    • A few development commands normally pick up local files first and you need to add --remote to go to S3.
  • To pick a prefix without specifying it every time, set the environment variable ARTHUR_DEFAULT_PREFIX.
  • Log files are by default in arthur.log. They are rotated and deleted so that your disk doesn't fill up too much.
  • Logs that are collected from data pipelines are in stderr.gz or StdErr.gz files since Arthur logs to stderr.
  • To see more log information, use --verbose. To see less, use --quiet.
  • To see them formatted in the console the same way as they are formatted in the log files, use --prolix.
  • You could copy data manually, but you probably shouldn't and let arthur.py sync manage files.
  • You can use environment variables to pass in credentials for database access, but you should use a file for that.

Prerequisites for running the ETL in a cluster

Creating a credentials file

All credentials can be picked up from environment variables by the ETL. Instead of setting these variables before starting the ETL, you can also add a file with credentials to the config directory where the ETL will pick them up for you. The credentials file should be formatted just like a shell file would be to set variables, meaning lines should have the form:

# Lines with '#' are ignored.
NAME=value
# Although not meaningful within the ETL code, you can use the "export" syntax from Bash
export NAME=value

The minimal credentials file contains the login information for the ETL user that Arthur will use to execute in Redshift. Make sure this file exists in your data warehouse repo as config/credentials.sh:

DATA_WAREHOUSE_ETL=postgres://etl:<password>@<host>:<port>/<dbname>?sslmode=require

If you need to make changes in the cluster beyond schema changes, you will also need an admin:

DATA_WAREHOUSE_ADMIN=postgres://admin:<password>@<host>:<port>/<dbname>?sslmode=require

Starting Arthur in a Docker container

The INSTALL.md file will explain how to setup a Docker image to run Arthur.

Once you have that, getting to a prompt is easy:

bin/run_arthur.sh ../warehouse-repo/config production

This command will set the path to the configuration files and default environment (a.k.a. prefix) for you.

Copying code into the S3 bucket

From within the Docker container, use:

upload_env.sh

For this to work, you have to set the object_store in one of your configuration files.

As an alternative, you can use bin/deploy_arthur.sh from outside a container.

Starting a cluster and submitting commands

Start a cluster:

launch_emr_cluster.sh

Now check for the output and pick up the cluster ID. There will be a line that looks something like this:

+ CLUSTER_ID=j-12345678

You can then use arthur.py --submit "<cluster ID>" instead of arthur.py in the examples below. Note that the --submit option must be between arthur.py and the sub-command in use, e.g.

arthur.py --submit "<cluster ID>" load --prolix --prefix $USER

Don't worry -- the script launch_emr_cluster.sh will show this information before it exits.

Don't forget to run terminate_emr_cluster.sh when you're done.

Initializing the Redshift cluster

Sub-command Goal
initialize Create schemas, groups and users
create_groups Create groups that are mentioned in the configuration file
create_user Create (or configure) users that are not mentioned in the configuration file
# The commands to setup the data warehouse users and groups or any database is by ADMIN (connected to `dev`)
arthur.py initialize
arthur.py initialize development --with-user-creation  # Must create users and groups on first call

Starting with design files (and managing them)

Sub-command Goal
bootstrap_sources Download schemas from upstream sources and bootstrap design files
bootstrap_transformations Bootstrap (or update) design files for transformations based on new SQL queries
explain Review query plan for transformations
validate After making changes to the design files, validate that changes are consistent with the expected format and with respect to each other
sync Upload your local files to your data lake
# This will upload local files related to one schema into your folder inside the S3 bucket:
arthur.py sync "<schema>"
# This will upload local files for just one table
arthur.py sync "<schema>.<table>"

Note that when running sync that involved changes of source schemas or configurations, you must use:

arthur.py sync --force --deploy "<schema>.<table>"

Deploying into production

We prefer to have a short and succinct way to deploy our data warehouse files (configuration, design files and transformations) into production. So instead of starting a bash and running sync, just do:

bin/deploy_with_arthur.sh -p aws-prod-profile ../repo/config_directory/ production

Loading and updating data

Sub-command Goal
extract Get data from upstream sources (databases or S3)
load, upgrade Make data warehouse "structural" changes and let data percolate through
update Move data from upstream sources and let it percolate through
unload Take data from a relation in the data warehouse and extract as CSVs into S3
arthur.py extract
arthur.py load  # This will automatically create schemas and tables as necessary

Note that when a load fails, the work until the failed relation is still in the "staging" schemas. You can continue the load after fixing any query errors or input data, using:

arthur.py upgrade --with-staging-schemas --continue-from failed_relation.in_load_step

Within a production environment, check out install_pizza_pipeline.sh which provides a consistent interface.

Dealing with schemas (create, restore)

Sub-command Goal
create_schemas Create schemas; normally load will do that for you
promote_schemas Bring back schemas from backup if load was aborted or promote staging after fixing any issues

To test permissions (granting and revoking), use this for any schema:

arthur.py create_schemas schema_name
arthur.py create_schemas --with-staging schema_name
arthur.py promote_schemas --from staging schema_name

Working with subsets of tables

Sub-command Goal
show_downstream_dependents Inspect the other relations impacted by changes to the selected ones
show_upstream_dependencies Inspect which other relations feed the selected ones

The patterns used by commands like extract or load may be provided using files. Together with show_downstream_dependents and show_upstream_dependencies, this opens up opportunities to work on a "sub-tree" of the data warehouse.

Working with just the source schemas or transformation schemas

At the beginning it might be worthwhile to focus just on tables in source schemas -- those tables that get loaded using CSV files after extract.

arthur.py show_downstream_dependents -q | grep 'kind=DATA' | tee sources.txt

# List CSV files and manifests, then continue with upgrade etc.
arthur.py ls --remote @sources.txt

Note that you should use the special value :transformations when you're interested to work with transformations.

Example:

arthur.py show_downstream_dependents -q --continue-from=:transformations | tee transformations.txt

arthur.py sync @transformations.txt
arthur.py upgrade --only @transformations.txt

# If you don't make changes to transformations.txt, then you might as well use
arthur.py upgrade --continue-from=:transformations

Working with a table and everything feeding it

While working on transformations or constraints, it might be useful to focus on just a set of of tables that feed data into it.

Example:

arthur.py show_upstream_dependencies -q www.users | tee www_users.txt

arthur.py sync www.users
arthur.py upgrade --only @www_users.txt

Using configuration values to fill in templates

AWS service templates can be filled out based on configuration in the ETL.

Sub-command Goal
render_template Return a JSON document that has values (like ${resources.vpc.id}) filled in
show_value Show value of a single variable based on current configuration
show_vars Show variables and their values based on current configuration

Note this leaves references like #{parameter}, which are used by AWS tools, in place.

Example:

arthur.py render_template ec2_instance
arthur.py show_value object_store.s3.bucket_name
arthur.py show_vars object_store.s3.*

Working with a staging environment

A staging environment can help with deploying data that you'll be confident to release into production.

arthur.py initialize staging
arthur.py initialize staging --dry-run  # In case you want to see what happens but not lose all schemas.

Once everything is working fine in staging, you can promote the code into production.

sync_env.sh "<your S3 bucket>" staging production

Don't forget to upload any credentials_*.sh or environment.sh files as needed for production.

Contributing and Releases

Creating pull requests

Pull requests are welcome!

Development takes place on the next branch. So go ahead, and create a branch off next and work on the next ETL feature.

Formatting code

Please format your code using:

Also, we want to run code through:

Adding a pre-commit hook

Use pre-commit to run linters automatically on commit. This is the preferred method of running formatters and linters.

brew bundle
pre-commit install

You can also run the linters directly:

pre-commit run

Or you can re-run some linters with something like:

pre-commit run black --all-files

Running linters and formatters directly

To use the linters (isort, black, flake8, mypy) locally, install them into a virtual environment:

bin/build_virtual_env
source arthur_venv/bin/activate

Then you can run:

black python/ setup.py tests/
isort python/ setup.py tests/
flake8 python setup.py
mypy python

References

Releasing new versions

Here are the basic steps to release a new version. Appropriate as you deem appropriate.

Creating patches

For minor updates, use a PR to update next. Anything that requires integration tests or running code in development cluster first should go through a release candidate.

Creating a release candidate

  • Create a new branch for the release candidate, e.g. v1_2_0 for v1.2.0. (Do yourself a favor and use underscores in branch names and periods in the tag names.)

  • Your first commit on this branch is a bump in the version number in setup.py.

  • Create a pull request for this new branch, add a message to state that it is

    "Release Candidate for v1.2.0"
    .

  • Go through open pull requests that are ready for release and change their base branch to your release branch (in our example, v1_2_0).

    • Make sure the PR message contains the Issue number or the Jira story (like DW-99) or the Issue number from this repo, as applicable.
    • Add the changes from the story work into comments of the PR.
      • Consider changes that are user facing and make sure there's a summary for those
      • List all bug fixes (especially when there are associated tickets)
      • Highlight internal changes, like changes in data structures or added control flow
    • Then merge the ready PRs into your release candidate.
  • After testing, merge the PR with your release candidate into next.

  • Create a release under Releases.

    • Create a new release and set the release version, e.g. v1.2.0.
    • Copy the comments from the PR where you collected all the changes into the release notes.
    • Save the release which will add the tag of the release.
  • Ship the new version using upload_env.sh in development. Wait at least a day before promoting to production.

Releasing code to master branch

Once code is considered ready for production (and you've made sure there's an updated version number in setup.py):

  • Merge next into master
git checkout master
git pull
git merge --no-ff origin/next
git push
  • Tag the latest commit on master
git tag # pick a tag following SemVer
git push origin --tags
  • Then merge master back into next to ensure any hotfixes on master get picked up:
git checkout next
git pull
git merge origin/master
git push

Tips & Tricks

Miscellaneous

Using command completion in the shell

For the bash shell, there is a file to add command completion that allows to tab-complete schemas and table names.

source etc/arthur_completion.sh

(Within a Docker container, that happens automatically.)

EMR login / EC2 login

You can use the .ssh/config file to pick the correct user (hadoop) for the cluster and to automatically pick up a key file. Replace the IdentityFile value with the location of your key pair file.

Host ec2-*.amazonaws.com
  ServerAliveInterval 60
  User hadoop
  IdentityFile ~/.ssh/dw-dev-keypair.pem

If you find yourself using a one-off EC2 instance more often than an EMR cluster, change the User:

  User ec2-user

Virtual environment

If you want to have a virtual environment in your local directory, e.g. to make working with an IDE easier, then these steps will work:

python3 -m venv arthur_venv
source arthur_venv/bin/activate
python3 -m pip install --requirement requirements-all.txt

arthur-redshift-etl's People

Contributors

aheyman11 avatar bhtucker avatar blohr-hs avatar dependabot[bot] avatar jamessthompson avatar jwisdom-harrys avatar marwamc-hs avatar michael-john-fu avatar mikekaminsky avatar pettyjamesm avatar rishi-gajula-harrys avatar sjanahan avatar soumyadsanyal avatar soumyadsanyal-harrys avatar swong28 avatar thomas-vogels avatar triplec1988 avatar tvogels01 avatar willcampbell433 avatar ynaim94 avatar ynaim94-harrys avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

arthur-redshift-etl's Issues

Validation missing against external tables

Summary

When a view depends on a table inside a schema that's marked as an "external" one in the configuration,
Arthur skips checking the dependencies entirely and trusts the user's configuration.

We should, however, at least check that the external tables already exist!

Details

The SVV_EXTERNAL_TABLES table allows us to pull all existing external tables. We should take the intersection with the external schemas that are marked as "external". During the validation we can then ensure that any table marked as dependency is either known because it is managed by Arthur or is in the list of external tables.

Use tabulate from pypi

Summary

The ETL code has custom etl.text module when a general tabulate module is available.

Details

Get rid of nihs-type code and replace the etl.text refs to use functions from tabulate.

Move log files to a dedicated prefix

We currently have log files copied (from EMR or Data Pipeline) in logs folders under a prefix, such as development/logs. This doesn't allow a simple prefix filter for log ingestion. The effect is that the lambda for log processing gets called also when CSV files are created and many other instances.

Suggestion: Move log files into a toplevel _logs folder, so the example above would become _logs/development.

The effect is that we can have a prefix rule for lambdas on _logs. This also makes it easier to keep the log files while deleting a folder. So if there was an experiment poc, then removing files would previously delete poc/logs but now would leave _logs/poc

Support schema path templates in database source relations

Summary

Extract for database targets doesn't support the power config rendering that's available for static sources.

In extract, the output target directory comes from relation.data_directory, whereas for static sources and unloads, the schema-level path template is used.

Details

Both systems get at the same 'universe' of remote data file/directory addresses:

Unload:

s3_key_prefix = "{schema.s3_unload_path_prefix}/data/{schema.name}/{source.schema}-{source.table}/csv".format(
        schema=schema, source=relation.target_table_name,
    )

Sqoop:

            "--target-dir",
            '"s3n://{}/{}"'.format(relation.bucket_name, relation.data_directory()),

where data_directory is:

         return os.path.join(
            from_prefix or self.prefix or ".",
            "data",
            self.source_path_name,
            (self.schema_config.s3_data_format.format or "CSV").lower(),
        )

The Unload formulation is a bit more powerful. By moving extract targets onto the render-based system, the same 'archiving' use case (e.g. retain daily snapshots of relations using today/yesterday config values) that templating supports in unload can be done directly from upstream DBs at extract time.

I also see data_lake is in the config and seems related but didn't quite see how it fits in. Hopefully this could be involved in the 'harmonization' of these two systems in such a way as to allow configuration of the storage backend for extract/unload between e.g. GCS vs S3.

Labels Please set the label on the issue so that

  • you pick bug fix, feature, or enhancement
  • you pick one of the components of Arthur, such as component: extract or component: load

feature
component: extract

Docker setup fails with permissions error

Summary

After a fresh install, attempted to run arthur:

./bin/run_arthur.sh 
++ pwd
+ docker run --rm --interactive --tty {volumes omitted} --env DATA_WAREHOUSE_CONFIG=/opt/data-warehouse/warehouse_config --env ARTHUR_DEFAULT_PREFIX=bhtucker arthur-redshift-etl:latest
+ cd /opt/src/arthur-redshift-etl
+ python3 setup.py --quiet develop
error: could not create 'python/redshift_etl.egg-info': Permission denied

Details

Prior steps were only git clone and ./bin/build_arthur.sh

Propose label: Bug, maybe documentation bug?

Simplify upgrading transformations

Summary

Implement arthur.py upgrade --continue-from=:transformations

Details

If one wants to only load transformations, the current steps are to (1) list all transformations (using show_dependents) and (2) finding the first non-source relation, (3) passing that relation to --continue-from.
There are situations where we want to accept the data loaded into source tables and continue with transformations in development and responses. Especially in situations where we'll use the "pizza loader" it would be great to just indicate "start with transformations" instead of having to figure out what the first transformation is.

Refresh Pipeline Ends after Extract Retries

Summary

Using the refresh pipeline with non-zero retry attempts for extract, failed Sqoop jobs that Arthur goes on to correct will lead the 'extract' step to show a status of FAILED. Thus, even though data may be fully extracted, Data Pipeline will cascade the failure and arthur.py update will not run.

Details

This failure definition behavior doesn't seem to be configurable in AWS. However, the cascading failure behavior is.

Old data is (usually) better than no data for sources

In a concurrent load, extracts may be observed to fail or may stall for long enough that the load process 'times out' waiting for them.

When this happens, the last-loaded version of the data source may be available elsewhere in the warehouse (ie, if loading in staging, a copy is sitting in the non-prefixed 'standard' position). Rather than fail the relation, we should try to load our new table using the data from that existing table.

Support "uuid" type

We should explicitly call out when an upstream table or a transformation has a uuid column.
For upstream, this would allow us to enable partitioning by that column.

For now, the expectation is:

  1. Add new type of uuid to the schema for table designs.
  2. When upstream shows a uuid column, then assign that type automatically.

TableNames and Schemas share 'position' abstraction too loosely

Summary

We have a position abstraction in logic but not in our objects.

Details

Managed schemas can be promoted from one 'position' to another (ie, staging -> standard, backup -> standard).

TableNames can give their names in 'staging position', which implies a prefix if their schema is a managed schema. However, getting a 'staging' TableName involves setting a private attribute, which is bad.

The same concept as at work here, but is not documented/implemented in a way that makes the connection obvious.

Validate should warn if not-null constraint was removed from source table design

Is your feature request related to a problem? Please describe.
A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]

Describe the solution you'd like
A clear and concise description of what you want to happen.

Describe alternatives you've considered
A clear and concise description of any alternative solutions or features you've considered.

Additional context
Add any other context or screenshots about the feature request here.

Labels
Please set the label on the issue so that

  • you pick bug, feature, or enhancement
  • you pick one of the components of Arthur, such as component: extract or component: load

Rotate over tables not sources for extract

Summary

Let's switch to rotating (round-robin style) over tables not sources during extracts.

The current implementation of extract runs one thread per source and then serially dumps data from tables in that source. The suggested implementation would create an initial list of tables in an interleaved fashion (see details) and then work off that list with pre-set concurrency.

Details

We have observed problems where we're issuing too many extracts and the master node which needs to handle the many concurrent Sqoops runs out of memory. The number of concurrent Sqoops cannot be limited and may go as high as the number of sources defined the ETL configuration. So the more sources are defined, the higher the probability of a failure on the master node (and also the higher the pressure on the resource allocation against the containers).

To avoid this situation and to be able to have a deterministically limited number of concurrent Sqoop runs, we should switch to an implementation that orients itself along a list of tables, not a list of sources.

In order to preserve the current advantage of having only one extract running against any upstream source at a time, we can start with an interleaved list of tables and move on to some locking mechanisms if ever needed.

Currently

  • Thread 1: source1.table1, source1.table2, source1.table3
  • Thread 2: source2.table1, source2.table2
  • Thread3: source3.table1, source3.table2, source3.table3, source3.table4

Proposed

  • Initial list: source1.table1, source2.table1, source3.table1, source1.table2, source2.table2, source3.table2, source1.table3, source3.table3, source3.table4
    Then:
  • Thread 1: source1.table1, ....
  • Thread 2: source2.table1, ....
  • Thread 3: source3.table1, ...

Exact execution order in the threads then depends on how long the extracts take.

An additional benefit of switching to processing a list is that we can address failed extracts by re-queueing the table.

Remove files after partial unload

Summary

Problem: Unload in Redshift happens distributed over the cluster. If unload fails on one node, the others will continue to write and leave files behind. These extraneous files will then block re-running the unload.
Solution: If there is an error during unload and the manifest file is missing, then remove all csv files.

Details

Maybe our --force option to unload is enough? This is likely to add around incident recovery.

Run "shellcheck" against our shell scripts.

Pretty much just that: https://github.com/koalaman/shellcheck

For example:

% shellcheck bin/build_arthur.sh

In bin/build_arthur.sh line 10:
Usage: `basename $0` [-t image_tag]
       ^-----------^ SC2006: Use $(...) notation instead of legacy backticked `...`.
                 ^-- SC2086: Double quote to prevent globbing and word splitting.

Did you mean: 
Usage: $(basename "$0") [-t image_tag]


In bin/build_arthur.sh line 17:
    exit ${1-0}
         ^----^ SC2086: Double quote to prevent globbing and word splitting.

Did you mean: 
    exit "${1-0}"

For more information:
  https://www.shellcheck.net/wiki/SC2086 -- Double quote to prevent globbing ...
  https://www.shellcheck.net/wiki/SC2006 -- Use $(...) notation instead of le...

Add CI job to check code formatting

Summary

Run CI job to check formatting (what would black do?) and order of imports (what would isort do?).

Details

Provide immediate feedback on PRs whether the code meets our coding style standards.

Move to parent parser to avoid separated command line arguments

Summary

Current:

arthur.py --submit $CLUSTER_ID extract --prolix

Desired:

arthur.py extract --submit $CLUSTER_ID --prolix

Brownie points when --submit automatically turns on "prolix" mode and (maybe?) sets the "prefix".

Details

This is most apparent when submitting jobs to the cluster where "help" may not do what a user might reasonably expect:

arthur.py --submit $CLUSTER_ID extract --help

Now you have to find the help in the output of the command on the cluster.

This would be clear (and print the help message for extract in terminal):

arthur.py extract --submit $CLUSTER_ID --help

Having the --submit and --config args after the verb would also make command line expansion easier.

It gets trickier when considering "dry-run":

arthur.py  extract --submit $CLUSTER_ID --dry-run

For now, this should indeed submit "extract --dry-run". Consider the workflow:

arthur.py  upgrade --from dw.user --dry-run
arthur.py  upgrade --from --submit $CLUSTER_ID

(Which is almost great .. but might need --submit to imply --prolix and --prefix {whatever prefix is configured})

Sync should check unload targets

Summary

Arthur should check whether unload targets are valid during the sync operation so that we don't get surprised by failures when the ETL runs unload.

Details

  • During the sync from local files to S3, check whether the unload targets mentioned in any table design are actually known targets per their schema description.
  • Should fail if schemas are not setup but mentioned as unload targets.
  • Currently we only validate tables against other tables but not the configuration.

Design validation allows extract-required column to be skipped

Summary

Table design allows "split by" column to be skipped, but this leads Sqoop extractor to fail.

Details

When trying to find the max/min IDs for splitting, Sqoop tries to run:

SELECT MIN({split_col}), MAX({split_col}) FROM (SELECT {non-skipped columns} FROM {source} WHERE TRUE AND  (1 = 1) ) AS t1

When the split_col is skipped, it isn't in the subquery, so this fails.

Split object store and data lake consistently

We should move to where:

  • "object store" is a bucket in S3 where all the code (schemas, jars, ...) and configuration (config) lives
  • "data lake" is a bucket in S3 where all (transient) data lives
    • data which is exported from upstream databases
    • data that is landed by other systems and is pulled in through Redshift Spectrum

Extract parallelism might exceed capacity and fail

Extracting with Sqoop, code must be generated and compiled each time an extract_table attempt is made.

With a thread per database source calling Sqoop, you may not have enough memory on the master node to do all this codegen/compiling. The Sqoop calls will fail, though they would work if re-tried with lower parallelism.

bootstrap_transformations on queries with external table used multiple times

Describe the bug
When the query involves an external table twice, arthur.py bootstrap_transformations fails because it creates the dependency twice in the produced design file, which fail table_design.schema validation.

To Reproduce
Steps to reproduce the behavior:

  1. Go to https://github.com/harrystech/harrys-data-warehouse/pull/1795
  2. Checkout the branch
  3. Run arthur.py bootstrap_transformations -u CTAS base__retail.daily_retailer_sales__v1
  4. See error:
jsonschema.exceptions.ValidationError: ['external__ascendio__exports.daily_retailer_sales__v1', 'external__ascendio__exports.daily_retailer_sales__v1'] has non-unique elements

Failed validating 'uniqueItems' in schema['properties']['depends_on']:
    {'description': 'List of all dependency tables of this transformation',
     'items': {'$ref': '#/definitions/table_name'},
     'minItems': 1,
     'type': 'array',
     'uniqueItems': True}

On instance['depends_on']:
    ['external__ascendio__exports.daily_retailer_sales__v1',
     'external__ascendio__exports.daily_retailer_sales__v1']

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/opt/src/arthur-redshift-etl/python/etl/design/load.py", line 108, in validate_table_design_syntax
    etl.config.validate_with_schema(table_design, "table_design.schema")
  File "/opt/src/arthur-redshift-etl/python/etl/config/__init__.py", line 288, in validate_with_schema
    raise SchemaValidationError("failed to validate against '%s'" % schema_name) from exc
etl.errors.SchemaValidationError: failed to validate against 'table_design.schema'

Expected behavior
The script should infer a single dependence on an external table.

Getting-started guides?

Summary

I'm trying to set up a fresh project and wonder if there are any templates for the 'sibling' repo. (I have the fortunate position of vaguely remembering how this should work, and still I'm stuck!)

By banging my head against the validator, I eventually came up with a dummy warehouse config (uselessly passes the validator):

{
  "arthur_settings": {},
  "data_warehouse": {},
  "type_maps": {},
  "object_store": {
    "s3": {
      "bucket_name": "load-bucket",
      "iam_role": "arn:aws:iam::123:role/NotARole"
    }
  },
  "resources": {
    "key_name": "my-fake-ssh-key",
    "VPC": {
      "region": "us-east-1",
      "account": "123",
      "name": "MyVPC",
      "public_subnet": "PublicSubnet",
      "whitelist_security_group": "sg-123"
    },
    "DataPipeline": {
      "role": "NotARole"
    },
    "EC2": {
      "instance_type": "m5.4xlarge",
      "image_id": "",
      "public_security_group": "foobar",
      "iam_instance_profile": "instanceprofile"
    },
    "EMR": {
      "master": {
        "instance_type": "m5.4xlarge",
        "managed_security_group": "foobar"
      },
      "core": {
        "instance_type": "m5.4xlarge",
        "managed_security_group": "foobar"
      },
      "release_label": "emr-5.29.0"
    }
  },
  "etl_events": {}
}

Now I need to set up my prefix, with e.g. bootstrapping scripts as well as sync output. I guess this is upload_env.sh?

Anyway, if I'm missing existing assets, I'd love to use them -- and if not, it would be good to know, so I can write down what I do!

Details

At the moment I'm just trying to use extract.

Labels Please set the label on the issue so that

  • you pick bug fix, feature, or enhancement
  • you pick one of the components of Arthur, such as component: extract or component: load

I don't think I have 'edit' rights on the labels

Consider configuration file during tasks other than design

Summary

The values in harrys.yaml (which tables are included or excluded) are not considered in the dump or load stage where only the files in S3 matter.

Details

This tends to be confusing. Keep a record of what's included / excluded in the setup and respect those choices in other steps.

Can't identify final extract failures in monitor payloads

In terms of output what is recorded by the monitor, failures that will be retried look like final failures.

This poses a problem for concurrent load — load thinks the first failure it sees is final. Since the extract job might have different settings than load, you can't know based on the attempt_num if this is the final one.

Use parameter store instead of or in addition to credentials files

Summary

Instead of copying credentials files around and updating for everybody when passwords change, create passwords (to access upstream databases or Redshift clusters) in the AWS Systems Manager Parameter Store.

Details

Initial steps

An initial implementation could just allow a special syntax in the DSN to trigger a lookup in the parameter store.

Example:

WEBAPP_DSN="ssm://etl/dsn/webapp_dsn"

Lookup with overrides

Suggested parameter organization:

prefix: DW-ETL/
hierarchical search:
general: <dw prefix>/<env_type>/<user prefix>/<name>

e.g. looking for connection to "WEBSERVER_REPLICA" in environment development when running validation:
    look for DW-ETL/dev/development/validation/WEBSERVER_REPLICA
             DW-ETL/dev/development/WEBSERVER_REPLICA
             DW-ETL/dev/WEBSERVER_REPLICA

When do overrides occur?

  • pick different base stack
  • pick different environment (development vs. tom vs. tom/validation)

Tie requirements.txt file to the Arthur version

Describe the bug

The upload script that deploys Arthur copies the latest version of the code into S3 while keeping a version number, so for example: dist/redshift_etl-1.56.0.tar.gz. But the file with the requirements for that version is always copied as requirements.txt. That means that an accidental upload of the requirements file or a downgrade of the version leads to the "wrong" requirements file being active.

Expected behavior
The package requirements should be part of the tar ball or a wheel so that we can run something like:

python3 -m pip install arthur-etl

without relying on an un-versioned requirements.txt file.

Additional context

See also: #574
to bring in a process around requirements.

Upate user command fails

Describe the bug
Running update_user fails with:

2022-03-07 21:44:54 - CRITICAL - Something terrible happened:
Traceback (most recent call last):
  File "/opt/src/arthur-redshift-etl/python/etl/commands.py", line 64, in execute_or_bail
    yield
  File "/opt/src/arthur-redshift-etl/python/etl/commands.py", line 175, in run_arg_as_command
    args.func(args)
  File "/opt/src/arthur-redshift-etl/python/etl/commands.py", line 764, in callback
    args.username,
AttributeError: 'Namespace' object has no attribute 'username'

To Reproduce
Run: arthur.py update_user --group etl_ro tom
(You can pick any user name.)

Expected behavior
The user should be added to the given group.

Cleanup SNS topics

Summary

We multiplex success and failure notifications on the same SNS topic. We would gain more flexibility by keeping success and failure separate SNS topics. This should be true for ETLs and Validation pipeline.s

Details

Here's a list of topics that receive messages based on the outcome of a pipeline run.

What happens Current Topic Future Topic
ETL starts -- start
ETL finishes successfully status success
ETL fails to finish status, page failure
Validation is successful -- validation-success
Validation is unsuccessful validation-page validation-failure

Add default encodings

Summary

In most cases, we should be able to pick a reasonable default for an encoding. Setting an encoding prevents us from having to have Redshift cluster figure it out (COMPUPDATE ON) or override our choices (COMPUPDATE PRESET) which saves time during load and reduces storage.

Details

  • Use raw for columns in distribution, sort, and foreign keys
  • Use raw for boolean, real, double-precision
  • Use bytedict for statuses and and anything enum-based upstream
  • Use az64 for int, date, timestamps
  • Else use zstd

Use pip-compile

Per Benson's suggestion, use pip-compile and keep requirements in requirements.in with full set of dependencies and with their pinned versions in requirements.txt.

Reduce noise from S3

Summary

The beginning of the log is dominated by the log lines for downloading and validating the table design files. This makes it difficult to see what's really going on or find where the interesting part starts.

Details

The beginning of the log shows many lines like this:

2017-12-22 14:09:11,980 2EFB197E672940A6 DEBUG etl.s3 (...) [s3.py:178] Downloading 's3://...'
2017-12-22 14:09:12,231 2EFB197E672940A6 DEBUG etl.s3 (...) [s3.py:184] Received response from S3: last modified: 2017-12-22 17:53:52+00:00, content length: 538, content type: binary/octet-stream

These log lines don't seem to really add anything (once past debugging of any S3 issues).

A great option would be to make them optional and only log them when requested by a command line switch. (This would start bringing the notion of a command line switch to select "debug" verbosity on specific components only.)

No warning when upgrade drops dependent view

Summary

When running an upgrade, we drop the relation with the CASCADE option which means that any VIEWs immediately depending on the relation also get dropped.
There is no warning to that effect. If upgrade could warn about the additional views being dropped, users could easily add them to the upgrade.

bootstrap_transformations does not update column type even a cast is present

Describe the bug
When running arthur.py bootstrap_transformations -u CTAS ... to update a table design, Arthur does not pickup the cast in the .sql file and instead considers it absent, and give this warning:

 WARNING - Keeping previous definition for column 'xxxx.yyyyy.zz': 'character varying(1000)' (please add a cast)

To Reproduce
Steps to reproduce the behavior:

  1. Pick a table in the warehouse with a character varying column.
  2. Add an explicit cast in the final projection of the query to make sure it is accounted for by Arthur.
  3. Modify the length of the column in the table design to be different than the length in cast in the query.
  4. Run arthur.py bootstrap_transformations -u CTAS ... on the table.
  5. This should give you the warning suggesting to add a cast, and would not modify the length of the column to the one in the query.

Expected behavior
After runningarthur.py bootstrap_transformations -u CTAS ..., and if a type is explicitely specified in the final projection with a cast, the type of a column in the table design should be the same as the cast.

Labels
bug

Include foreign references when validating dependencies

Summary

When we validate the depends_on field in table design files, we should consider that some columns have an explicit foreign reference. These references should be allowed into the depends_on field. Currently, the dependencies are coming only from the transformation's SQL.

Details

A/C

  • Foreign key references (on columns) are validated (does the column actually exist?)
  • Foreign tables are added to depends_on

Manage users access and group membership using configuration

Is your feature request related to a problem?

This proposal addresses multiple issues commonly faced in a data warehouse by extending the current approach in Arthur:

  • Needing a "system" user, which a BI tool uses
  • Adding users from the Data or Analytics team with wider permissions
  • Grouping users so that permissions can be based on groups
  • Adding users and removing users from groups, thus changing their permissions
  • Ensuring that users lose permissions (and access) when leaving the team.

(The first already exists. The rest is limited or doesn't yet exist.)

Instead of taking a manual approach of:

  • Using a SQL client like psql to run commands (which wouldn't be recorded)
  • Using arthur.py update_user to add a user to a group

this suggests to instead create a configuration section

  • with users
  • and their groups

So that users, groups and memberships can be configured and automatically managed.
This means that we have one place that describes who has access to which schemas (via groups). And we know that that's reflected in the state of the database as well. We make onboarding and offboarding faster and easier, thus ensuring that we're using the right tools consistently and timely.

How this plays out:

  • If a new group is mentioned (as user's group, as a reader or as a writer group), that group is created.
  • If a new user is mentioned, that user is created.
  • The user's groups in the database are compared with the groups in the configuration file.
    • The user is removed from any group not listed in its configuration.
    • The user is added to any group, where they are not already a member.
  • A user who is no longer listed in the configuration loses access to the database. (Removing the user would require dropping any objects so managing owned objects is a follow-up feature.)

Describe the solution you'd like

Here's an example:

Example Step 1:

{
    "data_warehouse": {
        "users": [
            {
                "name": "default",
                "group": "analyst"
            },
            {
                "name": "alice",
                "groups": ["analyst", "etl_ro"]
            }
        ]
    }
}
  • This creates groups analyst and etl_ro.
  • This creates user alice.
  • This adds alice to analyst and etl_ro.

Example Step 2:

{
    "data_warehouse": {
        "users": [
            {
                "name": "default",
                "group": "analyst"
            },
            {
                "name": "alice",
                "groups": ["etl_ro"]
            },
            {
                "name": "bob",
                "groups": ["analyst"]
            }
        ]
    }
}
  • This creates the new user bob.
  • This removes alice from the analyst group.
  • This adds bob to the analyst group.

Example Step 3:

{
    "data_warehouse": {
        "users": [
            {
                "name": "default",
                "group": "analyst"
            },
            {
                "name": "bob",
                "groups": ["etl_ro", "daas_ro"]
            },
            {
                "name": "carol",
                "groups": ["analyst", "daas_ro"]
            }
        ]
    }
}
  • This removes the user alice (in this step, just by using PASSWORD DISABLE)
  • This adds the user `carol.
  • This adds the group daas_ro.
  • This removes bob from analyst and adds him to etl_ro and daas_ro.
  • This adds carol to analyst and daas_ro.

Describe alternatives you've considered

  • Still looking for a tool that would do that for us.
  • Regarding database access, we should be leveraging IAM.

Also, out of scope of the configuration: managing a user's credentials by updating their passwords. (For now, continue to use arthur.py update_user.

Additional context

We have sometimes schemas that aren't managed by Arthur or external. For those schemas, we have to decide whether we want to have Arthur manage permissions as well.

Arthur should complain if it finds data files for tables where schema does not match design file

Scenario: table in an upstream table moves schemas from schema1.table to schema2.table. Suppose the relation was previously extracted when it lived in schema1. You've re-designed the file after it moved to schema2, synced the new design file, removed the old design file, but NOT removed the old data files. (You really should have used sync -f, in which case you would not be in this situation).

Arthur ought to complain when building file sets if it comes across a data file whose inferred schema name does not match that in the corresponding design file. The current behavior is to set the file set's schema to the old one from the data file, which it finds first, and then any subsequent command fails because it's looking for the table in the old schema upstream.

Fix caller information in logging of run function

Summary

When logging in run of db.py, the caller is prefixed to the message. It should be part of the log module information instead.

Details

Minor thing that I once thought about and didn't finish when pulling in calls to run.

Currently:

2017-12-22 14:09:51,432 29F95BAFCB274650 INFO etl.db (MainThread) [db.py:238] (set_redshift_wlm_slots) Using 4 WLM queue slot(s) for transformations

Chould be:

2017-12-22 14:09:51,432 29F95BAFCB274650 INFO etl.load.set_redshift_wlm_slots (MainThread) [db.py:238] Using 4 WLM queue slot(s) for transformations

We should check how easy this is to find in logs or what it would improve.

Avoid Accumulo warnings in log file

Summary

Adjust EMR settings to make Accumulo warnings go away

Details

Warning: /usr/lib/sqoop/../accumulo does not exist! Accumulo imports will fail.
Please set $ACCUMULO_HOME to the root of your Accumulo installation.
2020-08-22 00:36:37,696 4C346DD468974601 DEBUG etl.extract.sqoop (Thread-178) [sqoop.py:252] Sqoop stderr:
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/hive/lib/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Note: /tmp/sqoop-hadoop/compile/f637797e7f49154dcb76e389d1724d1d/QueryResult.java uses or overrides a deprecated API.
Note: Recompile with -Xlint:deprecation for details.

Refactor pipelines to be more modular and still easy to review

This is triggered by PR #15 . The rebuild, refresh and pizza loader pipelines are very similar.
(1) Can we merge the rebuild and refresh pipeline definitions? This would add the feature to the rebuild of being able to specify some specific schema to rebuild. This appears desirable for development work. Now that we have arthur in the mix for evaluating variables, the merge might be possible without much loss of readability.
(2) Should we add a feature to the rendering code in Arthur that allows (at least one level) of expansion where one template may refer to another. This way, we could break up the pipeline definitions into smaller fragments and then pull in to the actual pipeline definition the fragments that are needed.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.