Giter VIP home page Giter VIP logo

cc-pyspark's Introduction

Common Crawl Support Library

Overview

This library provides support code for the consumption of the Common Crawl Corpus RAW crawl data (ARC Files) stored on S3. More information about how to access the corpus can be found at https://commoncrawl.atlassian.net/wiki/display/CRWL/About+the+Data+Set .

You can take two primary routes to consuming the ARC File content:

(1) You can run a Hadoop cluster on EC2 or use EMR to run a Hadoop job. In this case, you can use the ARCFileInputFormat to drive data to your mappers/reducers. There are two versions of the InputFormat: One written to conform to the deprecated mapred package, located at org.commoncrawl.hadoop.io.mapred and one written for the mapreduce package, correspondingly located at org.commoncrawl.hadoop.io.mapreduce.

(2) You can decode data directly by feeding an InputStream to the ARCFileReader class located in the org.commoncrawl.util.shared package.

Both routes (InputFormat or ARCFileReader direct route) produce a tuple consisting of a UTF-8 encoded URL (Text), and the raw content (BytesWritable), including HTTP headers, that were downloaded by the crawler. The HTTP headers are UTF-8 encoded, and the headers and content are delimited by a consecutive set of CRLF tokens. The content itself, when it is of a text mime type, is encoded using the source text encoding.

Build Notes:

  1. You need to define JAVA_HOME, and make sure you have Ant & Maven installed.
  2. Set hadoop.path (in build.properties) to point to your Hadoop distribution.

Sample Usage:

Once the commoncrawl.jar has been built, you can validate that the ARCFileReader works for you by executing the sample command line from root for the commoncrawl source directory:

./bin/launcher.sh org.commoncrawl.util.shared.ARCFileReader --awsAccessKey <ACCESS KEY> --awsSecret <SECRET> --file s3n://aws-publicdatasets/common-crawl/parse-output/segment/1341690164240/1341819847375_4319.arc.gz

cc-pyspark's People

Contributors

cronoik avatar jaehunro avatar praveenr019 avatar sebastian-nagel avatar xue-alex avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

cc-pyspark's Issues

Provide classes to use FastWARC to read WARC/WAT/WET files

FastWARC (see also FastWARC API docs) is a Python WARC parsing library

  • written in C++ for high performance
  • although inspired by warcio, not API compatible
  • without less-frequently used features, eg. reading ARC files or (as of now) chunked transfer encoding

Ideally, API differences between FastWARC and warcio should be hidden away in methods in CCSparkJob or a derived class, so that users do not have to care about the differences, except for very specific cases. Because of the differences and the required compilation of C++ components, usage of FastWARC should be optional.

boto3 credentials error when running CCSparkJob with ~100 S3 warc paths as input, but works with <10 S3 warc paths as input

  • Created a spark job subclassing CCSparkJob to retrieve html text data. This job is working when passing input file with <10 S3 warc paths, but throwing below error when running with around 100 S3 warc paths. Could you please share your thoughts on what must be is causing this.
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/mnt3/yarn/usercache/hadoop/appcache/application_1654076989914_0005/container_1654076989914_0005_03_000005/pyspark.zip/pyspark/worker.py", line 619, in main
    process()
  File "/mnt3/yarn/usercache/hadoop/appcache/application_1654076989914_0005/container_1654076989914_0005_03_000005/pyspark.zip/pyspark/worker.py", line 611, in process
    serializer.dump_stream(out_iter, outfile)
  File "/mnt3/yarn/usercache/hadoop/appcache/application_1654076989914_0005/container_1654076989914_0005_03_000005/pyspark.zip/pyspark/serializers.py", line 259, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/mnt3/yarn/usercache/hadoop/appcache/application_1654076989914_0005/container_1654076989914_0005_03_000005/__pyfiles__/sparkcc.py", line 355, in process_warcs
    stream = self.fetch_warc(uri, self.args.input_base_url)
  File "/mnt3/yarn/usercache/hadoop/appcache/application_1654076989914_0005/container_1654076989914_0005_03_000005/__pyfiles__/sparkcc.py", line 290, in fetch_warc
    self.get_s3_client().download_fileobj(bucketname, path, warctemp)
  File "/usr/local/lib/python3.7/site-packages/boto3/s3/inject.py", line 795, in download_fileobj
    return future.result()
  File "/usr/local/lib/python3.7/site-packages/s3transfer/futures.py", line 103, in result
    return self._coordinator.result()
  File "/usr/local/lib/python3.7/site-packages/s3transfer/futures.py", line 266, in result
    raise self._exception
  File "/usr/local/lib/python3.7/site-packages/s3transfer/tasks.py", line 269, in _main
    self._submit(transfer_future=transfer_future, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/s3transfer/download.py", line 357, in _submit
    **transfer_future.meta.call_args.extra_args,
  File "/usr/local/lib/python3.7/site-packages/botocore/client.py", line 508, in _api_call
    return self._make_api_call(operation_name, kwargs)
  File "/usr/local/lib/python3.7/site-packages/botocore/client.py", line 895, in _make_api_call
    operation_model, request_dict, request_context
  File "/usr/local/lib/python3.7/site-packages/botocore/client.py", line 917, in _make_request
    return self._endpoint.make_request(operation_model, request_dict)
  File "/usr/local/lib/python3.7/site-packages/botocore/endpoint.py", line 116, in make_request
    return self._send_request(request_dict, operation_model)
  File "/usr/local/lib/python3.7/site-packages/botocore/endpoint.py", line 195, in _send_request
    request = self.create_request(request_dict, operation_model)
  File "/usr/local/lib/python3.7/site-packages/botocore/endpoint.py", line 134, in create_request
    operation_name=operation_model.name,
  File "/usr/local/lib/python3.7/site-packages/botocore/hooks.py", line 412, in emit
    return self._emitter.emit(aliased_event_name, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/botocore/hooks.py", line 256, in emit
    return self._emit(event_name, kwargs)
  File "/usr/local/lib/python3.7/site-packages/botocore/hooks.py", line 239, in _emit
    response = handler(**kwargs)
  File "/usr/local/lib/python3.7/site-packages/botocore/signers.py", line 103, in handler
    return self.sign(operation_name, request)
  File "/usr/local/lib/python3.7/site-packages/botocore/signers.py", line 187, in sign
    auth.add_auth(request)
  File "/usr/local/lib/python3.7/site-packages/botocore/auth.py", line 407, in add_auth
    raise NoCredentialsError()
botocore.exceptions.NoCredentialsError: Unable to locate credentials

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:545)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:703)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:685)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:498)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:35)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hasNext(Unknown Source)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:954)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:287)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$16(FileFormatWriter.scala:230)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:133)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1474)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

How To: process CC NEWS warc files, most recent first

This is more a 'how would I do...' question than an issue report.

I am processing the commoncrawl news WARC files with multiple python processes on a Kubernetes cluster. I am in progress of implementing cc-pyspark instead, to improve the parallelism.

Now, my main issue is that the cluster should process the most recent WARC file first, continuously released every hour (?), and only process older files when there is no new file pending.
Both my own setup and - as far as I know - cc-pyspark will only work on a specific set of WARC files. For instance, I can store a list of all 14183 news warc files in a DataFrame to be dispatched to all executors, but I can not dynamically add the new WARC new files first in that DataFrame.

One idea was to use Spark Structured Streaming with two queues, where the list of existing files are added in one queueStream and the new incoming files are appended to the second queueStream. That second one then needs to be consumed first, so has higher priority.
Not sure if that's possible with standard py-spark, or Spark/Scala.

Is this something you (or any cc-pyspark user) has encountered? I'm looking for directions, but working code is also fine. :)
Thanks.

Incompatible Architecture

I am using a 2021 iMac with the Apple M1 chip and macOS Monterey 12.4.

So far to set up PySpark I have pip3 installed pyspark, plus cloned this repo and installed from the requirements.txt file, plus downloaded Java from their homepage. I'm using Python 3.8.9.

I added the path to the pip3 installation of pyspark to SPARK_HOME in my .zshrc and sourced it:

% echo $SPARK_HOME
/Users/julius/Library/Python/3.8/lib/python/site-packages/pyspark

I then executed the following command:

$SPARK_HOME/bin/spark-submit ./server_count.py \
	--num_output_partitions 1 --log_level WARN \
	./input/test_warc.txt servernames

I had to execute this from inside the cc-pyspark repo, otherwise the script could not find the program server_count.py.

It returns this error message:

julius@Juliuss-iMac cc-pyspark % $SPARK_HOME/bin/spark-submit ./server_count.py \
        --num_output_partitions 1 --log_level WARN \
        ./input/test_warc.txt servernames
Traceback (most recent call last):
  File "/Users/julius/cc-pyspark/server_count.py", line 1, in <module>
    import ujson as json
ImportError: dlopen(/Users/julius/Library/Python/3.8/lib/python/site-packages/ujson.cpython-38-darwin.so, 0x0002): tried: '/Users/julius/Library/Python/3.8/lib/python/site-packages/ujson.cpython-38-darwin.so' (mach-o file, but is an incompatible architecture (have 'arm64', need 'x86_64'))
22/07/06 15:04:13 INFO ShutdownHookManager: Shutdown hook called
22/07/06 15:04:13 INFO ShutdownHookManager: Deleting directory /private/var/folders/xv/yzpjb77s2qg14px8dc7g4m_80000gn/T/spark-80c476e9-b5ba-4710-b292-e367dd387ece

There's something wrong with my installation of "ujson", it is for arm, but PySpark is designed for x86? Is that correct?

What is the simplest way to fix this issue? Should I try to run PySpark in some kind of x86 emulation like Rosetta? Has PySpark not been designed for the M1 Chip?

Is there a chance this is the fault of my Java installation? I took the first one offered; it seemed to say x86, but when I tested running PySpark on its own, it seemed to work fine.

Thanks very much

Commands to execute python files?

It would have been helpful, if there were some command examples for each .py files.
Or am I not finding those?
For now, I need to read every line of codes to understand the examples.
Still, I appreciate the examples, it would be much harder without the examples.

Test and update examples to work with ARC files of the 2008 - 2012 crawls

warcio is able to read ARC files as well, so it should be possible to run all examples designed to work on WARC files also on ARC files from the 2008 - 2012 crawls.

  • needs to be tested whether the WARC examples can be run successfully on ARC files, minor modifications may be necessary
  • could extend some WAT/WET examples so that they can be used also on WARC/ARC (cf. server_count.py which may use WARC or WAT)

Webgraph construction: include nodes with zero outgoing links

Pages (or hosts) without outgoing links are lost during webgraph construction:

  • during the first step (wat_extract_links.py) the graph is represented as list of edges <url_from, url_to> resp. <host_from, host_to>. Pages/hosts without outgoing links need to be represented as self-loops in order not to lose them.
  • optionally the second step (hostlinks_to_graph.py) could remove the self-loops but keep the isolated vertices in the <vertex_id, label> mapping. Note: self-loops are often ignored or must even be eliminated when ranking nodes by centrality metrics.

Common Crawl Index Table - Need for Schema Merging to be documented

Hi,

if some needs to load the full index table (parquet file) into pyspark with all latest fields, there is a need for setting the Spark property spark.sql.parquet.mergeSchema to "true" or use the following

df = spark.read.option("mergeSchema", "true").parquet('s3://commoncrawl/cc-index/table/cc-main/warc/')

Without this, fields that were added at a later stage like content_languagues are not loaded in the spark dataframe.

Maybe we could also provide the complete schema to Spark, so that there is
no need to extract the schema initially from (one of) the Parquet files.

Thanks

Document dependency of CCIndexSparkJob to Java S3 file system libs

(reported by @calee88 in #12)

While CCSparkJob uses the boto3 to read WARC/WAT/WET files, CCIndexSparkJob requires that the Spark installation includes libs/jars to access data on S3 (s3://commoncrawl/). These are usually provided when a Spark is used in a Hadoop cluster (eg. EMR, Spark on Yarn) but may not for any Spark package esp. when running Spark locally (not in a Hadoop cluster). Also add information about

  • Hadoop S3 FilesSystem implementations requiring to adapt the schema part of the data URI (s3:// on EMR, s3a:// when using s3a)
  • accessing data anonymously (s3a.AnonymousAWSCredentialsProvider).

Drop support for Python 2.7

Spark has dropped support for Python 2.7 (3.4 and 3.5), see SPARK-32138. The latest Spark versions supporting Python 2.7 were released summer 2021 (2.4.8 and 3.0.3). It's time for cc-pyspark to also drop support for older Python versions. That's also a requirement for the FastWARC parser #37.

Looks like ccspark tried to access everything from local file. What's wrong with the settings?

spark-3.3.2-bin-hadoop3/bin/spark-submit ./server_count. --num_output_partitions 1 --log_level WARN ./input/wat.gz servernames

23/02/18 09:20:39 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.0.41, 56238, None)
2023-02-18 09:20:52,155 INFO CountServers: Reading local file WARC/1.0
2023-02-18 09:20:52,156 ERROR CountServers: Failed to open /Users/joe/cc-pyspark/WARC/1.0: [Errno 2] No such file or directory: '/Users/joe/cc-pyspark/WARC/1.0'
2023-02-18 09:20:52,157 INFO CountServers: Reading local file WARC-Type: warcinfo
2023-02-18 09:20:52,158 ERROR CountServers: Failed to open /Users/joe/cc-pyspark/WARC-Type: warcinfo: [Errno 2] No such file or directory: '/Users/joe/cc-pyspark/WARC-Type: warcinfo'
2023-02-18 09:20:52,158 INFO CountServers: Reading local file WARC-Date: 2017-04-01T22:37:17Z
2023-02-18 09:20:52,159 ERROR CountServers: Failed to open /Users/joe/cc-pyspark/WARC-Date: 2017-04-01T22:37:17Z: [Errno 2] No such file or directory: '/Users/joe/cc-pyspark/WARC-Date: 2017-04-01T22:37:17Z'
2023-02-18 09:20:52,160 INFO CountServers: Reading local file WARC-Filename: CC-MAIN-20170322212946-00000-ip-10-233-31-227.ec2.internal.warc.gz
2023-02-18 09:20:52,161 ERROR CountServers: Failed to open /Users/joe/cc-pyspark/WARC-Filename: CC-MAIN-20170322212946-00000-ip-10-233-31-227.ec2.internal.warc.gz: [Errno 2] No such file or directory: '/Users/joe/cc-pyspark/WARC-Filename: CC-MAIN-20170322212946-00000-ip-10-233-31-227.ec2.internal.warc.gz'
2023-02-18 09:20:52,163 INFO CountServers: Reading local file WARC-Record-ID: urn:uuid:55d1a532-f91b-4461-b803-9bfc77efa410

Bad Substitution

Alternatively, running get-data.sh downloads the sample data.

% ./get-data.sh
./get-data.sh: line 20: Downloading Common Crawl paths listings (${data_type^^} files of $CRAWL)...: bad substitution

Could anyone explain why my attempts at running this script result in this message?

Thank you

Can not run server_count example on Windows locally

I tried to call:

$SPARK_HOME/bin/spark-submit ./server_count.py \
	--num_output_partitions 1 --log_level WARN \
	./input/test_warc.txt servernames

But getting error:

py4j.protocol.Py4JJavaError: An error occurred while calling o55.saveAsTable

I installed Hadoop 3.0.0 from here https://github.com/steveloughran/winutils.

I am calling under Windows 7, Python 3.6.6 64 bit, Java 8.

Full log is:

21/03/16 14:49:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/03/16 14:49:08 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes
Traceback (most recent call last):
File "server_count.py", line 46, in
job.run()
File "C:\Users\FA.PROJECTOR-MSK\Google Диск\Colab Notebooks\Finance\cc-pyspark\sparkcc.py", line 152, in run
self.run_job(sc, sqlc)
File "C:\Users\FA.PROJECTOR-MSK\Google Диск\Colab Notebooks\Finance\cc-pyspark\sparkcc.py", line 187, in run_job
.saveAsTable(self.args.output)
File "C:\Program Files (x86)\Microsoft Visual Studio\Shared\Python36_64\lib\site-packages\pyspark\sql\readwriter.py", line 1158, in saveAsTable
self._jwrite.saveAsTable(name)
File "C:\Program Files (x86)\Microsoft Visual Studio\Shared\Python36_64\lib\site-packages\py4j\java_gateway.py", line 1305, in call
answer, self.gateway_client, self.target_id, self.name)
File "C:\Program Files (x86)\Microsoft Visual Studio\Shared\Python36_64\lib\site-packages\pyspark\sql\utils.py", line 111, in deco
return f(*a, **kw)
File "C:\Program Files (x86)\Microsoft Visual Studio\Shared\Python36_64\lib\site-packages\py4j\protocol.py", line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o56.saveAsTable.
: java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:645)
at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1230)
at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1435)
at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:493)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1868)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1910)
at org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:678)
at org.apache.spark.sql.catalyst.catalog.SessionCatalog.validateTableLocation(SessionCatalog.scala:356)
at org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand.run(createDataSourceTables.scala:170)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:131)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:132)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:131)
at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
at org.apache.spark.sql.DataFrameWriter.createTable(DataFrameWriter.scala:753)
at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:731)
at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:626)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Unknown Source)

Host link extraction does not represent every IDN as IDNA

The host link extractor (ExtractHostLinksJob in wat_extract_links.py) should represent every hostname which contains non-ASCII Unicode characters (internationalized domain names - IDN) as IDNA (the Puncode ASCII representation of the hostname). The check whether a hostname requires conversion to its IDNA equivalent is not complete. Examples of such hostnames from cc-main-2021-22-oct-nov-jan-host :

app.ıo
br.org.poſtgres
br.org.poſtgresql
top.oynayamıyor

These hostnames include very specific non-ASCII characters for which specific case-folding rules apply:

'poſtgres'.upper()  # 'POSTGRES'
'ıo'.upper().lower() # 'io'

Processing English only archives

First off, thanks for the great work on this data set.

I'd like to process archives that are English only. I saw that the columnar url index contained content_languages information (https://commoncrawl.s3.amazonaws.com/cc-index/table/cc-main/index.html). However, it seems that the index is for WARC files (though I understand that WET files are derived from WARC files). Thus, my initial plan was the following:

  1. Use AWS Athena to select rows for a single month partition where content_languages = "eng" and save this as a separate table in my own s3 bucket.
CREATE TABLE "ccindex-2020-10-eng"
WITH (
  format = 'Parquet',
  external_location = 's3://my-bucket/ccindex-2020-10-eng',
)
AS SELECT url, warc_filename, warc_record_offset, warc_record_length
FROM "ccindex"."ccindex"
WHERE crawl = 'CC-MAIN-2020-10'
  AND subset = 'warc'
  AND content_languages = 'eng';
  1. Use cc_index_url_match.py to process subset of WARC archives that are English only and use a regular expression to match for URLs in the document content that are camel case (e.g. www.ThisIsATest.com). The code is pretty much the same as cc-pyspark/cc_index_word_count.py with the exception of a few minor tweaks:
  • Remove the .persist() on the input index (https://github.com/commoncrawl/cc-pyspark/blob/master/sparkcc.py#L329) since the index this time around is fairly large (>988 million rows)
  • Change output_schema to just be a single key (the matched string found in document)
  • Call .distinct() on RDD following .mapPartitions to remove duplicates
  • Change regular expression
  1. Run the Spark job (locally or on AWS EMR)
    Local version (for debugging)
export SPARK_HOME=/some/path/to/spark-2.4.4-bin-hadoop2.7

$SPARK_HOME/bin/spark-submit \
  --packages com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.3 \
  ./cc_index_word_count.py \
  --query "SELECT url, warc_filename, warc_record_offset, warc_record_length FROM ccindex" \
  s3a://my-bucket/ccindex-2020-10-eng/ \
  output_table_name

I've also tried to set up the job to run on AWS EMR. The configuration I use on EMR is:
Hardware:
1 master = m5.xlarge (4 vCore, 16 GiB memory)
5 core = c5.4xlarge (16 vCore, 32 GiB memory)

Spark step:

spark-submit \
  --deploy-mode cluster \
  --packages com.amazonaws:aws-java-sdk-pom:1.10.34,org.apache.hadoop:hadoop-aws:2.7.2 \
  --py-files s3://my-bucket/sparkcc.py \
  s3://my-bucket/cc_index_url_match.py \
  --output_option path=s3://my-output-bucket/cc-main-2020-10-eng \
  --output_compression snappy --num_output_partitions 100 \
  --query "SELECT url, warc_filename, warc_record_offset, warc_record_length FROM ccindex"\
  s3a://my-bucket/ccindex-2020-10-eng/ \
  output_table_name

I use --output_option path=s3://my-output-bucket/cc-main-2020-10-eng to save the output table to my designated s3 bucket. However, this EMR job had numerous issues. The job has been running for >16 hours now, and it seems that the task metrics aren't being reported correctly since all tasks for the saveAsTable are marked as pending and no executors are shown to have any active tasks. It seems like it's stuck with all pending stages and no active stages For more details, eventLogs-application_1590106415063_0001-1.zip.
As a sanity check, I used the exact same setup with a smaller version of the index with only 10 rows and it worked fine.

To summarize:

  • Is it just a bad idea to try to use an input index with this many rows?
  • Is it possible to just use WET files somehow?
    (e.g. process WET files without worrying about content_languages and just produce Row(match_url="www.ThisIsATest.com", target_uri="{record.rec_headers.get_header('WARC-Target-URI')}") then JOIN this later with the index table on target_uri = ccindex-2020-10-eng.url)
  • What is an efficient way to achieve the end result of processing English only content (w/o destroying my wallet)?

Thanks for your time.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.