Member since
01-14-2016
20
Posts
7
Kudos Received
0
Solutions
05-08-2018
01:22 PM
Just deploy the HDP on CentOS through Ambari (2.6.0.0) and then enabled Kerbrose. All seems to be set until i start setting the Ranger policies for users. I think earlier before kerbrose was enabled, ranger was able to sync users with Unix users. I can see the list of users when i am creating a new policy but after Kerbrose was set only old users are being shown in the list (no new Linux users) Here is the error that i can see in /var/log/ranger/usersync/usersync.log > 08 May 2018 08:33:19 ERROR PolicyMgrUserGroupBuilder [UnixUserSyncThread] - Failed to add portal user
08 May 2018 08:33:19 ERROR UnixUserGroupBuilder [UnixUserSyncThread] - sink.addOrUpdateUser failed with exception: Failed to add portal user, for user: mapred, groups: [hadoop]
08 May 2018 08:33:19 ERROR PolicyMgrUserGroupBuilder [UnixUserSyncThread] - Failed to add User :
com.sun.jersey.api.client.UniformInterfaceException: POST http://192.168.99.101:6080/service/users/default returned a response status of 401 Unauthorized
at com.sun.jersey.api.client.WebResource.handle(WebResource.java:686)
at com.sun.jersey.api.client.WebResource.access$200(WebResource.java:74)
at com.sun.jersey.api.client.WebResource$Builder.post(WebResource.java:568)
at org.apache.ranger.unixusersync.process.PolicyMgrUserGroupBuilder.getMUser(PolicyMgrUserGroupBuilder.java:963)
at org.apache.ranger.unixusersync.process.PolicyMgrUserGroupBuilder.access$800(PolicyMgrUserGroupBuilder.java:74)
at org.apache.ranger.unixusersync.process.PolicyMgrUserGroupBuilder$5.run(PolicyMgrUserGroupBuilder.java:936)
at org.apache.ranger.unixusersync.process.PolicyMgrUserGroupBuilder$5.run(PolicyMgrUserGroupBuilder.java:932)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:360)
at org.apache.ranger.unixusersync.process.PolicyMgrUserGroupBuilder.addMUser(PolicyMgrUserGroupBuilder.java:932)
at org.apache.ranger.unixusersync.process.PolicyMgrUserGroupBuilder.addOrUpdateUser(PolicyMgrUserGroupBuilder.java:329)
at org.apache.ranger.unixusersync.process.UnixUserGroupBuilder.updateSink(UnixUserGroupBuilder.java:153)
at org.apache.ranger.usergroupsync.UserGroupSync.syncUserGroup(UserGroupSync.java:114)
at org.apache.ranger.usergroupsync.UserGroupSync.run(UserGroupSync.java:87)
at java.lang.Thread.run(Thread.java:745)
08 May 2018 08:33:19 ERROR PolicyMgrUserGroupBuilder [UnixUserSyncThread] - Failed to add portal user
08 May 2018 08:33:19 ERROR UnixUserGroupBuilder [UnixUserSyncThread] - sink.addOrUpdateUser failed with exception: Failed to add portal user, for user: knox, groups: [hadoop]
08 May 2018 08:33:19 INFO UserGroupSync [UnixUserSyncThread] - End: update user/group from source==>sink
There is now a question After setting the Kerbrose and understanding how it works now i am confused between Kerbrose and Unix users. What is the relationship between Unix and Kerbrose users. Yes i can get tickets from kerbrose using kinit and can verify that ticker with klist BUT how does it play in Hadoop. Do i just need Kerbrose user to do i need both Kerbrose and Unix users to access resources in Ranger. I though after enabling Kerbrose then only Kerbrose users will be able to access resources in HDP. Notice i am not using AD or Active Direcoty just experimenting with Kerbrose and Unix
... View more
02-02-2016
04:13 PM
1 Kudo
I am using Hortonworks HDP and trying to run a storm topology but getting this error. Look like ZooKeeper having some trouble. Spend the whole day trying to figure out this problem. note that i compile and make jar on windows and run in the hortonworks sandbox. I copied the whole code from sample Shahzad 19232 [Thread-6] INFO backtype.storm.config - SET worker-user 2626e3be-568c-4d9e-97fb-26b3fd07f8a1
20052 [Thread-10-kafkaSpout] ERROR backtype.storm.util - Async loop died!
java.lang.NoSuchMethodError: org.apache.zookeeper.server.quorum.flexible.QuorumMaj.<init>(Ljava/util/Map;)V
at org.apache.curator.framework.imps.EnsembleTracker.<init>(EnsembleTracker.java:54) ~[curator-framework-3.0.0.jar:na]
at org.apache.curator.framework.imps.CuratorFrameworkImpl.<init>(CuratorFrameworkImpl.java:156) ~[curator-framework-3.0.0.jar:na]
at org.apache.curator.framework.CuratorFrameworkFactory$Builder.build(CuratorFrameworkFactory.java:136) ~[curator-framework-3.0.0.jar:na]
at org.apache.curator.framework.CuratorFrameworkFactory.newClient(CuratorFrameworkFactory.java:107) ~[curator-framework-3.0.0.jar:na]
at storm.kafka.ZkState.newCurator(ZkState.java:45) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating]
at storm.kafka.ZkState.<init>(ZkState.java:61) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating]
at storm.kafka.KafkaSpout.open(KafkaSpout.java:85) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating]
at backtype.storm.daemon.executor$fn__4949$fn__4964.invoke(executor.clj:542) ~[storm-core-0.9.3.2.2.4.2-2.jar:0.9.3.2.2.4.2-2]
at backtype.storm.util$async_loop$fn__452.invoke(util.clj:463) ~[storm-core-0.9.3.2.2.4.2-2.jar:0.9.3.2.2.4.2-2]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_79]
20067 [Thread-10-kafkaSpout] ERROR backtype.storm.daemon.executor -
java.lang.NoSuchMethodError: org.apache.zookeeper.server.quorum.flexible.QuorumMaj.<init>(Ljava/util/Map;)V
at org.apache.curator.framework.imps.EnsembleTracker.<init>(EnsembleTracker.java:54) ~[curator-framework-3.0.0.jar:na]
at org.apache.curator.framework.imps.CuratorFrameworkImpl.<init>(CuratorFrameworkImpl.java:156) ~[curator-framework-3.0.0.jar:na]
at org.apache.curator.framework.CuratorFrameworkFactory$Builder.build(CuratorFrameworkFactory.java:136) ~[curator-framework-3.0.0.jar:na]
at org.apache.curator.framework.CuratorFrameworkFactory.newClient(CuratorFrameworkFactory.java:107) ~[curator-framework-3.0.0.jar:na]
at storm.kafka.ZkState.newCurator(ZkState.java:45) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating]
at storm.kafka.ZkState.<init>(ZkState.java:61) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating]
at storm.kafka.KafkaSpout.open(KafkaSpout.java:85) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating]
at backtype.storm.daemon.executor$fn__4949$fn__4964.invoke(executor.clj:542) ~[storm-core-0.9.3.2.2.4.2-2.jar:0.9.3.2.2.4.2-2]
at backtype.storm.util$async_loop$fn__452.invoke(util.clj:463) ~[storm-core-0.9.3.2.2.4.2-2.jar:0.9.3.2.2.4.2-2]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_79]
20120 [Thread-10-kafkaSpout] ERROR backtype.storm.util - Halting process: ("Worker died")
java.lang.RuntimeException: ("Worker died")
at backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:322) [storm-core-0.9.3.2.2.4.2-2.jar:0.9.3.2.2.4.2-2]
at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.5.1.jar:na]
at backtype.storm.daemon.worker$fn__5425$fn__5426.invoke(worker.clj:491) [storm-core-0.9.3.2.2.4.2-2.jar:0.9.3.2.2.4.2-2]
at backtype.storm.daemon.executor$mk_executor_data$fn__4850$fn__4851.invoke(executor.clj:245) [storm-core-0.9.3.2.2.4.2-2.jar:0.9.3.2.2.4.2-2]
at backtype.storm.util$async_loop$fn__452.invoke(util.clj:475) [storm-core-0.9.3.2.2.4.2-2.jar:0.9.3.2.2.4.2-2]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_79]
... View more
Labels:
01-21-2016
03:12 PM
i guess i am using the same code as mentioned in kafka spout link
... View more
01-21-2016
02:56 PM
i did made the changes and run the code again but same error can you please add me on skype shahzad.aslam100 and have a look on my code by screen sharing i promise i will not disturb you a lot. Actually this is an assignment from my potentical new employer and i need to make this working by end of this weekend. Shahzad
... View more
01-21-2016
02:37 PM
i don't get it you are saying i should use hadoop-client instead of hadoop-core ?
... View more
01-21-2016
02:36 PM
Artem i have some experience working with topologies and architecture that is not alien for me i have developed and deployed some simple topologies the only thing alien is the kafka spout
... View more
01-21-2016
01:31 PM
1 Kudo
and here my POM <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.Testing</groupId>
<artifactId>Testing</artifactId>
<version>0.0.1-SNAPSHOT</version>
<repositories>
<repository>
<releases>
<enabled>true</enabled>
<updatePolicy>always</updatePolicy>
<checksumPolicy>warn</checksumPolicy>
</releases>
<snapshots>
<enabled>false</enabled>
<updatePolicy>never</updatePolicy>
<checksumPolicy>fail</checksumPolicy>
</snapshots>
<id>HDPReleases</id>
<name>HDP Releases</name>
<url>http://repo.hortonworks.com/content/repositories/releases/</url>
<layout>default</layout>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>1.2.0.23</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>0.9.3.2.2.5.1-3</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>0.98.0.2.1.3.7-24-hadoop2</version>
</dependency>
<dependency>
<groupId>jdk.tools</groupId>
<artifactId>jdk.tools</artifactId>
<version>1.7.0_05</version>
<scope>system</scope>
<systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.2.3.0.0-2557</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>0.9.2-incubating</version>
</dependency>
</dependencies>
</project>
... View more
01-21-2016
01:30 PM
1 Kudo
i am trying to connect my storm spout with Kafka and having error. Here is the code and next section is the error i getting when i run this topology. i am running on HDP 2.2.4.2-2 and compiling my code on windows 7 with eclipse package com.storm;
import java.util.UUID;
import storm.kafka.BrokerHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.TopologyBuilder;
public class storm123
{
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException
{
// create an instance of TopologyBuilder class
TopologyBuilder builder = new TopologyBuilder();
BrokerHosts hosts = new ZkHosts("sandbox.hortonworks.com:2181");
SpoutConfig spoutConfig = new SpoutConfig(hosts, "testing2", "/testing2", UUID.randomUUID().toString());
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
builder.setSpout("myKafkaSpout", kafkaSpout, 1);
// set the spout class
//builder.setSpout("LearningStormSpout", new stormspout(), 2);
// set the bolt class
builder.setBolt("LearningStormBolt", new stormbolt(), 4).shuffleGrouping ("myKafkaSpout");
Config conf = new Config();
conf.setDebug(true);
// create an instance of LocalCluster class for
// executing topology in local mode.
LocalCluster cluster = new LocalCluster();
// LearningStormTopolgy is the name of submitted topology
cluster.submitTopology("LearningStormToplogy", conf, builder.createTopology());
try {
Thread.sleep(70000);
} catch (Exception exception) {
System.out.println("Thread interrupted exception : " + exception);
}
// kill the LearningStormTopology
cluster.killTopology("LearningStormToplogy");
// shutdown the storm test cluster
cluster.shutdown();
}
}
and here is the error when i run this code in end 20848 [Thread-16-__metricsorg.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsSink] INFO backtype.storm.daemon.executor - Prepared bolt __metricsorg.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsSink:(6)
20854 [Thread-18-myKafkaSpout] ERROR backtype.storm.util - Async loop died!
java.lang.NoSuchMethodError: org.apache.zookeeper.server.quorum.flexible.QuorumMaj.<init>(Ljava/util/Map;)V
at org.apache.curator.framework.imps.EnsembleTracker.<init>(EnsembleTracker.java:54) ~[curator-framework-3.0.0.jar:na]
at org.apache.curator.framework.imps.CuratorFrameworkImpl.<init>(CuratorFrameworkImpl.java:156) ~[curator-framework-3.0.0.jar:na]
at org.apache.curator.framework.CuratorFrameworkFactory$Builder.build(CuratorFrameworkFactory.java:136) ~[curator-framework-3.0.0.jar:na]
at org.apache.curator.framework.CuratorFrameworkFactory.newClient(CuratorFrameworkFactory.java:107) ~[curator-framework-3.0.0.jar:na]
at storm.kafka.ZkState.newCurator(ZkState.java:45) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating]
at storm.kafka.ZkState.<init>(ZkState.java:61) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating]
at storm.kafka.KafkaSpout.open(KafkaSpout.java:85) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating]
at backtype.storm.daemon.executor$fn__4949$fn__4964.invoke(executor.clj:542) ~[storm-core-0.9.3.2.2.4.2-2.jar:0.9.3.2.2.4.2-2]
at backtype.storm.util$async_loop$fn__452.invoke(util.clj:463) ~[storm-core-0.9.3.2.2.4.2-2.jar:0.9.3.2.2.4.2-2]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_79]
20855 [Thread-18-myKafkaSpout] ERROR backtype.storm.daemon.executor -
java.lang.NoSuchMethodError: org.apache.zookeeper.server.quorum.flexible.QuorumMaj.<init>(Ljava/util/Map;)V
at org.apache.curator.framework.imps.EnsembleTracker.<init>(EnsembleTracker.java:54) ~[curator-framework-3.0.0.jar:na]
at org.apache.curator.framework.imps.CuratorFrameworkImpl.<init>(CuratorFrameworkImpl.java:156) ~[curator-framework-3.0.0.jar:na]
at org.apache.curator.framework.CuratorFrameworkFactory$Builder.build(CuratorFrameworkFactory.java:136) ~[curator-framework-3.0.0.jar:na]
at org.apache.curator.framework.CuratorFrameworkFactory.newClient(CuratorFrameworkFactory.java:107) ~[curator-framework-3.0.0.jar:na]
at storm.kafka.ZkState.newCurator(ZkState.java:45) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating]
at storm.kafka.ZkState.<init>(ZkState.java:61) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating]
at storm.kafka.KafkaSpout.open(KafkaSpout.java:85) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating]
at backtype.storm.daemon.executor$fn__4949$fn__4964.invoke(executor.clj:542) ~[storm-core-0.9.3.2.2.4.2-2.jar:0.9.3.2.2.4.2-2]
at backtype.storm.util$async_loop$fn__452.invoke(util.clj:463) ~[storm-core-0.9.3.2.2.4.2-2.jar:0.9.3.2.2.4.2-2]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_79]
20900 [Thread-18-myKafkaSpout] ERROR backtype.storm.util - Halting process: ("Worker died")
java.lang.RuntimeException: ("Worker died")
at backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:322) [storm-core-0.9.3.2.2.4.2-2.jar:0.9.3.2.2.4.2-2]
at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.5.1.jar:na]
at backtype.storm.daemon.worker$fn__5425$fn__5426.invoke(worker.clj:491) [storm-core-0.9.3.2.2.4.2-2.jar:0.9.3.2.2.4.2-2]
at backtype.storm.daemon.executor$mk_executor_data$fn__4850$fn__4851.invoke(executor.clj:245) [storm-core-0.9.3.2.2.4.2-2.jar:0.9.3.2.2.4.2-2]
at backtype.storm.util$async_loop$fn__452.invoke(util.clj:475) [storm-core-0.9.3.2.2.4.2-2.jar:0.9.3.2.2.4.2-2]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_79]
... View more
01-20-2016
03:45 PM
huu... ok that means i need to upgrade my HDP or convert all dependencies to matching versions
... View more
01-20-2016
03:13 PM
using HDP 2.2.4.2-2 and spark version installed on this HDP is 1.2.1 please suggest changes in build file. also i am always confused which version to use with what what are the general guidelines using these. I am MS guy and java compatibility and dependencies always bite me hard. in SBT i got a solution that i compile a fat jar with sbt assembly and then cp it 🙂 this time this trick is also not working 😞
... View more
01-20-2016
02:56 PM
i am compiling this scala project with SBT from command prompt here is the SBT.
javacOptions ++= Seq("-source", "1.7", "-target", "1.7", "-Xlint")
lazy val root = (project in file(".")).
settings(
name := "helo",
version := "1",
scalaVersion := "2.10.4"
)
libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.2.0"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.2.0"
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.4.1"
libraryDependencies += "org.apache.spark" % "spark-sql_2.10" % "1.2.0"
libraryDependencies += "org.scala-lang" % "scala-library" % "2.10.4" % "test"
libraryDependencies += "org.codehaus.jackson" % "jackson-mapper-asl" % "1.9.13"
// META-INF discarding
mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>
{
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
case x => MergeStrategy.first
}
}
... View more
01-20-2016
02:20 PM
1 Kudo
I am completely blank on this error. its been 2 days i am struggling with this error. my kafka producer is working fine emitting ClickEvent every second. confirmed that events are being posed to kafka topic in Avro format. Now i want to read back in spark streaming. code is at end. notice i got this error "java.lang.NoClassDefFoundError: org/apache/spark/streaming/util/WriteAheadLogUtils$" now i tried to find every where WriteAheadLogUtils and found nothing. Please help me resolve this issues 16/01/20 14:17:37 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@sandbox.hortonworks.com:38373/user/HeartbeatReceiver
16/01/20 14:17:37 INFO NettyBlockTransferService: Server created on 60697
16/01/20 14:17:37 INFO BlockManagerMaster: Trying to register BlockManager
16/01/20 14:17:37 INFO BlockManagerMasterActor: Registering block manager localhost:60697 with 265.4 MB RAM, BlockManagerId(<driver>, localhost, 60697)
16/01/20 14:17:37 INFO BlockManagerMaster: Registered BlockManager
16/01/20 14:17:39 WARN DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
Error: application failed with exception
java.lang.NoClassDefFoundError: org/apache/spark/streaming/util/WriteAheadLogUtils$
at org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:84)
at streamingAvroConsumer$delayedInit$body.apply(streamingAvroConsumer.scala:44)
at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:71)
at scala.App$$anonfun$main$1.apply(App.scala:71)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
at scala.App$class.main(App.scala:71)
at streamingAvroConsumer$.main(streamingAvroConsumer.scala:18)
at streamingAvroConsumer.main(streamingAvroConsumer.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:367)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:77)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.streaming.util.WriteAheadLogUtils$
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
... 18 more
Code is below import events.avro.ClickEvent
import kafka.serializer.DefaultDecoder
import org.apache.avro.io.DecoderFactory
import org.apache.avro.specific.SpecificDatumReader
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Minutes, Seconds, StreamingContext}
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf
object streamingAvroConsumer extends App
{
println("Initializing App")
val numThreads = "1"
val topics = "testing2"
val sparkConf = new SparkConf().setAppName("WindowClickCount").setMaster("local[2]")
// Slide duration of ReduceWindowedDStream must be multiple of the parent DStream, and we chose 2 seconds for the reduced
// window stream
val ssc = new StreamingContext(sparkConf, Seconds(2))
// Because we're using .reduceByKeyAndWindow, we need to persist it to disk
ssc.checkpoint("./checkpointDir")
val kafkaConf = Map(
"metadata.broker.list" -> "sandbox.hortonworks.com:6667",
"zookeeper.connect" -> "sandbox.hortonworks.com:2181",
"group.id" -> "kafka-spark-streaming-example",
"zookeeper.connection.timeout.ms" -> "1000")
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
// Create a new stream which can decode byte arrays. For this exercise, the incoming stream only contain user and product Ids
val lines = KafkaUtils.createStream[String, Array[Byte], DefaultDecoder, DefaultDecoder](ssc, kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER).map(_._2)
val mappedUserName = lines.transform
{ rdd =>
rdd.map { bytes => AvroUtil.clickEventDecode(bytes) }.map
{ clickEvent =>
println(clickEvent.time);
}
}
ssc.start()
ssc.awaitTermination()
}
object AvroUtil
{
val reader = new SpecificDatumReader[ClickEvent](ClickEvent.getClassSchema)
def clickEventDecode(bytes: Array[Byte]): ClickEvent = {
val decoder = DecoderFactory.get.binaryDecoder(bytes, null)
reader.read(null, decoder)
}
}
... View more
Labels:
01-19-2016
12:24 PM
Hi have simple kafka producer program in scala that randomly create click events and and send them to kafka topic. here is the complete code. check that in loop there is one second delay. but when i run this program the loop runs VERY VERY slow. check that when a message is send then a Click message is shown, But a single Click takes around 5 minutes. Also kafka-console-consumer.sh running in other window does not show any activity even when Click are being produced slowly (3-4 minutes i guess for each click in loop) in first window. NO IDEA what is happening.
Note i have tested kafka working and can send and receive messages to topics (hortonworks sandbox) Note i am producing AVRO records in each click. this code is tested and running as i have tested records are being encoded and decoded. just the code in WHILE LOOP is running very very slowly and also there is no activity in consumer. Please suggest what should i look into or configure object StreamingAvroProducer extends App {
val brokers = "localhost:6667"
val topic = "testing"
val random = new Random()
val props = new util.HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArraySerializer") // Kafka avro message stream comes in as a byte array
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
val producer = new KafkaProducer[String, Array[Byte]](props)
while(true)
{
val clickBytes = serializeClickEvent(newRandomClickEvent) // Avro schema serialization as a byte array
val message = new ProducerRecord[String, Array[Byte]](topic, null, clickBytes) // Create a new producer record to send the message in
producer.send(message)
println("Click!")
Thread.sleep(1000)
}
// Generate a random click event
def newRandomClickEvent: ClickEvent = {
val userId = Random.nextInt(5)
val time = Random.nextInt(5).toString()
val action = Random.nextInt(5).toString()
val destination = Random.nextInt(5).toString()
val hotel = Random.nextInt(5).toString()
new ClickEvent(userId, time, action, destination, hotel)
}
def serializeClickEvent(clickEvent: ClickEvent): Array[Byte] = {
val out = new ByteArrayOutputStream()
val encoder = EncoderFactory.get.binaryEncoder(out, null)
val writer = new SpecificDatumWriter[ClickEvent](ClickEvent.getClassSchema)
writer.write(clickEvent, encoder)
encoder.flush
out.close
out.toByteArray
}
}
... View more
Labels:
01-15-2016
02:03 PM
yes that is correct but i want to find out variations in this pattern and figure out which one is the fastest - what is usual practice in this pattern, is storm topology is programmed that it pushes latest updates after very X seconds or minutes to a external system OR is there any direct method to get that data directly from running topology ?
... View more
01-15-2016
11:49 AM
doing exactly same except in producer the port is 6667 which is hortonworks default kafka port
... View more
01-15-2016
11:22 AM
3 Kudos
Hi, I have a topology that receives website clicks events from a stream. it keep track of events in last 20, 30 and 40 minutes of different topics. ok now how to consume this information in my website i want to show numbers of clicks on top of page of specific topic. that means somehow i need that real time information out of running topology my best guess is that i call a update service after say 30 seconds or produce a json file and save it to a location. but that is not real time exact information. is there a way that i call a topology service and get this realtime upto exact that time information Regards Shahzad Aslam
... View more
Labels:
01-15-2016
10:59 AM
same results i started consumer first and then producer. How to pass zookeeper quorum to start producer ?
... View more
01-15-2016
10:22 AM
Hi I am trying to run kafka consumer and producer with Hortonworks defaults topic is created and i guess consumer is also connected when i connect produce there is nothing then i type something and then i got error here are the commands i used Create topic test bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test Following command shows test topic is created bin/kafka-topics.sh --list --zookeeper localhost:2181 started consumer bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning Started producer bin/kafka-console-producer.sh --broker-list localhost:6667 --topic test when i type something in producer that's where errors come up like [2016-01-15 09:55:59,181] ERROR fetching topic metadata for topics [Set(test)] from broker [ArrayBuffer(id:0,host:losandbox.hortwonworks.com,port:9092)] failed (kafka.utils.Utils$)
kafka.common.KafkaException: fetching topic metadata for topics [Set(test)] from broker [ArrayBuffer(id:0,host:losandbox.hortwonworks.com,port:9092)] failed
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:67)
at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
at kafka.producer.async.DefaultEventHandler$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78)
at kafka.utils.Utils$.swallow(Utils.scala:167)
at kafka.utils.Logging$class.swallowError(Logging.scala:106)
at kafka.utils.Utils$.swallowError(Utils.scala:46)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78)
at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:93)
at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
Caused by: java.net.ConnectException: Connection timed out
at sun.nio.ch.Net.connect0(Native Method)
at sun.nio.ch.Net.connect(Net.java:465)
at sun.nio.ch.Net.connect(Net.java:457)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
at kafka.producer.SyncProducer.connect(SyncProducer.scala:141)
at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$doSend(SyncProducer.scala:68)
at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
... 9 more
[2016-01-15 09:57:07,249] ERROR Producer connection to losandbox.hortwonworks.com:9092 unsuccessful (kafka.producer.SyncProducer)
java.net.ConnectException: Connection timed out
at sun.nio.ch.Net.connect0(Native Method)
at sun.nio.ch.Net.connect(Net.java:465)
at sun.nio.ch.Net.connect(Net.java:457)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
at kafka.producer.SyncProducer.connect(SyncProducer.scala:141)
at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$doSend(SyncProducer.scala:68)
at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
at kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)
at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$getPartitionListForTopic(DefaultEventHandler.scala:186)
at kafka.producer.async.DefaultEventHandler$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)
at kafka.producer.async.DefaultEventHandler$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:149)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:149)
at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:95)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:93)
at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
[2016-01-15 09:57:07,250] WARN Fetching topic metadata with correlation id 14 for topics [Set(test)] from broker [id:0,host:losandbox.hortwonworks.com,port:9092] failed (kafka.client.ClientUtils$)
java.net.ConnectException: Connection timed out
at sun.nio.ch.Net.connect0(Native Method)
at sun.nio.ch.Net.connect(Net.java:465)
at sun.nio.ch.Net.connect(Net.java:457)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
at kafka.producer.SyncProducer.connect(SyncProducer.scala:141)
at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$doSend(SyncProducer.scala:68)
at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
at kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)
at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$getPartitionListForTopic(DefaultEventHandler.scala:186)
at kafka.producer.async.DefaultEventHandler$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)
at kafka.producer.async.DefaultEventHandler$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:149)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:149)
at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:95)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:93)
at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
[2016-01-15 09:57:07,251] ERROR Failed to collate messages by topic, partition due to: fetching topic metadata for topics [Set(test)] from broker [ArrayBuffer(id:0,host:losandbox.hortwonworks.com,port:9092)] failed (kafka.producer.async.DefaultEventHandler)
[2016-01-15 09:58:15,437] ERROR Producer connection to losandbox.hortwonworks.com:9092 unsuccessful (kafka.producer.SyncProducer)
java.net.ConnectException: Connection timed out
at sun.nio.ch.Net.connect0(Native Method)
at sun.nio.ch.Net.connect(Net.java:465)
at sun.nio.ch.Net.connect(Net.java:457)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
at kafka.producer.SyncProducer.connect(SyncProducer.scala:141)
at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$doSend(SyncProducer.scala:68)
at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
at kafka.producer.async.DefaultEventHandler$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78)
at kafka.utils.Utils$.swallow(Utils.scala:167)
at kafka.utils.Logging$class.swallowError(Logging.scala:106)
at kafka.utils.Utils$.swallowError(Utils.scala:46)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78)
at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:93)
at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
[2016-01-15 09:58:15,438] WARN Fetching topic metadata with correlation id 15 for topics [Set(test)] from broker [id:0,host:losandbox.hortwonworks.com,port:9092] failed (kafka.client.ClientUtils$)
java.net.ConnectException: Connection timed out
at sun.nio.ch.Net.connect0(Native Method)
at sun.nio.ch.Net.connect(Net.java:465)
at sun.nio.ch.Net.connect(Net.java:457)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
at kafka.producer.SyncProducer.connect(SyncProducer.scala:141)
at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$doSend(SyncProducer.scala:68)
at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
at kafka.producer.async.DefaultEventHandler$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78)
at kafka.utils.Utils$.swallow(Utils.scala:167)
at kafka.utils.Logging$class.swallowError(Logging.scala:106)
at kafka.utils.Utils$.swallowError(Utils.scala:46)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78)
at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:93)
at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
[2016-01-15 09:58:15,439] ERROR fetching topic metadata for topics [Set(test)] from broker [ArrayBuffer(id:0,host:losandbox.hortwonworks.com,port:9092)] failed (kafka.utils.Utils$)
kafka.common.KafkaException: fetching topic metadata for topics [Set(test)] from broker [ArrayBuffer(id:0,host:losandbox.hortwonworks.com,port:9092)] failed
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:67)
at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
at kafka.producer.async.DefaultEventHandler$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78)
at kafka.utils.Utils$.swallow(Utils.scala:167)
at kafka.utils.Logging$class.swallowError(Logging.scala:106)
at kafka.utils.Utils$.swallowError(Utils.scala:46)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78)
at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:93)
at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
Caused by: java.net.ConnectException: Connection timed out
at sun.nio.ch.Net.connect0(Native Method)
at sun.nio.ch.Net.connect(Net.java:465)
at sun.nio.ch.Net.connect(Net.java:457)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
at kafka.producer.SyncProducer.connect(SyncProducer.scala:141)
at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$doSend(SyncProducer.scala:68)
at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
... 9 more
[2016-01-15 09:59:23,724] ERROR Producer connection to losandbox.hortwonworks.com:9092 unsuccessful (kafka.producer.SyncProducer)
java.net.ConnectException: Connection timed out
at sun.nio.ch.Net.connect0(Native Method)
at sun.nio.ch.Net.connect(Net.java:465)
at sun.nio.ch.Net.connect(Net.java:457)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
at kafka.producer.SyncProducer.connect(SyncProducer.scala:141)
at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$doSend(SyncProducer.scala:68)
at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
at kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)
at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$getPartitionListForTopic(DefaultEventHandler.scala:186)
at kafka.producer.async.DefaultEventHandler$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)
at kafka.producer.async.DefaultEventHandler$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:149)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:149)
at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:95)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:93)
at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
[2016-01-15 09:59:23,725] WARN Fetching topic metadata with correlation id 16 for topics [Set(test)] from broker [id:0,host:losandbox.hortwonworks.com,port:9092] failed (kafka.client.ClientUtils$)
java.net.ConnectException: Connection timed out
at sun.nio.ch.Net.connect0(Native Method)
at sun.nio.ch.Net.connect(Net.java:465)
at sun.nio.ch.Net.connect(Net.java:457)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
at kafka.producer.SyncProducer.connect(SyncProducer.scala:141)
at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$doSend(SyncProducer.scala:68)
at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
at kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)
at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$getPartitionListForTopic(DefaultEventHandler.scala:186)
at kafka.producer.async.DefaultEventHandler$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)
at kafka.producer.async.DefaultEventHandler$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:149)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:149)
at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:95)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:93)
at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
[2016-01-15 09:59:23,726] ERROR Failed to collate messages by topic, partition due to: fetching topic metadata for topics [Set(test)] from broker [ArrayBuffer(id:0,host:losandbox.hortwonworks.com,port:9092)] failed (kafka.producer.async.DefaultEventHandler)
[2016-01-15 10:00:32,088] ERROR Producer connection to losandbox.hortwonworks.com:9092 unsuccessful (kafka.producer.SyncProducer)
java.net.ConnectException: Connection timed out
at sun.nio.ch.Net.connect0(Native Method)
at sun.nio.ch.Net.connect(Net.java:465)
at sun.nio.ch.Net.connect(Net.java:457)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
at kafka.producer.SyncProducer.connect(SyncProducer.scala:141)
at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$doSend(SyncProducer.scala:68)
at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
at kafka.producer.async.DefaultEventHandler$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78)
at kafka.utils.Utils$.swallow(Utils.scala:167)
at kafka.utils.Logging$class.swallowError(Logging.scala:106)
at kafka.utils.Utils$.swallowError(Utils.scala:46)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78)
at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:93)
at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
[2016-01-15 10:00:32,090] WARN Fetching topic metadata with correlation id 17 for topics [Set(test)] from broker [id:0,host:losandbox.hortwonworks.com,port:9092] failed (kafka.client.ClientUtils$)
java.net.ConnectException: Connection timed out
at sun.nio.ch.Net.connect0(Native Method)
at sun.nio.ch.Net.connect(Net.java:465)
at sun.nio.ch.Net.connect(Net.java:457)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
at kafka.producer.SyncProducer.connect(SyncProducer.scala:141)
at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$doSend(SyncProducer.scala:68)
at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
at kafka.producer.async.DefaultEventHandler$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78)
at kafka.utils.Utils$.swallow(Utils.scala:167)
at kafka.utils.Logging$class.swallowError(Logging.scala:106)
at kafka.utils.Utils$.swallowError(Utils.scala:46)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78)
at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:93)
at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
[2016-01-15 10:00:32,093] ERROR fetching topic metadata for topics [Set(test)] from broker [ArrayBuffer(id:0,host:losandbox.hortwonworks.com,port:9092)] failed (kafka.utils.Utils$)
kafka.common.KafkaException: fetching topic metadata for topics [Set(test)] from broker [ArrayBuffer(id:0,host:losandbox.hortwonworks.com,port:9092)] failed
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:67)
at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
at kafka.producer.async.DefaultEventHandler$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78)
at kafka.utils.Utils$.swallow(Utils.scala:167)
at kafka.utils.Logging$class.swallowError(Logging.scala:106)
at kafka.utils.Utils$.swallowError(Utils.scala:46)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78)
at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:93)
at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
Caused by: java.net.ConnectException: Connection timed out
at sun.nio.ch.Net.connect0(Native Method)
at sun.nio.ch.Net.connect(Net.java:465)
at sun.nio.ch.Net.connect(Net.java:457)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
at kafka.producer.SyncProducer.connect(SyncProducer.scala:141)
at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$doSend(SyncProducer.scala:68)
at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
... 9 more
[2016-01-15 10:00:32,106] ERROR Failed to send requests for topics test with correlation ids in [9,17] (kafka.producer.async.DefaultEventHandler)
[2016-01-15 10:00:32,108] ERROR Error in handling batch of 3 events (kafka.producer.async.ProducerSendThread)
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:93)
at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
kafka.producer.ProducerClosedException: producer already closed
at kafka.producer.Producer.send(Producer.scala:73)
at kafka.producer.ConsoleProducer$.main(ConsoleProducer.scala:170)
at kafka.producer.ConsoleProducer.main(ConsoleProducer.scala)
... View more
Labels:
01-14-2016
04:16 PM
this is the same comment i posted accedently above absolutely no idea about this this is part of challenge i got from a potential employer and i don't want to ask him how to get challenge files from there as it look like this is part of challenge that how i retrieve information from there. it look like that i need to restore it with new hadoop installation administration is not what i am good in so step to do so will be highly appreciated Shahzad
... View more
01-14-2016
03:42 PM
Hi, I got a zip that contains dfs directory. this directory contains following directory data
name
namesecondary Now if i open name or namesecondary they contain current directory with files like "edits_0000000000000000001-0000000000000000004" "edits_inprogress_0000000000000000005" "fsimage_0000000000000000000" "VERSION" etc etc Data directory contains current directory and then more files and directories in that. Look like this is dump of some hadoop directories Now question is how to retrieve files from this. i guess this is some sort of hadoop archive and i need to restore it Please help me out Thanks Shahzad
... View more