Member since 
    
	
		
		
		02-24-2016
	
	
	
	
	
	
	
	
	
	
	
	
	
	
			
      
                87
            
            
                Posts
            
        
                18
            
            
                Kudos Received
            
        
                3
            
            
                Solutions
            
        My Accepted Solutions
| Title | Views | Posted | 
|---|---|---|
| 5346 | 12-18-2017 11:47 AM | |
| 13064 | 11-08-2017 01:54 PM | |
| 69469 | 05-06-2016 11:48 AM | 
			
    
	
		
		
		01-23-2018
	
		
		02:24 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
				
		
	
		
					
							 I am running Spark job on Hortonworks cluster . In source we have Oracle DB . we are fetching records from Oracle and analyse.   I am getting below error message , I refer many solution accordingly I am using ojdbc7.jar . Please guide me on this  18/01/22 07:04:09 ERROR Executor: Exception in task 0.0 in stage 15.0 (TID 1069)
java.sql.SQLException: Protocol violation: [ 6, 7, 21, 7, 21, 7, 21, 7, 21, 7, 21, 7, 21, 7, 21, 7, 21, 7, 21, 7, 21, 7, 21, 7, 21, 7, 21, 7, 21, 7, 21, 7, 21, 7, 21, 7, 21, 7, 21, 7, 21, 7, 21, 7, 21, 7, 21, 7, 21, 7, 21, 7, 21, 7, 21, 7, 21, 7, 21, 7, 21, 7, 21, 7, 21, 7, 21, 7, 21, 7, 21, 7, 21, 7, 21, 7, 21, 7, 21, 7, 21, 7, 21, 7, 21, 7, 21, 7, 21, 7, 21, 7, 21, 7, 21, 7, 21, 7, 21, 7, 21, 7, 21, 7, 21, 7, 21, 7, 21, 7, 21, 7, 21, 7, 21, 7, 21, 7, 21, 7, 21, 7, 21, 7, 21, 7, 21, 7, ]
        at oracle.jdbc.driver.T4CTTIfun.receive(T4CTTIfun.java:536)
        at oracle.jdbc.driver.T4CTTIfun.doRPC(T4CTTIfun.java:257)
        at oracle.jdbc.driver.T4C8Oall.doOALL(T4C8Oall.java:587)
        at oracle.jdbc.driver.T4CPreparedStatement.doOall8(T4CPreparedStatement.java:225)
        at oracle.jdbc.driver.T4CPreparedStatement.fetch(T4CPreparedStatement.java:1066)
        at oracle.jdbc.driver.OracleStatement.fetchMoreRows(OracleStatement.java:3716)
        at oracle.jdbc.driver.InsensitiveScrollableResultSet.fetchMoreRows(InsensitiveScrollableResultSet.java:1015)
        at oracle.jdbc.driver.InsensitiveScrollableResultSet.absoluteInternal(InsensitiveScrollableResultSet.java:979)
        at oracle.jdbc.driver.InsensitiveScrollableResultSet.next(InsensitiveScrollableResultSet.java:579)
        at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$$anon$1.getNext(JDBCRDD.scala:369)
        at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$$anon$1.hasNext(JDBCRDD.scala:498)
        at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoin$$anonfun$2.apply(BroadcastNestedLoopJoin.scala:105)
        at org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoin$$anonfun$2.apply(BroadcastNestedLoopJoin.scala:96)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$22.apply(RDD.scala:717)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$22.apply(RDD.scala:717)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
End of LogType:stderr. This log file belongs to a running container (container_e13_1503300866021_880451_01_000002) and so may not be complete.
LogType:launch_container.sh
Log Upload Time:Tue Jan 23 02:09:02 -0500 2018
   
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
		
			
				
						
							Labels:
						
						
		
			
	
					
			
		
	
	
	
	
				
		
	
	
- Labels:
- 
						
							
		
			Apache Spark
			
    
	
		
		
		12-18-2017
	
		
		11:47 AM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
				
		
	
		
					
							 Issue got resolved .    Follow this checklists   --  1. Check Zookeeper running .  2. Check Kafka Producer and Consumer running fine on console,  create one topic and list it this is to ensure that kafka running fine .  3. Similar version use in sbt   like for Kafka 0.9 below should be use :  org.apache.flink" %% "flink-connector-kafka-0.9" % flinkVersion % "provided"  and import in scala program : import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaProducer09, FlinkKafkaConsumer09}  For Kafka 0.10   org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion % "provided"
And Import in scala program : import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaProducer10, FlinkKafkaConsumer10} 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		12-18-2017
	
		
		08:46 AM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
				
		
	
		
					
							 @Fabian Hueske  Please guide here  
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		12-18-2017
	
		
		08:44 AM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
				
		
	
		
					
							    0favorite
 
  I am writing a Flink Kafka integration program as below but getting timeout error for kafka :  <code>import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer010, 
FlinkKafkaProducer010}
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import java.util.Properties
object StreamKafkaProducer {
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("zookeeper.connect", "localhost:2181")
properties.setProperty("serializer.class", "kafka.serializer.StringEncoder")
val stream: DataStream[String] =env.fromElements(
  ("Adam"),
  ("Sarah"))
val kafkaProducer = new FlinkKafkaProducer010[String](
  "localhost:9092",
  "output",
  new SimpleStringSchema
)
// write data into Kafka
stream.addSink(kafkaProducer)
env.execute("Flink kafka integration  ")
}
}
 
 From terminal I can see kafka and zookeeper are running but when I run above program from Intellij it is showing this error :  <code>C:\Users\amdass\workspace\flink-project-master>sbt run
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=256m; 
support was removed in 8.0
[info] Loading project definition from C:\Users\amdass\workspace\flink-
project-master\project
[info] Set current project to Flink Project (in build 
file:/C:/Users/amdass/workspace/flink-project-master/)
[info] Compiling 1 Scala source to C:\Users\amdass\workspace\flink-project-
master\target\scala-2.11\classes...
[info] Running org.example.StreamKafkaProducer
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further 
details.
Connected to JobManager at Actor[akka://flink/user/jobmanager_1#-563113020] 
with leader session id 5a637740-5c73-4f69-a19e-c8ef7141efa1.
12/15/2017 14:41:49     Job execution switched to status RUNNING.
12/15/2017 14:41:49     Source: Collection Source(1/1) switched to SCHEDULED
12/15/2017 14:41:49     Sink: Unnamed(1/4) switched to SCHEDULED
12/15/2017 14:41:49     Sink: Unnamed(2/4) switched to SCHEDULED
12/15/2017 14:41:49     Sink: Unnamed(3/4) switched to SCHEDULED
12/15/2017 14:41:49     Sink: Unnamed(4/4) switched to SCHEDULED
12/15/2017 14:41:49     Source: Collection Source(1/1) switched to DEPLOYING
12/15/2017 14:41:49     Sink: Unnamed(1/4) switched to DEPLOYING
12/15/2017 14:41:49     Sink: Unnamed(2/4) switched to DEPLOYING
12/15/2017 14:41:49     Sink: Unnamed(3/4) switched to DEPLOYING
12/15/2017 14:41:49     Sink: Unnamed(4/4) switched to DEPLOYING
12/15/2017 14:41:50     Source: Collection Source(1/1) switched to RUNNING
12/15/2017 14:41:50     Sink: Unnamed(2/4) switched to RUNNING
12/15/2017 14:41:50     Sink: Unnamed(4/4) switched to RUNNING
12/15/2017 14:41:50     Sink: Unnamed(3/4) switched to RUNNING
12/15/2017 14:41:50     Sink: Unnamed(1/4) switched to RUNNING
12/15/2017 14:41:50     Source: Collection Source(1/1) switched to FINISHED
12/15/2017 14:41:50     Sink: Unnamed(3/4) switched to FINISHED
12/15/2017 14:41:50     Sink: Unnamed(4/4) switched to FINISHED
12/15/2017 14:42:50     Sink: Unnamed(1/4) switched to FAILED
<b>  org.apache.kafka.common.errors.TimeoutException: Failed to update metadata 
after 60000 ms. </b>
12/15/2017 14:42:50     Sink: Unnamed(2/4) switched to FAILED
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata 
after 60000 ms.
12/15/2017 14:42:50     Job execution switched to status FAILING.
 
 org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms. 12/15/2017 14:42:50 Job execution switched to status FAILED. [error] (run-main-0) org.apache.flink.runtime.client.JobExecutionException: Job execution failed. org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:933) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:876) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:876) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at  scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107 ) Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms. [trace] Stack trace suppressed: run last *:run for the full output. java.lang.RuntimeException: Nonzero exit code: 1 at scala.sys.package$.error(package.scala:27) [trace] Stack trace suppressed: run last compile:run for the full output. [error] (compile:run) Nonzero exit code: 1 [error] Total time: 75 s, completed Dec 15, 2017 2:42:51 PM     
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
		
			
				
						
							Labels:
						
						
		
			
	
					
			
		
	
	
	
	
				
		
	
	
- Labels:
- 
						
							
		
			Apache Kafka
			
    
	
		
		
		12-15-2017
	
		
		01:09 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
				
		
	
		
					
							 @Fabian Hueske  Need your help for below :  https://stackoverflow.com/questions/47831484/flink-kafka-program-in-scala-giving-timeout-error-org-apache-kafka-common-errors 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		12-15-2017
	
		
		01:09 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
				
		
	
		
					
							 @Fabian Hueske   Need your help for below   :   https://stackoverflow.com/questions/47831484/flink-kafka-program-in-scala-giving-timeout-error-org-apache-kafka-common-errors 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		12-14-2017
	
		
		04:21 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
				
		
	
		
					
							 Adding this detail :  In its current version (1.4.0, Dec. 2017), Flink does not provide a built-in  TableSource  to ingest data from a relational database. 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		12-14-2017
	
		
		10:29 AM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
				
		
	
		
					
							 Hi ,  I am referring "https://flink.apache.org/news/2017/04/04/dynamic-tables.html" in that database configuration is not shown  `val sensorTable = ??? // can be a CSV file, Kafka topic, database, or ... // register the table source  tEnv.registerTableSource("sensors", sensorTable)  I have to connect with relational source, do Flink have API for database or JDBC approach we need to use ?  (Did we have any similar like for Apache Spark we have sqlcontext or sparksession object ) 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
		
			
				
						
							Labels:
						
						
		
			
	
					
			
		
	
	
	
	
				
		
	
	
- Labels:
- 
						
							
		
			Apache Flink
- 
						
							
		
			Apache Kafka
- 
						
							
		
			Apache Spark
			
    
	
		
		
		11-13-2017
	
		
		03:35 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
				
		
	
		
					
							 Do we know default partitions and replications for this topic  __consumer_offsets (internal topic) ?  By any chance if we loses this topic then do we have anything other then this to recover ?   As you explained above , what I understand is now there is no current offset and commit offset concepts .   Please add your valuable knowledge if it is not ?         
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		11-08-2017
	
		
		01:54 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
	
		1 Kudo
		
	
				
		
	
		
					
							 I got the answer for this question  : Number of partitions for a topic can only be increased, never decreased   Reason for this as if we decrease the partition it will be data loss .   We can delete current topic and recreate new one with required partition to achieve this .  
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		 
         
					
				













