Giter VIP home page Giter VIP logo

smallrye-mutiny's Introduction

Contributing

Want to contribute? Great! We try to make it easy, and all contributions, even the smaller ones, are greatly welcome. This includes bug reports, fixes, documentation.

All SmallRye projects use GitHub Issues to manage issues. Open an issue directly in GitHub within the appropriate SmallRye project repository.

Becoming a Project Committer

Through continued contributions to SmallRye projects, other committers on the project can nominate you to become a committer too!

Check out the nomination process for full details.

All original contributions to SmallRye projects are licensed under the ASL - Apache License, version 2.0 or later, or, if another license is specified as governing the file or directory being modified, such other license.

All contributions are subject to the Developer Certificate of Origin (DCO). The DCO text is also included verbatim in the dco.txt file in the root directory of each repository.

Before you Contribute

To contribute, use GitHub Pull Requests, from your own fork.

Code Reviews

All submissions, including submissions by project members, need to be reviewed before they are merged.

Continuous Integration

We’re all human, so SmallRye projects use continuous integration to ensure consistency, particularly as most SmallRye projects need to pass the applicable MicroProfile TCK for that specification. Each pull request triggers a full build of the project. Please make sure to monitor the output of the build and act accordingly.

Tests and Documentation are not optional

Don’t forget to include tests in your pull requests. Also, don’t forget the documentation (Javadoc).

Setup

If you have not done so on your machine, you need to:

  • Install Git and configure your GitHub access

  • Install Java SDK (OpenJDK recommended)

  • Install Maven

IDE Config and Code Style

SmallRye projects have a strictly enforced code style. Code formatting is done by the Eclipse code formatter, using the config files found in Code Rules. By default when you run mvn install the code will be formatted automatically. When submitting a pull request the CI build will fail if running the formatter results in any code changes, so it is recommended that you always run a full Maven build before submitting a pull request.

Eclipse Setup

Open the Preferences window, and then navigate to JavaCode StyleFormatter. Click Import and then select the eclipse-format.xml file in the coderules directory.

Next navigate to JavaCode StyleOrganize Imports. Click Import and select the eclipse.importorder file.

IDEA Setup

Open the Preferences window, navigate to Plugins and install the [Eclipse Code Formatter Plugin](https://plugins.jetbrains.com/plugin/6546-eclipse-code-formatter).

Restart your IDE, open the Preferences window again and navigate to Other SettingsEclipse Code Formatter.

Select Use the Eclipse Code Formatter, then change the Eclipse Java Formatter Config File to point to the eclipse-format.xml file in the coderules directory. Make sure the Optimize Imports box is ticked, and select the eclipse.importorder file as the import order config file.

Signing Commits

Signing commits is required to make sure that the contributor matches the author. To be able to sign commits, use the following instructions: https://docs.github.com/en/authentication/managing-commit-signature-verification/signing-commits

The small print

This project is an open source project, please act responsibly, be nice, polite, and enjoy!

smallrye-mutiny's People

Contributors

acanda avatar alexandreguidin avatar andreas-eberle avatar andrezimmermann avatar avivmu avatar cescoffier avatar dependabot-preview[bot] avatar dependabot[bot] avatar edeandrea avatar fromage avatar geoand avatar gwenneg avatar heubeck avatar inego avatar jponge avatar kenfinnigan avatar lhauspie avatar machi1990 avatar manofthepeace avatar nmcl avatar nryanov avatar oliver-brm avatar ozangunalp avatar pcasaes avatar radcortez avatar rgmz avatar sanne avatar smallrye-ci avatar stepan-romankov avatar stuartwdouglas avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

smallrye-mutiny's Issues

Implement the MultiOnResult group

It's the group retrieved using multi.onResult(). It should contain:

Multi<R> mapToresult(Function<? super T, ? extends V> mapper)
Multi<R> scan(Supplier<S> initialStateProducer, Function<S, ? super T, S> scanner)
Multi<R> scan(BiFunction<T, T ,T> scanner)
<R> Multi<R> castTo(Class<R> clazz)
Multi<T> delay() - same as uniDelay
Multi<T> filterWith(Predicate<? super T>)
Multi<T> filterWith(Function<? super T, Uni<Boolean>)

MultiFlattenGroup<T> mapToMulti(Function<? super T, Multi<? extends R> mapper) // Same 
with Publisher ?
MultiFlattenGroup<T> mapToUni(Function<? super T, Uni<? extends R> mapper) 
MultiFlattenGroup<T> mapToIterable(Function<? super T, Iterable<? extends R> mapper)
MultiFlattenGroup<T> mapToMulti(BiConsumer<? super T, MultiEmitter<? super R>> mapper)


FlatMapGroup<T>:

FlatMapGroup<T> withConcurrency(...)
FlatMapGroup<T> preserveOrder()
FlatMapGroup<T> awaitCompletionToFireFailure() // or just awaitCompletion ?
Multi<T> flatten()
  • Should be something like keepIfNotSame be added (like distinctUntilChange) ?

Consider adding a `onItem().ifNotNull()` group

That would provide symmetric API for ifNull().

We should also consider changing map to be only called if the item is non-null. This would ease migration from RX Maybe or Reactor Mono.

Uni with Optional

How to map to Optionals using Unis?

  @Override
  public Uni<Optional<PageDTO>> getPage(UUID pageID, UUID sessionID, UUID deviceID) {
    Tuple values = Tuple.of(pageID, sessionID, deviceID);
    return pgPool
        .preparedQuery(SELECT_PAGE_RAW_SWL, values)
        .map(
            rowSet -> {
              if (!rowSet.iterator().hasNext()) {
                return Optional.empty();
              }
              Row row = rowSet.iterator().next();
              return Optional.of(
                  new PageDTO(
                      row.getString("org_id"),
                      row.getUUID("uid"),
                      row.getString("url"),
                      row.getString("referrer"),
                      row.getString("doctype"),
                      row.getInteger("screen_width"),
                      row.getInteger("screen_height"),
                      row.getInteger("width"),
                      row.getInteger("height"),
                      row.getInteger("compiled_timestamp")));
            })
        .onFailure()
        .invoke(
            throwable -> {
              log.error("Failed to get page id={}", pageID);
              throw new DatabaseException();
            });
  }

This code will fails with the error:

Required type: Uni<Optional<PageDTO>>
Provided: Uni<Optional<?>>

RxJava's PublishSubject in Mutiny

What is the idiomatic way to recreate RxJava's PublishSubject in Mutiny?
(http://reactivex.io/RxJava/javadoc/io/reactivex/subjects/PublishSubject.html)
Suppose we need to have a Multi<Long> that starts broadcasting ticks every second upon its creation, without any subscribers, so that subsequent subscribers start receiving the ongoing ticks.

Multi<Long> multi = Multi
                .createFrom().ticks().every(Duration.ofSeconds(1))
                .broadcast().toAllSubscribers();

TimeUnit.SECONDS.sleep(3);

multi.subscribe().with(tick -> System.out.println("tick " + tick));

Here, even after waiting for 3 seconds, the printed out ticks start from 0, obviously because that's when the subscription happens. How can I make multi a hot broadcast without any subscribers?

A side question. Are this repository's issues the proper place to ask questions about Mutiny? There is no "mutiny" tag on StackOverflow. Also, the official homepage does not mention such a place.

Why not RxJava for smallrye-*?

I have a question related to the reason why you are introducing another reactive library? I thought that well adopted reactive libraries like RxJava3 are good enough. I can see that Mutiny is simpler, but is there any other reason behind your choice not to go with RxJava? I'm just trying to wrap my head around this.

Finalize the vocabulary for events

Right now we use:

  • result - could be element, item, value
  • failure - could be error, exception
  • completion - I don't have alternatives
  • cancellation - I don't have alternatives
  • requests - I don't have alternatives

Add Uni.flatMap

A flatMap method providing the default traditional behavior should be added.

How to collect a list of Unis?

What is the best way to collect a list of Unis? For example I have a List<Uni> and would like to collect those and produce Uni<List>. Another task might be to sum all the integers and result in Uni

java.lang.IllegalStateException: The subscription to events has been cancelled

Hi!

I'm trying to use mutiny to achieve reactive/parallel execution of multiple operations and merge them into a single async operation as seen from caller:

  • single write to a SQL database
  • multiple writes to a Kafka topic

I've been struggling with the API (it seems a bit quirky) to achieve this, and the typing seems wrong -- I would just like a single Uni<Void> as a return type.

@ApplicationScoped
@Slf4j
public class BeaconService {

  @Inject
  BeaconDatasource beaconDatasource;

  @Inject
  PageDatasource pageDatasource;

  @Inject
  @Channel(EventsChannel.NAME)
  Emitter<AbstractBrowserEvent> eventsEmitter;

  public Uni<Uni<Void>> process(UUID sessionID, UUID uid, UUID pageID, Beacon beacon) {
    return pageDatasource.pageExists(sessionID, uid, pageID).onItem().produceUni(exists -> {
      if (!exists) {
        log.warn("Unlinked beacon sessionID={} uid={} pageId={}", sessionID, uid, pageID);
        throw Boom.badRequest().message("Unlinked beacon").exception();
      }

      Multi<Uni<Void>> beaconWrite = Multi.createFrom()
          .uni(Uni.createFrom().item(beaconDatasource.store(beacon)));

      Multi<Uni<Void>> eventWrites = Multi.createFrom().iterable(beacon.getEvents())
          .onItem()
          .apply(event -> Uni.createFrom().completionStage(eventsEmitter.send(event))
              .onFailure()
              .apply(throwable -> {
                log.error("Something went wrong while sending event to Kafka topic", throwable);
                return null;
              })
              .onItem()
              .apply(item -> null));

      return Multi
          .createBy()
          .concatenating()
          .streams(eventWrites, beaconWrite)
          .collectItems()
          .last();
    });
  }
}

Moreover, Im seeing some errors:

2020-04-14 16:40:34,362 ERROR [org.jbo.res.res.i18n] (vert.x-eventloop-thread-22) RESTEASY002020: Unhandled asynchronous exception, sending back 500: java.lang.IllegalStateException: The subscription to events has been cancelled
	at io.smallrye.reactive.messaging.extension.EmitterImpl.verify(EmitterImpl.java:138)
	at io.smallrye.reactive.messaging.extension.EmitterImpl.send(EmitterImpl.java:112)
	at com.meemaw.rec.beacon.service.BeaconService.lambda$process$2(BeaconService.java:49)
	at io.smallrye.mutiny.operators.multi.MultiMapOp$MapProcessor.onItem(MultiMapOp.java:42)
	at io.smallrye.mutiny.subscription.MultiSubscriber.onNext(MultiSubscriber.java:61)
	at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$1.lambda$onNext$1(ContextPropagationMultiInterceptor.java:36)
	at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:215)
	at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$1.onNext(ContextPropagationMultiInterceptor.java:36)
	at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$1.lambda$onNext$1(ContextPropagationMultiInterceptor.java:36)
	at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:215)
	at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$1.onNext(ContextPropagationMultiInterceptor.java:36)
	at io.smallrye.mutiny.operators.AbstractMulti$1.onNext(AbstractMulti.java:90)
	at io.smallrye.mutiny.subscription.SerializedSubscriber.onItem(SerializedSubscriber.java:69)
	at io.smallrye.mutiny.operators.multi.builders.IterableBasedMulti$IteratorSubscription.fastPath(IterableBasedMulti.java:118)
	at io.smallrye.mutiny.operators.multi.builders.IterableBasedMulti$BaseRangeSubscription.request(IterableBasedMulti.java:68)
	at io.smallrye.mutiny.subscription.SerializedSubscriber.request(SerializedSubscriber.java:121)
	at io.smallrye.mutiny.operators.AbstractMulti$1$1.request(AbstractMulti.java:63)
	at io.smallrye.mutiny.operators.multi.MultiOperatorProcessor.request(MultiOperatorProcessor.java:73)
	at io.smallrye.mutiny.subscription.SerializedSubscriber.request(SerializedSubscriber.java:121)
	at io.smallrye.mutiny.operators.AbstractMulti$1$1.request(AbstractMulti.java:63)
	at io.smallrye.mutiny.subscription.SwitchableSubscriptionSubscriber.setOrSwitchUpstream(SwitchableSubscriptionSubscriber.java:189)
	at io.smallrye.mutiny.subscription.SwitchableSubscriptionSubscriber.onSubscribe(SwitchableSubscriptionSubscriber.java:95)
	at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$1.lambda$onSubscribe$0(ContextPropagationMultiInterceptor.java:31)
	at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:215)
	at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$1.onSubscribe(ContextPropagationMultiInterceptor.java:31)
	at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$1.lambda$onSubscribe$0(ContextPropagationMultiInterceptor.java:31)
	at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:215)
	at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$1.onSubscribe(ContextPropagationMultiInterceptor.java:31)
	at io.smallrye.mutiny.operators.AbstractMulti$1.onSubscribe(AbstractMulti.java:55)
	at io.smallrye.mutiny.subscription.SerializedSubscriber.onSubscribe(SerializedSubscriber.java:43)
	at io.smallrye.mutiny.operators.multi.MultiOperatorProcessor.onSubscribe(MultiOperatorProcessor.java:44)
	at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$1.lambda$onSubscribe$0(ContextPropagationMultiInterceptor.java:31)
	at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:215)
	at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$1.onSubscribe(ContextPropagationMultiInterceptor.java:31)
	at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$1.lambda$onSubscribe$0(ContextPropagationMultiInterceptor.java:31)
	at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:215)
	at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$1.onSubscribe(ContextPropagationMultiInterceptor.java:31)
	at io.smallrye.mutiny.operators.AbstractMulti$1.onSubscribe(AbstractMulti.java:55)
	at io.smallrye.mutiny.subscription.SerializedSubscriber.onSubscribe(SerializedSubscriber.java:43)
	at io.smallrye.mutiny.operators.multi.builders.IterableBasedMulti.subscribe(IterableBasedMulti.java:48)
	at io.smallrye.mutiny.operators.multi.builders.IterableBasedMulti.subscribe(IterableBasedMulti.java:32)
	at io.smallrye.mutiny.operators.AbstractMulti.subscribe(AbstractMulti.java:48)
	at io.smallrye.mutiny.groups.MultiSubscribe.withSubscriber(MultiSubscribe.java:48)
	at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$2.lambda$subscribe$0(ContextPropagationMultiInterceptor.java:58)
	at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:215)
	at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$2.subscribe(ContextPropagationMultiInterceptor.java:58)
	at io.smallrye.mutiny.operators.AbstractMulti.subscribe(AbstractMulti.java:37)
	at io.smallrye.mutiny.groups.MultiSubscribe.withSubscriber(MultiSubscribe.java:68)
	at io.smallrye.mutiny.operators.multi.MultiMapOp.subscribe(MultiMapOp.java:24)
	at io.smallrye.mutiny.operators.AbstractMulti.subscribe(AbstractMulti.java:48)
	at io.smallrye.mutiny.groups.MultiSubscribe.withSubscriber(MultiSubscribe.java:48)
	at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$2.lambda$subscribe$0(ContextPropagationMultiInterceptor.java:58)
	at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:215)
	at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$2.subscribe(ContextPropagationMultiInterceptor.java:58)
	at io.smallrye.mutiny.operators.multi.MultiConcatOp$ConcatArraySubscriber.onCompletion(MultiConcatOp.java:109)
	at io.smallrye.mutiny.operators.multi.MultiConcatOp.subscribe(MultiConcatOp.java:62)
	at io.smallrye.mutiny.operators.AbstractMulti.subscribe(AbstractMulti.java:48)
	at io.smallrye.mutiny.groups.MultiSubscribe.withSubscriber(MultiSubscribe.java:48)
	at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$2.lambda$subscribe$0(ContextPropagationMultiInterceptor.java:58)
	at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:215)
	at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$2.subscribe(ContextPropagationMultiInterceptor.java:58)
	at io.smallrye.mutiny.operators.AbstractMulti.subscribe(AbstractMulti.java:37)
	at io.smallrye.mutiny.groups.MultiSubscribe.withSubscriber(MultiSubscribe.java:68)
	at io.smallrye.mutiny.operators.multi.MultiLastItemOp.subscribe(MultiLastItemOp.java:18)
	at io.smallrye.mutiny.operators.AbstractMulti.subscribe(AbstractMulti.java:48)
	at io.smallrye.mutiny.groups.MultiSubscribe.withSubscriber(MultiSubscribe.java:48)
	at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$2.lambda$subscribe$0(ContextPropagationMultiInterceptor.java:58)
	at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:215)
	at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$2.subscribe(ContextPropagationMultiInterceptor.java:58)
	at io.smallrye.mutiny.operators.UniCreateFromPublisher.subscribing(UniCreateFromPublisher.java:64)
	at io.smallrye.mutiny.operators.UniSerializedSubscriber.subscribe(UniSerializedSubscriber.java:43)
	at io.smallrye.mutiny.operators.UniSerializedSubscriber.subscribe(UniSerializedSubscriber.java:38)
	at io.smallrye.mutiny.groups.UniSubscribe.withSubscriber(UniSubscribe.java:49)
	at io.smallrye.mutiny.context.ContextPropagationUniInterceptor$2.lambda$subscribing$0(ContextPropagationUniInterceptor.java:51)
	at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:215)
	at io.smallrye.mutiny.context.ContextPropagationUniInterceptor$2.subscribing(ContextPropagationUniInterceptor.java:51)
	at io.smallrye.mutiny.operators.UniSerializedSubscriber.subscribe(UniSerializedSubscriber.java:43)
	at io.smallrye.mutiny.operators.UniSerializedSubscriber.subscribe(UniSerializedSubscriber.java:38)
	at io.smallrye.mutiny.groups.UniSubscribe.withSubscriber(UniSubscribe.java:49)
	at io.smallrye.mutiny.operators.UniFlatMapOnItem.invokeAndSubstitute(UniFlatMapOnItem.java:48)
	at io.smallrye.mutiny.operators.UniFlatMapOnItem$2.onItem(UniFlatMapOnItem.java:65)
	at io.smallrye.mutiny.context.ContextPropagationUniInterceptor$1.lambda$onItem$1(ContextPropagationUniInterceptor.java:35)
	at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:215)
	at io.smallrye.mutiny.context.ContextPropagationUniInterceptor$1.onItem(ContextPropagationUniInterceptor.java:35)
	at io.smallrye.mutiny.operators.UniSerializedSubscriber.onItem(UniSerializedSubscriber.java:72)
	at io.smallrye.mutiny.context.ContextPropagationUniInterceptor$1.lambda$onItem$1(ContextPropagationUniInterceptor.java:35)
	at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:215)
	at io.smallrye.mutiny.context.ContextPropagationUniInterceptor$1.onItem(ContextPropagationUniInterceptor.java:35)
	at io.smallrye.mutiny.operators.UniSerializedSubscriber.onItem(UniSerializedSubscriber.java:72)
	at io.smallrye.mutiny.operators.UniMapOnResult$1.onItem(UniMapOnResult.java:39)
	at io.smallrye.mutiny.context.ContextPropagationUniInterceptor$1.lambda$onItem$1(ContextPropagationUniInterceptor.java:35)
	at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:215)
	at io.smallrye.mutiny.context.ContextPropagationUniInterceptor$1.onItem(ContextPropagationUniInterceptor.java:35)
	at io.smallrye.mutiny.operators.UniSerializedSubscriber.onItem(UniSerializedSubscriber.java:72)
	at io.smallrye.mutiny.vertx.AsyncResultUni.lambda$subscribing$1(AsyncResultUni.java:34)
	at io.vertx.mutiny.pgclient.PgPool$3.handle(PgPool.java:212)
	at io.vertx.mutiny.pgclient.PgPool$3.handle(PgPool.java:209)
	at io.vertx.sqlclient.impl.SqlResultBuilder.handle(SqlResultBuilder.java:92)
	at io.vertx.sqlclient.impl.SqlResultBuilder.handle(SqlResultBuilder.java:33)
	at io.vertx.sqlclient.impl.PoolBase$1$1.lambda$schedule$0(PoolBase.java:97)
	at io.vertx.sqlclient.impl.SocketConnectionBase.handleMessage(SocketConnectionBase.java:210)
	at io.vertx.sqlclient.impl.SocketConnectionBase.lambda$init$0(SocketConnectionBase.java:81)
	at io.vertx.core.net.impl.NetSocketImpl.lambda$new$2(NetSocketImpl.java:100)
	at io.vertx.core.streams.impl.InboundBuffer.handleEvent(InboundBuffer.java:237)
	at io.vertx.core.streams.impl.InboundBuffer.write(InboundBuffer.java:127)
	at io.vertx.core.net.impl.NetSocketImpl.handleMessage(NetSocketImpl.java:356)
	at io.vertx.core.impl.ContextImpl.executeTask(ContextImpl.java:369)
	at io.vertx.core.impl.EventLoopContext.execute(EventLoopContext.java:43)
	at io.vertx.core.impl.ContextImpl.executeFromIO(ContextImpl.java:232)
	at io.vertx.core.net.impl.VertxHandler.channelRead(VertxHandler.java:173)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
	at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
	at io.vertx.pgclient.impl.codec.PgEncoder.lambda$write$0(PgEncoder.java:78)
	at io.vertx.pgclient.impl.codec.PgCommandCodec.handleReadyForQuery(PgCommandCodec.java:124)
	at io.vertx.pgclient.impl.codec.PgDecoder.decodeReadyForQuery(PgDecoder.java:229)
	at io.vertx.pgclient.impl.codec.PgDecoder.channelRead(PgDecoder.java:87)
	at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:830)

Fix documentation update during the release process

At the moment it fails with:

2019-12-19T15:11:07.9347234Z Cloning repo
2019-12-19T15:11:07.9364989Z Cloning into 'site'...
2019-12-19T15:11:15.6335143Z Warning: Permanently added the RSA host key for IP address '192.30.253.112' to the list of known hosts.
2019-12-19T15:11:15.6939235Z [email protected]: Permission denied (publickey).
2019-12-19T15:11:15.6957046Z fatal: Could not read from remote repository.
2019-12-19T15:11:15.6958973Z Copy content
2019-12-19T15:11:15.6960007Z 
2019-12-19T15:11:15.6962044Z Please make sure you have the correct access rights
2019-12-19T15:11:15.6964436Z and the repository exists.
2019-12-19T15:11:15.6982052Z cp: target 'site' is not a directory
2019-12-19T15:11:15.6984101Z yes: standard output: Broken pipe
2019-12-19T15:11:15.6985964Z Pushing
2019-12-19T15:11:15.6987855Z .build/doc.sh: line 14: cd: site: No such file or directory
2019-12-19T15:11:15.7140754Z HEAD detached at 0.1.2
2019-12-19T15:11:15.7142481Z nothing to commit, working tree clean
2019-12-19T15:11:15.7223939Z error: src refspec gh-pages does not match any
2019-12-19T15:11:15.7226207Z error: failed to push some refs to 'https://github.com/***/***-mutiny'

because it does not use the correct git url (it should use the https with token)

Invoke CompletionStage

It is hard to invoke function that returns CompletionStage (in comparision to producing from CompletionStage). An example would be calling reactive messaging from stream.

I tried to simulate it like this:

.onItem().produceCompletionStage(entity -> 
    methodReturningCompletionStage()
        .thenApply(aVoid -> shiftEntity)
)
.concatenate()

Please add invoke that accepts CompletionStage.

Mutiny 0.4.0

Implement the MultiOnEmpty group

Group retrieved using multi.onCompletion().ifEmpty() with the following methods:

* failWith (Throwable, Supplier<Throwable>)
* continueWith (T, Supplier<T>)
* switchTo (Multi<T>, Supplier<Multi<T>>)

Implement the MultiCollect group

It would be the group retrieved with: Multi.collect()

Here are some of the methods from this group (on a Multi<T>):

Uni<T> first(); 
Uni<T> last();
Uni<List<T>> asList(); // Accumulate the results in a list
Uni<X> with(Collector<? super T, A, ? extends X>) // see https://docs.oracle.com/javase/8/docs/api/java/util/stream/Collector.html
Uni<X> in(Supplier<X> producer, BiConsumer<X, T> add)
Uni<Map<K, T>> asMap(Function<? super T, ? extend K> keyProducer)
Uni<Map<K, V>> asMap(Function<? super T, ? extend K> keyProducer, Function<? super T, ? extend V> valueProducer)
Uni<Map<K, Collection<T>>> asMultiMap(...)
Uni<Map<K, Collection<V>>> asMultiMap(...)

Can you please help me understanding how to handle Uni in these cases?

Hi,
I am creating a Quarkus app and therefore I am using Mutiny. I am still getting used to it but it seems interesting.
Nevertheless, I think I am missing something, because there are two concepts I am missing.
Mutiny uses Uni and Multi, ok, fine.

1- How do I create a method that "just completes"? I would like to use something like a Completable: it has no items, but I need to know when it completes and whether it failed or not.
2- How do I generate a Uni that completes but with no item? (Uni, as defined in the documentation can return 0 or 1 items). I could only generate a unit that returns a null item Uni.createFrom().nullItem() (but it is still an items), or a Uni that never completes (Uni.createFrom().nothing()). How can I create a Uni that returns 0 items?

thanks!

PS:
Case 1 is useful for "void" methods. Let's say that I have an update method that either returns or fails.
Case 2 is harder to find an example from the top of my head, but it should be possible since the documentation says it is possible.

thanks!

Implement Multi.onTimeout

Same method as UniOnTimeOut, but first the group access must be clarified between:

  • onTimeout().of(Duration).on(Executor).... and onTimeout().before(Uni<X>)...
  • onNoResult().after(Duration).on(Executor)... and onNoResult().before(Uni<X>)...

Is grouping too much?

The question has been slightly discussed in #1, but better having an issue for this.

Right now, we do something like this:

getSome() // produce an uni
   .onNoResult().after(ofHours(1)).fail()
   .onResult().mapToResult(i -> i*2)
   .onFailure().recoverWith(42)

Should we consider removing the grouping to be like:

getSome() // produce an uni
   .onTimeout(ofHours(1)).fail()
   .onResultMap(i -> i*2)
   .onFailureRecoverWith(42)

The second approach has the advantage to use a single class, which could make a bit easier than with the current chaining. Most of the method will start with onX, where X is the type of events triggering the behavior. It would also slightly reduce object creation (at assembly time).

So, to be discussed....

smallrye-reactive-types instead of -operators?

IMHO, Uni and Multi are reactive types, not operators. Operators being filter, map, flatMap, window, debounce, etc. Reactive Streams Operators are indeed operators for already existing reactive types (Publisher etc.), but here, we mainly create new reactive types.

Thoughts?

Checked exceptions in subscribe().with(...) consumer

When subscribing to an item, we provide a java.util.function.Consumer which does not declare any throws. Because of that, any calls made from that consumer which might throw checked exceptions must be wrapped in try blocks.
In RxJava, the item subscription is done by providing their own Consumer which supports calling any code that might throw checked exceptions:

/**
 * A functional interface (callback) that accepts a single value.
 * @param <T> the value type
 */
@FunctionalInterface
public interface Consumer<@NonNull T> {
    /**
     * Consume the given value.
     * @param t the value
     * @throws Throwable if the implementation wishes to throw any type of exception
     */
    void accept(T t) throws Throwable;
}

I wonder whether something like this can be supported in Mutiny.

Rethink release process

  • master should use 999-SNAPSHOT
  • versions should be computed during the release process in a reliable way (right now it works by accident)
  • the documentation upload should be fixed (#58)

Need TupleX

Hi,

Currently, the API has Tuple from 1 to 5. It means that we can join max 5 async calls. I work on a project where I need more API calls (Async call to a Postgresql DB).
It would be great if the API extends it with Tuple up to 10 or more. (6 in my case but can ben more in future)
I'm not able to extends Tuple5 because it's constructor is package private.
May be that a var args solution is possible?
Thank you

Blocking operation causes exception although `subscribeOn` is used.

A blocking operation causes exception although subscribeOn is used. It might be due to multiple blocking operations, but I cannot be sure.

This causes exception:

.onItem().apply(file -> blockingOperation(file))
.subscribeOn(Infrastructure.getDefaultWorkerPool())

Exception:

2020-03-17 16:31:17,831 ERROR [asc.import-aservice] () [vert.x-eventloop-thread-2] [ApiUploadMessageListener.java:115] - CollectorID 6: Exception while trying to process tlv file from a queue.: java.lang.IllegalStateException: You have attempted to perform a blocking operation on a IO thread. This is not allowed, as blocking the IO thread will cause major performance issues with your application. If you want to perform blocking EntityManager operations make sure you are doing it from a worker thread.

This does not cause exception:

.onItem().produceUni(file1 -> Uni.createFrom().item(file1)
		.onItem().apply(file -> blockingOperation(file))
		.subscribeOn(Infrastructure.getDefaultWorkerPool()))
.concatenate()

The whole chain looks somewhat like this:

return Multi.createFrom().item(status)
		.transform().byFilteringItemsWith(status1 -> status1.getStatus() == Status.UPLOADED)
		.onItem().apply(status1 -> getFile(fileId))
		.subscribeOn(Infrastructure.getDefaultWorkerPool())
		.onItem().produceCompletionStage(fileEntity -> update().thenApply(aVoid -> fileEntity))
		.concatenate()
		.onItem().produceUni(file1 -> Uni.createFrom().item(file1)
				.onItem().apply(file -> process(file))
				.subscribeOn(Infrastructure.getDefaultWorkerPool()))
		.concatenate()
		.onItem().produceCompletionStage(someEntity ->
				update().thenApply(aVoid -> someEntity)
		)
		.concatenate()
		.transform().byFilteringItemsWith(SomeEntity::isValid)
		.onItem().apply(someEntity -> KafkaRecord.of(key, value))
		.onFailure(throwable -> throwable instanceof InvalidDataException)
		.recoverWithMulti(throwable -> Multi.createFrom().empty())
		.onFailure().invoke(throwable -> {
			// some processing
			if (throwable instanceof RuntimeException) {
				throw (RuntimeException) throwable;
			} else {
				throw new RuntimeException(throwable);
			}
		});

Add subscribe method with callbacks on both Uni and Multi

Need to be added:

  • .subscribe().with((val, x) -> {}) - Uni and Multi
  • .subscribe().with((val, x) -> {}, f -> {}) - Uni and Multi
  • .subscribe().with((val, x) -> {}, f -> {}, () -> {}) - Multi
  • .subscribe().with(onsub, onval, onerr, oncomplete) - Multi
  • .subscribe().with(onsub, onval, onerr) - Uni

Converters switch from SPI to converter method

Currently we handle conversions between types by requiring an SPI is implemented and it's registered as a Service.

We want to switch to a model whereby we can do something like:

Uni<T> uni = Uni.createFrom().converter(mySingleToUniConverter, instance)
Single<T> single = uni.adapt().with(myUniToSingleConverter)

Renaming to Mutiny

The named has been chosen: SmallRye Mutiny is will be.

  • Rename repository
  • Rename package
  • Rename artifactId and groupId

Document difference between MultiFlatten concatenate and merge

Both methods have same signature, same return type, and almost the same description. They call the same MultiFlatMapOp constructor with a different concurrency parameter. It is hard to figure out what is the difference and what to use.

Version: 0.4.0

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.