Giter VIP home page Giter VIP logo

flow's People

Contributors

aeon-automation avatar alexislefebvre avatar dawidsajdak avatar dependabot[bot] avatar drupol avatar flavioheleno avatar github-actions[bot] avatar inmanturbo avatar jguittard avatar jpiatko avatar mleczakm avatar norbertmwk avatar norberttech avatar owsiakl avatar peter279k avatar rzarno avatar scyzoryck avatar stloyd avatar szepeviktor avatar tomaszhanc avatar voku avatar wiktor6 avatar wirone avatar xaviermarchegay avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

flow's Issues

Parquet - writing in batches

Right now Writer has only one way of writing parquet files:

Writer::write(string $path, Schema $schema, iterable $rows)

With this approach, we can pass \Generator, and internally RowGroupBuilder will keep reading from it, splitting data into RowGroups by their size.
Once all data is written writer closes the file writing all metadata at the end of it.

This approach does not let us fully integrate Writer with ETL Loaders because load method is executed only for a given chunk of Rows, so we would need to keep appending data into the file but that would create too many and too small RowGroups which would affect parquet reader performance.

What we need is a method that will not close the parquet file and that would let us keep adding more batches (internal RowGroupBuilder should work exactly as it's working now).
Once all data is saved user should run close() method that will save the parquet file metadata at the end of the file closing also file stream.

Apply default batchSize based on Loader type

In order to improve DX, Flow should try to detect if the current BatchSize is suitable for a given Loader.

(new Flow())
    ->read(
        Dbal::from_limit_offset(
            $sourceDbConnection,
            'source_dataset_table',
            new OrderBy('id', Order::DESC)
        )
    )
    ->withEntry('id', ref('id')->cast('int'))
    ->withEntry('name', concat(ref('name'), lit(' '), ref('last name')))
    ->drop('last_name')
    ->write(Dbal::to_table_insert($dbConnection, 'flow_dataset_table'))
    ->run();

In this example, batchSize is equal to 1. This means that Flow will try to insert rows into the db, one by one.
This can be easily changed by putting batchSize(1_000) just above write, but it also requires from developer some knowledge about how loaders work internally.

What we can do, is use Optimizer in order to detect current batchSize when Loaders are added, and whenever we notice that batchSize wasn't set, we can automatically apply one.
The exact numbers should be predefined, I think we can start from 1k for each of the following:

  • ElasticSearch
  • Dbal
  • Meilisearch

For the file-based loaders, this is irrelevant, as most of them are writing rows one by one.

Improve bechmarks stability

We need to increase the stability & reproducibility of results provided by benchmarks.

To achieve this we need:

  • replace faker usage with predefined "random" data setup,
  • reduce revolutions to lower value in all heavy benchmarks,
  • use aggregate report and have an eye at rstdev a value for giving info that benchmark is not stable enough (0-2% value are acceptable),

Parquet DataPageV2 support

Currently, only the first version of DataPage is supported. We should also bring DataPageV2 support into the parquet reader/writer.

Snappy & Parquet tests can throw warnings randomly

As noticed locally, we can see it also in CI:

Randomly happening test warnings:

2 tests triggered 2 PHP warnings:

1) /Users/stloyd/Documents/flow/src/lib/snappy/src/Flow/Snappy/SnappyCompressor.php:221
Undefined array key 2693

Triggered by:

* Flow\Parquet\Tests\Integration\IO\MapsWritingTest::test_writing_map_of_int_int
  /Users/stloyd/Documents/flow/src/lib/parquet/tests/Flow/Parquet/Tests/Integration/IO/MapsWritingTest.php:17

2) /Users/stloyd/Documents/flow/src/lib/parquet/src/Flow/Parquet/BinaryReader/Bytes.php:50
Undefined array key 0

Triggered by:

* Flow\ETL\Adapter\Parquet\Tests\Integration\ParquetTest::test_writing_with_partitioning
  /Users/stloyd/Documents/flow/src/adapter/etl-adapter-parquet/tests/Flow/ETL/Adapter/Parquet/Tests/Integration/ParquetTest.php:46

Fix broken examples

Currently, not all examples can be executed, those that are designed to be loaded into phar would work, but there is no way to execute them through CLI.
It would be nice to detect if a given example is loaded through Phar or executed directly and then decide if we should return DataFrame or execute DataFrame::run().
Another thing is autoloading, examples/bootstrap.php is no longer loading autoload.php since it assumes it will be executed through phar, here would be also good to detect if we are in the phar context, otherwise load regular autoload.

We might also want to prepare a simple run_examples.sh or even run_examples.php script that would try to iterate over all examples and execute them one by one. Execution order is important here since some examples depend on the output from other examples.

Parquet allow user to specify column encoding

Currently, PagesBuilder is deciding what type of encoding applies to the columns, and we only support two types:

  • RLE_DICTIONARY
  • PLAIN

We should allow users to overwrite this, but keep in mind that Booleans should not allow for RLE_DICTIONARY encoding.

I think encoding could be added to NestedColumn/FlatColumn static constructors and used later in PagesBuilder that should fallback to default logic when it's not specified.

Parquet Reader

Currently, we are using https://github.com/codename-hub/php-parquet/ library; however, it comes with some issues for example, handling deeply nested complex data structures.
The goal is to replace that dependency with an internal parquet library and integrate it with a pipeline planner to reduce memory footprint while processing parquet files as much as possible.

Add batchSize method to DataFrame

Currently we can manipulate batch size of dataframe in 2 ways:

  • DataFrame::parallelize
  • DataFrame::collect

First one will split Rows into smaller batches, second one will merge small Rows into bigger batch.
Even that those two are working fine, it might not be always easy to determine which one to use, plus
their names might be confusing to users without understanding how DataFrame works.

We can solve both of those problems by adding:

DataFrame::batchSize(int $size) : self

That will split or merge processed rows based on the size.
It should also be more intuitive to the users.

XML Writer

Currently, Flow can only read XML, but we should start looking into saving XML files as well.

There are two libraries that could help us achieve that:

Before actual coding, we should first look into converting Rows into XML, we will need to figure out how to determine what should be saved as a value and what should be saved as an attributes.

Parquet Statistics

Parquet Writer is already collecting statistics, but it's not turning them into ColumnChunk statistics that can be later used by the Reader to reduce the number of partitions required to read when looking for specific data.

Add PHPUnit internal helper(s)

We often start to do copy-paste work in tests, thus we should begin to have some reusable helpers to make our internal tests easier to maintain.

Helpers:

  • FilesystemHelper with: create, delete for files & directories,
  • ...

Setup benchmarks at CI/CD

While project is growing, it will become easier to miss performance degradation, like it happened here: #558

We should think about creating a set of benchmarks for each adapter/core/lib and execute those benchmarks at CI/CD so we can at least manually check which PR's introduced bottlenecks.
The ideal solution would be to store those performance benchmarks as workflows artifacts after merge to 1.x and compare them with benchmarks from newly open PR's.


I was thinking about creating benchmarks for specific building blocks separately, for example:

  1. Extractors - we could come up with some dataset schema, save it as all supported file types, and just benchmark extraction without doing any operations on the dataset.
  2. Transformers - since we reduced the number of transformers, keeping only critical ones, we might want to start at least from those most frequently used, like the one that evaluates expressions. Here, I think we can take a similar approach, but instead of using extractors, we can directly pass prepared Rows to it and measure the performance of transformations themselves.
  3. Expressions - just like with Transformers, but here we don't even need Rows. Single Row should be enough
  4. Loaders - similarly to Transformers, prepare Rows and execute Loading them into the destination directly

Those are very granular benchmarks, which can test all building blocks separately, providing clear insights about each element separately. However, on top of that, I would probably still try to benchmark entire Pipelines on a selected subset of the most frequently used extractors/loaders/transformers (we would need to develop a few scenarios here).

Schema inferring

When working on DataFrames without provided schema, Flow does its best to detect the entry value type to create the proper Entry type in NativeEntryFactory.

This behavior is very developer-friendly but introduces a significant impact on Extractors using array_to_rows()

This could be improved by introducing schema inferring similar to the one available in Apache Spark.

We could extract the first X rows (this should be configurable), iterate over them, and detect the schema.
Then, revert the extractor and read all rows passing inferred schema to NativeEntryFactory, which would no longer need to guess entry types.

New logical type `MapType`

We need to introduce a new logical type called MapType, along with the related MapKey & MapValue.

This will be in pair of how map data structure looks like in Parquet and Avro.

Loading pipelines from Json

DataFrame API:

<?php

return (new Flow())
    ->read(CSV::from(__FLOW_DATA__ . '/power-plant-daily.csv', 10, delimiter: ';'))
    ->fromJson("pipeline.json")
    ->write(To::output(truncate: false));

PHP Pipeline Example:

<?php

return (new Flow())
    ->read(CSV::from(__FLOW_DATA__ . '/power-plant-daily.csv', 10, delimiter: ';'))
        ->withEntry('unpacked', ref('row')->unpack())
        ->renameAll('unpacked.', '')
        ->withEntry('production_kwh', ref('Produkcja(kWh)'))
    ->write(To::output(truncate: false));

Json Pipeline Example:

{
  "operations": [
    {
      "name" : "withEntry",
      "args": [
        {
          "type": "string",
          "value": "unpack"
        },
        {
          "type": "ref",
          "args": ["row"],
          "call": [
            {
              "name": "unpack",
              "args": []
            },
            {
              "name": "round",
              "arguments": [
                {
                  "type": "lit",
                  "args": [2]
                }
              ]
            }
          ]
        }
      ]
    },
    {
      "name" : "renameAll",
      "args": [
        {
          "type": "string",
          "value": "unpacked."
        },
        {
          "type": "string",
          "value": ""
        }
      ]
    },
    {
      "name": "withEntry",
      "args": [
        {
          "type": "string",
          "value": "production_kwh"
        },
        {
          "type": "ref",
          "args": ["Produkcja(kWh)"]
        }
      ]
    }
  ]
}

Random failure in `Flow\ETL\Tests\Integration\FlowTest::test_etl_sort_at_disk_in_memory` test

This particular test seems to be failing randomly: https://github.com/flow-php/flow/actions/runs/6533466973/job/17738710141?pr=591

There was 1 failure:

1) Flow\ETL\Tests\Integration\FlowTest::test_etl_sort_at_disk_in_memory
Failed asserting that 50 is identical to 2600.

/home/runner/work/flow/flow/src/core/etl/tests/Flow/ETL/Tests/Integration/FlowTest.php:70

FAILURES!
Tests: 1414, Assertions: 148471, Failures: 1.

Docker Images

Create and upload to dockerhub flow-php docker images with phar and latest PHP version inside allowing to load dataframe from single PHP file using flow runtime.

docker run -t --rm -v $PWD:/dataframes flow-php/flow ./dataframe.php

Add examples to readme how to execute flow dataframes directly from Docker

Entry string value with space char

If in my row value, I have only this : " "
Due to this :
https://github.com/flow-php/etl/blob/11dd19d09b7b3f44606c97491439a8c1711dea38/src/Flow/ETL/Row/Factory/NativeEntryFactory.php#L41-L45

I go to the ->isJson(string $string) method with $string = ""
and so this :
https://github.com/flow-php/etl/blob/11dd19d09b7b3f44606c97491439a8c1711dea38/src/Flow/ETL/Row/Factory/NativeEntryFactory.php#L306

crash with : Uninitialized string offset 0

I think this check is not mandatory because already covered by the following test :
https://github.com/flow-php/etl/blob/11dd19d09b7b3f44606c97491439a8c1711dea38/src/Flow/ETL/Row/Factory/NativeEntryFactory.php#L310-L313

PS: Same in isXML

Limit is evaluated too early

In general, the goal of the limit is to break the pipeline in the early stage after a specific number of rows was reached.
However, the way it works now is a bit naive, since the limit increased just after Rows are extracted from Extractor.
This means, that if later Rows(1) are expanded into Rows(50) during transformation and the limit is set to 5, Loaders will receive Rows(250) in total instead of the expected 5.

Logger Error Handler

It would be super handy to have an ErrorHandler that would do the same thing as SkipRows handler is doing but that would first log entire rows collection to PsrLogger.

Support for CamelCase columns

I have some tables with CamelCase names and columns (I can't change them). I've experienced troubles because when the query is builded, the columns are without quotes, like (Name, Surname, Email) and of course, the DB engine (postgres) is case-insesitive, so I get a error saying: "Column Name does not exists).

I've fixed this modifying the "concat" function on Flow\Doctrine\Bulk\Columns, simply like that:

return '"'.\implode('"'.$separator.'"', $this->columns).'"';

Now this work propely with CamelCase columns and lowercase columns, no errors.

Would be possible maybe add an option, to switch between quoted columns and not quoted?

Sorry if this is not the place to ask this question

Static analysis - avoid unnecessary counts

It would be nice to have static analysis detect following scenario as invalid:

$array = [];

if (\count($array)) {
   foreach ($array as $element) {
        // some logic
   }
}

Since the array is not nullable foreach would do nothing, so count is unnecessary.

#Ref: #712 (comment)

Issue GoogleSheetExtractor with empty sheet.

Hi!

We found a weird edge case when using the GoogleSheetExtractor - when trying to load data from completely empty sheet the return of google response is null.

# vendor/flow-php/etl-adapter-google-sheet/src/Flow/ETL/Adapter/GoogleSheet/GoogleSheetExtractor.php:47
        /** @var Sheets\ValueRange $response */
        $response = $this->service->spreadsheets_values->get($this->spreadsheetId, $cellsRange->toString(), $this->options);
        /** @var array[] $values */
        $values = $response->getValues();

In reality it should be

# vendor/flow-php/etl-adapter-google-sheet/src/Flow/ETL/Adapter/GoogleSheet/GoogleSheetExtractor.php:47
        /** @var Sheets\ValueRange $response */
        $response = $this->service->spreadsheets_values->get($this->spreadsheetId, $cellsRange->toString(), $this->options);
        /** @var null|array[] $values */
        $values = $response->getValues();

I feel like the issue is on the Google repository side, but may it would be worth to handle this edgecase in the package as well? I can provide a fix if needed.

Best, Marcin.

Parquet - append

Appending to an existing parquet file should be done by reading metadata from the file, then attaching new RowGroup after the last RowGroup, and writing new metadata at the end of the file with newly added RowGroups.

MapEntry - new entry type

Map is a Key/Value data structure where each key must be unique.
Keys can have any of the other Flow simple data types like:

  • String
  • Int
  • DateTime
  • Float
  • Uuid

Map Value can be literally anything, it's just an Entry.

Something like: Entry::map(string $name, Entry $key, Entry $value)

Something like: Entry::map(string $name, array $map)

This would let us create even stronger integration with Parquet and Avro and other data destinations that support Maps.

One important note on a map is that all keys must have exactly the same type, so we can't have mixed int|string keys.

Run tests only after code changes

Currently, whole test suite is executed at every pull request however, we are doing some PR's that are not affecting the codebase, for example:

  • updating docs
  • adding examples

It would be nice to execute tests only when code/dependencies were changed.

Apply limit directly to Extractors

The easiest way to explain it is through example:

(new Flow())
    ->read(CSV::from(__FLOW_OUTPUT__ . '/dataset.csv'))
    ->limit(100)
    ->write(\Flow\ETL\DSL\To::output())
    ->run();

In this case, we want to read only 100 from X rows available in the file.
Since the limit is now implemented as a pipeline (LimitingPipeline) extractor will still read at least 1000 rows since this is the default value for $rows_in_batch.
We can change this value to 100 however, it requires from the user some understanding of how Extractors are working, which I think is completely unnecessary.

Instead, limit should be passed directly to the Extractor (we can implement another interface, LimitedExtractor) for example which would let "something" to set that limit so the extractor regardless how it's configured, will not read more rows then in a given limit.

In theory that's the perfect task for LogicalPlan and Execution Processor but there is a small catch here.
In the current implementation, LogicalPlan has only access to Pipes (Transformers/Loaders). It does not have access to the Piepline itself, which means it has no access to LimitingPipeline.

What we need is something between DataFrame and Pipeline, that something is Executor (similar concept exists in Apache Spark).

The role of the Executor would be to create a LogicalPlan, process it using Execution Processor, and execute the pipeline.
Thanks to that, we are going to get direct access to Pipeline (which might hold other pipelines under itself), which would give us more flexible way of analyzing the entire execution chain and adjusting it (optimizing/validating/changing) before it gets executed.

Now going back to applying the limit on the Extractor.

So Processor should check if there are any transformations in the pipeline before DataFrame::limit() is used, whenever it wont find anything that might change the number of rows (like for example, expand), it should apply the limit directly on the Executor removing LimitingPipeline. Let's look at some examples:

(new Flow())
    ->read(CSV::from(__FLOW_OUTPUT__ . '/dataset.csv'))
    ->select("id", "name")
    ->limit(100)
    ->write(\Flow\ETL\DSL\To::output())
    ->run();

In this example, there is nothing before the limit that might change the number of results, limit should be applied directly on the Extractor.

(new Flow())
    ->read(CSV::from(__FLOW_OUTPUT__ . '/dataset.csv'))
    ->withEntry("item", ref("items")->expand()) 
    ->limit(100)
    ->write(\Flow\ETL\DSL\To::output())
    ->run();

In this case, we are expanding one row into multiple rows, so we can't properly predict on the Extractor level what the total number of rows after transformation we are going to get, we should stick to LimitingPipeline.

It's a small change for the users, but it comes with a huge DX impact, especially when dealing with large files.
Once this is done, we can implement similar mechanism for DataFrame::select, there are extractors like for example Parquet or Avro that can read only specific columns.

Add PHPUnit helper for ETL testing

To improve the overall way of working with tests against data frames & other core parts of FlowPHP we would like to introduce a new PHPUnit helper trait/class.

Ideas for test methods:

  • assertEntriesExistsInEachRow(),
  • assertRowsCount(),
  • assertSchema(),
  • assertEntryType()
  • and more

Parquet - BinaryReader - accessing bytes out of range

          I don't have a real sample, but the test generating warning is: `Flow\ETL\Adapter\Parquet\Tests\Integration\ParquetTest::test_writing_with_partitioning` (see #722)

Here are a few caught (open "Test" details in jobs):

Originally posted by @stloyd in #724 (comment)

Parquet writer

This is a continuation of #506
The goal is to build a parquet writer that:

  • supports all parquet types
  • allows appending data into an existing file
  • supports remote filesystems

This is an absolute minimum to replace the library flow is using right now. After that the next step is to work on more direct integration between Flow and Parquet in order to benefit from more advanced parquet features not available in other file formats.

Parquet - Boolean column dictionary encoding

Currently boolean columns are automatically encoded with RLE_DICTIONARY, there is nothing wrong with that as parquet specification is not limiting that in any way but...

  • Apache spark is not supporting that
  • It brings very little value as booleans can anyway be RLE encoded

We should put a rule in the code that would use the dictionary encoding on any other column type except Booleans.

Cover all expressions with integration tests

Every expression is covered with a unit tests that looks like this:

<?php

    public function test_array_unpack() : void
    {
        $row = Row::create(
            Entry::int('id', 1),
            Entry::array('array_entry', [
                'status' => 'PENDING',
                'enabled' => true,
                'array' => ['foo' => 'bar'],
            ]),
        );

        $this->assertSame(
            [
                'status' => 'PENDING',
                'enabled' => true,
                'array' => ['foo' => 'bar'],
            ],
            (new ArrayUnpack(ref('array_entry')))->eval($row)
        );
    }

however this does not show how to use given expressions, integration tests would solve that problem, for example:

<?php

namespace Flow\ETL\Tests\Integration\Row\Reference\Expression;

use Flow\ETL\DSL\Entry;
use Flow\ETL\DSL\From;
use Flow\ETL\DSL\To;
use Flow\ETL\Flow;
use Flow\ETL\Memory\ArrayMemory;
use Flow\ETL\Row;
use Flow\ETL\Rows;
use PHPUnit\Framework\TestCase;
use function Flow\ETL\DSL\array_expand;
use function Flow\ETL\DSL\ref;

final class ArrayUnpackTest extends TestCase
{
    public function test_array_unpack() : void
    {
        (new Flow())
            ->read(
                From::rows(new Rows(
                    Row::with(Entry::int('id', 1), Entry::array('array', ['a' => 1, 'b' => 2, 'c' => 3])),
                ))
            )
            ->withEntry('expanded', array_expand(ref('array')))
            ->write(To::memory($memory = new ArrayMemory()))
            ->run();

        $this->assertSame(
            [
                ['id' => 1, 'array' => ['a' => 1, 'b' => 2, 'c' => 3], 'expanded' => 1],
                ['id' => 1, 'array' => ['a' => 1, 'b' => 2, 'c' => 3], 'expanded' => 2],
                ['id' => 1, 'array' => ['a' => 1, 'b' => 2, 'c' => 3], 'expanded' => 3],
            ],
            $memory->data
        );
    }
}

Pivoting

GroupBy could also allow pivot tables.

From:

+------+---------+------+
| Year | Product | Sale |
+------+---------+------+
| 2019 |   A     | 100  |
| 2019 |   B     | 200  |
| 2020 |   A     | 150  |
| 2020 |   C     | 300  |
| 2021 |   B     | 100  |
| 2021 |   C     | 150  |
+------+---------+------+

Through:

  • Group by Year
  • Pivot by Product
  • Aggregate Sum by Sale

Into:

+------+----+----+----+
| Year |  A |  B |  C |
+------+----+----+----+
| 2021 |null| 100| 150|
| 2019 | 100| 200|null|
| 2020 | 150|null| 300|
+------+----+----+----+

XML depth/previousDepth are not correctly handled

Hi everyone,
I think that XML depth is not correctly handled and xml adapter does not work properly.

If you use a deep xml scructure like this:

<?xml version="1.0" encoding="utf-8"?>
<root>
    <cars>
        <car>
            <model>
                <make>Mercedes</make>
            </model>
        </car>
        <car>
            <model>
                <make>BMW</make>
            </model>
        </car>
        <car>
            <model>
                <make>Nissan</make>
            </model>
        </car>
    </cars>
</root>

And run process

(new Flow())
    ->read(XML::from($filepath, 'root/cars/car'))
    ->write(\Flow\ETL\DSL\To::output(false))
    ->run();

The result is

+----------------------------------------------------------------------------------------------------------------------+
|                                                                                                                 node |
+----------------------------------------------------------------------------------------------------------------------+
| <?xml version="1.0"?><car>            <model>                <make>Mercedes</make>            </model>        </car> |
+----------------------------------------------------------------------------------------------------------------------+

But I expect this output

+----------------------------------------------------------------------------------------------------------------------+
|                                                                                                                 node |
+----------------------------------------------------------------------------------------------------------------------+
| <?xml version="1.0"?><car>            <model>                <make>Mercedes</make>            </model>        </car> |
|      <?xml version="1.0"?><car>            <model>                <make>BMW</make>            </model>        </car> |
|   <?xml version="1.0"?><car>            <model>                <make>Nissan</make>            </model>        </car> |
+----------------------------------------------------------------------------------------------------------------------+

Maybe I'm wrong and I don't understand the intent behind this adapter but I think that internally he does not correctly handle the depth/previous depth.

Look at XMLReaderExtractor::extract at line 60 and supposed to be in depth 4 (meaning <make> tag, the deepest tag found).

In the next iteration previousDepth=4 and and depth=2 (meaning <car>, because reader back up to parent).
So the only entered if statement is $xmlReadler->depth < $previousDepth and \array_pop($currentPathBreadCrumbs) line was executed.

So now $currentPathBreadCrumbs is ['root', 'cars', 'car', 'model'] but is not the correct path (remember that reader back up to <car>).

I expect that in case of $depth < $previousDepth we should be remove all tags with depth between $xmlReadler->depth and $previousDepth and in this specific case $currentPathBreadCrumbs should be ['root', 'cars', 'car'].

Thanks, I hope I was clear :)

Improve handling extractors output

Currently, when dealing with just extracted rows, we need to properly handle them, this is how it looks now:

(new Flow())
    ->read(
        From::array(
            [['id' => 1, 'array' => ['a' => 1, 'b' => 2, 'c' => 3]]],
        )
    )
    ->withEntry('row', ref('row')->unpack())
    ->renameAll('row.', '')
    ->drop('row')
    ->withEntry('array', ref('array')->arrayMerge(lit(['d' => 4])))
    ->write(To::memory($memory = new ArrayMemory()))
    ->run();

Following lines are repeated almost* always

    ->withEntry('row', ref('row')->unpack())
    ->renameAll('row.', '')
    ->drop('row')

We should look into this and introduce an expression that will do all of those 3 things, something like:

ref('row')->rowUnpack() 

One might ask, why extractors are exposing the entire row under ArrayEntry.
The reason comes from Config::shouldPutInputIntoRows() option.

When this option is set to true, extractors will provide also additional data from the input, for example
file based extractors with that option will return something like this:

[
   'input_file_uri' => 'string',
   'row' => [...]
]

Or Http extractor will put there headers, request URL etc.

This is very handy when for example our datasets are stored like this:

  • /datasets/sales/2023/january.json
  • /datasets/sales/2023/february.json
  • /datasets/reports/report_type/123456.xml
  • /datasets/reports/report_type/789010.xml

Thanks to input_file_uri we can also parse input file path and get the month name or report id from it.

Parquet Filtering

Parquet comes with very handy mechanism called "Column Statistics" which says for example what are the min/max values, total number of null values etc.

By reading those statistics we won't need to iterate through the entire parquet file when for example we are looking for a data from a specific time range or value range.

New logical type `ListType`

We need to introduce a new logical type called ListType, along with the related ListElement type.

This will be in pair of how list data structure looks like in Parquet and Avro.

Parquet Viewer

Working with parquet files is not as easy as when we are working with CSV files, mostly because it's hard to browse those files.
The idea is to build a simple CLI app, that could be released as a docker image.

We can start simple, with 2 commands:

  • metadata
  • data
bin/parquet read:metadata
bin/parquet read:data

For metadata, I already created a POC, here is the output:

$ ./parquet read:metadata --help

Usage:
  ./parquet_viewer.php [options] [--] <file>

Arguments:
  file                  path to parquet file

Options:
      --columns         Display column details
      --row-groups      Display row group details
      --column-chunks   Display column chunks details
      --page-headers    Display page headers details
  -h, --help            Display help for the given command. When no command is given display help for the ./parquet_viewer.php command
  -q, --quiet           Do not output any message
  -V, --version         Display this application version
      --ansi|--no-ansi  Force (or disable --no-ansi) ANSI output
  -n, --no-interaction  Do not ask any interactive question
  -v|vv|vvv, --verbose  Increase the verbosity of messages: 1 for normal output, 2 for more verbose output and 3 for debug

Usage example:

$ ./parquet.php read:metadata path/to/parquet.file --columns --row-groups --column-chunks --page-headers

 ----------------- -------------------- Metadata --------------------------------------- 
  file_path         path/to/parquet.file 
  parquet_version   1                                                                    
  created_by        flow-php parquet version 1.x                                                          
  rows              1                                                                    
 ----------------- --------------------------------------------------------------------- 

 -------------------------------------------- ------- Flat Columns ------ ------------ ---------------- ---------------- 
  path                                         type         logical_type   repetition   max_repetition   max_definition  
 -------------------------------------------- ------------ -------------- ------------ ---------------- ---------------- 
  id                                           INT32        -              OPTIONAL     0                1               
  aMapField.key_value.someKey                  BYTE_ARRAY   -              REQUIRED     1                1               
  aMapField.key_value.aStructField.anInteger   INT32        -              OPTIONAL     1                2               
  aMapField.key_value.aStructField.aString     BYTE_ARRAY   -              OPTIONAL     1                2               
  rootLevelStructField.anotherInteger          INT32        -              OPTIONAL     0                1               
  rootLevelStructField.anotherString           BYTE_ARRAY   -              OPTIONAL     0                1               
  aListField.list.someInteger                  INT32        -              OPTIONAL     1                2               
 -------------------------------------------- ------------ -------------- ------------ ---------------- ---------------- 

 ---------- ----- Row Groups  --------------- 
  num_rows   total_byte_size   columns_count  
 ---------- ----------------- --------------- 
  1          285               7              
 ---------- ----------------- --------------- 

 -------------------------------------------- ------------------------ Column Chunks ------------- ------------ ------------------------ ------------------ 
  path                                         encodings                compression   file_offset   num_values   dictionary_page_offset   data_page_offset  
 -------------------------------------------- ------------------------ ------------- ------------- ------------ ------------------------ ------------------ 
  id                                           [RLE,BIT_PACKED,PLAIN]   GZIP          4             1            -                        4                 
  aMapField.key_value.someKey                  [RLE,BIT_PACKED,PLAIN]   GZIP          56            2            -                        56                
  aMapField.key_value.aStructField.anInteger   [RLE,BIT_PACKED,PLAIN]   GZIP          119           2            -                        119               
  aMapField.key_value.aStructField.aString     [RLE,BIT_PACKED,PLAIN]   GZIP          179           2            -                        179               
  rootLevelStructField.anotherInteger          [RLE,BIT_PACKED,PLAIN]   GZIP          243           1            -                        243               
  rootLevelStructField.anotherString           [RLE,BIT_PACKED,PLAIN]   GZIP          295           1            -                        295               
  aListField.list.someInteger                  [RLE,BIT_PACKED,PLAIN]   GZIP          358           3            -                        358               
 -------------------------------------------- ------------------------ ------------- ------------- ------------ ------------------------ ------------------ 

 -------------------------------------------- -------------- -------- Page Headers ------ ------------------- ----------------------- ----------------- 
  path                                         type           encoding   compressed_size   uncompressed_size   dictionary_num_values   data_num_values  
 -------------------------------------------- -------------- ---------- ----------------- ------------------- ----------------------- ----------------- 
  id                                           DATA_PAGE_V2   PLAIN      26                6                   -                       1                
  aMapField.key_value.someKey                  DATA_PAGE_V2   PLAIN      37                22                  -                       2                
  aMapField.key_value.aStructField.anInteger   DATA_PAGE_V2   PLAIN      34                14                  -                       2                
  aMapField.key_value.aStructField.aString     DATA_PAGE_V2   PLAIN      38                20                  -                       2                
  rootLevelStructField.anotherInteger          DATA_PAGE_V2   PLAIN      26                6                   -                       1                
  rootLevelStructField.anotherString           DATA_PAGE_V2   PLAIN      37                17                  -                       1                
  aListField.list.someInteger                  DATA_PAGE_V2   PLAIN      35                18                  -                       3                
 -------------------------------------------- -------------- ---------- ----------------- ------------------- ----------------------- -----------------

read:data - this is a bit more tricky, but I think we can leverage ETL here and just read the file and display X number of results.

The ideal solution would be also to allow users to scroll through data using Pgdn/Pgup/up/down keys but it can come later, we can start easy.

Question, about etl proces xml to json

I'm building my own extractor that parses xml and have the following snippet

public function extract(FlowContext $context) : \Generator
    {
        $client = new Client(); //new client that gives me xml records
        foreach ($client->fetchRecords() as $record) { //fetchRecords is a generator $record is a simpleXmlElement Object
            unset($rows); //If I don't do this it will have exponentially high json records when written to json
            $rows[] = new Row(new Row\Entries(Entry::object('xml_record', $record))); // so this will add all Rows to $rows and not just the new ones.
            yield new Rows(...$rows);
        }
    }
}

My question is: Am I doing this right? It feels wrong to unset $rows

Remove batchSize from loaders

After #716 is merged, we no longer need to expect from Loaders to chunk data before saving it.
The same can be achieved through DataFrame::collect($batchSize = 10).

This will reduce the complexity of Loaders.

We need to go through all Loaders, remove the batch size, update UPGRADE.md file and put a short explanation in the DSL functions for those extractors that in order to control number of Rows saved at once DataFrame::collect should be used just before loader.

array_expand - add options

Add following options to array_expand:

  • Expand::values - expanding only values from an array (default)
  • Expand::keys - expanding only keys from an array
  • Expand::key_value - expanding key and array:

Values:

<?PHP
        (new Flow())
            ->read(
                From::rows(new Rows(
                    Row::with(Entry::int('id', 1), Entry::array('array', ['a' => 1, 'b' => 2, 'c' => 3])),
                ))
            )
            ->withEntry('expanded', array_expand(ref('array'), Expand::values))
            ->write(To::memory($memory = new ArrayMemory()))
            ->run();

        $this->assertSame(
            [
                ['id' => 1, 'array' => ['a' => 1, 'b' => 2, 'c' => 3], 'expanded' => 1],
                ['id' => 1, 'array' => ['a' => 1, 'b' => 2, 'c' => 3], 'expanded' => 2],
                ['id' => 1, 'array' => ['a' => 1, 'b' => 2, 'c' => 3], 'expanded' => 3],
            ],
            $memory->data
        );
    }

Keys:

<?PHP
        (new Flow())
            ->read(
                From::rows(new Rows(
                    Row::with(Entry::int('id', 1), Entry::array('array', ['a' => 1, 'b' => 2, 'c' => 3])),
                ))
            )
            ->withEntry('expanded', array_expand(ref('array'), Expand::values))
            ->write(To::memory($memory = new ArrayMemory()))
            ->run();

        $this->assertSame(
            [
                ['id' => 1, 'array' => ['a' => 1, 'b' => 2, 'c' => 3], 'expanded' => 'a'],
                ['id' => 1, 'array' => ['a' => 1, 'b' => 2, 'c' => 3], 'expanded' => 'b'],
                ['id' => 1, 'array' => ['a' => 1, 'b' => 2, 'c' => 3], 'expanded' => 'c'],
            ],
            $memory->data
        );
    }

Key Value:

<?PHP
        (new Flow())
            ->read(
                From::rows(new Rows(
                    Row::with(Entry::int('id', 1), Entry::array('array', ['a' => 1, 'b' => 2, 'c' => 3])),
                ))
            )
            ->withEntry('expanded', array_expand(ref('array'), Expand::values))
            ->write(To::memory($memory = new ArrayMemory()))
            ->run();

        $this->assertSame(
            [
                ['id' => 1, 'array' => ['a' => 1, 'b' => 2, 'c' => 3], 'expanded' => ['a' => 1]],
                ['id' => 1, 'array' => ['a' => 1, 'b' => 2, 'c' => 3], 'expanded' => ['b' => 2]],
                ['id' => 1, 'array' => ['a' => 1, 'b' => 2, 'c' => 3], 'expanded' => ['c' => 3]],
            ],
            $memory->data
        );
    }

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.