- Build docker image
make image
- Run docker
docker run --rm -p 8888:8888 -p 4040:4040 -v mountPoint:/home/jovyan/work balis/openaq-streamer
-
Create an AWS SQS queue and subscribe to SNS topic
arn:aws:sns:us-east-1:470049585876:OPENAQ_NEW_MEASUREMENT
. -
To reduce the volume of data, go to the created SNS subscription and set up a filter policy, e.g.:
{
"country": [
"PL"
]
}
- Run spark structured streaming query. Python example:
# read AWS credentials from the 'credentials' file (format as generated by 'aws configure')
# Note: the file needs to be copied to the directory mounted in the Docker container
# Alternatively, environment variables can be used via 'docker --env AWS_ACCESS_KEY=...'
import configparser
config = configparser.ConfigParser()
cfg = config.read('credentials')
access_key=config.get('default', 'aws_access_key_id')
secret_key=config.get('default', 'aws_secret_access_key')
session_token=config.get('default', 'aws_session_token')
from pyspark.sql import SparkSession
spark = SparkSession\
.builder\
.config("spark.sql.streaming.schemaInference", True)\
.getOrCreate()
stream = spark\
.readStream\
.format("sqs")\
.option("queueUrl", "https://sqs.us-east-1.amazonaws.com/...")\ # insert your SQS queue URL
.option("accessKey", access_key)\
.option("secretKey", secret_key)\
.option("sessionToken", session_token)\
.option("region", "us-east-1")\
.load()
stream.select("city", "parameter", "value", "date.local").writeStream\
.format("console")\
.outputMode("append")\
.start()