Comments (11)
I see. We need to check decompressive transcoding for the GCS file to determine whether the content is compressed rather than relying on the file extension.
# standard libraries
import logging
# third party libraries
import apache_beam as beam
from apache_beam import Create, Map
from apache_beam.io.textio import ReadAllFromText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.combiners import Count
logger = logging.getLogger()
logger.setLevel(logging.INFO)
elements = [
# "gs://apache-beam-samples/gcs/bigfile.txt.gz",
# "gs://apache-beam-samples/gcs/bigfile_with_encoding.txt.gz",
"gs://apache-beam-samples/gcs/bigfile_with_encoding_plain.txt.gz",
]
options = PipelineOptions()
with beam.Pipeline(options=options) as p:
(
p
| Create(elements)
| "Read File from GCS"
>> ReadAllFromText(
compression_type=beam.io.filesystem.CompressionTypes.UNCOMPRESSED
)
| Count.Globally()
| "Log" >> Map(lambda x: logging.info("Total lines %d", x))
)
This only loads 75,601 lines.
#19413 could be related for uploading the file to GCS.
from beam.
Thanks for reporting. Agree this is a P1 bug as it causes data loss.
from beam.
Is it possible to provide a working example that reproduce the issue, which could help triage.
from beam.
@shunping FYI
from beam.
Is it possible to provide a working example that reproduce the issue, which could help triage.
@Abacn I don't have a working example however the steps to reproduce are:
- Upload a gzip file to GCS. Make sure that the unzipped file is large enough, e.g a few MB.
- Create a beam pipeline using Python SDK that reads the file from 1. using RealAllFromText.
- Print or write the output of ReadAllFromText.
- Observe that the file is not fully read.
EDIT: This issue will probably appear for any compression type. I just encountered it with gzip but did not test with other compression algorithms.
from beam.
I uploaded one test file here: gs://apache-beam-samples/gcs/bigfile.txt.gz
(~7MB), which has 100000 lines but cannot reproduce this:
# standard libraries
import logging
# third party libraries
import apache_beam as beam
from apache_beam import Create, Map
from apache_beam.io.textio import ReadAllFromText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.combiners import Count
logger = logging.getLogger()
logger.setLevel(logging.INFO)
elements = [
"gs://apache-beam-samples/gcs/bigfile.txt.gz",
]
options = PipelineOptions()
with beam.Pipeline(options=options) as p:
(
p
| Create(elements)
| "Read File from GCS" >> ReadAllFromText()
| Count.Globally()
| "Log" >> Map(lambda x: logging.info("Total lines %d", x))
)
This shows:
INFO:root:Total lines 100000
from beam.
So I double checked and there are differences between your example and our case.
- We use content encoding
gzip
while saving our files to GCS, you don't have encoding specified - This leads us to using
ReadAllFromText
with parametercompression_type=CompressionTypes.UNCOMPRESSED
since the downloaded file seems to be already uncompressed (it doesn't work withCompressionTypes.AUTO
), as in gcs policy - This further results in reading only fragment of the file
Furthermore, after removing encoding type from our file and using CompressionTypes.AUTO
on it worked properly.
To get you example to represent our situation please add content encoding gzip
to your file metadata.
from beam.
For quick patch we use following solution:
class ReadAllFromTextNotSplittable(ReadAllFromText):
"""This class doesn't take advantage of splitting files in bundles because
when doing so beam was taking compressed file size as reference resulting in
reading only a fracture of uncompressed file"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._read_all_files._splittable = False
from beam.
What does your medadata look like?
Then I got this error:
ERROR:apache_beam.runners.common:Error -3 while decompressing data: incorrect header check [while running '[6]: Read File from GCS/ReadAllFiles/ReadRange']
from beam.
This is expected, as I mentioned earlier
This leads us to using ReadAllFromText with parameter compression_type=CompressionTypes.UNCOMPRESSED since the downloaded file seems to be already uncompressed (it doesn't work with CompressionTypes.AUTO), as in gcs policy
I presume while downloading file from GCS it's already decompressed, hence the error of decompression in Beam.
from beam.
Metadata is as follows (also please note we checked both text/plain
and application/x-gzip
, both were only partially read):
from beam.
Related Issues (20)
- [Failing Test]: macos-latest fails on actions/setup-java@v3 - can't find java 8
- [Bug]: SyntheticUnboundedSource missing records
- JdbcIO informix connection problem
- [Task]: Remove Flink 1.14 and cleanup
- [Failing Test]: The Build python source distribution and wheels job is permared HOT 1
- [Bug]: Beam SQL Extension raised an error when the input row contained iterable fields
- [Bug]: IntelliJ dependencies on jars, not modules, due to shadow plugin
- The PostCommit Go VR Flink job is flaky HOT 10
- [Feature Request]: Set quota project in `beam.io.ReadFromBigQuery` HOT 7
- [Failing Test]: Some tests in tox-py38-embeddings are flaky or failing
- [Bug]: Python Pipeline Options Grandchildren Args
- [Task]: Stop using GCR in Beam
- [Bug][Python]: ReadFromCsv with the dtype argument is very slow HOT 3
- [Bug]: PaneInfo not populated in Go SDK HOT 4
- [Failing Test]: PostCommit Java Dataflow V1 - testFhirIO_Import & SpannerWriteIT > testSequentialWrite HOT 2
- [Bug]: Cannot use python ReadFromKafka via DirectRunner in CI HOT 1
- [Feature Request]: Vertex AI Triton Inference Server Support
- [Failing Test]: PreCommit Java failures: org.apache.beam.runners.spark.CacheTest > cacheCandidatesUpdaterTest HOT 1
- [Feature Request]: Prism Support for Timer and ProcessingTime HOT 5
- [Failing Test]: TypeScript Tests continually failing
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from beam.