Giter VIP home page Giter VIP logo

piper's People

Contributors

ccamel avatar muranoya avatar runabol avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

piper's Issues

The Coordinator Shutsdown

On starting coordinator service it keeps shutting down

019-04-30 23:51:51.119 INFO 27376 --- [ main] uration$$EnhancerBySpringCGLIB$$39cad2e7 : Registring AMQP Listener: completions -> com.creactiviti.piper.core.Coordinator:complete
2019-04-30 23:51:51.125 INFO 27376 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:5672]
2019-04-30 23:51:51.392 INFO 27376 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#7b84fcf8:0/SimpleConnection@363042d7 [delegate=amqp://[email protected]:5672/, localPort= 62381]
2019-04-30 23:51:51.409 INFO 27376 --- [ main] o.s.amqp.rabbit.core.RabbitAdmin : Auto-declaring a non-durable, auto-delete, or exclusive Queue (x.control.af0fa68f536441fea2329a0491b1d68a) durable:true, auto-delete:true, exclusive:true. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.
2019-04-30 23:51:51.468 INFO 27376 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:5672]
2019-04-30 23:51:51.475 ERROR 27376 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory : Channel shutdown: connection error; protocol method: #method<connection.close>(reply-code=503, reply-text=COMMAND_INVALID - unknown exchange type 'x-delayed-message', class-id=40, method-id=10)
2019-04-30 23:51:51.483 WARN 27376 --- [ 127.0.0.1:5672] c.r.c.impl.ForgivingExceptionHandler : An unexpected connection driver error occured (Exception message: Connection reset)
2019-04-30 23:51:51.488 INFO 27376 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#7b84fcf8:1/SimpleConnection@663411de [delegate=amqp://[email protected]:5672/, localPort= 62382]
2019-04-30 23:51:52.504 INFO 27376 --- [ main] o.s.amqp.rabbit.core.RabbitAdmin : Auto-declaring a non-durable, auto-delete, or exclusive Queue (x.control.af0fa68f536441fea2329a0491b1d68a) durable:true, auto-delete:true, exclusive:true. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.
2019-04-30 23:51:52.509 ERROR 27376 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory : Channel shutdown: connection error; protocol method: #method<connection.close>(reply-code=503, reply-text=COMMAND_INVALID - unknown exchange type 'x-delayed-message', class-id=40, method-id=10)
2019-04-30 23:51:52.510 INFO 27376 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:5672]
2019-04-30 23:51:52.517 WARN 27376 --- [ 127.0.0.1:5672] c.r.c.impl.ForgivingExceptionHandler : An unexpected connection driver error occured (Exception message: Connection reset)
2019-04-30 23:51:52.565 INFO 27376 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#7b84fcf8:2/SimpleConnection@33b1c5c5 [delegate=amqp://[email protected]:5672/, localPort= 62383]
2019-04-30 23:51:54.575 INFO 27376 --- [ main] o.s.amqp.rabbit.core.RabbitAdmin : Auto-declaring a non-durable, auto-delete, or exclusive Queue (x.control.af0fa68f536441fea2329a0491b1d68a) durable:true, auto-delete:true, exclusive:true. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.
2019-04-30 23:51:54.579 ERROR 27376 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory : Channel shutdown: connection error; protocol method: #method<connection.close>(reply-code=503, reply-text=COMMAND_INVALID - unknown exchange type 'x-delayed-message', class-id=40, method-id=10)
2019-04-30 23:51:54.580 INFO 27376 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:5672]
2019-04-30 23:51:54.598 WARN 27376 --- [ 127.0.0.1:5672] c.r.c.impl.ForgivingExceptionHandler : An unexpected connection driver error occured (Exception message: Connection reset)
2019-04-30 23:51:54.612 INFO 27376 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#7b84fcf8:3/SimpleConnection@9ef8eb7 [delegate=amqp://[email protected]:5672/, localPort= 62385]
2019-04-30 23:51:58.617 INFO 27376 --- [ main] o.s.amqp.rabbit.core.RabbitAdmin : Auto-declaring a non-durable, auto-delete, or exclusive Queue (x.control.af0fa68f536441fea2329a0491b1d68a) durable:true, auto-delete:true, exclusive:true. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.
2019-04-30 23:51:58.619 ERROR 27376 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory : Channel shutdown: connection error; protocol method: #method<connection.close>(reply-code=503, reply-text=COMMAND_INVALID - unknown exchange type 'x-delayed-message', class-id=40, method-id=10)
2019-04-30 23:51:58.619 INFO 27376 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:5672]
2019-04-30 23:51:58.633 WARN 27376 --- [ 127.0.0.1:5672] c.r.c.impl.ForgivingExceptionHandler : An unexpected connection driver error occured (Exception message: Connection reset)
2019-04-30 23:51:58.650 INFO 27376 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#7b84fcf8:4/SimpleConnection@305a0c5f [delegate=amqp://[email protected]:5672/, localPort= 62388]
2019-04-30 23:52:03.659 INFO 27376 --- [ main] o.s.amqp.rabbit.core.RabbitAdmin : Auto-declaring a non-durable, auto-delete, or exclusive Queue (x.control.af0fa68f536441fea2329a0491b1d68a) durable:true, auto-delete:true, exclusive:true. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.
2019-04-30 23:52:03.661 ERROR 27376 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory : Channel shutdown: connection error; protocol method: #method<connection.close>(reply-code=503, reply-text=COMMAND_INVALID - unknown exchange type 'x-delayed-message', class-id=40, method-id=10)
2019-04-30 23:52:03.661 INFO 27376 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:5672]
2019-04-30 23:52:03.663 WARN 27376 --- [ 127.0.0.1:5672] c.r.c.impl.ForgivingExceptionHandler : An unexpected connection driver error occured (Exception message: Connection reset)
2019-04-30 23:52:03.670 INFO 27376 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#7b84fcf8:5/SimpleConnection@4372b9b6 [delegate=amqp://[email protected]:5672/, localPort= 62391]
2019-04-30 23:52:03.674 INFO 27376 --- [ main] o.apache.catalina.core.StandardService : Stopping service [Tomcat]
2019-04-30 23:52:03.707 INFO 27376 --- [ main] utoConfigurationReportLoggingInitializer :

Error starting ApplicationContext. To display the auto-configuration report re-run your application with 'debug' enabled.
2019-04-30 23:52:03.729 ERROR 27376 --- [ main] o.s.boot.SpringApplication : Application startup failed

org.springframework.amqp.AmqpIOException: java.io.IOException
at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:71) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.connection.RabbitAccessor.convertRabbitAccessException(RabbitAccessor.java:113) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1467) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1417) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1393) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.core.RabbitAdmin.initialize(RabbitAdmin.java:579) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.core.RabbitAdmin$11$1.doWithRetry(RabbitAdmin.java:486) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287) ~[spring-retry-1.2.2.RELEASE.jar!/:na]
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:164) ~[spring-retry-1.2.2.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.core.RabbitAdmin$11.onCreate(RabbitAdmin.java:481) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.connection.CompositeConnectionListener.onCreate(CompositeConnectionListener.java:33) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createConnection(CachingConnectionFactory.java:592) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1436) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1417) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1393) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.core.RabbitAdmin.declareQueue(RabbitAdmin.java:232) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at com.creactiviti.piper.config.AmqpMessengerConfiguration.registerListenerEndpoint(AmqpMessengerConfiguration.java:168) ~[classes!/:0.0.1-SNAPSHOT]
at com.creactiviti.piper.config.AmqpMessengerConfiguration.registerListenerEndpoint(AmqpMessengerConfiguration.java:164) ~[classes!/:0.0.1-SNAPSHOT]
at com.creactiviti.piper.config.AmqpMessengerConfiguration.configureRabbitListeners(AmqpMessengerConfiguration.java:142) ~[classes!/:0.0.1-SNAPSHOT]
at org.springframework.amqp.rabbit.annotation.RabbitListenerAnnotationBeanPostProcessor.afterSingletonsInstantiated(RabbitListenerAnnotationBeanPostProcessor.java:230) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:781) ~[spring-beans-4.3.18.RELEASE.jar!/:4.3.18.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:867) ~[spring-context-4.3.18.RELEASE.jar!/:4.3.18.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:543) ~[spring-context-4.3.18.RELEASE.jar!/:4.3.18.RELEASE]
at org.springframework.boot.context.embedded.EmbeddedWebApplicationContext.refresh(EmbeddedWebApplicationContext.java:122) ~[spring-boot-1.5.15.RELEASE.jar!/:1.5.15.RELEASE]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:693) ~[spring-boot-1.5.15.RELEASE.jar!/:1.5.15.RELEASE]
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:360) ~[spring-boot-1.5.15.RELEASE.jar!/:1.5.15.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:303) ~[spring-boot-1.5.15.RELEASE.jar!/:1.5.15.RELEASE]
at com.creactiviti.piper.PiperApplication.main(PiperApplication.java:37) [classes!/:0.0.1-SNAPSHOT]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_171]
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[na:1.8.0_171]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[na:1.8.0_171]
at java.lang.reflect.Method.invoke(Unknown Source) ~[na:1.8.0_171]
at org.springframework.boot.loader.MainMethodRunner.run(MainMethodRunner.java:48) [piper-0.0.1-SNAPSHOT.jar:0.0.1-SNAPSHOT]
at org.springframework.boot.loader.Launcher.launch(Launcher.java:87) [piper-0.0.1-SNAPSHOT.jar:0.0.1-SNAPSHOT]
at org.springframework.boot.loader.Launcher.launch(Launcher.java:50) [piper-0.0.1-SNAPSHOT.jar:0.0.1-SNAPSHOT]
at org.springframework.boot.loader.JarLauncher.main(JarLauncher.java:51) [piper-0.0.1-SNAPSHOT.jar:0.0.1-SNAPSHOT]
Caused by: java.io.IOException: null
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:105) ~[amqp-client-4.0.3.jar!/:4.0.3]
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:101) ~[amqp-client-4.0.3.jar!/:4.0.3]
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:123) ~[amqp-client-4.0.3.jar!/:4.0.3]
at com.rabbitmq.client.impl.ChannelN.exchangeDeclare(ChannelN.java:763) ~[amqp-client-4.0.3.jar!/:4.0.3]
at com.rabbitmq.client.impl.ChannelN.exchangeDeclare(ChannelN.java:50) ~[amqp-client-4.0.3.jar!/:4.0.3]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_171]
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[na:1.8.0_171]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[na:1.8.0_171]
at java.lang.reflect.Method.invoke(Unknown Source) ~[na:1.8.0_171]
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:1027) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at com.sun.proxy.$Proxy92.exchangeDeclare(Unknown Source) ~[na:na]
at org.springframework.amqp.rabbit.core.RabbitAdmin.declareExchanges(RabbitAdmin.java:630) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.core.RabbitAdmin.access$000(RabbitAdmin.java:72) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.core.RabbitAdmin$12.doInRabbit(RabbitAdmin.java:583) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1461) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
... 33 common frames omitted
Caused by: com.rabbitmq.client.ShutdownSignalException: connection error; protocol method: #method<connection.close>(reply-code=503, reply-text=COMMAND_INVALID - unknown exchange type 'x-delayed-message', class-id=40, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66) ~[amqp-client-4.0.3.jar!/:4.0.3]
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:32) ~[amqp-client-4.0.3.jar!/:4.0.3]
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:366) ~[amqp-client-4.0.3.jar!/:4.0.3]
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:229) ~[amqp-client-4.0.3.jar!/:4.0.3]
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:117) ~[amqp-client-4.0.3.jar!/:4.0.3]
... 45 common frames omitted

2019-04-30 23:52:03.734 INFO 27376 --- [ main] ationConfigEmbeddedWebApplicationContext : Closing org.springframework.boot.context.embedded.AnnotationConfigEmbeddedWebApplicationContext@5ef04b5: startup date [Tue Apr 30 23:51:34 PDT 2019]; root of context hierarchy
2019-04-30 23:52:03.747 INFO 27376 --- [ main] o.s.j.e.a.AnnotationMBeanExporter : Unregistering JMX-exposed beans on shutdown

MongoDB support

I started working on MongoDB database support in Piper. I think it's a very interesting NoSQL database for Piper due to its ability to persist very easily JSON (/BSON) documents.

I would like to know if you would be interested in this contribution?

Switch not working as per document

Issue:
Switch/Cases never executes "default" block.

Created below pipeline. On executing Switch Cases, when "year" value is not 2019 or 2020 it NEVER reaches Default block to set respective value to "output" variable, and the coordinator status always comes FAILED

2019-05-03 22:27:28.001 DEBUG 20812 --- [cTaskExecutor-1] c.c.piper.core.event.LogEventListener : {jobId=a9cabddd12f3469d9c68505fc7da6428, createTime=2019-05-04T05:27:27.993+0000, id=5097b47a74204e38af03b181fdd1cd9b, type=job.status, status=FAILED}

Request:

{
"pipelineId":"myswitch",
"inputs":
{
"year":"2019"
}
}

Response:

{
"createTime": "2019-05-04T05:27:27.490+0000",
"webhooks": [],
"inputs": {
"year": "2090"
},
"id": "a9cabddd12f3469d9c68505fc7da6428",
"label": "Switch",
"priority": 0,
"pipelineId": "switch2",
"status": "CREATED",
"tags": []
}

switch2.yaml

label: Switch

inputs:
  - name: year
    type: string
    required: true
    
tasks:          

  - name: selector
    type: var
    value: ${year}

  - type: switch
    expression: ${selector}
    cases: 
      - key: "2019"
        tasks: 
          - name: output
            type: var
            value: Requested this year ${selector} schedule 
      - key: "2020"
        tasks: 
          - name: output
            type: var
            value: Requested next year ${selector} schedule 
    default: 
          -  name: output
             type: var
             value: Schedule not available for year ${selector}
             
  - type: print
    label: ${selector}-${output}
    text: ${output}  

Bash type tasks sometimes fail to execute

There is an intermittent problem with bash type tasks that sometimes fail to execute, throwing a Permission Denied error.
I have isolated the fault and will shortly submit a pull request.

Fork/Join results not returning

Hi Arik,

            I’m trying to retrieve (join) the two branches back together after they have forked and run.  How can I accomplish that and retrieve the results from each branch in the fork after the join?

            I attempted to do the same with using type “parallel”, but no luck.  I want to be able to merge the branches back, that way I can go to task named “checkAdapterResults” and pull in adapter1 and adapter2 results in the checkAdapterResults handler.

            I’m getting the error “2019-02-13 15:24:45.778 DEBUG 9396 --- [enerContainer-1] c.c.piper.core.task.SpelTaskEvaluator    : EL1008E: Property or field 'adapter1' cannot be found on object of type 'com.creactiviti.piper.core.context.MapContext' - maybe not public or not valid?”

And the same error for “adapter2”.

  • type: fork
    branches:

      • name: adapter1
        label: Web Service Call
        type: adapter1
      • name: adapter2
        label: Web Service Call
        type: adapter2
  • type: checkAdapterResults
    name: checkAdapterResults
    label: Checking Adapter Results
    adapter1: ${adapter1}
    adapter2: ${adapter2}

Thank you ,
Kevin

Branch fork/join documentation issue

The one bolded should be replaced with "tasks" not "branches". Branches doesn't work.

Fork/Join
Executes each branch in the branches as a seperate and isolated sub-flow. Branches are executed internally in sequence.

- type: fork
  **branches**: 
     - - name: randomNumber                 <-- branch 1 start here
         label: Generate a random number
         type: randomInt
         startInclusive: 0
         endInclusive: 5000

Improve separation worker/coordinator

As far as I understand, the commit 73c8345 has simplified project code structure and put all in one place, in a single jar.

We have a strategy of implementation of the workers in which we need to separate the workers from the coordinator (µService architecture - wrt separation of concerns) and we do not want to embed the code of the coordinator (server) for each of them.

In addition, we plan to implement some workers in go (now that piper supports Kafka). That's why it might be interesting to have some sort of SDK in java, aside of the the piper server project, for the implementation of the workers, who would contain the minimum of code required for that, and a declination for other languages: go, python...

What do you think ?

two sequential events for same task are send to two different queues

Hi Team,

Please can anyone clarify me regarding the below two points?

  1. Two sequential events of a task "STARTED" and "COMPLETED" are going to two different queues "Queues.EVENTS" and "Queues.COMPLETIONS".
    Because of two queues for same task state, "Queues.COMPLETIONS" will be processed before "Queues.EVENTS". Though in
    "TaskStartedEventListener.java" we are updating the DB only if startTime is null. I feel this is a temporary fix.
    So my question is can we push "Completion" event also to "Queues.Events" queue?

  2. During handling of the Event, I am setting few more properties to "TaskExecution" Object, So while creating PiperEvent, can we send TaskExecution Object also as part of PiperEvent?

Regards,
Ravi Kishore. K

Add Travis/Circle CI build

It'd be great to have some continuous integration (in order to run builds for regression testing, and so on).

Your first pipeline example does not work

So i am following the steps as mentioned at https://github.com/creactiviti/piper#writing-your-first-pipeline & get an error -

/piper$ curl -s -X POST -H Content-Type:application/json -d '{"pipelineId":"mypipeline","inputs":{"name":"Arik"}}' http://localhost:8080/jobs
{"timestamp":"2019-03-21T20:57:15.006+0000","status":400,"error":"Bad Request","exception":"java.lang.IllegalArgumentException","message":"Unknown pipeline: mypipeline","path":"/jobs"}

I have tried this with the mypipeline.yaml in

/piper$ ll src/main/resources/pipelines/
total 1
drwxrwxrwx 1 xxx xxx 256 Mar 21 16:53 ./
drwxrwxrwx 1 xxx xxx 0 Mar 21 16:24 ../
drwxrwxrwx 1 xxx xxx 0 Mar 21 16:24 demo/
-rwxrwxrwx 1 xxx xxx 240 Mar 21 16:45 mypipeline.yaml*

and at

/piper$ ll piper/pipelines/
total 1
drwxrwxrwx 1 xxx xxx 160 Mar 21 16:45 ./
drwxrwxrwx 1 xxx xxx 152 Mar 21 16:49 ../
-rwxrwxrwx 1 xxx xxx 240 Mar 21 16:45 mypipeline.yaml*

to the same error.

Where should this yaml file go?

java.lang.IllegalArgumentException: Unknown task handler: subflow

As the title mentions, i'm running the latest version of docker container "creactiviti/piper"
I wanted to use the subflow task in order to make a modular code, but an exception was raised

$ docker pull creactiviti/piper
Using default tag: latest
latest: Pulling from creactiviti/piper
Digest: sha256:1d9e5465304805191e0a7322e7c7b36ca2a4f3dc577f02d7bb6f7d46f82726ef
Status: Image is up to date for creactiviti/piper:latest
        {
            "jobId": "3559b511ff3e48ff9baa5548c04c785d",
            "createTime": "2019-01-25T12:08:26.118+0000",
            "inputs": [
                {
                    "source": "/path/to/source/dir"
                },
                {
                    "destination": "/path/to/destination/dir"
                }
            ],
            "taskNumber": 2,
            "id": "c072543876024b89a50d753164d0201e",
            "endTime": "2019-01-25T12:08:26.128+0000",
            "type": "subflow",
            "priority": 0,
            "error": {
                "stackTrace": [
                    "java.lang.IllegalArgumentException: Unknown task handler: subflow",
                    "\tat org.springframework.util.Assert.notNull(Assert.java:134)",
                    "\tat com.creactiviti.piper.core.task.DefaultTaskHandlerResolver.resolve(DefaultTaskHandlerResolver.java:33)",
                    "\tat com.creactiviti.piper.core.Worker.lambda$handle$0(Worker.java:90)",
                    "\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)",
                    "\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)",
                    "\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)",
                    "\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)",
                    "\tat java.lang.Thread.run(Thread.java:748)"
                ],
                "message": "Unknown task handler: subflow"
            },
            "pipelineId": "job",
            "status": "FAILED"
        }

The Docker Hub's image seems to be outdated

/scripts/build.sh fails two unit tests

[ERROR] Failures:
[ERROR] SpelTaskEvaluatorTests.test32:289 expected: </> but was:
[ERROR] MkdirTests.test2:27 Expected java.nio.file.FileSystemException to be thrown, but nothing was thrown.
[INFO]
[ERROR] Tests run: 98, Failures: 2, Errors: 0, Skipped: 0

Assertions.assertEquals(FilenameUtils.getFullPathNoEndSeparator(System.getProperty("java.io.tmpdir")),evaluated.get("tempDir"));
java.io.tmpdir is /tmp on my machine, and getFullPathNoEndSeparator gives "/", which is not equal to "/tmp"

task.set("path", "/no/such/thing");
Files.createDirectories() will not throw expected exception.

Am I missing anything here?

Thanks

adding control flow types

Hi,
Piper control flow seems to be missing flow control types such as loop & next. Actually a loop can be executed by a switch & next kind of control types.
Any suggestions on how to add a next kind of control or any other means to achieve that?
Thanks.

Logical control type support

Hi Team,

I am trying to find the option, how can i fit IF ELSE control.
We have SWITCHand FOR LOOP already.

Please advise.

Thanks in advance.
Sreenadh.

Resuming a task within switch executes all tasks in hierarchy of parent switch

I am executing the following yaml
label: one-level-switch

inputs:

  • name: idType
    label: Type of Id - User/Service
    type: string
    required: true

  • name: retainId
    label: Name of IdType
    type: string
    required: true

  • name: leadApproval
    label: Platform Lead Approval
    type: string
    required: true

  • name: targetServerIP
    label: Target Server IP
    type: string
    required: true

  • name: requestorId
    label: Requestor Id
    type: string
    required: true

  • name: wfRequestId
    label: WorkFlow Id
    type: string
    required: true

  • name: transactionType
    label: Transaction Type
    type: string
    required: true

  • name: requestId
    label: Request Id
    type: string
    required: true

outputs:

  • name: myMagicNumber
    value: ${randomNumber}

tasks:

  • label: Is ServiceId configured
    name: serviceIdOutcome
    type: jio/ngo/checkIdConfiguration
    id_type: service
    retainId: ${retainId }
    wfRequestId: ${wfRequestId}
    exec: async
    route:
    endpoint: /people/myspan/checkIdConfiguration
    endpointType: REST
    inbox:
    stat:
    checkIdOutput: ${checkIdOutput}

  • label: Print a greeting 1
    type: io/print
    text: At ${serviceIdOutcome.checkIdOutput} - ${serviceIdOutcome}

  • label: Service Switch
    type: switch
    expression: ${serviceIdOutcome.checkIdOutput}
    cases:

    • key: Y
      tasks:

      • label: Print a greeting
        type: io/print
        text: No Change
    • key: N
      tasks:

      • label: Domain Lead Approval
        name: domainLeadApproval
        type: jio/ngo/DomainLeadApproval
        requestId: ${requestorId}
        inbox: Domain-Lead-Inbox
        userId: ${requestorId}
        domainLeadOutcome: ${domainLeadOutcome}
        approver: ${leadApproval}

      • type: jio/ngo/id/ProfileChange
        label: Profile Change On Target Server
        text: Profile Change On Target Server
        serverIp: ${targetServerIP}
        retainId: ${retainId }
        wfRequestId: ${wfRequestId}
        exec: async
        route:
        endpoint: /people/myspan/profileChange
        endpointType: REST
        inbox:
        stat:
        default:

    • tasks:

      • label: Print a greeting
        type: io/print
        text: ServiceId Default Case
  • name: updateIdMaster
    type: jio/ngo/CloseTransaction
    value: "Pre: ${message}"
    url: api-url(to be put)
    requestId: ${requestId}
    domainLeadResponse: ${domainLeadApproval.domainLeadOutcome}
    exec: async
    route:
    endpoint: /people/myspan/updateIdMasterAndSendMail
    endpointType: REST
    inbox:
    stat:

I am facing below errors -

  • Firstly the resolution of variable "serviceIdOutcome.checkIdOutput" is uneven. It sometimes resolves sometimes not.
  • Secondly I am pausing task( using cordinator.stop() ) at intermediate level (in my case - Domain Lead Approval ) for approval as per my requirement. When i resume task at this stage, the workflow executes once again from start( Which is incorrect in terms of execution, Ideally it should complete my existing task and move on to next one.)
  • Third, if I have multiple switch cases , each task in switch starts with taskNumber 1. So as in "task_exection" table for task "Is ServiceId configured" we have tasknumber as 1 and also for "Domain Lead Approval" we have taskNumber 1. Ideally it should be incremental one, which is also one of the case in which my workflow fails to execute in expected flow.

[Question] Access to the JobId via piper pipelines

Hello,

As i have mentioned in the title, I want to get access to the JobId variable in order to persist some data that will be used after the notification of the registered webhooks.
I have checked the content of ${execution} & ${this} according to the Spring Expression Language, but without success.

Thanks for your answer.

Unknown pipeline: video/hls_single

Hi,

I have setup Coordinator and Worker based setup. My problem is sometimes its working and sometimes I am getting the error " Execution of Rabbit message listener failed." in Worker-node. Also, I am attaching the error below.
Can you help me to solve the issue?

Worker Node Log:

2019-06-06 07:57:32.150 WARN 1 --- [cTaskExecutor-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.

org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener method 'onApplicationEvent' threw exception
at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.invokeListenerMethod(MessageListenerAdapter.java:395) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.onMessage(MessageListenerAdapter.java:298) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:856) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:779) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:105) [spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:208) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1381) [spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:760) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1324) [spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1294) [spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1800(SimpleMessageListenerContainer.java:105) [spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1550) [spring-rabbit-1.7.9.RELEASE.jar!/:na]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_191]
Caused by: java.lang.NullPointerException: null
at com.creactiviti.piper.core.task.SubflowJobStatusEventListener.onApplicationEvent(SubflowJobStatusEventListener.java:46) ~[classes!/:0.0.1-SNAPSHOT]
at com.creactiviti.piper.core.event.EventListenerChain.onApplicationEvent(EventListenerChain.java:35) ~[classes!/:0.0.1-SNAPSHOT]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_191]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_191]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_191]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_191]
at org.springframework.util.MethodInvoker.invoke(MethodInvoker.java:265) ~[spring-core-4.3.18.RELEASE.jar!/:4.3.18.RELEASE]
at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.invokeListenerMethod(MessageListenerAdapter.java:387) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
... 12 common frames omitted

2019-06-06 07:57:32.153 WARN 1 --- [cTaskExecutor-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.

org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener method 'start' threw exception
at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.invokeListenerMethod(MessageListenerAdapter.java:395) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.onMessage(MessageListenerAdapter.java:298) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:856) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:779) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:105) [spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:208) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1381) [spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:760) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1324) [spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1294) [spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1800(SimpleMessageListenerContainer.java:105) [spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1550) [spring-rabbit-1.7.9.RELEASE.jar!/:na]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_191]
Caused by: java.lang.NullPointerException: null
at com.creactiviti.piper.core.context.MapContext.(MapContext.java:39) ~[classes!/:0.0.1-SNAPSHOT]
at com.creactiviti.piper.core.DefaultJobExecutor.executeNextTask(DefaultJobExecutor.java:73) ~[classes!/:0.0.1-SNAPSHOT]
at com.creactiviti.piper.core.DefaultJobExecutor.execute(DefaultJobExecutor.java:52) ~[classes!/:0.0.1-SNAPSHOT]
at com.creactiviti.piper.core.Coordinator.start(Coordinator.java:133) ~[classes!/:0.0.1-SNAPSHOT]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_191]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_191]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_191]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_191]
at org.springframework.util.MethodInvoker.invoke(MethodInvoker.java:265) ~[spring-core-4.3.18.RELEASE.jar!/:4.3.18.RELEASE]
at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.invokeListenerMethod(MessageListenerAdapter.java:387) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
... 12 common frames omitted

Pipeline task

Is it possible to call other pipelines by their "pipelineId" as Tasks from another pipeline, and pass them parameters?

Job.status webhooks don't notify of STARTED status

I've started using Webhooks (thanks a lot for the updated documentation btw) but have encountered the following issue:
I get notified of CREATED and COMPLETED events, but not of STARTED ones.
I believe there is a bug on this line, which publishes the event before changing the status. Is this it or am I understanding incorrectly?
Also the notification of the CREATED status seems unnecessary, we already get it on the response to the create job request.

Docker setup help, Failed to obtain JDBC Connection

I've reached this amazing project and I want to give it a try, but I'm unable to setup the enviroment with docker. I've tried two tutorials without success. If I follow the docker instructions at the end of this repository, I just end up with this message if I perform a get request to the /jobs url:

image

The same if I make a POST request with postman, same message:

image

My docker-compose config looks like this:

rabbit:
  image: creactiviti/rabbitmq:3.7.7-management
  hostname: rabbit
  ports:
    - 15672:15672
    - 5672:5672
  restart: always

postgres:
  image: postgres:11
  ports: 
    - 5432:5432
  environment:
    - POSTGRES_DB=piper
    - POSTGRES_USER=piper
    - POSTGRES_PASSWORD=piper

coordinator:
  image: creactiviti/piper:latest
  ports:
    - 8080:8080
  environment:
    - piper.worker.enabled=true
    - piper.coordinator.enabled=true
    - piper.worker.subscriptions.tasks=1
    - piper.pipeline-repository.filesystem.enabled=true
    - piper.pipeline-repository.filesystem.location-pattern=/app/pipelines/**/*.yaml
    - spring.datasource.initialize=true
    - spring.datasource.name=piper
    - spring.datasource.platform=postgres
    - spring.datasource.url=jdbc:postgresql://localhost:5432/piper
    - spring.datasource.username=piper
    - spring.datasource.password=piper
  volumes:
    - D:/test/docker/piper/pipelines:/app/pipelines
  restart: always

I also tried above setup but without any of the spring options, as the docker tutorial in this repository states with same results.

I have some questions:

  1. Is it mandatory having a DB for following the hello world tutorial?
  2. Is there any other tutorial might I follow?
  3. Is it compatible with Oracle, or just h2 and postgres?

I also tried this tutorial without success, I think I'm missing some steps.
https://medium.com/@arik.cohen/transcoding-video-at-scale-with-piper-dca23eb26fd2

Thanks for your work!

How to run project after importing in STS

I have successfully imported piper project and when i run it it got started on pot 8080.
But i want to know how i should check project is running by hitting URL on browser

Parallel task handler is executing tasks sequentially

As the title describe, the second command's output date is always greater than the first commmand's

Pipeline code :

label: Transcode 

inputs:
  - name: input
    label: Input File
    type: string
    required: true

tasks: 

- type: parallel
  tasks:

    - type: bash
      label: First executed command
      script: sleep 10; date
          
    - type: bash
      label: Second executed command
      script: date

Job description :

{
    "outputs": {},
    "execution": [
        {
            "jobId": "635d3f8490664d2b8cadd1eb434cd09c",
            "createTime": "2019-01-27T20:17:23.757+0000",
            "startTime": "2019-01-27T20:17:23.824+0000",
            "taskNumber": 1,
            "id": "2a22422dfdde49dfbad12e5985bebcea",
            "type": "parallel",
            "priority": 0,
            "tasks": [
                {
                    "label": "First executed command",
                    "type": "bash",
                    "script": "sleep 10; date"
                },
                {
                    "label": "Second executed command",
                    "type": "bash",
                    "script": "date"
                }
            ],
            "status": "COMPLETED"
        },
        {
            "label": "First executed command",
            "type": "bash",
            "priority": 0,
            "script": "sleep 10; date",
            "parentId": "2a22422dfdde49dfbad12e5985bebcea",
            "output": "Sun Jan 27 20:17:33 UTC 2019\n",
            "executionTime": 10008,
            "jobId": "635d3f8490664d2b8cadd1eb434cd09c",
            "createTime": "2019-01-27T20:17:23.795+0000",
            "progress": 100,
            "startTime": "2019-01-27T20:17:23.801+0000",
            "id": "3fe36d3f85f74513b0ad80f2ab377ed0",
            "endTime": "2019-01-27T20:17:33.808+0000",
            "status": "COMPLETED"
        },
        {
            "label": "Second executed command",
            "type": "bash",
            "priority": 0,
            "script": "date",
            "parentId": "2a22422dfdde49dfbad12e5985bebcea",
            "output": "Sun Jan 27 20:17:33 UTC 2019\n",
            "executionTime": 5,
            "jobId": "635d3f8490664d2b8cadd1eb434cd09c",
            "createTime": "2019-01-27T20:17:23.798+0000",
            "progress": 100,
            "startTime": "2019-01-27T20:17:33.810+0000",
            "id": "0d5e03f8e3dc44269f90e58a1595e697",
            "endTime": "2019-01-27T20:17:33.815+0000",
            "status": "COMPLETED"
        }
    ],
    "inputs": {
        "input": "lol"
    },
    "currentTask": -1,
    "label": "Transcode",
    "priority": 0,
    "pipelineId": "video/transcode",
    "tags": [],
    "createTime": "2019-01-27T20:17:23.620+0000",
    "webhooks": [],
    "startTime": "2019-01-27T20:17:23.750+0000",
    "id": "635d3f8490664d2b8cadd1eb434cd09c",
    "endTime": "2019-01-27T20:17:33.843+0000",
    "status": "COMPLETED"
}

Incorrect HTTP status code (500)

The HTTP status code returned by the API is incorrect in some cases.

For instance, when launching a pipeline with a bad parameter:

> curl -v -s -X POST -H Content-Type:application/json -d '{"pipelineId":"demo/hello","inputs":{"badprop":"Joe Jones"}}' http://localhost:8080/jobs

I got:

< HTTP/1.1 500 
< X-Application-Context: application
< Content-Type: application/json;charset=UTF-8
< Transfer-Encoding: chunked
< Date: Tue, 12 Jun 2018 10:17:03 GMT
< Connection: close
< 
* Closing connection 0
{"timestamp":"2018-06-12T10:17:03.794+0000","status":500,"error":"Internal Server Error","exception":"java.lang.IllegalArgumentException","message":"Missing required param: yourName","path":"/jobs"}⏎          

I would expect to have a 400 error code instead.

400 Bad Request: The request cannot be fulfilled due to bad syntax.

Application startup failed due to org.springframework.amqp.AmqpIOException: java.io.IOException

I am facing following exceptions (window env) Please advice. Thanks in advance.

ror starting ApplicationContext. To display the auto-configuration report re-run your application with 'debug' enabled.
2019-05-05 00:49:05.105 ERROR 17532 --- [ main] o.s.boot.SpringApplication : Application startup failed

org.springframework.amqp.AmqpIOException: java.io.IOException
at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:71) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.connection.RabbitAccessor.convertRabbitAccessException(RabbitAccessor.java:113) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1467) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1417) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1393) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.core.RabbitAdmin.initialize(RabbitAdmin.java:579) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.core.RabbitAdmin$11$1.doWithRetry(RabbitAdmin.java:486) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287) ~[spring-retry-1.2.2.RELEASE.jar!/:na]
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:164) ~[spring-retry-1.2.2.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.core.RabbitAdmin$11.onCreate(RabbitAdmin.java:481) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.connection.CompositeConnectionListener.onCreate(CompositeConnectionListener.java:33) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createConnection(CachingConnectionFactory.java:592) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1436) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1417) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1393) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.core.RabbitAdmin.declareQueue(RabbitAdmin.java:232) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at com.creactiviti.piper.config.AmqpMessengerConfiguration.registerListenerEndpoint(AmqpMessengerConfiguration.java:168) ~[classes!/:0.0.1-SNAPSHOT]
at com.creactiviti.piper.config.AmqpMessengerConfiguration.registerListenerEndpoint(AmqpMessengerConfiguration.java:164) ~[classes!/:0.0.1-SNAPSHOT]
at com.creactiviti.piper.config.AmqpMessengerConfiguration.lambda$configureRabbitListeners$0(AmqpMessengerConfiguration.java:150) ~[classes!/:0.0.1-SNAPSHOT]
at java.util.HashMap.forEach(Unknown Source) ~[na:1.8.0_201]
at com.creactiviti.piper.config.AmqpMessengerConfiguration.configureRabbitListeners(AmqpMessengerConfiguration.java:150) ~[classes!/:0.0.1-SNAPSHOT]
at org.springframework.amqp.rabbit.annotation.RabbitListenerAnnotationBeanPostProcessor.afterSingletonsInstantiated(RabbitListenerAnnotationBeanPostProcessor.java:230) ~[spring-rabbit-1.7.9.RELEA
SE.jar!/:na]
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:781) ~[spring-beans-4.3.18.RELEASE.jar!/:4.3.18.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:867) ~[spring-context-4.3.18.RELEASE.jar!/:4.3.18.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:543) ~[spring-context-4.3.18.RELEASE.jar!/:4.3.18.RELEASE]
at org.springframework.boot.context.embedded.EmbeddedWebApplicationContext.refresh(EmbeddedWebApplicationContext.java:122) ~[spring-boot-1.5.15.RELEASE.jar!/:1.5.15.RELEASE]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:693) ~[spring-boot-1.5.15.RELEASE.jar!/:1.5.15.RELEASE]
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:360) ~[spring-boot-1.5.15.RELEASE.jar!/:1.5.15.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:303) ~[spring-boot-1.5.15.RELEASE.jar!/:1.5.15.RELEASE]
at com.creactiviti.piper.PiperApplication.main(PiperApplication.java:37) [classes!/:0.0.1-SNAPSHOT]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_201]
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[na:1.8.0_201]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[na:1.8.0_201]
at java.lang.reflect.Method.invoke(Unknown Source) ~[na:1.8.0_201]
at org.springframework.boot.loader.MainMethodRunner.run(MainMethodRunner.java:48) [piper-0.0.1-SNAPSHOT.jar:0.0.1-SNAPSHOT]
at org.springframework.boot.loader.Launcher.launch(Launcher.java:87) [piper-0.0.1-SNAPSHOT.jar:0.0.1-SNAPSHOT]
at org.springframework.boot.loader.Launcher.launch(Launcher.java:50) [piper-0.0.1-SNAPSHOT.jar:0.0.1-SNAPSHOT]
at org.springframework.boot.loader.JarLauncher.main(JarLauncher.java:51) [piper-0.0.1-SNAPSHOT.jar:0.0.1-SNAPSHOT]
Caused by: java.io.IOException: null
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:105) ~[amqp-client-4.0.3.jar!/:4.0.3]
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:101) ~[amqp-client-4.0.3.jar!/:4.0.3]
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:123) ~[amqp-client-4.0.3.jar!/:4.0.3]
at com.rabbitmq.client.impl.ChannelN.exchangeDeclare(ChannelN.java:763) ~[amqp-client-4.0.3.jar!/:4.0.3]
at com.rabbitmq.client.impl.ChannelN.exchangeDeclare(ChannelN.java:50) ~[amqp-client-4.0.3.jar!/:4.0.3]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_201]
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[na:1.8.0_201]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[na:1.8.0_201]
at java.lang.reflect.Method.invoke(Unknown Source) ~[na:1.8.0_201]
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:1027) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at com.sun.proxy.$Proxy87.exchangeDeclare(Unknown Source) ~[na:na]
at org.springframework.amqp.rabbit.core.RabbitAdmin.declareExchanges(RabbitAdmin.java:630) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.core.RabbitAdmin.access$000(RabbitAdmin.java:72) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.core.RabbitAdmin$12.doInRabbit(RabbitAdmin.java:583) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1461) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
... 35 common frames omitted
Caused by: com.rabbitmq.client.ShutdownSignalException: connection error; protocol method: #method<connection.close>(reply-code=503, reply-text=COMMAND_INVALID - unknown exchange type 'x-delayed-message'
, class-id=40, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66) ~[amqp-client-4.0.3.jar!/:4.0.3]
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:32) ~[amqp-client-4.0.3.jar!/:4.0.3]
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:366) ~[amqp-client-4.0.3.jar!/:4.0.3]
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:229) ~[amqp-client-4.0.3.jar!/:4.0.3]
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:117) ~[amqp-client-4.0.3.jar!/:4.0.3]
... 47 common frames omitted

2019-05-05 00:49:05.110 INFO 17532 --- [ main] ationConfigEmbeddedWebApplicationContext : Closing org.springframework.boot.context.embedded.AnnotationConfigEmbeddedWebApplicationContext@46fbb2
c1: startup date [Sun May 05 00:48:39 PDT 2019]; root of context hierarchy
2019-05-05 00:49:05.116 INFO 17532 --- [ main] o.s.j.e.a.AnnotationMBeanExporter : Unregistering JMX-exposed beans on shutdown

Distributed computing issue when deployed on multiple nodes with kubernetes

I have deployed piper in kubernetes using 5 piper workers/pods .

After proceeding with the tests, I have noticed that all workers are responsible for a single job which is a problem in case I want to make some POST-TRANSCODING actions with the video.

Here's an explained scenario :

  1. worker X gets the job and started transcoding the file and saving the output to a temporary file
  2. worker X finished the transcoding
  3. worker Y tries to make some POST-TRANSCODING actions with the video => Here come the problem, file not found in the worker's tmp directory.

I did a hack using a mounted NFS volume to sync the workers processing but this is very Network IO consuming.

So my question: Is it possible to bind a unique worker to a unique job ?

Apache Kafka support

Hi,

It looks like the current implementation supports jms, rabbitmq. Do you have any future roadmap to integrate with Apache Kafka as the message broker?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.