Giter VIP home page Giter VIP logo

hedera-etl's Introduction

Hedera ETL

Build Status Discord

Hedera ETL populates BigQuery dataset with transactions and records generated by the Hedera Mainnet (or Testnet, if so configured).

  • Extract: Stream of transactions (and records) are ingested from a GCP PubSub topic
  • Transform: Filters for important fields, formats data types, etc
  • Load: Streaming insert into BigQuery dataset

Overview

Ingestion

  • PubSub topic contains JSON serialized hedera transactions published by Hedera Mirror Node. More details can be found here.

  • Apache Beam pipeline pulls transactions from PubSub and inserts them into BigQuery. GCP Dataflow is used as runner for the pipeline.

  • Deduplication: The above ingestion pipeline gives at-least-once guarantee for persisting transaction into BigQuery. Duplicates, if inserted, are removed using a deduplication task.

Setup

BigQuery

Schema for BigQuery table to store Hedera transactions is in transactions-schema.json file. Please refer corresponding fields' documentation in hedera-protobuf for more info about columns.

Creating tables

bq CLI is needed to create the tables. /create-tables.sh can be used to create all the tables together. Alternatively, tables can be created individually using the commands below.

Transactions table
bq mk \
  --table \
  --description "Hedera network transactions" \
  --time_partitioning_field consensusTimestampTruncated \
  --time_partitioning_type DAY \
  --clustering_fields transactionType \
  project_id:dataset.transactions \
  hedera-etl-bigquery/src/main/resources/transactions-schema.json
Errors table

If an error is encountered when inserting a transaction into BigQuery, then the insert is retried. However, errors for which retry would not help (for example, table row violating the schema), are not tried again and instead logged into errors table.

bq mk \
  --table \
  --description "Hedera ETL Errors" \
  project_id:dataset.errors \
  hedera-etl-bigquery/src/main/resources/errors-schema.json
Deduplication state table

Deduplication task's state is stored in BigQuery table for persistence. That's because the task already relies on BigQuery to be available, and adding dependency on a persistent volume or another database would be not as good.

bq mk \
  --table \
  --description "BigQuery deduplication task state" \
  --description "Hedera Dedupe " \
  project_id:dataset.dedupe_state \
  hedera-etl-bigquery/src/main/resources/state-schema.json

ETL to BigQuery

Requirements
  1. BigQuery tables for transactions and errors should exist
  2. PubSub topic should exist

For requirements to deploy on GCP Dataflow, refer deployment.

Common parameters

Configure GCP project id, PubSub subscription/topic, and BigQuery tables.

PROJECT_ID=... # Set your project id
SUBSCRIPTION=projects/${PROJECT_ID}/subscriptions/subscriptionName
TRANSACTIONS_TABLE=${PROJECT_ID}:dataset.transactions
ERRORS_TABLE=${PROJECT_ID}:dataset.errors

Running locally

cd hedera-etl-bigquery

mvn compile exec:java -PdirectRunner -Dexec.args=" \
  --inputSubscription=${SUBSCRIPTION}, \
  --outputTransactionsTable=${TRANSACTIONS_TABLE}, \
  --outputErrorsTable=${ERRORS_TABLE}"

Running on GCP Dataflow

  1. Setup GCS bucket which is used for staging, templates, and temp location.
BUCKET_NAME=... # Set your bucket name
PIPELINE_FOLDER=gs://${BUCKET_NAME}/etl-bigquery
  1. Build and upload template to GCS bucket
cd hedera-etl-bigquery

mvn compile exec:java \
 -Dexec.args=" \
 --project=${PROJECT_ID} \
 --stagingLocation=${PIPELINE_FOLDER}/staging \
 --tempLocation=${PIPELINE_FOLDER}/temp \
 --templateLocation=${PIPELINE_FOLDER}/template \
 --runner=DataflowRunner"
  1. Start Dataflow job using the template
gcloud dataflow jobs run etl-bigquery-`date +"%Y%m%d-%H%M%S%z"` \
 --gcs-location=${PIPELINE_FOLDER}/template \
 --parameters "inputSubscription=${SUBSCRIPTION},outputTransactionsTable=${TRANSACTIONS_TABLE},outputErrorsTable=${ERRORS_TABLE}"

Controller service account can be configured by adding --service-account-email=my-service-account-name@<project-id>.iam.gserviceaccount.com. See Controller service account for more details.

Deduplication

Deduplication task trails the transactions table to ensure that two rows never have same consensusTimestamp. Due to at-least once guarantee of PubSub and Hedera Mirror Node (publishing to pubsub), it's possible that in rare cases, single transaction gets inserted more than once. Deduplication task removes these duplicates to ensure exactly-once guarantee. See class comments on DedupeRunner for more details.

Development

Build: mvn clean compile -DskipTests

Running tests: mvn test

Running deduplication tests

Due to lack of an emulator for BigQuery, deduplication tests requires GCP BigQuery. To run the tests, you'll need a GCP project with BigQuery API enabled and json key of a service account with BigQuery Editor role.

Setup application.yml as follows:

hedera:
  dedupe:
    projectId: projectName
    credentialsLocation: file:/path/to/key.json
    transactionsSchemaLocation: /path/to/hedera-etl/hedera-etl-bigquery/src/main/resources/transactions-schema.json
    stateSchemaLocation: /path/to/hedera-etl/hedera-etl-bigquery/src/main/resources/state-schema.json

Use following command to run deduplication tests

mvn test -PgcpBigquery -Dspring.config.additional-location=file:/path/to/dir/with/yaml/file/

Note that it assumes current directory to be project's root. If that is not the case, change the schema location values appropriately.

More documentation

Deployment
Configurations

Code of Conduct

This project is governed by the Contributor Covenant Code of Conduct. By participating, you are expected to uphold this code of conduct. Please report unacceptable behavior to [email protected].

License

Apache License 2.0

hedera-etl's People

Contributors

apeksharma avatar dependabot[bot] avatar edwin-greene avatar medvedev1088 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar

hedera-etl's Issues

Add table for transactionType

Add table containing mapping from transactionType id to transactionType name.

mirror_node=# select * from t_transaction_types ;
 proto_id |          name
----------+------------------------
       23 | FREEZE
       14 | CRYPTOTRANSFER
       15 | CRYPTOUPDATEACCOUNT
       12 | CRYPTODELETE
        7 | CONTRACTCALL
        8 | CONTRACTCREATEINSTANCE
        9 | CONTRACTUPDATEINSTANCE
       17 | FILECREATE
       16 | FILEAPPEND
       19 | FILEUPDATE
       18 | FILEDELETE
       11 | CRYPTOCREATEACCOUNT
       20 | SYSTEMDELETE
       21 | SYSTEMUNDELETE
       22 | CONTRACTDELETEINSTANCE
       24 | CONSENSUSCREATETOPIC
       25 | CONSENSUSUPDATETOPIC
       26 | CONSENSUSDELETETOPIC
       27 | CONSENSUSSUBMITMESSAGE
       10 | CRYPTOADDLIVEHASH
       13 | CRYPTODELETELIVEHASH

Debug the bigQueryInsertErrors

In the test run, 1 row insertion to BigQuery failed (of 24M rows so far).
Error:

{"errors":[{"debugInfo":"","location":"transaction.body.contractcreateinstance.memo","message":"Failed to decode bytes input. Byte fields must be base64 encoded.","reason":"invalid"}],"index":0}

Transaction

{
  "consensusTimestamp": 1568833603515197000,
  "entity": {
    "shardNum": 0,
    "realmNum": 0,
    "entityNum": 19416,
    "type": 2
  },
  "transactionType": 8,
  "transaction": {
    "body": {
      "transactionID": {
        "transactionValidStart": {
          "seconds": "1568833591",
          "nanos": 876370500
        },
        "accountID": {
          "shardNum": "0",
          "realmNum": "0",
          "accountNum": "13477"
        }
      },
      "nodeAccountID": {
        "shardNum": "0",
        "realmNum": "0",
        "accountNum": "3"
      },
      "transactionFee": "510139189",
      "transactionValidDuration": {
        "seconds": "30"
      },
      "generateRecord": true,
      "memo": "Test Contract tx-2",
      "contractCreateInstance": {
        "fileID": {
          "shardNum": "0",
          "realmNum": "0",
          "fileNum": "19415"
        },
        "gas": "250000",
        "initialBalance": "0",
        "proxyAccountID": {
          "shardNum": "0",
          "realmNum": "0",
          "accountNum": "0"
        },
        "autoRenewPeriod": {
          "seconds": "7890000"
        },
        "constructorParameters": "",
        "memo": "Test Contract-2"
      }
    },
    "sigMap": {
      "sigPair": [
        {
          "pubKeyPrefix": "",
          "ed25519": "e1hx4GTuf05gBBYn/kLOL/iQPSdNk189i9lDln9WwiYanbDVHDw0XjTkvbEWtLOeKy0uOvC/GtvOK5qfQmrRBQ=="
        }
      ]
    }
  },
  "transactionRecord": {
    "receipt": {
      "status": "SUCCESS",
      "contractID": {
        "shardNum": "0",
        "realmNum": "0",
        "contractNum": "19416"
      },
      "exchangeRate": {
        "currentRate": {
          "hbarEquiv": 30000,
          "centEquiv": 320235,
          "expirationTime": {
            "seconds": "1568768400"
          }
        },
        "nextRate": {
          "hbarEquiv": 30000,
          "centEquiv": 298800,
          "expirationTime": {
            "seconds": "1568772000"
          }
        }
      },
      "topicSequenceNumber": "0",
      "topicRunningHash": "",
      "topicRunningHashVersion": "0"
    },
    "transactionHash": "HK59cRMNMURRaXuPM6Niw3iMeOJEUGwopQHImsidNMS3HnlLgIsAls3SQREG82f2",
    "consensusTimestamp": {
      "seconds": "1568833603",
      "nanos": 515197000
    },
    "transactionID": {
      "transactionValidStart": {
        "seconds": "1568833591",
        "nanos": 876370500
      },
      "accountID": {
        "shardNum": "0",
        "realmNum": "0",
        "accountNum": "13477"
      }
    },
    "memo": "Test Contract tx-2",
    "transactionFee": "493596009",
    "contractCreateResult": {
      "contractID": {
        "shardNum": "0",
        "realmNum": "0",
        "contractNum": "19416"
      },
      "contractCallResult": "",
      "errorMessage": "",
      "bloom": "",
      "gasUsed": "96",
      "logInfo": [
        
      ],
      "createdContractIDs": [
        
      ]
    },
    "transferList": {
      "accountAmounts": [
        {
          "accountID": {
            "shardNum": "0",
            "realmNum": "0",
            "accountNum": "13477"
          },
          "amount": "-486655787"
        },
        {
          "accountID": {
            "shardNum": "0",
            "realmNum": "0",
            "accountNum": "3"
          },
          "amount": "10320881"
        },
        {
          "accountID": {
            "shardNum": "0",
            "realmNum": "0",
            "accountNum": "98"
          },
          "amount": "476334906"
        },
        {
          "accountID": {
            "shardNum": "0",
            "realmNum": "0",
            "accountNum": "13477"
          },
          "amount": "-6940222"
        },
        {
          "accountID": {
            "shardNum": "0",
            "realmNum": "0",
            "accountNum": "98"
          },
          "amount": "6940222"
        },
        {
          "accountID": {
            "shardNum": "0",
            "realmNum": "0",
            "accountNum": "13477"
          },
          "amount": "-75648"
        },
        {
          "accountID": {
            "shardNum": "0",
            "realmNum": "0",
            "accountNum": "98"
          },
          "amount": "75648"
        }
      ]
    }
  },
  "nonFeeTransfers": [
    {
      "accountID": {
        "shardNum": "0",
        "realmNum": "0",
        "accountNum": "13477"
      },
      "amount": "0"
    },
    {
      "accountID": {
        "shardNum": "0",
        "realmNum": "0",
        "accountNum": "19416"
      },
      "amount": "0"
    }
  ],
  "consensusTimestampTruncated": "2019-09-18T19:06:43.515197Z"
}

Fix deduplication task

@medvedev1088 mentioned here about limitations of UPDATE query.

Deduplication task as in won't work. It'll fail on step 2 where it tries to update rows with consensusTimestamp > lastValidTimestamp since the update job will span over streaming buffer.

Need to fix this.

Add Full deduplication

It takes about an hour or more for a newly inserted row to make it from streaming buffer to regular partitions (accessible by DML operations). Our deduplication logic assumes that:
Intermittent rows won't be delayed by an hour. If deduplication logic has checked data up to point X where X = (now - 90min), then no new row with consensusTimestamp < X would appear. This is fair assumption since in a normally functioning system, delays due to PubSub, or transient errors in Dataflow, or transient errors in BigQuery streaming inserts won't be more than few minutes.
Above assumption also incorporates in itself the fact that no duplicates should arrive as late as an hour.

However, the assumption breaks down when the system is not functioning normally. To name some scenarios:

  • If importer (publishing to pubsub) crashes. It can publish duplicates when it comes back up much later
  • If a bug in dataflow pipeline causes a crash. It'll likely take more than an hour to fix it and restart the pipeline.

For such cases, we want to run a "watcher" job every 24 hours to check for duplicates in data older than 1 day and alert if any are found (so that they can be fixed by manual run of deduplication, or state reset).

Add packaging

Build packages for etl-pipeline and deduplication.
Needed for standalone deployments.

Token Transfer Not Included

In the transfer list, there are only HBar related transactions, we are not able to find details for Fungible token and NFT transfer detail in the transactions table

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.