Giter VIP home page Giter VIP logo

lakesoul-io / lakesoul Goto Github PK

View Code? Open in Web Editor NEW
2.3K 246.0 420.0 34.39 MB

LakeSoul is an end-to-end, realtime and cloud native Lakehouse framework with fast data ingestion, concurrent update and incremental data analytics on cloud storages for both BI and AI applications.

Home Page: https://lakesoul-io.github.io/

License: Apache License 2.0

Python 3.03% Scala 33.53% Java 40.91% Shell 0.48% Rust 18.60% Dockerfile 0.07% PLpgSQL 0.19% JavaScript 0.39% CSS 0.17% MDX 1.60% CMake 0.20% C++ 0.55% Cython 0.11% ANTLR 0.13% TypeScript 0.05% Just 0.01%
lakesoul datalake lakehouse spark flink streaming big-data postgresql rust sql

lakesoul's People

Contributors

asakiny avatar bakey avatar ceng23333 avatar chenyunhey avatar clouddea avatar codingfun2022 avatar dependabot[bot] avatar dmetasoul-opensource avatar dmetasoul01 avatar f-phantam avatar hades-888 avatar intelligencegear avatar javyxu avatar lypnaruto avatar mag1c1an1 avatar moresun avatar xuchen-plus avatar yangzh-v2 avatar yuchanghui avatar zhaishuangszszs avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

lakesoul's Issues

Support database (namespace)

For now under lakesoul catalog, there's only one default database. We should support multiple databases in catalog.

[Flink] Optimize Serde performance

Currently in Flink CDC, we use JSON to (de)serialize SourceRecord from Debezium, which has several issues:

  1. The JSON serde is slow, especially for large schemas with many columns
  2. Each SourceRecord is serde several times, i.e. in source record split, RowData convert, hash partition compute, etc
  3. For Update operation in CDC source, we have to write both before and after records, because of the partition fields may have changed.

A standard benchmark test data for performance comparison

Lakesoul has a high performance on upsert/meta-data management/concurrence writing compared with other products such as iceberg/detalake/hudi. Could we provide a standard benchmark data for testing? I think it will be helpful in spreading the influence of Lakesoul.

Upsert for different fields with primary key in a df

A table with four columns (hash string ,range string,op string,value string);hash is primary key, range is partition key

Lakesoul will support upsert dataframe for differnet fileds with primary key in a batch from one kafka topic

{"columnList":{"hash":"6000","value":"6000","range":"6000"},"op":"0"}
{"columnList":{"hash":"6000","range":"6000"},"op":"1"}
{"columnList":{"hash":"6000",value,"hello":"6000","value":"6000"}}

[EPIC][WIP] Native IO Layer in Rust Tracking Issue

Motivations

The IO layer acts as a critical part for a table storage framework. However, current IO implementation suffers some drawbacks:

  1. The IO stack tightly coupled with Spark, making it difficult to adapt to other computing frameworks including both SQL engines and AI engines (Flink, Presto, Pandas, Dask, PyTorch, etc.).
  2. The file IO still relies on Hadoop FileSystem, which is inefficient on high-latency storages such as S3.
  3. The lack of expression evaluation capability makes it difficult to implement a compute engine independent MergeInto SQL with merge on read.

Goals

  1. Compute engine neutral. Native IO layer implements self-contained IO logics such as merge on read and provides interfaces for compute engines to invoke.
  2. Native. The compute engines are not just in Java world. We would also like to support popular Python data processing frameworks like Pandas/Dask, and AI frameworks such as PyTorch with C++ at its core.
  3. Fast. IO on object stores usually have high latency and lack of file semantics and is a drag on overall execution pipeline. Things got worse in cases when there are multiple files to merge. We would like the IO layer to enable concurrency and asynchronism on read and write paths.
  4. Feature-rich. Native IO layer should support commonly required data source features such as predicate push down, index filtering, atomic write, etc. We also would like the reader to support MergeInto SQL within the IO layer so that the merge logics are transparent for the compute engines.
  5. Easy to use(embed). Native IO layer itself is a library and expose its functionality via C interfaces with Java/Python wrappers. It should be easy to be embedded into compute engines.

NonGoals

  1. Native IO layer is NOT a distributed execution framework. The native IO layer is a library inside a single execution unit. It itself is not aware of any distributed execution context. It is up to the higher level engines on whether to and how to read/write data in parallel (e.g. partitions in Spark, splits in Presto).
  2. Native IO layer is NOT a SQL engine. Native IO layer is not designed to be a SQL execution engine. Though the IO layer would have some expression evaluation capability, it primarily aims to provide table data read and write on data lakes. It acts as a data source to compute engines and should be used together with LakeSoul's meta layer.

Design

We use Arrow (arrow-rs) + DataFusion to implement the new IO layer with the following reasons:

  • Asynchronous IO with Parquet read and write, and the pipeline is executed asynchronously, which is in line with our design goals;
  • DataFusion brings a relatively complete implementation of physical plans and expressions, and can easily support MergeInto SQL with merge on read;
  • It is efficient and memory secure in rust with compiled native vectorized execution and is easy to provide bindings in other languages.

According to this design idea, the overall modules and execution logic are divided as follows:
image
The above diagram shows the logical hierarchy of the native IO layer. It has (roughly) the following parts from bottom up:

  • IO with datafusion's object store with async reader/writer traits.

  • File format based on object store with async.

  • Merge on read execution plan. The execution plan is combined with datafusion's builtin hash join or sort merge join and customized projection/filter plans to support MergeInto SQL. A typical plan with multiple files to merge would be in the following form:

    image

  • Reader interface in Rust and C. Provide a simple interface to iterate the merged arrow record batches asynchronously. The rust interface could be the following:

    pub struct LakeSoulReaderConfig {
      files: Vec<String>, 
      primary_keys: Vec<String>, // primary keys
      columns: Vec<String>, // column filters pushdown
      filters: Vec<Expr>, // predicate filters pushdown
      object_store_options: HashMap<String, String>, // object store options
    }
    
    pub struct LakeSoulReader {
      config: LakeSoulReaderConfig
    }
    
    impl LakeSoulReader {
      pub fn new(config: LakeSoulReaderConfig) -> Self {
      }
       
      pub fn read(&self) -> SendableRecordBatchStream {
      }
    }

    And we could also expose an extern "C" interface with callback to support async IO.

  • JNI/Python wrapper. In JNI wrapper, we could provide a native method to accept arrow schema and array pointers together with a callback object, like the following:

    public native void nextBatch(Consumer<boolean> callback, long schemaAddr, long arrayAddr);

    In which the native implementation (asynchronously) iterate the native stream and get the next available arrow record batch, populate the arrow c data structs by their pointers and call the callback with a boolean arg to indicate the end of stream. The expected usage pattern would like (in Scala):

    val p = Promise[Option[VectorSchemaRoot]]()
    
    tryWithResource(ArrowSchema.allocateNew(allocator)) { consumerSchema =>
      tryWithResource(ArrowArray.allocateNew(allocator)) { consumerArray =>
        val schemaPtr: Long = consumerSchema.memoryAddress
        val arrayPtr: Long = consumerArray.memoryAddress
        
        reader.nextBatch((hasNext) => {
          if (hasNext) {
            val root: VectorSchemaRoot = Data.importVectorSchemaRoot(allocator, consumerArray, consumerSchema, provider)
            p.success(Some(root))
          } else {
            p.success(None)
          }
        })
      }
    }
    
    val fut: Future[Option[VectorSchemaRoot]] = p.future
    // get recordbatch from future either sync or async
  • Compute engine adapters. For example in Spark, we could implement a vectorized reader based on the above interface, and implement the datasource v2 interfaces.

Plan

We plan to first implement a default overwrite merge logic reader tracked under this issue. Further support of MergeInto SQL would be in a separated tracking issue.

Development Branch

develop/native_io

Tasks

请问是否能提供完整的示例代码及相关Maven配置?

我尝试在内部网络里搭建一个LakeSoul的示例,我在Windows上搭建Hadoop、Spark、Java、Scala,但是在使用Scala运行提供的示例代码时异常,提示AWS证书问题(No AWS Credentials provided by SimpleAWSCredentialsProvider EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider),查询资料后,需要往AWS进行权限认证,但是这和我们离线环境下使用相悖,是否是我这边的尝试有问题?我一开始也是在CentOS7上进行环境搭建,也是遇到了一系列的问题,在国内的博客上也几乎没有相关环境的搭建教程,请问是否能够提供一个完整的项目示例搭建教程?我们想往国内开源项目转,但是资料真的很匮乏。

[Flink] Timestamp column misses min-max stats

Parquet writer currently doesn't produce min-max stats for int96 type, which is the hardcoded type for timestamp in flink-parquet.
This would cause some queries that have filters on timestamp column perform bad.

[EPIC] Catalog refactor

Overview

An overall refactor on catalog implementation, to address some known issues and pave the way of future development:
The catalog interaction code is now tightly coupled with spark. We need a separate 'clean' implementation independent with compute engines. This enables further and easier integration with query engines like Flink/Presto, etc.

Problems

  1. users may not have Cassandra deployment or is not familiar with Cassandra;
  2. Some complex multiple partition/multiple table transaction scenarios are hard to implement based on Cassandra's LWT. We would like to extend catalog to support more DBs, like PostgresSQL with its dialect.
  3. More complex merge semantics and concurrent control semantics require more metadata in the catalog, and we need to encapsulate the details within independent catalog implementation.

Proposal

Design Goals

Catalog metadata management refers to the management of metadata such as all tables, schemas, partitions, data directories or files in a data warehouse. Similar to Hive MetaStore, metadata management needs to be connected with the Catalog interface of the computing engine to implement SQL query resolving for the LakeSoul table.
LakeSoul's metadata management hopes to achieve the following goals:

  • High performance and scalable. There are some performance problems in metadata management such as Hive. For example, when Hive queries partition, it needs to perform Join in MySQL for the two tables PARTITONS and PARTITION_KEY_VALS, there will be performance problems with thousands of partitions of one table.
    LakeSoul's metadata read and write operations can be searched through the primary key index, preventing full table scans.
  • Atomicity. The metadata atomicity of LakeSoul provides the atomicity of data commit operations, that is, partitioned micro-batch writing, which can ensure that the batch is fully submitted and the reader will not see the data in the intermediate state.
  • Consistency. LakeSoul implements tunable consistency through Cassandra, and the default is eventual consistency.
  • Isolation. LakeSoul realizes the separation of read and write versions through a multi-version mechanism, and can realize version backtracking based on timestamps (ie time travel).
  • Multiple concurrent writes. LakeSoul supports multiple concurrent writes, and determines whether data conflicts occur by detecting write conditions. For conflict-free writes (Append/Upsert), concurrent updates are allowed. For conflicting writes (Update), the retry logic is performed by the compute framework.
  • Other extension points. Metadata can also provide table-level special semantic control (CDC, SCD), file-level statistics, data distribution (Bucketing, Ordering)

Meta Operations

1. Data Format Definition

1. Table information. Store table name, path, Schema, Properties, partition name and type (Range, Hash)

CREATE TABLE table_info (
    table_id text,
    table_name text,
    table_path text,
    table_schema text,
    properties json, # entity corresponds to JSONObject
    partitions text,
    PRIMARY KEY (table_id)
)

CREATE TABLE table_name_id (
    table_name text,
    table_id text,
    PRIMARY KEY (table_name)
)

CREATE TABLE table_path_id (
    table_path text,
    table_id text,
    PRIMARY KEY (table_path)
)

PostgreSQL: Documentation: 14: Chapter 8. Data Types
PostgreSQL: Documentation: 14: 8.15. Arrays

2. File Information. File information stores file names and file operations (file_op), such as add (add), delete (delete)

3. Commit. A commit corresponds to a set of file information and records the type of the current commit (commit_op). Commit and file information are stored in a table

CREATE TYPE data_file_op AS (
    path text,
    file_op text,
    size bigint,
    file_exist_cols text
);

CREATE TABLE data_commit_info (
    table_id text,
    partition_desc text,
    commit_id UUID, # can use timestamp
    file_ops data_file_op[],
    commit_op text,
    timestamp bigint,
    PRIMARY KEY (table_id, partition_desc, commit_id)
)

PostgreSQL: Documentation: 14: 8.16. Composite Types
https://www.postgresql.org/docs/current/datatype-uuid.html

The types of commit_op include update, compaction, append, and merge. These four types are related to the semantic relationship of multiple commits in the snapshot

4. Snapshots.

A Snapshot contains a commit sequence, where the commit sequence is organized in chronological order, and each snapshot can restore the files that need to be read, ignored (deleted or invalidated after update), merged (merge on read), and the files between them. sequence relationship

5. Partition information.

Stores all historical snapshots in a partition, and a corresponding version number (used to control read-write isolation and multi-version, etc.). Among them, partition_desc is a combined description generated by multi-level partitions, which is used to uniquely locate a leaf partition of a table.

CREATE TABLE partition_info (
    table_id text,
    partition_desc text, # year=2022,month=04,day=20,bucket=000
    version int, # continuous monotonically increasing
    commit_op text,
    snapshot UUID[], # organized by commit time order
    expression text, #Entity corresponds to JSONArray
    PRIMARY KEY (table_id, partition_desc, version)
)

Write logic

Each time the data of a table partition is updated, the overall process is as follows

  1. A list of DataFileOp file operation pairs is generated by the computing framework, each file operation pair is a tuple of (file name, operation), where the operation includes add, delete
  2. Generate a UUID as the commit id, and write the file operation pair list and the current commit_op into the data_commit_info table (replace with timestamp)
  3. At the same time, generate the corresponding partition_info data according to the generated data_commit_info
  4. Get the current version value from each partition: current version , current snapshot: current_snapshot
  5. Calculate the new version and snapshot of each partition:
  6. new_version = current_version + 1
  7. new_snapshot =
    1. append/merge: current_snapshot.append(commit_id)
    2. update, compaction: [commit_id]
  8. Ideally, the data can be written to the table at a time. Considering that there may be version conflicts in concurrency, it is necessary to consider the conflict detection mechanism. Here, the pg transcation mechanism is used to ensure that data conflicts can be safely rolled back, and different operations handle conflicts.
  9. Fine-grained Concurrent conflict resolution:
    In some cases, conflict is resolvable. For example, two concurrent append operations have no conflict, and the second writer could just retry to get newest commit sequence and make a new snapshot. However in some cases like to concurrent udpate, the conflict cannot be resolved and the second operation should fail.
OP append compaction update merge
append Retry Commit Reorder as compaction+append sequence
compaction Reorder as compaction+append sequence Discard the last one Keep Update only reorder as compaction+ merge sequence
update Keep update only
merge Reorder as compaction+ merge sequence

Conflict resolving steps:

  1. If there is a conflict, according to the difference of this submission operation, you can re-fetch the current (cur_1) partition information each time in the following way:
    1. If this is an append operation and a conflict with the concurrent update/merge is not allowed, you can directly determine that the append operation fails. If it conflicts with compaction/append, then snapshot = cur_1.snapshot + partition_info.snapshot;
    2. If this is a compaction operation, update will prevail when it conflicts with updte. The operation will be abandoned and the value true will be returned directly. If it conflicts with compaction, the operation will be invalid and return true. If it conflicts with append/merge , then snapshot= partition_info.snapshot.append(cur_1.snapshot-current.snapshot);
    3. If this is an update operation, the conflict with append/merge/update that occurs at the same time is not allowed, so the judgment fails. If it conflicts with compaction, just overwrite the snapshot, then snapshot = partition_info.snapshot;
    4. If this is a merge operation, conflicts with append/merge/update that occur at the same time are not allowed, so the judgment fails. If it conflicts with compaction, snapshot = cur_1.snapshot + partition_info.snapshot;
  2. Resubmitted version = cur_1.version + 1

Read logic

1. Normal reading process

  1. Get the latest partition version number and snapshot of each partition
SELECT max(version), snapshot FROM partition_info
WHERE table_id = id and partition_desc in (descs...);

select m.table_id, t.partition_desc, m.version, m.commit_op, m.snapshot, m.expression
from
    select table_id,partition_desc,max(version)
    from partition_info
    where table_id = 'tableId' and partition_desc in ('partitionlist')
    group by table_id, partition_desc) t
left join partition_info m
on t.table_id = m.table_id and t.partition_desc = m.partition_desc and t.max = m.version

Note: Since version is inside the composite primary key, pg would automatically use btree as index so this query execution requires no table scan and is actually very fast.

  1. Create a read logical plan through the commit_id list in the snapshot
  2. For each commit id, read the list of DataFileOp from data_commit_info
  3. According to the commit order, decide the Scan plan
    1. If there is a merge commit, you need to create a MergeScan
    2. In the rest of the cases, create ParquetScan directly
    There is actually a requirement implicit here, that is, only one file can be retained in each partition after Update, while merge can have any number of delta files. For example, if a partition has been updated several times, only one file may remain in this partition. Therefore, we can actually keep only the latest commit id after an update.

2 Time Travel reading process

Time Travel supports reading at a given version or at a given timestamp. When the version is given, the corresponding version is directly searched, and the rest of the logic is the same as 4.1. Given a timestamp, each partition needs to traverse the timestamp of the version to find the version with the first commit timestamp <= the given timestamp.

Exception handling logic

Possible exceptions when submitting file updates:

  1. After writing the file, the writing job fails and no commit record is created.
  2. After the commit record is created, the update partition version fails.
    Logically, both failure cases can be ignored because the final version number is not modified and has no effect on reads. But it leaves invalid data in storage and metadata that needs to be cleaned up.

TTL processing logic

TTL is a common requirement for data warehouses.

  1. For tables with TTL set, set the same TTL to partition_info, commit, and set the TTL of file storage at the same time.
  2. For situations where TTL cannot be used for file storage, such as HDFS, cleanup operations can be performed asynchronously by listening to TTL events of metadata (via CDC).

Schema Evolution (DDL)

The above section does not take into account the capabilities of Schema evolution. For schema evolution, the main problem that needs to be dealt with is that when the schema is changed (adding, deleting, and modifying several columns)

Development

Branch

develop/catalog_refactor

Tasks

New Feature:New MergeOperator for upsert to ignore null fields by default

Here is a table with columns(A B C D ) , Column A is primary key , All fields are type string.
Spark reads a batch of json data from kafka and converts to Dataframe to upsert. But Dataframe may contains "null" or null for some cells because of some missing fields in kafka(.e.g {A:A1,B:B4,C:C4,D:D4}{A:A2,C:C5}{A:A3,B:B5,D:D5}) which could lead to unexpected result in new table.
Since LakeSoul has MergeOperator feature, we could create a new mergeoprator to deal with this case by ignoring null values
image

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.