Giter VIP home page Giter VIP logo

alibaba-rsocket-broker's People

Contributors

alibaba-oss avatar anuger avatar chenggangpro avatar fox1ck avatar he-pin avatar hero-zhanghao avatar jackie1in avatar jimichan avatar kevinten10 avatar linux-china avatar sawravchy avatar slowrookie avatar szihai avatar xiaoboey avatar yiyanghua avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

alibaba-rsocket-broker's Issues

服务路由性能提升

目前路由的算法是 group + service name + version,然后对其进行hashcode,然后在进行bitmap匹配查找。 可以考虑在调用方提前将路由信息进行HashCode化,然后基于Integer Hashcode进行bitmap匹配,这样只需要读取composite metadata的前8个字节(1 + 3 + 4)就可以完成路由匹配。 考虑到Hashcode各个语言的一致性性,还是采用 MurmurHash3 算法。

当Responder端的service未指定version时,Broker的控制台报错

1. Stack

	at com.alibaba.rsocket.broker.web.ui.ServicesView.lambda$services$0(ServicesView.java:50) ~[classes/:na]
	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) ~[na:1.8.0_231]
	at java.util.concurrent.ConcurrentHashMap$ValueSpliterator.forEachRemaining(ConcurrentHashMap.java:3566) ~[na:1.8.0_231]
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) ~[na:1.8.0_231]
	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) ~[na:1.8.0_231]
	at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) ~[na:1.8.0_231]
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[na:1.8.0_231]
	at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) ~[na:1.8.0_231]

支持Mon<ByteBuffer>避免反复序列化的问题

在不少场景中,我们调用远程服务, 获取资源对象,然后再对象输出。 如下述代码中,我们调用远程服务,获取用户,然后再将用户信息以REST API方式输出。 但是这里有序列化的问题: RPC网络调用有两次序列化和反序列化,然后REST API输出又再进行JSON序列化,代码如下:

    @GetMapping(value = "/user/{id}", produces = "application/json")
    public Mono<User> jsonBytes(@PathVariable Integer id) {
        return userService.findUserById(id);
    }

能否有一个机制,在服务端就输出对应的数据格式,如JSON,而通讯的过程中都不要涉及多次序列化的问题,而是将服务端输出的bytes直接输出给最终的调用者,代码如下:

    @GetMapping(value = "/user/{id}", produces = "application/json")
    public Mono<ByteBuffer> jsonBytes(@PathVariable Integer id) {
        return userService.findUserById(id);
    }

Kotlin Coroutines & Flow支持

目前RSocket Service接口支持Reactor,RxJava2, RxJava 3,还没有对Kolin Coroutines和 Flow的支持。 Kotlin Coroutines和Reactive都可以相互转换的,这里调研一下大家是否有在Kotlin? 是否有意愿使用Kotlin的原生接口?

interface UserService {
    suspend fun getAdmin(): String
    suspend fun getNickById(id: Int): String
    fun getAllNames(): Flow<String>
}

RSocket Broker Banner

目前使用Spring Boot默认的banner,调整为粉色的 Alibaba RSocket Broker

手动执行 Example 代码长时间未操作后再次调用服务出现 500 错误

代码为 Example 中的代码,按照文档步骤操作后访问正常,
过了六小时未操作后再次调用接口服务器会报 500 异常,手动将 Broker、Respones、Request 重启后正常访问

2019-12-11 10:27:20.625  INFO 97481 --- [           main] o.s.b.web.embedded.netty.NettyWebServer  : Netty started on port(s): 8181
2019-12-11 10:27:20.628  INFO 97481 --- [           main] c.a.s.b.r.demo.RSocketRequesterApp       : Started RSocketRequesterApp in 1.702 seconds (JVM running for 2.302)
io.rsocket.exceptions.ConnectionErrorException: No keep-alive acks for 65000 ms
	at io.rsocket.RSocketRequester.terminate(RSocketRequester.java:115)
	at io.rsocket.keepalive.KeepAliveSupport.tryTimeout(KeepAliveSupport.java:110)
	at io.rsocket.keepalive.KeepAliveSupport$ClientKeepAliveSupport.onIntervalTick(KeepAliveSupport.java:146)
	at io.rsocket.keepalive.KeepAliveSupport.lambda$start$0(KeepAliveSupport.java:54)
	at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160)
	at reactor.core.publisher.FluxInterval$IntervalRunnable.run(FluxInterval.java:123)
	at reactor.core.scheduler.PeriodicWorkerTask.call(PeriodicWorkerTask.java:59)
	at reactor.core.scheduler.PeriodicWorkerTask.run(PeriodicWorkerTask.java:73)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
2019-12-11 16:14:48.833 ERROR 97481 --- [ctor-http-nio-4] a.w.r.e.AbstractErrorWebExceptionHandler : [fc5bd7d9]  500 Server Error for HTTP GET "/user/1"

io.rsocket.exceptions.ConnectionErrorException: No keep-alive acks for 65000 ms
	at io.rsocket.RSocketRequester.terminate(RSocketRequester.java:115) ~[rsocket-core-1.0.0-RC5.jar:na]
	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):
	|_ checkpoint ⇢ Handler com.alibaba.spring.boot.rsocket.demo.PortalController#user(Integer) [DispatcherHandler]
	|_ checkpoint ⇢ org.springframework.boot.actuate.metrics.web.reactive.server.MetricsWebFilter [DefaultWebFilterChain]
	|_ checkpoint ⇢ HTTP GET "/user/1" [ExceptionHandlingWebHandler]
Stack trace:
		at io.rsocket.RSocketRequester.terminate(RSocketRequester.java:115) ~[rsocket-core-1.0.0-RC5.jar:na]
		at io.rsocket.keepalive.KeepAliveSupport.tryTimeout(KeepAliveSupport.java:110) ~[rsocket-core-1.0.0-RC5.jar:na]
		at io.rsocket.keepalive.KeepAliveSupport$ClientKeepAliveSupport.onIntervalTick(KeepAliveSupport.java:146) ~[rsocket-core-1.0.0-RC5.jar:na]
		at io.rsocket.keepalive.KeepAliveSupport.lambda$start$0(KeepAliveSupport.java:54) ~[rsocket-core-1.0.0-RC5.jar:na]
		at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160) ~[reactor-core-3.3.1.RELEASE.jar:3.3.1.RELEASE]
		at reactor.core.publisher.FluxInterval$IntervalRunnable.run(FluxInterval.java:123) ~[reactor-core-3.3.1.RELEASE.jar:3.3.1.RELEASE]
		at reactor.core.scheduler.PeriodicWorkerTask.call(PeriodicWorkerTask.java:59) ~[reactor-core-3.3.1.RELEASE.jar:3.3.1.RELEASE]
		at reactor.core.scheduler.PeriodicWorkerTask.run(PeriodicWorkerTask.java:73) ~[reactor-core-3.3.1.RELEASE.jar:3.3.1.RELEASE]
		at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_212]
		at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) ~[na:1.8.0_212]
		at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) ~[na:1.8.0_212]
		at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) ~[na:1.8.0_212]
		at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_212]
		at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:1.8.0_212]
		at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_212]

RSocket HTTP Gateway的调整

考虑到开发和部署的便捷性,RSocket Broker Server内置支持RSocket Service的HTTP访问,格式如下,请注意 "/api" 前缀。

POST http://127.0.0.1:9998/api/com.alibaba.user.UserService/findById
Authorization: Bearer jwt_token
Content-Type: application/json

[
  1
]

这里有一个小问题: Broker Console目前是用Vaadin开发,而Vaadin现在还不能支持WebFlux,所以RSocket REST API并不能很好地利用WebFlux的Reactive特性,性能和thread模型还是Spring MVC的。
Vaadin的reactive支持要等待这里: vaadin/spring#565

而alibaba-broker-http-gateway还继续保留,这个是完全Reactive的,同时方便开发者对其他协议进行集成,如gRPC, Dubbo等。 如果大家有gRPC -> RSocket的需求,我们会考虑在http gateway中添加gRPC的集成。

如果大家有更好的建议,欢迎留言。

Hessian序列化增加Java 8数据类型支持,同时兼容Dubbo Hessian Lite

Hessian序列化增加Java 8数据类型支持,如Java time类型,Optional支持。

兼容Dubbo Hessian Lite,相关的代码来自Dubbo Hessian Lite,去除Java反射逻辑,RSocket支持Java 8+版本,不需要使用反射来支持Java 8数据类型。

实现逻辑: META-INF/hessian/serializers添加对应的Serializer类。

其他类型支持,如Joda time, 需要自己添加对应的Serializer类。

Java Proxy机制调整

目前RSocket Service是基于Java Interface的,所以需要通过Proxy机制代理进行RSocket接口调用。
对比JDK Proxy, Javassist 和 Byte Buddy 发现,ByteBuddy的性能最高。 一个简单的性能测试数据如下,而且ByteBuddy对Java Interface的default method处理也比较友好。

ByteBuddyProxyTest.testOperation   thrpt    2  1391028488.716         ops/s
JavassistProxyTest.testOperation   thrpt    2  582515791.244          ops/s
JdkProxyTest.testOperation         thrpt    2  233792969.032          ops/s

所以Java Proxy代理机制调整到ByteBuddy上。

Scalecube-cluster支持broker到broker的rpc调用

在某些情况下,需要支持broker到broker直接的RPC调用,例如访问broker的一些配置等,目前一个场景就是broker要支持外部应用应用接入,应用无法直接访问broker的内部IP,如office的机器访问云vpc内部的rsocker broker集群,这个时候broker要有两个ip,所以对外部应用推送就需要走外部IP或者域名推送,内部应用则走内部IP推送。

解决思路:

  • 添加json rpc支持,这个比较简单一些,scalecube-cluster已经支持request/response,添加一个规范就可以。
  • 添加自定义的scalecube-jackson-codec支持,支持broker直接的对象序列化,目前主要要支持CloudEvents的JSON序列化

Broker RPC特性可以方便后续对broker的功能的扩展,broker之间通讯也比较简洁。

alibaba-rsocket-spring-boot-starter-1.0.0.M1问题反馈

  1. 自动化配置RSocketListenerAutoConfiguration中,当properties中没有配置rsocket.port时,以下表达式会返回true,导致应用启动了监听处理逻辑。建议改为 ${rsocket.port:0}!=0
@Configuration
@ConditionalOnExpression("'${rsocket.port}'!='0'")
public class RSocketListenerAutoConfiguration {
  1. 没有生成spring-autoconfigure-metadata.properties文件导致RSocketAutoConfiguration中@ConditionalOnClass加载了类而报错
    @Bean
    @ConditionalOnClass(PrometheusMeterRegistry.class)
    public MetricsService metricsService(PrometheusMeterRegistry meterRegistry) {
        return new MetricsServicePrometheusImpl(meterRegistry);
    }

文档建议添加如下依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-autoconfigure-processor</artifactId>
            <optional>true</optional>
        </dependency>

参考文档
报错信息如下

java.lang.IllegalStateException: Failed to introspect Class [com.alibaba.spring.boot.rsocket.RSocketAutoConfiguration] from ClassLoader [sun.misc.Launcher$AppClassLoader@18b4aac2]
	at org.springframework.util.ReflectionUtils.getDeclaredMethods(ReflectionUtils.java:481) ~[spring-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
	at org.springframework.util.ReflectionUtils.doWithMethods(ReflectionUtils.java:358) ~[spring-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
	at org.springframework.util.ReflectionUtils.getUniqueDeclaredMethods(ReflectionUtils.java:414) ~[spring-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.lambda$getTypeForFactoryMethod$2(AbstractAutowireCapableBeanFactory.java:743) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
	at java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660) ~[na:1.8.0_241]
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.getTypeForFactoryMethod(AbstractAutowireCapableBeanFactory.java:742) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.determineTargetType(AbstractAutowireCapableBeanFactory.java:681) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.predictBeanType(AbstractAutowireCapableBeanFactory.java:649) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
	at org.springframework.beans.factory.support.AbstractBeanFactory.isFactoryBean(AbstractBeanFactory.java:1605) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
	at org.springframework.beans.factory.support.DefaultListableBeanFactory.doGetBeanNamesForType(DefaultListableBeanFactory.java:523) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
	at org.springframework.beans.factory.support.DefaultListableBeanFactory.getBeanNamesForType(DefaultListableBeanFactory.java:494) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
	at org.springframework.beans.factory.support.DefaultListableBeanFactory.getBeansOfType(DefaultListableBeanFactory.java:616) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
	at org.springframework.beans.factory.support.DefaultListableBeanFactory.getBeansOfType(DefaultListableBeanFactory.java:608) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
	at org.springframework.context.support.AbstractApplicationContext.getBeansOfType(AbstractApplicationContext.java:1242) ~[spring-context-5.2.5.RELEASE.jar:5.2.5.RELEASE]
	at org.springframework.boot.SpringApplication.getExitCodeFromMappedException(SpringApplication.java:880) [spring-boot-2.3.0.M4.jar:2.3.0.M4]
	at org.springframework.boot.SpringApplication.getExitCodeFromException(SpringApplication.java:868) [spring-boot-2.3.0.M4.jar:2.3.0.M4]
	at org.springframework.boot.SpringApplication.handleExitCode(SpringApplication.java:855) [spring-boot-2.3.0.M4.jar:2.3.0.M4]
	at org.springframework.boot.SpringApplication.handleRunFailure(SpringApplication.java:806) [spring-boot-2.3.0.M4.jar:2.3.0.M4]
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:325) [spring-boot-2.3.0.M4.jar:2.3.0.M4]
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1237) [spring-boot-2.3.0.M4.jar:2.3.0.M4]
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226) [spring-boot-2.3.0.M4.jar:2.3.0.M4]
	at com.example.spring.webflux.SpringWebfluxDemoApplication.main(SpringWebfluxDemoApplication.java:29) [classes/:na]
Caused by: java.lang.NoClassDefFoundError: io/micrometer/prometheus/PrometheusMeterRegistry
	at java.lang.Class.getDeclaredMethods0(Native Method) ~[na:1.8.0_241]
	at java.lang.Class.privateGetDeclaredMethods(Class.java:2701) ~[na:1.8.0_241]
	at java.lang.Class.getDeclaredMethods(Class.java:1975) ~[na:1.8.0_241]
	at org.springframework.util.ReflectionUtils.getDeclaredMethods(ReflectionUtils.java:463) ~[spring-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
	... 21 common frames omitted
Caused by: java.lang.ClassNotFoundException: io.micrometer.prometheus.PrometheusMeterRegistry
	at java.net.URLClassLoader.findClass(URLClassLoader.java:382) ~[na:1.8.0_241]
	at java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[na:1.8.0_241]
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355) ~[na:1.8.0_241]
	at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ~[na:1.8.0_241]
	... 25 common frames omitted

Disconnected from the target VM, address: '127.0.0.1:1064', transport: 'socket'

Process finished with exit code 1

Alibaba RSocket Broker 1.0.0.M2

准备1.0.0.M2发布:

  • RSocket
    • RSocket Java SDK 1.0.0
    • Spring Boot 2.3.0
  • Development
    • Performance testing for memory leak
  • Deployment
    • Maven Central Deployment
  • Documents
    • M2 Release Note

Prometheus Service Discovery支持

目前RSocket Broker已经提供对各个应用的Prometheus的采集支持,主要是通过MetricsService这个RSocket服务完成的。 而且提供了MetricsScrapeController可以输出Prometheus的格式。

实现一个基于RSocket Broker的Prometheus Service Discovery,完成对接入到RSocket Broker的所有应用进行Prometheus的metrics抓取。

https://prometheus.io/blog/2018/07/05/implementing-custom-sd/

https://github.com/prometheus/prometheus/tree/master/discovery

添加Travis CI支持

使用Travis CI进行持续集成,同时在README.md添加build status图标。

编译错误:npm ERR! Unexpected end of JSON input while parsing near '...2oYSQ4\nrAd/cAHSAmngL'

[INFO] Added 40 dependencies to 'D:\IdeaProjects\alibaba-rsocket-broker\alibaba-broker-server\target\frontend\package.json'
[INFO] Updated npm D:\IdeaProjects\alibaba-rsocket-broker\alibaba-broker-server\target\frontend\package.json.
[INFO] Running npm install ...
npm ERR! Unexpected end of JSON input while parsing near '...2oYSQ4\nrAd/cAHSAmngL'

npm ERR! A complete log of this run can be found in:
npm ERR! E:\Program Files\nodejs\node_cache_logs\2019-12-16T05_49_17_435Z-debug.log
[ERROR] >>> Dependency ERROR. Check that all required dependencies are deployed in npm repositories.

maven 版本 :3.63
nodejs 版本:v12.13.1

是不是我本地网络比较慢,包下载的不完整?

为RSocketRemoteServiceBuilder添加Interceptor支持

RSocket默认支持Interceptor机制,你可以通过ClientRSocketFactory.addRequesterPlugin() 添加你自定义的RSocketInterceptor,但是这些interceptor相对比较低级一些,如果你要获取具体的信息还需要进行ByteBuf解析,尤其是metadata。 如目前的Zipkin Trace实现,实现相对麻烦一些,如果以Interceptor实现,就会简单很多。

应用端到Broker集群的Load Balance架构和稳定性

目前RSocket主要涉及到的Load Balance到Broker 集群的多连接管理。 Alibaba RSocket Broker采用share nothing架构,也就是集群中broker相互之间不通讯,不承担消息转发。 目前客户端SDK的设计思路是通过集群推过来的拓扑结构变更,SDK端完成连接的重连和路由更新等。 目前代码只有基础功能,需要再细致完善,提升稳定性。

Reactive兼容RxJava 3

目前RSocket Broker支持RxJava 2, RxJava 3 估计在在1月底发布,要兼容 RxJava 3。 主要在RSocketRequestRpcProxy 和 RSocketResponderSupport 进行调整。

Upgrade to RSocket 1.0.0-RC7

还是有一些变化,主要是UriHandler.java 和 UriTransportRegistry.java 被删除啦,目前RSocket Broker要根据schema做transport层识别,可能需要将这些代码转换到broker内部,RSocket-CLI是添加了这些代码。

如下改变:

  • 保留 UriHandler和UriTransportRegistry
  • RSocketFactory调整到 RSocketServer 和 RSocketConnector

Alibaba RSocket Broker的Docker镜像

提供RSocket Broker 开发环境Docker镜像,方便使用Docker或者Docker Compose的开发者快速启动RSocket Broker进行相关的测试。 目前已经RSocket Broker已经使用Jib进行镜像制作,但是还没有提供到docker hub上。

Alpha Release Roadmap

准备Alpha1的版本发布,一些要解决的核心问题:

  • RSocket
    • RSocket Java SDK 1.0.0
  • Development
    • Deploy artifacts to Maven central repository(进行中)
    • External apps integration from outside of the cluster network
    • Integration testing for all features
    • JDK 8, 11 & 14 compatible testing
  • Deployment
    • Dev deployment
    • Gossip cluster deployment
  • Documents
    • Alibaba RSocket Broker Website
    • Alibaba RSocket Broker Wiki

如果大家还有一些比较关注的问题,这里可以留言一下。

Vaadin to Vue or React?

目前RSocket Broker的控制台使用Vaadin开发,主要是方便Java程序员,同时减少各种REST API调用的问题,不知道多少同学对其他JS框架的了解程度。 这里调查一下,是否需要将控制台UI调整到Vue、React等框架? 但是Node环境和基本开发这个都是需要的。

有个报错的信息, RST-600500: Failed to parse composite metadata

2020-04-28 00:02:11.427 INFO 1 --- [tor-tcp-epoll-4] b.r.b.r.RSocketBrokerHandlerRegistryImpl : RST-500200: Succeed to accept connection from rsocket-user-service
2020-04-28 00:02:11.460 ERROR 1 --- [tor-tcp-epoll-4] c.a.r.listen.CompositeMetadataRSocket : RST-600500: Failed to parse composite metadata

java.lang.NullPointerException: null
at java.util.Objects.requireNonNull(Objects.java:203) ~[na:1.8.0_242]
at io.micrometer.core.instrument.ImmutableTag.(ImmutableTag.java:35) ~[micrometer-core-1.3.2.jar:1.3.2]
at io.micrometer.core.instrument.Tag.of(Tag.java:29) ~[micrometer-core-1.3.2.jar:1.3.2]
at io.micrometer.core.instrument.Tags.of(Tags.java:254) ~[micrometer-core-1.3.2.jar:1.3.2]
at io.micrometer.core.instrument.MeterRegistry.counter(MeterRegistry.java:363) ~[micrometer-core-1.3.2.jar:1.3.2]
at io.micrometer.core.instrument.Metrics.counter(Metrics.java:76) ~[micrometer-core-1.3.2.jar:1.3.2]
at com.alibaba.rsocket.listen.CompositeMetadataRSocket.metrics(CompositeMetadataRSocket.java:155) ~[alibaba-rsocket-core-0.1.0-SNAPSHOT.jar:na]
at com.alibaba.rsocket.listen.CompositeMetadataRSocket.requestResponse(CompositeMetadataRSocket.java:70) ~[alibaba-rsocket-core-0.1.0-SNAPSHOT.jar:na]
at io.rsocket.RSocketResponder.requestResponse(RSocketResponder.java:193) [rsocket-core-1.0.0-RC5.jar:na]
at io.rsocket.RSocketResponder.handleFrame(RSocketResponder.java:299) [rsocket-core-1.0.0-RC5.jar:na]
at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:242) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drainRegular(FluxGroupBy.java:554) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drain(FluxGroupBy.java:630) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.subscribe(FluxGroupBy.java:696) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.Flux.subscribe(Flux.java:8174) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onNext(MonoFlatMapMany.java:188) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1637) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.MonoProcessor.onNext(MonoProcessor.java:317) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at io.rsocket.internal.ClientServerInputMultiplexer.lambda$new$1(ClientServerInputMultiplexer.java:116) ~[rsocket-core-1.0.0-RC5.jar:na]
at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxGroupBy$GroupByMain.drainLoop(FluxGroupBy.java:380) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxGroupBy$GroupByMain.drain(FluxGroupBy.java:316) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxGroupBy$GroupByMain.onNext(FluxGroupBy.java:201) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:218) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:351) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:348) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:90) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:321) ~[netty-codec-4.1.45.Final.jar:4.1.45.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:295) ~[netty-codec-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:792) ~[netty-transport-native-epoll-4.1.45.Final-linux-x86_64.jar:4.1.45.Final]
at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:475) ~[netty-transport-native-epoll-4.1.45.Final-linux-x86_64.jar:4.1.45.Final]
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378) ~[netty-transport-native-epoll-4.1.45.Final-linux-x86_64.jar:4.1.45.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_242]

2020-04-28 00:02:11.463 ERROR 1 --- [tor-tcp-epoll-4] c.a.r.listen.impl.RSocketListenerImpl : RST-200501: Exception during rsocket call

io.rsocket.exceptions.InvalidException: RST-600500: Failed to parse composite metadata
at com.alibaba.rsocket.listen.CompositeMetadataRSocket.requestResponse(CompositeMetadataRSocket.java:76) ~[alibaba-rsocket-core-0.1.0-SNAPSHOT.jar:na]
at io.rsocket.RSocketResponder.requestResponse(RSocketResponder.java:193) [rsocket-core-1.0.0-RC5.jar:na]
at io.rsocket.RSocketResponder.handleFrame(RSocketResponder.java:299) [rsocket-core-1.0.0-RC5.jar:na]
at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:242) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drainRegular(FluxGroupBy.java:554) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drain(FluxGroupBy.java:630) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.subscribe(FluxGroupBy.java:696) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.Flux.subscribe(Flux.java:8174) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onNext(MonoFlatMapMany.java:188) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1637) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.MonoProcessor.onNext(MonoProcessor.java:317) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at io.rsocket.internal.ClientServerInputMultiplexer.lambda$new$1(ClientServerInputMultiplexer.java:116) ~[rsocket-core-1.0.0-RC5.jar:na]
at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxGroupBy$GroupByMain.drainLoop(FluxGroupBy.java:380) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxGroupBy$GroupByMain.drain(FluxGroupBy.java:316) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxGroupBy$GroupByMain.onNext(FluxGroupBy.java:201) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:218) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:351) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:348) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:90) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:321) ~[netty-codec-4.1.45.Final.jar:4.1.45.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:295) ~[netty-codec-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:792) ~[netty-transport-native-epoll-4.1.45.Final-linux-x86_64.jar:4.1.45.Final]
at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:475) ~[netty-transport-native-epoll-4.1.45.Final-linux-x86_64.jar:4.1.45.Final]
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378) ~[netty-transport-native-epoll-4.1.45.Final-linux-x86_64.jar:4.1.45.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_242]

使用state machine来管理应用的状态

目前的应用状态主要包括 connected, serving, paused, stopped,同时触发不同的逻辑。 使用状态机管理逻辑可能更清晰,代码也容易管理。 目前spring state machine的flux支持还没有发布,不过目前可以做一些测试,性能不用担心,主要是StateMachine对象的大小。

spring-projects/spring-statemachine#397

gRPC到RSocket协议转换gateway

和HTTP REST API到RSocket转换一样,是否要增加gRPC到RSocket协议转换的gateway? 有需求的留言一下,目前内部已经有一个原型版本。

RSocket Payload自定数据类型支持

在RSocket Broker的场景中,一个RSocket长连接会传输各种类型的Payload,虽然RSocket长连接支持全局的Payload的data mime type,但是实际中会存在不用的业务场景使用不同的数据编码格式,如RPC调用和Kafka消息可能就是不同的数据类型,这个时候就需要给每一个Payload设置独立的数据编码格式,对于RPC场景,可能还存在对返回数据的数据编码需求,如以下签名:

User findUserByNick(String nick);

有可能是请求的时候是Text/Plain编码,而返回的对象结果要求是JSON编码,这种不同的编码需求主要是出于个性化和性能要求。 在RSocket 281的Metadata规范中,增加了data encoding和accept data encoding两者元数据类型,这样可以保证满足该场景的要求。

在RSocket Broker中,建议服务提供方支持多种数据类型,目前主要包括:

  • 数字类型编码支持: 如果Integer, long, double等
  • 字符串支持: String
  • byte数组: 如在文件上传的场景
  • 自定义序列化方式: JSON, Hessian, Protobuf, Avor这四者

Composite Metadata Extension: data Mime/type definition per-stream: rsocket/rsocket#281

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.