Giter VIP home page Giter VIP logo

Comments (1)

hlteoh37 avatar hlteoh37 commented on July 18, 2024

Adding dump of replication Flink code here:

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xmlns="http://maven.apache.org/POM/4.0.0"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">

    <modelVersion>4.0.0</modelVersion>

    <groupId>org.apache.flink.example</groupId>
    <artifactId>beam</artifactId>
    <version>1.0.0-SNAPSHOT</version>
    <name>Apache Flink Beam Application</name>
    <packaging>jar</packaging>

    <properties>
        <flink.version>1.15.2</flink.version>
        <logback.version>1.4.14</logback.version>
        <main-class>org.apache.flink.example.BeamApplication</main-class>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.commons</groupId>
                    <artifactId>commons-math3</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-runners-flink-1.15</artifactId>
            <version>2.56.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-sdks-java-io-amazon-web-services2</artifactId>
            <version>2.56.0</version>
        </dependency>
    </dependencies>


    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.0</version>
                <configuration>
                    <source>11</source>
                    <target>11</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.0.0</version>
                <executions>
                    <!-- Run shade goal on package phase -->
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>org.apache.flink:flink-core</exclude>
                                    <exclude>org.apache.flink:flink-annotations</exclude>
                                    <exclude>org.apache.flink:flink-metrics-core</exclude>
                                    <exclude>org.apache.flink:flink-shaded-*</exclude>
                                    <exclude>org.apache.flink:flink-table-*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>${main-class}</mainClass>
                                </transformer>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

Java class

package org.apache.flink.example;


import io.opentelemetry.instrumentation.annotations.WithSpan;
import lombok.Data;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.FlinkRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.aws2.common.ClientConfiguration;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisIO;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisPartitioner;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisRecord;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonPropertyOrder;
import org.joda.time.Duration;
import software.amazon.awssdk.regions.Region;
import software.amazon.kinesis.common.InitialPositionInStream;

import java.nio.charset.StandardCharsets;
import java.time.Instant;

public class BeamApplication {

    @Data
    @JsonPropertyOrder({"timestamp", "location", "quantity"})
    public static final class Event {
        private Instant timestamp;
        private String location;
        private long quantity;
    }

    @WithSpan
    public static void main(final String... args) throws Exception {
        FlinkPipelineOptions options = PipelineOptionsFactory.create().as(FlinkPipelineOptions.class);
        options.setRunner(FlinkRunner.class);
        options.setAttachedMode(false);

        Pipeline pipeline = Pipeline.create(options);

        pipeline.apply("KDS Source", KinesisIO.read()
                .withStreamName("ExampleInputStream")
                        .withInitialPositionInStream(InitialPositionInStream.AT_TIMESTAMP)
                        .withInitialTimestampInStream(org.joda.time.Instant.now().minus(Duration.standardMinutes(30)))
                        .withClientConfiguration(ClientConfiguration.builder()
                                .region(Region.US_EAST_1)
                                .build()))
                .apply(ParDo.of(new DoFn<KinesisRecord, String>() {
                    @ProcessElement
                    public void processElement(@Element KinesisRecord record, OutputReceiver<String> out) {
                        System.out.println(record.toString());
                        out.output(record.toString());
                    }
                }))
                .apply("KDS Sink", KinesisIO.<String>write()
                        .withStreamName("ExampleOutputStream")
                        .withClientConfiguration(ClientConfiguration.builder()
                                .region(Region.US_EAST_1)
                                .build())
                        .withSerializer((SerializableFunction<String, byte[]>) input -> input.getBytes(StandardCharsets.UTF_8))
                        .withPartitioner(KinesisPartitioner.explicitRandomPartitioner(1))
                );


        pipeline.run();

    }
}

from beam.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.