Comments (1)
Adding dump of replication Flink code here:
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.flink.example</groupId>
<artifactId>beam</artifactId>
<version>1.0.0-SNAPSHOT</version>
<name>Apache Flink Beam Application</name>
<packaging>jar</packaging>
<properties>
<flink.version>1.15.2</flink.version>
<logback.version>1.4.14</logback.version>
<main-class>org.apache.flink.example.BeamApplication</main-class>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>commons-math3</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-flink-1.15</artifactId>
<version>2.56.0</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-amazon-web-services2</artifactId>
<version>2.56.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>11</source>
<target>11</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:flink-core</exclude>
<exclude>org.apache.flink:flink-annotations</exclude>
<exclude>org.apache.flink:flink-metrics-core</exclude>
<exclude>org.apache.flink:flink-shaded-*</exclude>
<exclude>org.apache.flink:flink-table-*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>${main-class}</mainClass>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Java class
package org.apache.flink.example;
import io.opentelemetry.instrumentation.annotations.WithSpan;
import lombok.Data;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.FlinkRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.aws2.common.ClientConfiguration;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisIO;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisPartitioner;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisRecord;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonPropertyOrder;
import org.joda.time.Duration;
import software.amazon.awssdk.regions.Region;
import software.amazon.kinesis.common.InitialPositionInStream;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
public class BeamApplication {
@Data
@JsonPropertyOrder({"timestamp", "location", "quantity"})
public static final class Event {
private Instant timestamp;
private String location;
private long quantity;
}
@WithSpan
public static void main(final String... args) throws Exception {
FlinkPipelineOptions options = PipelineOptionsFactory.create().as(FlinkPipelineOptions.class);
options.setRunner(FlinkRunner.class);
options.setAttachedMode(false);
Pipeline pipeline = Pipeline.create(options);
pipeline.apply("KDS Source", KinesisIO.read()
.withStreamName("ExampleInputStream")
.withInitialPositionInStream(InitialPositionInStream.AT_TIMESTAMP)
.withInitialTimestampInStream(org.joda.time.Instant.now().minus(Duration.standardMinutes(30)))
.withClientConfiguration(ClientConfiguration.builder()
.region(Region.US_EAST_1)
.build()))
.apply(ParDo.of(new DoFn<KinesisRecord, String>() {
@ProcessElement
public void processElement(@Element KinesisRecord record, OutputReceiver<String> out) {
System.out.println(record.toString());
out.output(record.toString());
}
}))
.apply("KDS Sink", KinesisIO.<String>write()
.withStreamName("ExampleOutputStream")
.withClientConfiguration(ClientConfiguration.builder()
.region(Region.US_EAST_1)
.build())
.withSerializer((SerializableFunction<String, byte[]>) input -> input.getBytes(StandardCharsets.UTF_8))
.withPartitioner(KinesisPartitioner.explicitRandomPartitioner(1))
);
pipeline.run();
}
}
from beam.
Related Issues (20)
- [Failing Test]: Beam ML unit tests fail with Keras 3.3.3 and TF 2.16.1 HOT 2
- [Bug]: TextIO.read().withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW) still fails if no file is found HOT 4
- [Failing Test]: PreCommit YAML Xlang Direct fails on GHA HOT 2
- [Feature Request]: Include job name in GCS custom audit info
- [Bug]: windmillServiceCommitThreads option can lead to ConcurrentModificationException and stuck commits
- [Failing Test]: :sdks:python:test-suites:direct:py38:tensorflowInferenceTest fails in Python 3.8 postcommit suite
- [Bug]: Using WriteToBigQuery FILE_LOADS in a streaming pipeline does not remove temporary tables
- [Feature Request]: Manage GCS soft delete policy in temp location HOT 7
- [Bug]: JmsIOTests don't actually verify that the queue is empty HOT 1
- [Feature Request]: Enable BigQueryIO to support _CHANGE_SEQUENCE_NUMBER fixed hex string pseudo column
- [Bug][Prism]: panic: unknown coder urn key: beam:coder:nullable:v1
- [Bug][Prism]: panic: nothing in progress and no refreshes with non zero pending elements HOT 3
- [Bug][Prism]: panic: unsupported StateKey Get type: *fnexecution_v1.StateKey_MultimapKeysSideInput_
- Performance Regression or Improvement: gbk_python_batch_load_test_2gb_of_100KB_records:runtime
- Performance Regression or Improvement: sideinpts_python_batch_10gb_1kb_10workers_1000window_first_iterable:runtime
- The PreCommit Java Debezium IO Direct job is flaky
- [Bug]: Add a consistent schema format for TypedSchemaTransformProvider
- [Feature Request]: Enable withFormatRecordOnFailureFunction() equivalent for BigQuery STORAGE_WRITE_API
- [Failing Test]: Python Precommit failing due to RRIO test failures HOT 1
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from beam.