Giter VIP home page Giter VIP logo

Comments (11)

liferoad avatar liferoad commented on July 17, 2024 3

I see. We need to check decompressive transcoding for the GCS file to determine whether the content is compressed rather than relying on the file extension.

# standard libraries
import logging

# third party libraries
import apache_beam as beam
from apache_beam import Create, Map
from apache_beam.io.textio import ReadAllFromText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.combiners import Count

logger = logging.getLogger()
logger.setLevel(logging.INFO)

elements = [
    # "gs://apache-beam-samples/gcs/bigfile.txt.gz",
    # "gs://apache-beam-samples/gcs/bigfile_with_encoding.txt.gz",
    "gs://apache-beam-samples/gcs/bigfile_with_encoding_plain.txt.gz",
]

options = PipelineOptions()

with beam.Pipeline(options=options) as p:
    (
        p
        | Create(elements)
        | "Read File from GCS"
        >> ReadAllFromText(
            compression_type=beam.io.filesystem.CompressionTypes.UNCOMPRESSED
        )
        | Count.Globally()
        | "Log" >> Map(lambda x: logging.info("Total lines %d", x))
    )

This only loads 75,601 lines.

#19413 could be related for uploading the file to GCS.

from beam.

Abacn avatar Abacn commented on July 17, 2024

Thanks for reporting. Agree this is a P1 bug as it causes data loss.

from beam.

Abacn avatar Abacn commented on July 17, 2024

Is it possible to provide a working example that reproduce the issue, which could help triage.

from beam.

liferoad avatar liferoad commented on July 17, 2024

@shunping FYI

from beam.

janowskijak avatar janowskijak commented on July 17, 2024

Is it possible to provide a working example that reproduce the issue, which could help triage.

@Abacn I don't have a working example however the steps to reproduce are:

  1. Upload a gzip file to GCS. Make sure that the unzipped file is large enough, e.g a few MB.
  2. Create a beam pipeline using Python SDK that reads the file from 1. using RealAllFromText.
  3. Print or write the output of ReadAllFromText.
  4. Observe that the file is not fully read.

EDIT: This issue will probably appear for any compression type. I just encountered it with gzip but did not test with other compression algorithms.

from beam.

liferoad avatar liferoad commented on July 17, 2024

I uploaded one test file here: gs://apache-beam-samples/gcs/bigfile.txt.gz (~7MB), which has 100000 lines but cannot reproduce this:

# standard libraries
import logging

# third party libraries
import apache_beam as beam
from apache_beam import Create, Map
from apache_beam.io.textio import ReadAllFromText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.combiners import Count

logger = logging.getLogger()
logger.setLevel(logging.INFO)

elements = [
    "gs://apache-beam-samples/gcs/bigfile.txt.gz",
]

options = PipelineOptions()

with beam.Pipeline(options=options) as p:
    (
        p
        | Create(elements)
        | "Read File from GCS" >> ReadAllFromText()
        | Count.Globally()
        | "Log" >> Map(lambda x: logging.info("Total lines %d", x))
    )

This shows:

INFO:root:Total lines 100000

from beam.

Michal-Nguyen-airspace-intelligence avatar Michal-Nguyen-airspace-intelligence commented on July 17, 2024

So I double checked and there are differences between your example and our case.

  • We use content encoding gzip while saving our files to GCS, you don't have encoding specified
  • This leads us to using ReadAllFromText with parameter compression_type=CompressionTypes.UNCOMPRESSED since the downloaded file seems to be already uncompressed (it doesn't work with CompressionTypes.AUTO), as in gcs policy
  • This further results in reading only fragment of the file

Furthermore, after removing encoding type from our file and using CompressionTypes.AUTO on it worked properly.
To get you example to represent our situation please add content encoding gzip to your file metadata.

from beam.

Michal-Nguyen-airspace-intelligence avatar Michal-Nguyen-airspace-intelligence commented on July 17, 2024

For quick patch we use following solution:

class ReadAllFromTextNotSplittable(ReadAllFromText):
    """This class doesn't take advantage of splitting files in bundles because
    when doing so beam was taking compressed file size as reference resulting in
    reading only a fracture of uncompressed file"""

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._read_all_files._splittable = False

from beam.

liferoad avatar liferoad commented on July 17, 2024

What does your medadata look like?

I tried this:
image

Then I got this error:

ERROR:apache_beam.runners.common:Error -3 while decompressing data: incorrect header check [while running '[6]: Read File from GCS/ReadAllFiles/ReadRange']

from beam.

Michal-Nguyen-airspace-intelligence avatar Michal-Nguyen-airspace-intelligence commented on July 17, 2024

This is expected, as I mentioned earlier
This leads us to using ReadAllFromText with parameter compression_type=CompressionTypes.UNCOMPRESSED since the downloaded file seems to be already uncompressed (it doesn't work with CompressionTypes.AUTO), as in gcs policy
I presume while downloading file from GCS it's already decompressed, hence the error of decompression in Beam.

from beam.

Michal-Nguyen-airspace-intelligence avatar Michal-Nguyen-airspace-intelligence commented on July 17, 2024

Metadata is as follows (also please note we checked both text/plain and application/x-gzip, both were only partially read):
image

from beam.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.