ververica / flink-training-exercises Goto Github PK
View Code? Open in Web Editor NEWLicense: Apache License 2.0
License: Apache License 2.0
when I do the example of CarEventSort.java , i failed to find the corresponding data set for this example, this example is very important, so how can i find them? many thx!
No longer able to download the datasets for the training-exercises the links are broken:
wget http://training.ververica.com/trainingData/nycTaxiRides.gz URL transformed to HTTPS due to an HSTS policy --2020-06-29 15:19:37-- https://training.ververica.com/trainingData/nycTaxiRides.gz Resolving training.ververica.com (training.ververica.com)... 151.101.65.195, 151.101.1.195 Connecting to training.ververica.com (training.ververica.com)|151.101.65.195|:443... connected. HTTP request sent, awaiting response... 404 Not Found 2020-06-29 15:19:37 ERROR 404: Not Found.
mvn clean install fails with a checkstyle error about unsused imports:
[INFO] There are 2 checkstyle errors.
[ERROR] TravelTimePrediction.java[30:8] Unused import - org.apache.flink.api.common.typeinfo.TypeHint.
[ERROR] EventTimeJoinFunction.java[25:8] Unused import - java.util.Iterator.
Modifed : I realised that tha Java version was mapped to 0.2 version of exercises but the scala was mapped to 0.3
I changed it to 0.2 and now it is working as expected.
Is there any thing strange ?
While trying a got an exception below.
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply$mcV$sp(JobManager.scala:563)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:509)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:509)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.UnsupportedOperationException: Automatic-Timestamp sources cannot emit elements with a timestamp. See interface ManualTimestampSourceFunction if you want to manually assign timestamps to elements.
at org.apache.flink.streaming.api.operators.StreamSource$AutomaticWatermarkContext.collectWithTimestamp(StreamSource.java:226)
at com.dataartisans.flinktraining.exercises.datastream_java.sources.TaxiRideSource.generateUnorderedStream(TaxiRideSource.java:278)
at com.dataartisans.flinktraining.exercises.datastream_java.sources.TaxiRideSource.run(TaxiRideSource.java:130)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
"queueState.update(queue)" is missing at the end of the "onTimer" function.
Hi,
I am adding the following dependency to my pom.xml -
com.data-artisans flink-training-exercises 0.15.2Even after I build my Maven project, I'm getting compilation issues when I try to use the TaxiRide and TaxiRideSource classes:
DataStream rides = env.addSource(
new TaxiRideSource("/path/to/nycTaxiRides.gz", maxDelayInSeconds, servingSpeed));
Error:(20, 21) java: cannot find symbol
symbol: class TaxiRideSource
location: class Qualys.FimEventProcessor
The link of EventTimeJoinHelper.java in http://training.data-artisans.com/exercises/eventTimeJoin.html is broken. It should point to https://github.com/dataArtisans/flink-training-exercises/blob/master/src/main/java/com/dataartisans/flinktraining/exercises/datastream_java/process/EventTimeJoinHelper.java
The Scala MailTFIDF
exercise has a hard coded input path.
mvn clean install fails on JDK 1.7 with: javadoc: error - invalid flag: -Xdoclint:none
Seems that you used an invalid javadoc option for JDK 1.7 in your pom.xml on line 200.
In the following files, the package names have "datastream_scala" instead of "datastream_java" causing errors, changing which the errors are resolved.
Couldn't make a branch and push changes due to lack of permission to do so.
src/test/java/com/dataartisans/flinktraining/exercises/datastream_java/basics/RideCleansingScalaTest.java
src/test/java/com/dataartisans/flinktraining/exercises/datastream_java/process/ExpiringStateScalaTest.java
src/test/java/com/dataartisans/flinktraining/exercises/datastream_java/process/LongRidesScalaTest.java
src/test/java/com/dataartisans/flinktraining/exercises/datastream_java/state/RidesAndFaresScalaTest.java
src/test/java/com/dataartisans/flinktraining/exercises/datastream_java/windows/HourlyTipsScalaTest.java
src/test/java/com/dataartisans/flinktraining/exercises/datastream_java/windows/PopularPlacesScalaTest.java
I am using Flink DataStream API where there where racks are available & I want to calculate "average"of temperature group by rack IDs. My window duration is of 40 seconds & my window is sliding by 10 seconds...Following is my code where I am calculating sum of temperatures after 10 second for every rackID,but now I want to calculate average temperatures::
static Properties properties=new Properties();
public static Properties getProperties()
{
properties.setProperty("bootstrap.servers", "54.164.200.104:9092");
properties.setProperty("zookeeper.connect", "54.164.200.104:2181");
//properties.setProperty("deserializer.class", "kafka.serializer.StringEncoder");
//properties.setProperty("group.id", "akshay");
properties.setProperty("auto.offset.reset", "earliest");
return properties;
}
@SuppressWarnings("rawtypes")
public static void main(String[] args) throws Exception
{
StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
Properties props=Program.getProperties();
DataStream<TemperatureEvent> dstream=env.addSource(new FlinkKafkaConsumer09<TemperatureEvent>("TemperatureEvent", new TemperatureEventSchema(), props)).assignTimestampsAndWatermarks(new IngestionTimeExtractor<>());
DataStream<TemperatureEvent> ds1=dstream.keyBy("rackId").timeWindow(Time.seconds(40), Time.seconds(10)).sum("temperature");
env.execute("Temperature Consumer");
}
How can I calculate average of temperatures for each rackId based on window duration ??
i'm stuck here for quite a bit time, cause there is no explanation for this example,so it is tough
I am using Flink Table API. I want to update the table in Flink through Pattern Detection..I am using 3 fields : routeno,source,distance,category
. Now I want to update the category based on the value of distance for every routeno...For ex : if routeno=1 and distance<=200 then category='daily' ..How can I update this using Flink's Table API??
Hi, I have some confusions on connect
operations in flink, wonder if I can ask here:
Thanks in advance!
Hi,
i try the following workflow:
here is a build log: https://gist.github.com/zavalit/1e78478ebdda827f3454 and when i run
java -jar target/flink-scala-project-0.1.jar
i get
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/api/scala/ExecutionEnvironment$ at org.apache.flink.quickstart.Job$.main(Job.scala:41) at org.apache.flink.quickstart.Job.main(Job.scala) Caused by: java.lang.ClassNotFoundException: org.apache.flink.api.scala.ExecutionEnvironment$ at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ... 2 more
Hi, getting the following error while running mvn clean package
Any idea what am I doing wrong ? Scala version 2.11
bahadir@server:~/code/learn/flink/flink-training-exercises$ mvn clean package
[INFO] Scanning for projects...
[WARNING]
[WARNING] Some problems were encountered while building the effective model for com.dataartisans:flink-training-exercises:jar:0.2
[WARNING] 'build.plugins.plugin.version' for org.apache.maven.plugins:maven-jar-plugin is missing. @ line 142, column 12
[WARNING]
[WARNING] It is highly recommended to fix these problems because they threaten the stability of your build.
[WARNING]
[WARNING] For this reason, future Maven versions might no longer support building such malformed projects.
[WARNING]
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building Flink Exercises 0.2
[INFO] ------------------------------------------------------------------------
[INFO]
[INFO] --- maven-clean-plugin:2.5:clean (default-clean) @ flink-training-exercises ---
[INFO] Deleting /home/bahadir/code/learn/flink/flink-training-exercises/target
[INFO]
[INFO] --- maven-checkstyle-plugin:2.12.1:check (validate) @ flink-training-exercises ---
[INFO]
[INFO]
[INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ flink-training-exercises ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] skip non existing resourceDirectory /home/bahadir/code/learn/flink/flink-training-exercises/src/main/resources
[INFO]
[INFO] --- scala-maven-plugin:3.1.4:compile (scala-compile-first) @ flink-training-exercises ---
[WARNING] Expected all dependencies to require Scala version: 2.10.4
[WARNING] com.twitter:chill_2.10:0.5.2 requires scala version: 2.10.4
[WARNING] com.twitter:chill-avro_2.10:0.5.2 requires scala version: 2.10.4
[WARNING] com.twitter:chill-bijection_2.10:0.5.2 requires scala version: 2.10.4
[WARNING] com.twitter:bijection-core_2.10:0.7.2 requires scala version: 2.10.4
[WARNING] com.twitter:bijection-avro_2.10:0.7.2 requires scala version: 2.10.4
[WARNING] org.scala-lang:scala-reflect:2.10.4 requires scala version: 2.10.4
[WARNING] org.apache.flink:flink-scala:0.10.1 requires scala version: 2.10.4
[WARNING] org.apache.flink:flink-scala:0.10.1 requires scala version: 2.10.4
[WARNING] org.scala-lang:scala-compiler:2.10.4 requires scala version: 2.10.4
[WARNING] org.scalamacros:quasiquotes_2.10:2.0.1 requires scala version: 2.10.4
[WARNING] org.apache.flink:flink-runtime:0.10.1 requires scala version: 2.10.4
[WARNING] com.typesafe.akka:akka-actor_2.10:2.3.7 requires scala version: 2.10.4
[WARNING] com.typesafe.akka:akka-remote_2.10:2.3.7 requires scala version: 2.10.4
[WARNING] com.typesafe.akka:akka-slf4j_2.10:2.3.7 requires scala version: 2.10.4
[WARNING] org.clapper:grizzled-slf4j_2.10:1.0.2 requires scala version: 2.10.3
[WARNING] Multiple versions of scala libraries detected!
[INFO] /home/bahadir/code/learn/flink/flink-training-exercises/src/main/java:-1: info: compiling
[INFO] /home/bahadir/code/learn/flink/flink-training-exercises/src/main/scala:-1: info: compiling
[INFO] Compiling 25 source files to /home/bahadir/code/learn/flink/flink-training-exercises/target/classes at 1449867448989
[WARNING] /home/bahadir/code/learn/flink/flink-training-exercises/src/main/scala/com/dataartisans/flinktraining/exercises/table_scala/memberotm/MemberOTMonth.scala:74: warning: Type org.apache.flink.api.table.Row is no POJO, has immutable fields: value fields, value arity.
[WARNING] membersOTMonth.toDataSet[Row].print()
[WARNING] ^
[WARNING] one warning found
[INFO] prepare-compile in 0 s
[INFO] compile in 15 s
[INFO]
[INFO] --- maven-compiler-plugin:3.1:compile (default-compile) @ flink-training-exercises ---
[INFO] Changes detected - recompiling the module!
[INFO] Compiling 15 source files to /home/bahadir/code/learn/flink/flink-training-exercises/target/classes
[INFO] -------------------------------------------------------------
[ERROR] COMPILATION ERROR :
[INFO] -------------------------------------------------------------
[ERROR] /home/bahadir/code/learn/flink/flink-training-exercises/src/main/java/com/dataartisans/flinktraining/exercises/datastream_java/sources/TaxiRideSource.java:[204,68] no suitable constructor found for PriorityQueue(<anonymous java.util.Comparator<org.apache.flink.api.java.tuple.Tuple2<java.lang.Long,java.lang.Object>>>)
constructor java.util.PriorityQueue.PriorityQueue(java.util.SortedSet<? extends org.apache.flink.api.java.tuple.Tuple2<java.lang.Long,java.lang.Object>>) is not applicable
(actual argument <anonymous java.util.Comparator<org.apache.flink.api.java.tuple.Tuple2<java.lang.Long,java.lang.Object>>> cannot be converted to java.util.SortedSet<? extends org.apache.flink.api.java.tuple.Tuple2<java.lang.Long,java.lang.Object>> by method invocation conversion)
constructor java.util.PriorityQueue.PriorityQueue(java.util.PriorityQueue<? extends org.apache.flink.api.java.tuple.Tuple2<java.lang.Long,java.lang.Object>>) is not applicable
(actual argument <anonymous java.util.Comparator<org.apache.flink.api.java.tuple.Tuple2<java.lang.Long,java.lang.Object>>> cannot be converted to java.util.PriorityQueue<? extends org.apache.flink.api.java.tuple.Tuple2<java.lang.Long,java.lang.Object>> by method invocation conversion)
constructor java.util.PriorityQueue.PriorityQueue(java.util.Collection<? extends org.apache.flink.api.java.tuple.Tuple2<java.lang.Long,java.lang.Object>>) is not applicable
(actual argument <anonymous java.util.Comparator<org.apache.flink.api.java.tuple.Tuple2<java.lang.Long,java.lang.Object>>> cannot be converted to java.util.Collection<? extends org.apache.flink.api.java.tuple.Tuple2<java.lang.Long,java.lang.Object>> by method invocation conversion)
constructor java.util.PriorityQueue.PriorityQueue(int,java.util.Comparator<? super org.apache.flink.api.java.tuple.Tuple2<java.lang.Long,java.lang.Object>>) is not applicable
(actual and formal argument lists differ in length)
constructor java.util.PriorityQueue.PriorityQueue(int) is not applicable
(actual argument <anonymous java.util.Comparator<org.apache.flink.api.java.tuple.Tuple2<java.lang.Long,java.lang.Object>>> cannot be converted to int by method invocation conversion)
constructor java.util.PriorityQueue.PriorityQueue() is not applicable
(actual and formal argument lists differ in length)
[INFO] 1 error
[INFO] -------------------------------------------------------------
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 23.164 s
[INFO] Finished at: 2015-12-11T21:57:46+01:00
[INFO] Final Memory: 41M/660M
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.1:compile (default-compile) on project flink-training-exercises: Compilation failure
[ERROR] /home/bahadir/code/learn/flink/flink-training-exercises/src/main/java/com/dataartisans/flinktraining/exercises/datastream_java/sources/TaxiRideSource.java:[204,68] no suitable constructor found for PriorityQueue(<anonymous java.util.Comparator<org.apache.flink.api.java.tuple.Tuple2<java.lang.Long,java.lang.Object>>>)
[ERROR] constructor java.util.PriorityQueue.PriorityQueue(java.util.SortedSet<? extends org.apache.flink.api.java.tuple.Tuple2<java.lang.Long,java.lang.Object>>) is not applicable
[ERROR] (actual argument <anonymous java.util.Comparator<org.apache.flink.api.java.tuple.Tuple2<java.lang.Long,java.lang.Object>>> cannot be converted to java.util.SortedSet<? extends org.apache.flink.api.java.tuple.Tuple2<java.lang.Long,java.lang.Object>> by method invocation conversion)
[ERROR] constructor java.util.PriorityQueue.PriorityQueue(java.util.PriorityQueue<? extends org.apache.flink.api.java.tuple.Tuple2<java.lang.Long,java.lang.Object>>) is not applicable
[ERROR] (actual argument <anonymous java.util.Comparator<org.apache.flink.api.java.tuple.Tuple2<java.lang.Long,java.lang.Object>>> cannot be converted to java.util.PriorityQueue<? extends org.apache.flink.api.java.tuple.Tuple2<java.lang.Long,java.lang.Object>> by method invocation conversion)
[ERROR] constructor java.util.PriorityQueue.PriorityQueue(java.util.Collection<? extends org.apache.flink.api.java.tuple.Tuple2<java.lang.Long,java.lang.Object>>) is not applicable
[ERROR] (actual argument <anonymous java.util.Comparator<org.apache.flink.api.java.tuple.Tuple2<java.lang.Long,java.lang.Object>>> cannot be converted to java.util.Collection<? extends org.apache.flink.api.java.tuple.Tuple2<java.lang.Long,java.lang.Object>> by method invocation conversion)
[ERROR] constructor java.util.PriorityQueue.PriorityQueue(int,java.util.Comparator<? super org.apache.flink.api.java.tuple.Tuple2<java.lang.Long,java.lang.Object>>) is not applicable
[ERROR] (actual and formal argument lists differ in length)
[ERROR] constructor java.util.PriorityQueue.PriorityQueue(int) is not applicable
[ERROR] (actual argument <anonymous java.util.Comparator<org.apache.flink.api.java.tuple.Tuple2<java.lang.Long,java.lang.Object>>> cannot be converted to int by method invocation conversion)
[ERROR] constructor java.util.PriorityQueue.PriorityQueue() is not applicable
[ERROR] (actual and formal argument lists differ in length)
[ERROR] -> [Help 1]
[ERROR]
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR]
[ERROR] For more information about the errors and possible solutions, please read the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException
Description in the title,
reference solutions do not work with the older Kafka version.
After executing the example, an exception is thrown.
Exception in thread "main" java.lang.RuntimeException: No data sinks have been created yet. A program needs at least one sink that consumes data. Examples are writing the data set or printing it.
at org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:929)
at org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:908)
at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:81)
at org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:627)
at com.likya.flink.lectures.PageRankWithEdgeWeights$.main(PageRankWithEdgeWeights.scala:87)
at com.likya.flink.lectures.PageRankWithEdgeWeights.main(PageRankWithEdgeWeights.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
There are lines comented may be related with the problem :
//Now run the Page Rank algorithm over the weighted graph
//val pageRanks = networkWithWeights.run[String](new PageRank[String, Double, Double, String](DAMPENING_FACTOR, maxIterations))
//pageRanks.writeAsCsv(outputPath, fieldDelimiter = "\t", lineDelimiter = "\n")
java version "1.8.0_171"
Java(TM) SE Runtime Environment (build 1.8.0_171-b11)
Java HotSpot(TM) 64-Bit Server VM (build 25.171-b11, mixed mode)
Information:java: Errors occurred while compiling module 'flink-training-exercises'
Information:javac 1.8.0_171 was used to compile java sources
Information:2018/9/4 14:35 - Compilation completed with 15 errors and 0 warnings in 10 s 522 ms
D:\java\flink-training-exercises\src\test\java\com\dataartisans\flinktraining\exercises\datastream_java\basics\RideCleansingScalaTest.java
Error:(19, 72) java: package com.dataartisans.flinktraining.exercises.datastream_scala.basics not exists
Error:(27, 112) java: package com.dataartisans.flinktraining.solutions.datastream_scala.basics not exists
D:\java\flink-training-exercises\src\test\java\com\dataartisans\flinktraining\exercises\datastream_java\windows\HourlyTipsScalaTest.java
Error:(19, 73) java: package com.dataartisans.flinktraining.exercises.datastream_scala.windows not exists
Error:(31, 113) java: package com.dataartisans.flinktraining.solutions.datastream_scala.windows not exists
D:\java\flink-training-exercises\src\test\java\com\dataartisans\flinktraining\exercises\datastream_java\process\ExpiringStateScalaTest.java
Error:(19, 73) java: package com.dataartisans.flinktraining.exercises.datastream_scala.process not exists
Error:(27, 113) java: package com.dataartisans.flinktraining.solutions.datastream_scala.process not exists
D:\java\flink-training-exercises\src\test\java\com\dataartisans\flinktraining\exercises\datastream_java\state\RidesAndFaresScalaTest.java
Error:(21, 71) java: package com.dataartisans.flinktraining.exercises.datastream_scala.state not exists
Error:(32, 111) java: package com.dataartisans.flinktraining.solutions.datastream_scala.state not exists
D:\java\flink-training-exercises\src\test\java\com\dataartisans\flinktraining\exercises\datastream_java\windows\PopularPlacesScalaTest.java
Error:(19, 73) java: package com.dataartisans.flinktraining.exercises.datastream_scala.windows not exists
Error:(30, 113) java: package com.dataartisans.flinktraining.solutions.datastream_scala.windows not exists
D:\java\flink-training-exercises\src\test\java\com\dataartisans\flinktraining\exercises\datastream_java\process\LongRidesScalaTest.java
Error:(26, 112) java: package com.dataartisans.flinktraining.exercises.datastream_scala.process not exists
Error:(27, 111) java: package com.dataartisans.flinktraining.exercises.datastream_scala.cep not exists
Error:(30, 113) java: package com.dataartisans.flinktraining.solutions.datastream_scala.process not exists
Error:(35, 112) java: package com.dataartisans.flinktraining.solutions.datastream_scala.cep not exists
Error:(40, 125) java: package com.dataartisans.flinktraining.solutions.datastream_scala.process not exists
D:\java\flink-training-exercises\src\test\java\com\dataartisans\flinktraining\exercises\datastream_java\testing\TestSource.java
Information:java: 某些输入文件使用了未经检查或不安全的操作。
Information:java: 有关详细信息, 请使用 -Xlint:unchecked 重新编译。
Hi,
i think i have found a small problem in the example.
This commit in my fork contains the fix.
The line
int direction = GeoUtils.getDirectionAngle(ride.endLon, ride.endLat, ride.startLon, ride.startLat);
should be
int direction = GeoUtils.getDirectionAngle(ride.startLon, ride.startLat, ride.endLon, ride.endLat);
because the GeoUtils API says
/**
* Returns the angle in degrees between the vector from the start to the destination
* and the x-axis on which the start is located.
*
* The angle describes in which direction the destination is located from the start, i.e.,
* 0° -> East, 90° -> South, 180° -> West, 270° -> North
*
* @param startLon longitude of start location
* @param startLat latitude of start location
* @param destLon longitude of destination
* @param destLat latitude of destination
* @return The direction from start to destination location
*/
public static int getDirectionAngle(
float startLon, float startLat, float destLon, float destLat)
Or maybe I am wrong? Looking forward for feedback, thanks in advance.
I am using the web Ui to submit a first test job
com.dataartisans.flinktraining.exercises.datastream_java.basics.RideCleansingExercise
-input /Users/etsang/dev/src/github.com/apache/flink/flink-dist/target/flink-1.9-SNAPSHOT-bin/flink-1.9-SNAPSHOT/examples/data/nycTaxiRides.gz
ls /Users/etsang/dev/src/github.com/apache/flink/flink-dist/target/flink-1.9-SNAPSHOT-bin/flink-1.9-SNAPSHOT/examples/data/nycTaxiRides.gz is able to find the file
at java.io.FileInputStream.open0(Native Method)
at java.io.FileInputStream.open(FileInputStream.java:195)
at java.io.FileInputStream.<init>(FileInputStream.java:138)
at java.io.FileInputStream.<init>(FileInputStream.java:93)
at com.dataartisans.flinktraining.exercises.datastream_java.sources.TaxiRideSource.run(TaxiRideSource.java:110)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:93)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:57)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:97)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)```
I am using this example Flink CEP where I am separating out the data as I have created one application which is Sending application to Kafka & another application reading from Kafka... I generated the producer for class TemperatureWarning i.e. in Kafka,I was sending data related to TemperatureWarning Following is my code which is consuming data from Kafka...
StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.enableCheckpointing(5000);
Properties properties=new Properties();
properties.setProperty("bootstrap.servers", "PUBLICDNS:9092");
properties.setProperty("zookeeper.connect", "PUBLICDNS:2181");
properties.setProperty("group.id", "test");
DataStream<TemperatureWarning> dstream=env.addSource(new FlinkKafkaConsumer09<TemperatureWarning>("MonitoringEvent", new MonitoringEventSchema(), properties));
Pattern<TemperatureWarning, ?> alertPattern = Pattern.<TemperatureWarning>begin("first")
.next("second")
.within(Time.seconds(20));
PatternStream<TemperatureWarning> alertPatternStream = CEP.pattern(
dstream.keyBy("rackID"),
alertPattern);
DataStream<TemperatureAlert> alerts = alertPatternStream.flatSelect(
(Map<String, TemperatureWarning> pattern, Collector<TemperatureAlert> out) -> {
TemperatureWarning first = pattern.get("first");
TemperatureWarning second = pattern.get("second");
if (first.getAverageTemperature() < second.getAverageTemperature()) {
out.collect(new TemperatureAlert(second.getRackID(),second.getAverageTemperature(),second.getTimeStamp()));
}
});
dstream.print();
alerts.print();
env.execute("Flink Kafka Consumer");
But when I execute this application,it throws following Exception:
java.lang.NullPointerException at com.yash.source.MonitoringEventSchema.deserialize(MonitoringEventSchema.java:74) at com.yash.source.MonitoringEventSchema.deserialize(MonitoringEventSchema.java:1) at org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper.deserialize(KeyedDeserializationSchemaWrapper.java:39) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerThread.run(FlinkKafkaConsumer09.java:466)
Following is my code for TemperatureWarning
public class TemperatureWarning {
public int rackID;
public double averageTemperature;
public long timeStamp;
public TemperatureWarning(int rackID, double averageTemperature,long timeStamp) {
this.rackID = rackID;
this.averageTemperature = averageTemperature;
this.timeStamp=timeStamp;
}
public TemperatureWarning() {
this(-1, -1,-1);
}
public int getRackID() {
return rackID;
}
public void setRackID(int rackID) {
this.rackID = rackID;
}
public double getAverageTemperature() {
return averageTemperature;
}
public void setAverageTemperature(double averageTemperature) {
this.averageTemperature = averageTemperature;
}
public long getTimeStamp() {
return timeStamp;
}
public void setTimeStamp(long timeStamp) {
this.timeStamp = timeStamp;
}
@Override
public boolean equals(Object obj) {
if (obj instanceof TemperatureWarning) {
TemperatureWarning other = (TemperatureWarning) obj;
return rackID == other.rackID && averageTemperature == other.averageTemperature;
} else {
return false;
}
}
@Override
public int hashCode() {
return 41 * rackID + Double.hashCode(averageTemperature);
}
@Override
public String toString() {
//return "TemperatureWarning(" + getRackID() + ", " + averageTemperature + ")";
String str=getRackID()+","+averageTemperature+","+getTimeStamp();
return str;
//return "TemperatureWarning(" + getRackID() +","+averageTemperature + ") "+ "," + getTimeStamp();
}
}
Following is my code for MonitoringEventSchema :
public class MonitoringEventSchema implements DeserializationSchema<TemperatureWarning>,SerializationSchema<TemperatureWarning>
{
//@Override
public TypeInformation<TemperatureWarning> getProducedType() {
// TODO Auto-generated method stub
return TypeExtractor.getForClass(TemperatureWarning.class);
}
//@Override
public byte[] serialize(TemperatureWarning element) {
// TODO Auto-generated method stub
return element.toString().getBytes();
}
//@Override
public TemperatureWarning deserialize(byte[] message) throws IOException {
// TODO Auto-generated method stub
TemperatureWarning warning=null;
System.out.println("Before Encoded:"+message);
if(message!=null)
{
String str=new String(message,"UTF-8");
System.out.println("Message:"+str);
String []value=str.split(",");
warning.setRackID(Integer.parseInt(value[0]));
warning.setAverageTemperature(Double.parseDouble(value[1]));
warning.setTimeStamp(Long.parseLong(value[2]));
return warning;
}
return null;
}
//@Override
public boolean isEndOfStream(TemperatureWarning nextElement) {
// TODO Auto-generated method stub
return false;
}
}
Can anyone please provide me the solution for this error ?? How can I solve this error ??
I trying to follow the TravelTimePrediction example to learn about prediction models in flink but I can find the class TravelTimePredictionModel
where the prediction code should be. Am I totally wrong or isn't it shipped with the project?
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.