Giter VIP home page Giter VIP logo

sylph's Introduction

Sylph Build Status

license language os

Welcome to Sylph !

Sylph is Streaming Job Manager.

Sylph uses SQL Query to describe calculations and bind multiple source(input)/sink(output) to visually develop and deploy streaming applications. Through Web IDE makes it easy to develop, deploy, monitor streaming applications and analyze streaming application behavior at any time.
Sylph has rich source/sink support and flexible extensions to visually develop and deploy stream analysis applications and visualized streaming application lifecycle management.

The Sylph core is to build distributed applications through workflow descriptions. Support for

  • Spark-Streaming (Spark1.x)
  • Structured-Streaming (Spark2.x)
  • Flink Streaming

License

Copyright (C) 2018 The Sylph Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

StreamingSql

create function get_json_object as 'com.github.harbby.sylph.runner.flink.runtime.UDFJson';

create source table topic1(
    _topic varchar,
    _key varchar,
    _partition integer,
    _offset bigint,
    _message varchar,
    ip varchar extend '$.conntent.ip',                 -- json path
    event_time varchar extend '$.conntent.event_time'  -- json path
) with (
    type = 'kafka08',
    kafka_topic = 'event_topic',
    auto.offset.reset = latest,
    kafka_broker = 'localhost:9092',
    kafka_group_id = 'test1',
    zookeeper.connect = 'localhost:2181'
);

-- 定义数据流输出位置
create sink table event_log(
    key varchar,
    user_id varchar,
    offset bigint
) with (
    type = 'kudu',
    kudu.hosts = 'localhost:7051',
    kudu.tableName = 'impala::test_kudu.log_events',
    kudu.mode = 'INSERT',
    batchSize = 5000
);

insert into event_log
select _key,get_json_object(_message, 'user_id') as user_id,_offset 
from topic1

UDF UDAF UDTF

The registration of the custom function is consistent with the hive

create function get_json_object as 'com.github.harbby.sylph.runner.flink.runtime.UDFJson';

Building

sylph builds use Gradle and requires Java 8.
Also if you want read a chinese deploy docs,中文部署文档 may can help you.

# Build and install distributions
./gradlew clean assemble dist

Running Sylph in your IDE

After building Sylph for the first time, you can load the project into your IDE and run the server. Me recommend using IntelliJ IDEA.

After opening the project in IntelliJ, double check that the Java SDK is properly configured for the project:

  • Open the File menu and select Project Structure
  • In the SDKs section, ensure that a 1.8 JDK is selected (create one if none exist)
  • In the Project section, ensure the Project language level is set to 8.0 as Sylph makes use of several Java 8 language features
  • HADOOP_HOME(2.6.x+) SPARK_HOME(2.4.x+) FLINK_HOME(1.7.x+)

Sylph comes with sample configuration that should work out-of-the-box for development. Use the following options to create a run configuration:

  • Main Class: com.github.harbby.sylph.main.SylphMaster
  • VM Options: -Dconfig=etc/sylph/sylph.properties -Dlogging.config=etc/sylph/logback.xml
  • ENV Options: FLINK_HOME= HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
  • Working directory: sylph-dist/build
  • Use classpath of module: sylph-main

Useful mailing lists

  1. [email protected] - For discussions about code, design and features
  2. [email protected] - For discussions about code, design and features
  3. [email protected] - For discussions about code, design and features

Getting Help

sylph's People

Contributors

dependabot[bot] avatar erichetti avatar harbby avatar jeific avatar jfanzhao avatar lingya avatar thomasperkins1123 avatar uzmijnlm avatar wcf1 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

sylph's Issues

AccessControlException): Permission denied: user=kkk

Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.AccessControlException): Permission denied: user=kkk, access=WRITE, inode="/data":hdfs:supergroup:drwxr-xr-x

cdh环境下,本机启动sylph,提示没有权限访问目录,怎么办?

在IDEA中运行报错

19-09-23 09:20:34 ERROR[main][SylphMaster.java:83]-SERVER START FAILED...
java.lang.NoClassDefFoundError: org/apache/hadoop/yarn/conf/YarnConfiguration
at ideal.sylph.runtime.yarn.YarnModule.configure(YarnModule.java:42)
at com.github.harbby.gadtry.ioc.BindMapping.create(BindMapping.java:163)
at com.github.harbby.gadtry.ioc.BindMapping.create(BindMapping.java:92)
at com.github.harbby.gadtry.ioc.IocFactory.create(IocFactory.java:55)
at ideal.sylph.runner.flink.FlinkContainerFactory.(FlinkContainerFactory.java:67)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at java.lang.Class.newInstance(Class.java:442)
at ideal.sylph.main.service.JobEngineManager.lambda$createRunner$1(JobEngineManager.java:117)
at com.github.harbby.gadtry.base.Throwables.noCatch(Throwables.java:94)
at ideal.sylph.main.service.JobEngineManager.createRunner(JobEngineManager.java:117)
at ideal.sylph.main.service.JobEngineManager.lambda$loadRunners$0(JobEngineManager.java:103)
at com.github.harbby.gadtry.classloader.PluginLoader$Builder.loadModule(PluginLoader.java:205)
at com.github.harbby.gadtry.classloader.PluginLoader$Builder.load(PluginLoader.java:189)
at ideal.sylph.main.service.JobEngineManager.loadRunners(JobEngineManager.java:106)
at ideal.sylph.main.SylphMaster.main(SylphMaster.java:73)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.yarn.conf.YarnConfiguration
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at com.github.harbby.gadtry.classloader.PluginClassLoader.loadClass(PluginClassLoader.java:83)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 18 common frames omitted

Add more documents

This issue is a long-term issue and will be reported here if new documents are added.

flink_1.9分支运行任务报错

com.github.harbby.gadtry.jvm.JVMException: ForKJVMError: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:627)
at ideal.sylph.runner.flink.local.MiniExecutor.executeJobBlocking(MiniExecutor.java:88)
at ideal.sylph.runner.flink.local.MiniExecutor.lambda$createVmCallable$9b2e45cb$1(MiniExecutor.java:111)
at com.github.harbby.gadtry.jvm.JVMLauncher.main(JVMLauncher.java:58)
Caused by: java.lang.NoClassDefFoundError: Could not initialize class ideal.sylph.plugins.kafka.flink.JsonDeserializer
at java.io.ObjectStreamClass.hasStaticInitializer(Native Method)
at java.io.ObjectStreamClass.computeDefaultSUID(ObjectStreamClass.java:1941)
Caused by: java.lang.NoClassDefFoundError: Could not initialize class ideal.sylph.plugins.kafka.flink.JsonDeserializer

at java.io.ObjectStreamClass.access$100(ObjectStreamClass.java:79)
at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:275)
at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:273)
at java.security.AccessController.doPrivileged(Native Method)
at java.io.ObjectStreamClass.getSerialVersionUID(ObjectStreamClass.java:272)
at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:694)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1885)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:235)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:115)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:370)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)

at com.github.harbby.gadtry.jvm.VmResult.get(VmResult.java:31)
at com.github.harbby.gadtry.jvm.VmFuture.get(VmFuture.java:88)
at ideal.sylph.runtime.local.LocalContainer.getStatus(LocalContainer.java:91)
at ideal.sylph.main.service.JobManager.lambda$null$0(JobManager.java:75)
at java.util.concurrent.ConcurrentHashMap.forEach(ConcurrentHashMap.java:1597)
at ideal.sylph.main.service.JobManager.lambda$new$1(JobManager.java:74)
at java.lang.Thread.run(Thread.java:748)

20-08-19 17:16:43 WARN[job_monitor][JobManager.java:77]-Job 13[24196] state is STOP, Will resubmit

是否支持窗口函数问题

您好,窗口函数包括滚动窗口、滑动窗口、会话窗口、over窗口,Sylph项目对这些窗口函数都支持吗?谢谢

Optimize the root build experience

bower ESUDO         Cannot be run with sudo

Additional error details:
Since bower is a user command, there is no need to execute it with superuser permissions.
If you're having permission errors when using bower without sudo, please spend a few minutes learning more about how your system should work and make any necessary repairs.

http://www.joyent.com/blog/installing-node-and-npm
https://gist.github.com/isaacs/579814

You can however run a command with sudo using "--allow-root" option
npm ERR! code ELIFECYCLE
npm ERR! errno 1
npm ERR! [email protected] build: `bower install && exit 0`

add connector-hbase ,Support sink and join

see:

CREATE OUTPUT TABLE hbase_table1(
  rowkey VARCHAR,
  name VARCHAR,
  age BIGINT,
  birthday DATE
)
WITH (
  column_mapping = 'name:info:name,age:info:age,birthday:info:date'
)

java.lang.NoClassDefFoundError: org/apache/hadoop/yarn/conf/YarnConfiguration

19-05-20 17:23:02 INFO[PipelinePluginLoader.java:126]-Find PipelinePlugin:ideal.sylph.plugins.kudu.KuduSink
19-05-20 17:23:02 INFO[PipelinePluginLoader.java:181]-loading RealTime Pipeline Plugin:class ideal.sylph.plugins.kudu.KuduSink ,the name is [kudu]
19-05-20 17:23:02 INFO[PipelinePluginLoader.java:126]-Find PipelinePlugin:ideal.sylph.plugins.mysql.PrintSink
19-05-20 17:23:02 INFO[PipelinePluginLoader.java:126]-Find PipelinePlugin:ideal.sylph.plugins.mysql.TestTrans
19-05-20 17:23:02 INFO[PipelinePluginLoader.java:126]-Find PipelinePlugin:ideal.sylph.plugins.mysql.MysqlAsyncJoin
19-05-20 17:23:02 INFO[PipelinePluginLoader.java:126]-Find PipelinePlugin:ideal.sylph.plugins.mysql.MysqlSink
19-05-20 17:23:02 INFO[PipelinePluginLoader.java:181]-loading RealTime Pipeline Plugin:class ideal.sylph.plugins.mysql.PrintSink ,the name is [console]
19-05-20 17:23:02 INFO[PipelinePluginLoader.java:181]-loading RealTime Pipeline Plugin:class ideal.sylph.plugins.mysql.TestTrans ,the name is [ideal.sylph.plugins.mysql.TestTrans]
19-05-20 17:23:02 INFO[PipelinePluginLoader.java:181]-loading RealTime Pipeline Plugin:class ideal.sylph.plugins.mysql.MysqlAsyncJoin ,the name is [mysql]
19-05-20 17:23:02 INFO[PipelinePluginLoader.java:181]-loading RealTime Pipeline Plugin:class ideal.sylph.plugins.mysql.MysqlSink ,the name is [mysql]
19-05-20 17:23:02 INFO[RunnerManager.java:106]-Found module dir directory modules/sylph-runner-flink Try to loading the runner
19-05-20 17:23:02 INFO[RunnerManager.java:113]-Installing runner ideal.sylph.runner.flink.FlinkRunner with dirideal.sylph.runner.flink.FlinkRunner@6a400542
19-05-20 17:23:02 INFO[RunnerManager.java:124]-Runner: ideal.sylph.runner.flink.FlinkRunner starts loading ideal.sylph.etl.PipelinePlugin
19-05-20 17:23:02 ERROR[SylphMaster.java:80]-SERVER START FAILED...
java.lang.NoClassDefFoundError: org/apache/hadoop/yarn/conf/YarnConfiguration
	at ideal.sylph.runtime.yarn.YarnModule.configure(YarnModule.java:42)
	at com.github.harbby.gadtry.ioc.BindMapping.create(BindMapping.java:158)
	at com.github.harbby.gadtry.ioc.IocFactory.create(IocFactory.java:55)
	at ideal.sylph.runner.flink.FlinkContainerFactory.<init>(FlinkContainerFactory.java:68)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at java.lang.Class.newInstance(Class.java:442)
	at ideal.sylph.main.service.RunnerManager.lambda$createRunner$1(RunnerManager.java:129)
	at com.github.harbby.gadtry.base.Throwables.noCatch(Throwables.java:92)
	at ideal.sylph.main.service.RunnerManager.createRunner(RunnerManager.java:129)
	at ideal.sylph.main.service.RunnerManager.lambda$loadRunners$0(RunnerManager.java:114)
	at com.github.harbby.gadtry.classloader.PluginLoader$Builder.loadModule(PluginLoader.java:201)
	at com.github.harbby.gadtry.classloader.PluginLoader$Builder.build(PluginLoader.java:185)
	at ideal.sylph.main.service.RunnerManager.loadRunners(RunnerManager.java:117)
	at ideal.sylph.main.SylphMaster.main(SylphMaster.java:70)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.yarn.conf.YarnConfiguration
	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at com.github.harbby.gadtry.classloader.PluginClassLoader.loadClass(PluginClassLoader.java:83)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	... 17 more

windows submit job failure

sumbit job from windows OS,some jars upload hdfs, but program cannot find some class,because,classpath's separator on windows is semicolon, on linux is colon。

flink1.11.1 运行报 配置参数的问题

Caused by: org.apache.flink.configuration.IllegalConfigurationException: Either required fine-grained memory (jobmanager.memory.heap.size), or Total Flink Memory size (Key: 'jobmanager.memory.flink.size' , default: null (fallback keys: [])), or Total Process Memory size (Key: 'jobmanager.memory.process.size' , default: null (fallback keys: [])) need to be configured explicitly.

add clickhouse-sink support

create sink table flink2ck(
key varchar,
message varchar,
event_time date
) with(
type = 'ideal.sylph.plugins.clickhouse.ClickHouseSink',
userName = 'default',
password ='default',
bulkSize = 20000 -- 默认20000
url = 'jdbc:clickhouse://localhost:9000',
query = 'insert into flink2ck values(?,?,?)'
);

When `tableEnv.registerDataStream` is used multiple times, specifying `proctime.proctime` or `rowtime.rowtime` will cause this error.

see:

Caused by: java.lang.ClassCastException: java.sql.Timestamp cannot be cast to java.lang.Long
	at org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:27)
	at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:95)
	at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:46)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
	at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
	at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
	at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37)
	at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28)
	at DataStreamCalcRule$15.processElement(Unknown Source)
	at org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:66)
	at org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:35)
	at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)

add connector redis

support redis <k,v>sink
support redis <rowkey, fieldKey, v>sink
support redis <k,v> Join
support redis <rowkey, fieldKey, v> Join

Job proxy page may not open

Some clusters behave as follows:

19-01-30 14:52:17 INFO[DefaultRequestDirector.java:633]-Retrying connect to localhost/127.0.0.1->{}->http://hadoop7:44921
19-01-30 14:52:17 INFO[DefaultRequestDirector.java:625]-I/O exception (java.net.SocketException) caught when connecting to localhost/127.0.0.1->{}->http://hadoop7:44921: Invalid argument or cannot assign requested address

Cannot load user class

ClassLoader info: URL ClassLoader:
Class not resolvable through given classloader.
at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:236)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:369)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:296)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:360)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:296)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:360)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:296)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:360)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:296)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:360)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:296)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:133)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)

什么时候更新到flink1.10?

什么时候更新到flink1.10,出一个详细的结构文档,不然不好研究你的代码,然后大家好去完善它。

sylph 0.3 Plan

1, Support simple batch join (dimensional table join)
2, Eliminate front-end version vulnerabilities in github reports

The custom kafka data source not support json data map list format parser

I use below sql create source table:
create source table topic1(
uuid int,
data var
) with (
type = 'kafka',
kafka_topic = 'flow_message_test',
"auto.offset.reset" = latest,
kafka_broker = '10.16.98.10:9092',
kafka_group_id = 'test_json_parser',
value_type = 'json'
);
Data are as follows:
{
"uuid": 10455,
"data": ["{"applyid":"1590224987","base_info":"{\"city_id\":201}}"]
}

That sql grammar check pass. running throw exceptions blow:
java.lang.ClassCastException: java.util.ArrayList cannot be cast to java.lang.String
at org.apache.flink.api.common.typeutils.base.StringSerializer.copy(StringSerializer.java:33)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:96)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:47)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:89)
at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:154)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:738)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:748)

idea下运行报错

参照配置后运行

相关配置说明

Main Class: ideal.sylph.main.SylphMaster
VM Options: -Dconfig=etc/sylph/sylph.properties -Dlogging.config=etc/sylph/logback.xml
Program arguments: 如命令行运行中修改sylph-env.sh,将flink等的环境配置进来,以";"分割
Working directory: 目录到编译后的 sylph-dist/build目录下,即 $clone后的sylph目录/sylph-dist/build,全路径
Environment variables: 与Program arguments相同
Use classpath of module: sylph-main
JRE: 1.8

报错信息:
Error: A JNI error has occurred, please check your installation and try again
Exception in thread "main" java.lang.NoClassDefFoundError: com/github/harbby/gadtry/ioc/Bean

Connector api supports config injection

see:

    public static class MysqlConfig
            extends PluginConfig
    {
        @Name("url")
        @Description("this is mysql jdbc url")
        private String jdbcUrl;

        @Name("userName")
        @Description("this is mysql userName")
        private String user;

        @Name("password")
        @Description("this is mysql password")
        private String password;
}
@Name("mysql")
@Description("this is mysql Sink, if table not execit ze create table")
public class MysqlSink
        implements RealTimeSink
{
    public MysqlSink(MysqlConfig mysqlConfig)
    {
        this.config = mysqlConfig;
    }

在编译的时候报错

Task :sylph-connectors:sylph-hdfs:compileJava FAILED

FAILURE: Build failed with an exception.

  • What went wrong:
    Execution failed for task ':sylph-connectors:sylph-hdfs:compileJava'.

Could not resolve all files for configuration ':sylph-connectors:sylph-hdfs:compileClasspath'.
Could not resolve com.hadoop.gplcompression:hadoop-lzo:0.4.20.
Required by:
project :sylph-connectors:sylph-hdfs
> Could not resolve com.hadoop.gplcompression:hadoop-lzo:0.4.20.
> Could not get resource 'https://maven.twttr.com/com/hadoop/gplcompression/hadoop-lzo/0.4.20/hadoop-lzo-0.4.20.pom'.
> Could not GET 'https://maven.twttr.com/com/hadoop/gplcompression/hadoop-lzo/0.4.20/hadoop-lzo-0.4.20.pom'.
> Connect to maven.twttr.com:443 [maven.twttr.com/31.13.85.8] failed: Connection refused (Connection refused)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.