Giter VIP home page Giter VIP logo

rocketmq-streams's Introduction

RocketMQ Streams

Build Status CodeCov GitHub release License Average time to resolve an issue Percentage of issues still open Twitter Follow

RocketMQ Streams is a lightweight stream processing framework, application gains the stream processing ability by depending on RocketMQ Streams as an SDK.

It offers a variety of features:

  • Function:
    • One-to-one transform function, such as: filter, map, foreach
    • Aggregate function, such as: sum, min, max, count, aggregate
    • Generating function, such as: flatMap
  • Group by aggregate and window aggregate
  • Join stream
  • Custom serialization

Quick Start

This paragraph guides you running a stream processing with RocketMQ Streams.

Run RocketMQ 5.0 locally

RocketMQ quick-start

RocketMQ runs on all major operating systems and requires only a Java JDK version 8 or higher to be installed. To check, run java -version:

$ java -version
java version "1.8.0_121"

1) Download RocketMQ

wget https://archive.apache.org/dist/rocketmq/5.0.0/rocketmq-all-5.0.0-bin-release.zip

# Unpack the release
$ unzip rocketmq-all-5.0.0-bin-release.zip

$ cd rocketmq-all-5.0.0-bin-release/bin

2) Start NameServer

NameServer will be listening at 0.0.0.0:9876, make sure that the port is not used by others on the local machine, and then do as follows.

### start Name Server
$ nohup sh mqnamesrv &

### check whether Name Server is successfully started
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...

2) Start Broker

### start Broker
$ nohup sh bin/mqbroker -n localhost:9876 &

### check whether Broker is successfully started, eg: Broker's IP is 192.168.1.2, Broker's name is broker-a
$ tail -f ~/logs/rocketmqlogs/broker.log
The broker[broker-a, 192.169.1.2:10911] boot success...

Build stream processing application

1) Build application in IDE

2) Add RocketMQ Streams dependency

    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-streams</artifactId>
        <version>{current.version}</version>
    </dependency>

3) Build stream processing application

  • create topic in RocketMQ before start the stream processing.
sh bin/mqadmin updateTopic -c ${clusterName} -t ${topicName} -r 8 -w 8 -n 127.0.0.1:9876
NOTE: the default clusterName is DefaultCluster in this quick-start doc, changes it with your RocketMQ cluster.
  • add your stream processing code, The following is an example. more examples are here.
public static void main(String[] args) {
        StreamBuilder builder = new StreamBuilder("wordCount");

        builder.source("sourceTopic",  total -> {
                    String value = new String(total, StandardCharsets.UTF_8);
                    return new Pair<>(null, value);
                })
                .flatMap((ValueMapperAction<String, List<String>>) value -> {
                    String[] splits = value.toLowerCase().split("\\W+");
                    return Arrays.asList(splits);
                })
                .keyBy(value -> value)
                .count()
                .toRStream()
                .print();

        TopologyBuilder topologyBuilder = builder.build();

        Properties properties = new Properties();
        properties.put(MixAll.NAMESRV_ADDR_PROPERTY, "127.0.0.1:9876");

        RocketMQStream rocketMQStream = new RocketMQStream(topologyBuilder, properties);

        final CountDownLatch latch = new CountDownLatch(1);

        Runtime.getRuntime().addShutdownHook(new Thread("wordcount-shutdown-hook") {
            @Override
            public void run() {
                rocketMQStream.stop();
                latch.countDown();
            }
        });

        try {
            rocketMQStream.start();
            latch.await();
        } catch (final Throwable e) {
            System.exit(1);
        }
        System.exit(0);
    }

rocketmq-streams's People

Contributors

123liuziming avatar cyril68 avatar dependabot[bot] avatar duhenglucky avatar elenacliu avatar hyl-xidian avatar j-ching avatar jacekjanur avatar jargon9 avatar joecarf avatar laura-monkey avatar linfan avatar lizhiboo avatar ltamber avatar matrixhb avatar ni-ze avatar odbozhou avatar rocklau avatar rongtongjin avatar ronzl avatar selectbook avatar shannonding avatar shuozeli avatar speak2me avatar starmilkxin avatar totalo avatar vongosling avatar wesleyone avatar xstorm1 avatar yuanxiaodong avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

rocketmq-streams's Issues

Is FilterFunction return false indicate this message will be conducted?

AFAIK, FilterFunction return true indicate this message will be conducted in flink.

    public <O> DataStream filter(final FilterFunction<O> filterFunction) {
        StageBuilder mapUDFOperator = new StageBuilder() {

            @Override
            protected <T> T operate(IMessage message, AbstractContext context) {
                try {
                    boolean isFilter = filterFunction.filter((O)message.getMessageValue());
                    if (isFilter) {
                        context.breakExecute();
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
                return null;
            }
        };
        ChainStage stage = this.mainPipelineBuilder.createStage(mapUDFOperator);
        this.mainPipelineBuilder.setTopologyStages(currentChainStage, stage);
        return new DataStream(this.mainPipelineBuilder, this.otherPipelineBuilders, stage);
    }

使用文档

您好,请问有详细的使用文档吗?

BatchSize is not settable from JoinWindow

public abstract class AbstractSink extends BasedConfigurable implements ISink<AbstractSink>, ILifeCycle {

    private static final Log logger = LogFactory.getLog(AbstractSink.class);
    public static String TARGET_QUEUE = "target_queue";//指定发送queue
    public static final int DEFAULT_BATCH_SIZE = 3000;
    protected transient IMessageCache<IMessage> messageCache;
    protected volatile int batchSize = DEFAULT_BATCH_SIZE;

batchSize is not settable from JoinWindow, sometimes occur send message exceed 4M in RocketMQ

fix RemoteCheckpointTest bug

image

root reason is the sql not right:

Caused by: org.springframework.jdbc.CannotGetJdbcConnectionException: Could not get JDBC Connection; nested exception is com.mysql.jdbc.exceptions.jdbc4.MySQLNonTransientConnectionException: Could not create connection to database server.

#120

Pull Consumer integration

Compared with Push consumer, pull consumer more flexible in offset management, so It is recommended to use Pullconsumer instead of PushConsumer

There is no log output when rocketmq-streams running.

There is no rocketmq-streams log output right now, when a APP running base on rocketmq-streams. It is inconvenient for troubleshooting.
We can implement a inner log, which should not conflict with user's local log dependencies. A client log in rocketmq:
org.apache.rocketmq.client.log.ClientLogger

which solve this problem based on InnerLoggerFactory .

There are other log dependency, such as log4j, in rocketmq-streams, we also need remove it and only use one log dependency.

Support Aliyun SLS ?

I found some docs about how to use rocketmq-streams-sql with aliyun sls here , but there is no sls channel in this repositroy.

Are you using RocketMQ Streams?

Are you using RocketMQ Streams?

If you are using RocketMQ Streams, first we would like to Thank You. Here, we sincerely invite you to take a minute to feedback on your usage scenario.

The purpose of this issue

We are always interested in finding out who is using Streams, what attracted you to use it, how we can listen to your needs, and if you are interested, help promote your organization.

What we would like from you

Pls. submit a comment in this issue to include the following information:

  • your company, school, or organization
  • your country and city
  • your contact info, such as email, WeChat, and Twitter (optional).
  • usage scenario
  • expectations(optional)

You can refer to the following sample answer for the format:

* Organization: XX Company
* Location: Seoul, South Korea
* Contact: [email protected]
* Version: v1.0.0
* Status: production
* Expectations(optional): Data ingest service

Thanks again for your participation!
Apache RocketMQ Community

fromFile not exist cause queue NPE

plz close this issue if same as #90

org.apache.rocketmq.streams.examples.source.FileSourceExample

if fromFile is not exist, will throwing NPE as below.

截屏2022-01-30 13 02 00

java.lang.NullPointerException
	at org.apache.rocketmq.streams.common.channel.impl.file.FileSource.startSource(FileSource.java:90)
	at org.apache.rocketmq.streams.common.channel.source.AbstractSource.start(AbstractSource.java:114)
	at org.apache.rocketmq.streams.common.topology.ChainPipeline.startChannel(ChainPipeline.java:150)
	at org.apache.rocketmq.streams.client.transform.DataStream$7.run(DataStream.java:714)
	at java.lang.Thread.run(Thread.java:748)

how about fast-fail with throwing FileNotFoundException

双流join问题

1,JoinTest中的双流测试用例的文件是你们开发者本机的路径,我自己造了两个文件去做join,无法输出join后的结果
2,如果left Source和right Source都是from rocket mq,会有异常

[bug] rocketmq source can't work cause of no namesrv is set

public DataStream fromRocketmq(String topic, String groupName) {
return fromRocketmq(topic, groupName, null, false);
}
public DataStream fromRocketmq(String topic, String groupName, boolean isJson) {
return fromRocketmq(topic, groupName, null, isJson);
}
public DataStream fromRocketmq(String topic, String groupName, String tags, boolean isJson) {
RocketMQSource rocketMQSource = new RocketMQSource();
rocketMQSource.setTopic(topic);
rocketMQSource.setTags(tags);
rocketMQSource.setGroupName(groupName);
rocketMQSource.setJsonData(isJson);
this.mainPipelineBuilder.setSource(rocketMQSource);
return new DataStream(this.mainPipelineBuilder, this.otherPipelineBuilders, null);
}

RAT check failed

When running mvn -B clean apache-rat:check, it failed with some files missing license header.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.