Member since
08-07-2017
21
Posts
2
Kudos Received
0
Solutions
09-28-2017
08:35 AM
Thanks, it worked
... View more
09-25-2017
06:37 AM
What I have observed that using DefaultFileNameFormat works fine for me. So can any body please tell me what should I do in my custom hdfs file name format so that it can also work without no lease file error. public class CustomHDFSFileNameFormat implements FileNameFormat{
private static final long serialVersionUID = 1L;
private String filename=null;
private String extension=null;
private String path=null;
public CustomHDFSFileNameFormat(String path,String filename,String extension){
this.filename = filename;
this.extension = extension;
this.path = path;
}
public String getName(long rotation, long timeStamp) {
return (this.filename+"."+this.extension);
}
public String getPath() {
return this.path;
}
public void prepare(Map map, TopologyContext context) {
// TODO Auto-generated method stub
}
}
... View more
09-25-2017
04:37 AM
Hi @ajoseph Thanks . But can you please tell me how could I set overwrite flag.
... View more
09-22-2017
01:36 PM
I am getting below error while working with Apache storm and HDFS. Caused by: org.apache.hadoop.ipc.RemoteException: No lease on /user/test/xxx/ccc/bbb/value_fct_archive.csv (inode 5425306): File does not exist. [Lease. Holder: DFSClient_NONMAPREDUCE_-247167534_42, pendingcreates: 1]
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3521)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.analyzeFileState(FSNamesystem.java:3324)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getNewBlockTargets(FSNamesystem.java:3162)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3122)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:843)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:500)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:640)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2313)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2309)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1724)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2307)
at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1552) ~[stormjar.jar:?]
at org.apache.hadoop.ipc.Client.call(Client.java:1496) ~[stormjar.jar:?]
at org.apache.hadoop.ipc.Client.call(Client.java:1396) ~[stormjar.jar:?]
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:233) ~[stormjar.jar:?]
at com.sun.proxy.$Proxy41.addBlock(Unknown Source) ~[?:?]
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:457) ~[stormjar.jar:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_77]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_77]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_77]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_77]
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:278) ~[stormjar.jar:?]
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:194) ~[stormjar.jar:?]
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:176) ~[stormjar.jar:?]
at com.sun.proxy.$Proxy43.addBlock(Unknown Source) ~[?:?]
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.locateFollowingBlock(DFSOutputStream.java:1489) ~[stormjar.jar:?]
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1284) ~[stormjar.jar:?]
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:463) ~[stormjar.jar:?] I tried to resolve it by many ways like increasing ulimit or setting parallelism hint to 1 etc. but nothing worked for me. My topology code is as below, public class Topology {
private static final Logger LOG = LoggerFactory.getLogger(Topology.class);
public static StormTopology buildTopology() {
TopologyBuilder builder = new TopologyBuilder();
// read properties and send to MQTT spout
PropertyReader propReader = new PropertyReader();
Properties prop = propReader.getProperties();
String mqttService = (propReader.getValue(prop, "mqttservice")).trim();
String[] devicesTopics = ((propReader.getValue(prop, "devices_topics.MQTT")).trim()).split(",");
for(String devicesTopicsId:devicesTopics){
LOG.info("devicesTopics Id is "+devicesTopicsId);
String[] devices = devicesTopicsId.split(":");
String deviceId = devices[0];
LOG.info("device Id is "+deviceId);
String mqtttopic = devices[1];
LOG.info("Topic Name is "+mqtttopic);
//start topology logic
String mqttreader="mqttreader-"+deviceId;
LOG.info("Mqtt reader is "+mqttreader);
String stream="stream-"+deviceId;
LOG.info("Stream Id is "+stream);
builder.setSpout(mqttreader, new MQTTReader(mqttService,mqtttopic,deviceId,stream));
//set mqttprocessor bolt
String mqttprocesser="mqttprocesser-"+deviceId;
LOG.info("Mqtt processor is "+mqttprocesser);
builder.setBolt(mqttprocesser, new MQTTProcesser(stream)).shuffleGrouping(mqttreader,stream);
//set dateretriver bolt
String dateretriver="dateretriver-"+deviceId;
LOG.info("date retriver is "+dateretriver);
builder.setBolt(dateretriver, new DateRetriver(stream)).shuffleGrouping(mqttprocesser,stream);
//set archival bolt from dateretriver
String archive="archive-"+deviceId;
LOG.info("archival is "+archive);
builder.setBolt(archive, new Archival(stream)).shuffleGrouping(dateretriver,stream);
//get hive bolt
MQTTHiveSinker sinker = new MQTTHiveSinker();
String metaStoreURI = (propReader.getValue(prop, "hiveurl")).trim();
String dbName = (propReader.getValue(prop, "dbname.MQTT")).trim();
String tblName = (propReader.getValue(prop, "tblname.MQTT")).trim();
String[] colNames = ((propReader.getValue(prop, "colNames.MQTT")).trim()).split(",");
LOG.info("colName0 is "+colNames[0]);
LOG.info("colName1 is "+colNames[1]);
LOG.info("colName2 is "+colNames[2]);
LOG.info("colName3 is "+colNames[3]);
HiveBolt hiveBolt = sinker.buildHiveBolt(metaStoreURI, dbName, tblName, colNames);
//set hive bolt
String hivebolt="hivebolt-"+deviceId;
LOG.info("Hivbolt is "+hivebolt);
builder.setBolt(hivebolt, hiveBolt).shuffleGrouping(archive);
//set hdfs bolt
MQTTHDFSSinker hdfssinker = new MQTTHDFSSinker();
String hdfsboltId="hdfsbolt-"+deviceId;
LOG.info("hdfsbolt is "+hdfsboltId);
HdfsBolt hdfbolt = hdfssinker.makeHDFSBolt();
builder.setBolt(hdfsboltId,hdfbolt).shuffleGrouping(archive);
}
return builder.createTopology();
}
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
Config conf = new Config();
conf.setDebug(true);
conf.setNumWorkers(2);
//conf.setMaxSpoutPending(1);
StormSubmitter.submitTopology(args[0], conf, buildTopology());
} And HDFS streaming code as below, public class MQTTHDFSSinker {
private static final Logger LOG = LoggerFactory.getLogger(MQTTHDFSSinker.class);
public HdfsBolt makeHDFSBolt(){
LOG.info("start of MQTTHDFSSinker.makeHDFSBolt");
RecordFormat format = new DelimitedRecordFormat().withFieldDelimiter(",");
// sync the filesystem after every 1k tuples
SyncPolicy syncPolicy = new CountSyncPolicy(1000);
// rotate files when they reach 5MB
FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB);
FileNameFormat fileNameFormat = new CustomHDFSFileNameFormat("/user/test/xxx/ccc/bbb/","value_fct_archive","csv");
HdfsBolt bolt = new HdfsBolt().withFsUrl("hdfs://namenode:8020").withFileNameFormat(fileNameFormat).withRecordFormat(format).withRotationPolicy(rotationPolicy).withSyncPolicy(syncPolicy);
LOG.info("end of MQTTHDFSSinker.makeHDFSBolt");
return bolt;
}
} Is it issue of multiple threads trying to write the file. Can anybody please help me for the error.
... View more
Labels:
- Labels:
-
Apache Storm
08-30-2017
12:39 PM
So I updated hive-site.xml with /tmp/mydir value for scratchdir property on namenode. And it kept showing me error for /tmp/hive. So I changed configuration file on all datanodes and error went (but another error came.) So, 1. Is it correct way to change property on all nodes. 2. Does that also suggest that in case hive property change it has to be changed for all thenodes. Can anybody please let me know.
... View more
08-30-2017
09:49 AM
Hi, I am having issue with storm and hive streaming. I did as post https://community.hortonworks.com/questions/96995/storm-hdfs-javalangruntimeexception-error-preparin.html but did not helped. I also looked at https://community.hortonworks.com/questions/111874/non-local-session-path-expected-to-be-non-null-try.html post. But not understood as which jar to be included. Can anybody please help me. Below is my POM. <dependencies>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.9.9</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.0.1</version>
<scope>provided</scope>
</dependency>
<dependency> <groupId>org.apache.storm</groupId>
<artifactId>storm-hive</artifactId>
<version>1.0.1</version>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.3.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.2.0</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion> </exclusions> </dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId> <version>2.2.0</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion> </exclusions> </dependency>
</dependencies> Below is my maven shade plugin <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId> </plugin> <plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.8</source> <target>1.8</target> </configuration>
</plugin> <!-- Maven shade plugin --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.4.3</version>
<configuration>
<createDependencyReducedPom>true</createDependencyReducedPom> </configuration> <executions>
<execution> <phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration> <!-- <artifactSet>
<excludes>
<exclude>org.slf4j:slf4j-log4j12:*</exclude>
<exclude>log4j:log4j:jar:</exclude>
<exclude>org.slf4j:slf4j-simple:jar</exclude>
<exclude>org.apache.storm:storm-core</exclude>
</excludes>
</artifactSet> -->
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass></mainClass> </transformer> </transformers>
</configuration> </execution>
</executions>
</plugin> </plugins>
</build> Please see below error. java.lang.RuntimeException: java.lang.RuntimeException: The root scratch dir: /tmp/hive on HDFS should be writable. Current permissions are: rwx------
at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:444) ~[stormjar.jar:?]
at org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.createPartitionIfNotExists(HiveEndPoint.java:358) ~[stormjar.jar:?]
at org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.<init>(HiveEndPoint.java:276) ~[stormjar.jar:?]
at org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.<init>(HiveEndPoint.java:243) ~[stormjar.jar:?]
at org.apache.hive.hcatalog.streaming.HiveEndPoint.newConnectionImpl(HiveEndPoint.java:180) ~[stormjar.jar:?]
at org.apache.hive.hcatalog.streaming.HiveEndPoint.newConnection(HiveEndPoint.java:157) ~[stormjar.jar:?]
at org.apache.storm.hive.common.HiveWriter$5.call(HiveWriter.java:238) ~[stormjar.jar:?]
at org.apache.storm.hive.common.HiveWriter$5.call(HiveWriter.java:235) ~[stormjar.jar:?]
at org.apache.storm.hive.common.HiveWriter$9.call(HiveWriter.java:366) ~[stormjar.jar:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_77]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[?:1.8.0_77]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[?:1.8.0_77]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_77]
Caused by: java.lang.RuntimeException: The root scratch dir: /tmp/hive on HDFS should be writable. Current permissions are: rwx------
at org.apache.hadoop.hive.ql.session.SessionState.createRootHDFSDir(SessionState.java:529) ~[stormjar.jar:?]
at org.apache.hadoop.hive.ql.session.SessionState.createSessionDirs(SessionState.java:478) ~[stormjar.jar:?]
at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:430) ~[stormjar.jar:?]
... 12 more
2017-08-30 14:51:58.022 o.a.s.d.executor [INFO] BOLT fail TASK: 3 TIME: TUPLE: source: mqttprocesser:4, stream: default, id: {}, [My count is, 11]
2017-08-30 14:51:58.023 o.a.s.d.executor [INFO] Execute done TUPLE source: mqttprocesser:4, stream: default, id: {}, [My count is, 11] TASK: 3 DELTA:
2017-08-30 14:51:58.023 o.a.s.d.executor [INFO] Processing received message FOR 3 TUPLE: source: mqttprocesser:4, stream: default, id: {}, [My count is, 12]
2017-08-30 14:51:58.050 h.metastore [INFO] Trying to connect to metastore with URI thrift://base1.rolta.com:9083
2017-08-30 14:51:58.052 h.metastore [INFO] Connected to metastore.
2017-08-30 14:51:58.124 o.a.h.h.q.l.PerfLogger [INFO] <PERFLOG method=Driver.run from=org.apache.hadoop.hive.ql.Driver>
2017-08-30 14:51:58.124 o.a.h.h.q.l.PerfLogger [INFO] <PERFLOG method=TimeToSubmit from=org.apache.hadoop.hive.ql.Driver>
2017-08-30 14:51:58.124 o.a.h.h.q.l.PerfLogger [INFO] <PERFLOG method=compile from=org.apache.hadoop.hive.ql.Driver>
2017-08-30 14:51:58.202 STDIO [ERROR] FAILED: NullPointerException Non-local session path expected to be non-null
2017-08-30 14:51:58.202 o.a.h.h.q.Driver [ERROR] FAILED: NullPointerException Non-local session path expected to be non-null
java.lang.NullPointerException: Non-local session path expected to be non-null
at com.google.common.base.Preconditions.checkNotNull(Preconditions.java:204)
at org.apache.hadoop.hive.ql.session.SessionState.getHDFSSessionPath(SessionState.java:590)
at org.apache.hadoop.hive.ql.Context.<init>(Context.java:129)
at org.apache.hadoop.hive.ql.Context.<init>(Context.java:116)
at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:382)
at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:303)
at org.apache.hadoop.hive.ql.Driver.compileInternal(Driver.java:1067)
at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1129)
at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1004)
at org.apache.hadoop.hive.ql.Driver.run(Driver.java:994)
at org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.runDDL(HiveEndPoint.java:404)
at org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.createPartitionIfNotExists(HiveEndPoint.java:369)
at org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.<init>(HiveEndPoint.java:276)
at org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.<init>(HiveEndPoint.java:243)
at org.apache.hive.hcatalog.streaming.HiveEndPoint.newConnectionImpl(HiveEndPoint.java:180)
at org.apache.hive.hcatalog.streaming.HiveEndPoint.newConnection(HiveEndPoint.java:157)
at org.apache.storm.hive.common.HiveWriter$5.call(HiveWriter.java:238)
at org.apache.storm.hive.common.HiveWriter$5.call(HiveWriter.java:235)
at org.apache.storm.hive.common.HiveWriter$9.call(HiveWriter.java:366)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
... View more
Labels:
- Labels:
-
Apache Hive
-
Apache Storm
08-08-2017
09:39 AM
thanks 🙂 It worked. I am using twitter4j 4.0.4 now.
... View more
08-07-2017
11:57 AM
@Jay SenSharma Thanks Jay for your response. I will try your suggestion. Mean while my POM is as below, <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.rolta</groupId> <artifactId>stormtwitter</artifactId> <version>0.0.1-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.7</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.7</version> </dependency> <dependency> <groupId>joda-time</groupId> <artifactId>joda-time</artifactId> <version>2.9.9</version> </dependency> <dependency> <groupId>org.twitter4j</groupId> <artifactId>twitter4j-core</artifactId> <version>4.0.6</version> </dependency> <dependency> <groupId>org.twitter4j</groupId> <artifactId>twitter4j-stream</artifactId> <version>4.0.6</version> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>1.0.1</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.opennlp</groupId> <artifactId>opennlp-tools</artifactId> <version>1.8.1</version> </dependency> <dependency> <groupId>org.apache.solr</groupId> <artifactId>solr-solrj</artifactId> <version>6.6.0</version> </dependency> <dependency> <groupId>com.jolira</groupId> <artifactId>onejar-maven-plugin</artifactId> <version>1.4.4</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-source-plugin</artifactId> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.4.3</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <artifactSet> <excludes> <exclude>org.slf4j:slf4j-log4j12:*</exclude> <exclude>log4j:log4j:jar:</exclude> <exclude>org.slf4j:slf4j-simple:jar</exclude> <exclude>org.apache.storm:storm-core</exclude> </excludes> </artifactSet> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>com.rolta.storm.topology.Topology</mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build> </project>
... View more
08-07-2017
11:13 AM
Hi, I am having HDP 2.5 and storm core version as storm-core-1.0.1.2.5.3.0-37.jar. My code was working with Local Cluster but its failing when using StormSubmitter.submitTopology. It gives me InvalidClassException. Below is complete stack. One more observation that jar gives error while executing first time and run afterwards. I searched and implemented as below, but nothing worked. Please help me. java.lang.RuntimeException: java.io.InvalidClassException: twitter4j.MediaEntityJSONImpl; local class incompatible: stream classdesc serialVersionUID = 1571961225214439778, local class serialVersionUID = 3609683338035442290 at org.apache.storm.serialization.SerializableSerializer.read(SerializableSerializer.java:58) ~[storm-core-1.0.1.2.5.3.0-37.jar:1.0.1.2.5.3.0-37] 1. added serialVersionUID with value 1571961225214439778L, 3609683338035442290, 1L but nothing worked 2. Implemented Serializable, Externalizable in my POJOs 3. Since I am using storm-core-1.0.1.2.5.3.0-37.jar & HDP 2.5 so added maven dependency in pom with scope as provided which matches it, but did not worked. 4. Added no-arg constructors.
... View more
Labels:
- Labels:
-
Apache Storm