Member since
08-07-2017
21
Posts
2
Kudos Received
0
Solutions
02-13-2018
05:24 AM
Hi Experts, I was reading about "withBatchSize" parameter in apache storm at http://storm.apache.org/releases/1.1.1/storm-hive.html It has explanation as "Max number of events written to Hive in a single Hive transaction". So below are my questions, 1. What is events? Is it one record in stream which would be ingested or any thing else. 2. How to verify that number for batch size is being written into hdfs Thanks
... View more
Labels:
02-08-2018
04:27 AM
HDP and NiFi does not seem compliant. I used HDF.
... View more
01-01-2018
01:00 PM
tuple.getSourceComponent() and tuple.getSourceStreamId() always return spout name and default values. please help.
... View more
12-29-2017
11:05 AM
Hi Experts, In my Storm based application I need to query oracle table periodically So I thought to use Tick tuple of storm. But it's not giving correct result and tick tuple is not producing.
My storm version is 1.0.1.2.5.3.0-37 I tried as below, 1. Added getComponentConfiguration method in bolt but tick tuple is not generating. 2. So I changed the code and used Config from topology for generating tick tuple. But got tick tuple only once. Below is my code of tick tuple with bolt, public class TickTupleBolt implements IRichBolt{
private OutputCollector collector = null;
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(TickTupleBolt.class);
public void prepare(Map stormConf, TopologyContext context,OutputCollector collector) {
this.collector = collector;
}
public void execute(Tuple tuple) {
LOG.info("Start of TickTupleBolt.execute");
try {
if (isTickTuple(tuple)) {
//if(tuple.getSourceStreamId().equals("__tick")){
LOG.info("**got tick tuple");
}else{
LOG.info("not got tick tuple");
}
} catch (Exception e) {
LOG.error("Bolt execute error: {}", e);
collector.reportError(e);
}
LOG.info("End of TickTupleBolt.execute");
}
public void cleanup() {
// TODO Auto-generated method stub
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// TODO Auto-generated method stub
} So can any body please let me know, 1. How to implement tick tuple in Storm 2. Is there any other way (apart from tick tuple) to do periodic job in storm Thanks.
... View more
Labels:
11-06-2017
09:43 AM
2 Kudos
I am with nifi 1.4. I am using ConsumeKafkaRecord_0_10 and PutHiveStreaming processor. I am getting pipe delimited messages which is parsed with CSVReader and written by using AvroRecordSetWriter. But now I am getting below error. 2017-11-06 11:11:02,699 ERROR [Timer-Driven Process Thread-6]o.a.n.processors.hive.PutHiveStreaming PutHiveStreaming[id=819a7fe2-015f-1000-7097-cd3f8614c9c2] Error connecting to Hive endpoint: table oneviewtest3 at thrift://parent.rolta.com:9083
2017-11-06 11:11:02,699 ERROR [Timer-Driven Process Thread-6] o.a.n.processors.hive.PutHiveStreaming PutHiveStreaming[id=819a7fe2-015f-1000-7097-cd3f8614c9c2] Hive Streaming connect/write error, flow file will be penalized and routed to retry. org.apache.nifi.util.hive.HiveWriter$ConnectFailure: Failed connecting to EndPoint {metaStoreUri='thrift://parent.rolta.com:9083', database='default', table='oneviewtest3', partitionVals=[] }:
org.apache.nifi.processors.hive.PutHiveStreaming$ShouldRetryException: Hive Streaming connect/write error, flow file will be penalized and routed to retry. org.apache.nifi.util.hive.HiveWriter$ConnectFailure: Failed connecting to EndPoint {metaStoreUri='thrift://parent.rolta.com:9083', database='default', table='oneviewtest3', partitionVals=[] }
org.apache.nifi.processors.hive.PutHiveStreaming$ShouldRetryException: Hive Streaming connect/write error, flow file will be penalized and routed to retry. org.apache.nifi.util.hive.HiveWriter$ConnectFailure: Failed connecting to EndPoint {metaStoreUri='thrift://parent.rolta.com:9083', database='default', table='oneviewtest3', partitionVals=[] } Below is my table create statement, create table oneviewtest3 (TNT_KEY BIGINT, PLANT_KEY BIGINT, UNIT_KEY BIGINT, TAG_DATA_SRC STRING, TAG_DATA_RLVNC STRING, TAG_TYPE_ID STRING, DB_RQST_ID STRING, BATCH_KEY BIGINT, OPC_TAG_ID STRING, OPC_TAG_VAL_TS STRING, OPC_SRC_STM_CODE BIGINT, OPC_TAG_VAL STRING, OPC_QLTY_VAL BIGINT, REF_ID STRING, REF_TYPE STRING, ERROR_CODE STRING, OPC_TAG_VAL_AGG_TYPE STRING, OPC_TAG_RSMPL_INTRVL_VAL BIGINT, OPC_TAG_RSMPL_INTRVL_TYPE STRING, OPC_TAG_VAL_TS_PRC STRING, FLEX_FIELD1 STRING, FLEX_FIELD2 STRING, FLEX_FIELD3 STRING, FLEX_FIELD4 STRING, FLEX_FIELD5 STRING, ETL_BTCH_ID BIGINT) CLUSTERED BY (OPC_TAG_ID) into 4 buckets stored as orc tblproperties ("transactional"="true","orc.compress"="NONE"); I am already following below in table create statement, 1.ORC format for table. So table has "stored as orc" 2.transactional = "true" is set in the table create statement 3.Bucketed but not sorted. So table has "clustered by (colName) into (n) buckets" And also done following on hive,
hive.support.concurrency = true hive.enforce.bucketing = true hive.exec.dynamic.partition.mode = nonstrict hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager hive.compactor.initiator.on = true hive.compactor.worker.threads = 2 I am using HDP 2.5.3 and nifi-hive-nar-1.4.0.nar. In hive-metastore I see error as, 2017-11-06 12:02:53,436 ERROR [pool-8-thread-192]: server.TThreadPoolServer (TThreadPoolServer.java:run(297)) - Error occurred during processing of message. java.lang.IllegalStateException: Unexpected DataOperationType: UNSET agentInfo=Unknown txnid:424 So can anybody please help me on it.
... View more
Labels:
11-03-2017
01:34 PM
Hi,
I just started with NiFi 1.4. I am trying to send pipe delimited message via kafka into Hive. So I am using ConsumeKafkaRecord_0_10 and PutHivStreaming processor. Consume Kafka reader sends data on success to PutHiveStreaming. Consume Kafka reader writing data in avro format but Put Hive streaming gives error as "The incoming flow file can not be read as an Avro filee: java.io.IOException: Not a data file."
So can anybody please help me on this error.
... View more
Labels:
11-02-2017
01:36 PM
Hi, I am using Kafka 0.10 and Flume 1.8. I am trying to get information on below (but could not get it). So can any body please help me. 1. Is there any way to send event to particular kafka topic partition
2. And if so, then can we read such events (coming to specific partition) with flume using hive sink
... View more
Labels:
10-30-2017
11:00 AM
I am facing issue with kafka 0.10 and flume 1.8. We are trying to ingest NRT data into hive by streaming. But often get errors like socket exception and followed by java.lang.OutOfMemoryError. So i tried below things, 1. Exported java heap from flume-env.sh to 8GB but not worked. 2. Used multiple hive sinkers and channels (memory and file) but not worked. We are having 4 node cluster with 16GB memory on each node. So is there any thing else I can do in flume configuration or any alternative for kafka-flume would help me. Below is my flume properties flume1.sources = kafka-source-1
flume1.channels = hive-channel-1 hive-channel-2 hive-channel-3 hive-channel-4
hive-channel-5 hive-channel-6 hive-channel-7 hive-channel-8 hive-channel-9
flume1.sinks = hive-sink-1 hive-sink-2 hive-sink-3 hive-sink-4 hive-sink-5
hive-sink-6 hive-sink-7 hive-sink-8 hive-sink-9
flume1.sources.kafka-source-1.type =
org.apache.flume.source.kafka.KafkaSource
flume1.sources.kafka-source-1.zookeeperConnect = base1.rolta.com:2181
flume1.sources.kafka-source-1.topic = iot-streaming
flume1.sources.kafka-source-1.batchSize = 100
flume1.sources.kafka-source-1.channels = hive-channel-1
flume1.sources.kafka-source-1.batchSize = 100
flume1.sources.kafka-source-1.batchDurationMillis = 10000
flume1.channels.hive-channel-1.type = file
flume1.channels.hive-channel-1.checkpointDir = /home/labuser/flumedata/chkpoint/001
flume1.channels.hive-channel-1.dataDirs = /home/labuser/flumedata/data/001
flume1.channels.hive-channel-1.transactionCapacity = 100
flume1.channels.hive-channel-2.type = file
flume1.channels.hive-channel-2.checkpointDir = /home/labuser/flumedata/chkpoint/002
flume1.channels.hive-channel-2.dataDirs = /home/labuser/flumedata/data/002
flume1.channels.hive-channel-2.transactionCapacity = 100
flume1.channels.hive-channel-3.type = file
flume1.channels.hive-channel-3.checkpointDir = /home/labuser/flumedata/chkpoint/003
flume1.channels.hive-channel-3.dataDirs = /home/labuser/flumedata/data/003
flume1.channels.hive-channel-3.transactionCapacity = 100
flume1.channels.hive-channel-4.type = file
flume1.channels.hive-channel-4.checkpointDir = /home/labuser/flumedata/chkpoint/004
flume1.channels.hive-channel-4.dataDirs = /home/labuser/flumedata/data/004
flume1.channels.hive-channel-4.transactionCapacity = 100
flume1.channels.hive-channel-5.type = file
flume1.channels.hive-channel-5.checkpointDir = /home/labuser/flumedata/chkpoint/005
flume1.channels.hive-channel-5.dataDirs = /home/labuser/flumedata/data/005
flume1.channels.hive-channel-5.transactionCapacity = 100
flume1.channels.hive-channel-6.type = file
flume1.channels.hive-channel-6.checkpointDir = /home/labuser/flumedata/chkpoint/006
flume1.channels.hive-channel-6.dataDirs = /home/labuser/flumedata/data/006
flume1.channels.hive-channel-6.transactionCapacity = 100
flume1.channels.hive-channel-7.type = file
flume1.channels.hive-channel-7.checkpointDir = /home/labuser/flumedata/chkpoint/007
flume1.channels.hive-channel-7.dataDirs = /home/labuser/flumedata/data/007
flume1.channels.hive-channel-7.transactionCapacity = 100
flume1.channels.hive-channel-8.type = file
flume1.channels.hive-channel-8.checkpointDir = /home/labuser/flumedata/chkpoint/008
flume1.channels.hive-channel-8.dataDirs = /home/labuser/flumedata/data/008
flume1.channels.hive-channel-8.transactionCapacity = 100
flume1.channels.hive-channel-9.type = file
flume1.channels.hive-channel-9.checkpointDir = /home/labuser/flumedata/chkpoint/009
flume1.channels.hive-channel-9.dataDirs = /home/labuser/flumedata/data/009
flume1.channels.hive-channel-9.transactionCapacity = 100
flume1.sinks.hive-sink-1.channel = hive-channel-1
flume1.sinks.hive-sink-1.type = hive
flume1.sinks.hive-sink-1.hive.metastore = thrift://base1.rolta.com:9083
flume1.sinks.hive-sink-1.hive.database = default
flume1.sinks.hive-sink-1.hive.table = oneviewtest1
flume1.sinks.hive-sink-1.serializer = DELIMITED
flume1.sinks.hive-sink-1.serializer.delimiter = "\\|"
flume1.sinks.hive-sink-1.serializer.fieldnames = db_rqst_id,opc_tag_id,opc_qlty_val,opc_tag_val,opc_tag_val_ts,opc_src_stm_code,tag_data_src
flume1.sinks.hive-sink-1.hive.txnsPerBatchAsk = 2
flume1.sinks.hive-sink-1.batchSize = 100
flume1.sinks.hive-sink-1.hive.partition = %y-%m-%d
flume1.sinks.hive-sink-1.hive.txnsPerBatchAsk = 2
flume1.sinks.hive-sink-2.channel = hive-channel-2
flume1.sinks.hive-sink-2.type = hive
flume1.sinks.hive-sink-2.hive.metastore = thrift://base1.rolta.com:9083
flume1.sinks.hive-sink-2.hive.database = default
flume1.sinks.hive-sink-2.hive.table = oneviewtest1
flume1.sinks.hive-sink-2.serializer = DELIMITED
flume1.sinks.hive-sink-2.serializer.delimiter = "\\|"
flume1.sinks.hive-sink-2.serializer.fieldnames = db_rqst_id,opc_tag_id,opc_qlty_val,opc_tag_val,opc_tag_val_ts,opc_src_stm_code,tag_data_src
flume1.sinks.hive-sink-2.hive.txnsPerBatchAsk = 2
flume1.sinks.hive-sink-2.batchSize = 100
flume1.sinks.hive-sink-2.hive.partition = %y-%m-%d
flume1.sinks.hive-sink-2.hive.txnsPerBatchAsk = 2
flume1.sinks.hive-sink-3.channel = hive-channel-3
flume1.sinks.hive-sink-3.type = hive
flume1.sinks.hive-sink-3.hive.metastore = thrift://base1.rolta.com:9083
flume1.sinks.hive-sink-3.hive.database = default
flume1.sinks.hive-sink-3.hive.table = oneviewtest1
flume1.sinks.hive-sink-3.serializer = DELIMITED
flume1.sinks.hive-sink-3.serializer.delimiter = "\\|"
flume1.sinks.hive-sink-3.serializer.fieldnames = db_rqst_id,opc_tag_id,opc_qlty_val,opc_tag_val,opc_tag_val_ts,opc_src_stm_code,tag_data_src
flume1.sinks.hive-sink-3.hive.txnsPerBatchAsk = 2
flume1.sinks.hive-sink-3.batchSize = 100
flume1.sinks.hive-sink-3.hive.partition = %y-%m-%d
flume1.sinks.hive-sink-3.hive.txnsPerBatchAsk = 2
flume1.sinks.hive-sink-4.channel = hive-channel-4
flume1.sinks.hive-sink-4.type = hive
flume1.sinks.hive-sink-4.hive.metastore = thrift://base1.rolta.com:9083
flume1.sinks.hive-sink-4.hive.database = default
flume1.sinks.hive-sink-4.hive.table = oneviewtest1
flume1.sinks.hive-sink-4.serializer = DELIMITED
flume1.sinks.hive-sink-4.serializer.delimiter = "\\|"
flume1.sinks.hive-sink-4.serializer.fieldnames = db_rqst_id,opc_tag_id,opc_qlty_val,opc_tag_val,opc_tag_val_ts,opc_src_stm_code,tag_data_src
flume1.sinks.hive-sink-4.hive.txnsPerBatchAsk = 2
flume1.sinks.hive-sink-4.batchSize = 100
flume1.sinks.hive-sink-4.hive.partition = %y-%m-%d
flume1.sinks.hive-sink-4.hive.txnsPerBatchAsk = 2
flume1.sinks.hive-sink-5.channel = hive-channel-5
flume1.sinks.hive-sink-5.type = hive
flume1.sinks.hive-sink-5.hive.metastore = thrift://base1.rolta.com:9083
flume1.sinks.hive-sink-5.hive.database = default
flume1.sinks.hive-sink-5.hive.table = oneviewtest1
flume1.sinks.hive-sink-5.serializer = DELIMITED
flume1.sinks.hive-sink-5.serializer.delimiter = "\\|"
flume1.sinks.hive-sink-5.serializer.fieldnames = db_rqst_id,opc_tag_id,opc_qlty_val,opc_tag_val,opc_tag_val_ts,opc_src_stm_code,tag_data_src
flume1.sinks.hive-sink-5.hive.txnsPerBatchAsk = 2
flume1.sinks.hive-sink-5.batchSize = 100
flume1.sinks.hive-sink-5.hive.partition = %y-%m-%d
flume1.sinks.hive-sink-5.hive.txnsPerBatchAsk = 2
flume1.sinks.hive-sink-6.channel = hive-channel-6
flume1.sinks.hive-sink-6.type = hive
flume1.sinks.hive-sink-6.hive.metastore = thrift://base1.rolta.com:9083
flume1.sinks.hive-sink-6.hive.database = default
flume1.sinks.hive-sink-6.hive.table = oneviewtest1
flume1.sinks.hive-sink-6.serializer = DELIMITED
flume1.sinks.hive-sink-6.serializer.delimiter = "\\|"
flume1.sinks.hive-sink-6.serializer.fieldnames = db_rqst_id,opc_tag_id,opc_qlty_val,opc_tag_val,opc_tag_val_ts,opc_src_stm_code,tag_data_src
flume1.sinks.hive-sink-6.hive.txnsPerBatchAsk = 2
flume1.sinks.hive-sink-6.batchSize = 100
flume1.sinks.hive-sink-6.hive.partition = %y-%m-%d
flume1.sinks.hive-sink-6.hive.txnsPerBatchAsk = 2
flume1.sinks.hive-sink-7.channel = hive-channel-7
flume1.sinks.hive-sink-7.type = hive
flume1.sinks.hive-sink-7.hive.metastore = thrift://base1.rolta.com:9083
flume1.sinks.hive-sink-7.hive.database = default
flume1.sinks.hive-sink-7.hive.table = oneviewtest1
flume1.sinks.hive-sink-7.serializer = DELIMITED
flume1.sinks.hive-sink-7.serializer.delimiter = "\\|"
flume1.sinks.hive-sink-7.serializer.fieldnames = db_rqst_id,opc_tag_id,opc_qlty_val,opc_tag_val,opc_tag_val_ts,opc_src_stm_code,tag_data_src
flume1.sinks.hive-sink-7.hive.txnsPerBatchAsk = 2
flume1.sinks.hive-sink-7.batchSize = 100
flume1.sinks.hive-sink-7.hive.partition = %y-%m-%d
flume1.sinks.hive-sink-7.hive.txnsPerBatchAsk = 2
flume1.sinks.hive-sink-8.channel = hive-channel-8
flume1.sinks.hive-sink-8.type = hive
flume1.sinks.hive-sink-8.hive.metastore = thrift://base1.rolta.com:9083
flume1.sinks.hive-sink-8.hive.database = default
flume1.sinks.hive-sink-8.hive.table = oneviewtest1
flume1.sinks.hive-sink-8.serializer = DELIMITED
flume1.sinks.hive-sink-8.serializer.delimiter = "\\|"
flume1.sinks.hive-sink-8.serializer.fieldnames = db_rqst_id,opc_tag_id,opc_qlty_val,opc_tag_val,opc_tag_val_ts,opc_src_stm_code,tag_data_src
flume1.sinks.hive-sink-8.hive.txnsPerBatchAsk = 2
flume1.sinks.hive-sink-8.batchSize = 100
flume1.sinks.hive-sink-8.hive.partition = %y-%m-%d
flume1.sinks.hive-sink-8.hive.txnsPerBatchAsk = 2
flume1.sinks.hive-sink-9.channel = hive-channel-9
flume1.sinks.hive-sink-9.type = hive
flume1.sinks.hive-sink-9.hive.metastore = thrift://base1.rolta.com:9083
flume1.sinks.hive-sink-9.hive.database = default
flume1.sinks.hive-sink-9.hive.table = oneviewtest1
flume1.sinks.hive-sink-9.serializer = DELIMITED
flume1.sinks.hive-sink-9.serializer.delimiter = "\\|"
flume1.sinks.hive-sink-9.serializer.fieldnames = db_rqst_id,opc_tag_id,opc_qlty_val,opc_tag_val,opc_tag_val_ts,opc_src_stm_code,tag_data_src
flume1.sinks.hive-sink-9.hive.txnsPerBatchAsk = 2
flume1.sinks.hive-sink-9.batchSize = 100
flume1.sinks.hive-sink-9.hive.partition = %y-%m-%d
... View more
Labels:
10-13-2017
12:34 PM
Hi, I have done real time streaming into hive tables using storm. But now it takes long time to insert data into hive table. And the time lag for inserting data into table keeps on increasing. Below is my code, public class HiveSinker { private static final Logger LOG = LoggerFactory.getLogger(HiveSinker.class); public HiveBolt buildHiveBolt(String metaStoreURI, String dbName, String tblName, String[] colNames) {
LOG.info("start of HiveSinker.buildHiveBolt"); DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper().withColumnFields(new Fields(colNames)).withTimeAsPartitionField("YYYY/MM/DD");
//HiveOptions hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper).withTxnsPerBatch(10).withBatchSize(1000).withIdleTimeout(10).withCallTimeout(10000000); HiveOptions hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper).withTxnsPerBatch(2).withBatchSize(100).withIdleTimeout(10).withCallTimeout(10000);
HiveBolt hiveBolt = new HiveBolt(hiveOptions);
LOG.info("end of HiveSinker.buildHiveBolt");
return hiveBolt;
} } So I tried to change withTxnsPerBatch and withBatchSize but with no effect.So can anybody points out the issue. Thanks.
... View more
Labels:
09-28-2017
08:35 AM
Thanks, it worked
... View more
09-25-2017
06:37 AM
What I have observed that using DefaultFileNameFormat works fine for me. So can any body please tell me what should I do in my custom hdfs file name format so that it can also work without no lease file error. public class CustomHDFSFileNameFormat implements FileNameFormat{
private static final long serialVersionUID = 1L;
private String filename=null;
private String extension=null;
private String path=null;
public CustomHDFSFileNameFormat(String path,String filename,String extension){
this.filename = filename;
this.extension = extension;
this.path = path;
}
public String getName(long rotation, long timeStamp) {
return (this.filename+"."+this.extension);
}
public String getPath() {
return this.path;
}
public void prepare(Map map, TopologyContext context) {
// TODO Auto-generated method stub
}
}
... View more
09-25-2017
04:37 AM
Hi @ajoseph Thanks . But can you please tell me how could I set overwrite flag.
... View more
09-22-2017
01:36 PM
I am getting below error while working with Apache storm and HDFS. Caused by: org.apache.hadoop.ipc.RemoteException: No lease on /user/test/xxx/ccc/bbb/value_fct_archive.csv (inode 5425306): File does not exist. [Lease. Holder: DFSClient_NONMAPREDUCE_-247167534_42, pendingcreates: 1]
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3521)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.analyzeFileState(FSNamesystem.java:3324)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getNewBlockTargets(FSNamesystem.java:3162)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3122)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:843)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:500)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:640)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2313)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2309)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1724)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2307)
at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1552) ~[stormjar.jar:?]
at org.apache.hadoop.ipc.Client.call(Client.java:1496) ~[stormjar.jar:?]
at org.apache.hadoop.ipc.Client.call(Client.java:1396) ~[stormjar.jar:?]
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:233) ~[stormjar.jar:?]
at com.sun.proxy.$Proxy41.addBlock(Unknown Source) ~[?:?]
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:457) ~[stormjar.jar:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_77]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_77]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_77]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_77]
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:278) ~[stormjar.jar:?]
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:194) ~[stormjar.jar:?]
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:176) ~[stormjar.jar:?]
at com.sun.proxy.$Proxy43.addBlock(Unknown Source) ~[?:?]
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.locateFollowingBlock(DFSOutputStream.java:1489) ~[stormjar.jar:?]
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1284) ~[stormjar.jar:?]
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:463) ~[stormjar.jar:?] I tried to resolve it by many ways like increasing ulimit or setting parallelism hint to 1 etc. but nothing worked for me. My topology code is as below, public class Topology {
private static final Logger LOG = LoggerFactory.getLogger(Topology.class);
public static StormTopology buildTopology() {
TopologyBuilder builder = new TopologyBuilder();
// read properties and send to MQTT spout
PropertyReader propReader = new PropertyReader();
Properties prop = propReader.getProperties();
String mqttService = (propReader.getValue(prop, "mqttservice")).trim();
String[] devicesTopics = ((propReader.getValue(prop, "devices_topics.MQTT")).trim()).split(",");
for(String devicesTopicsId:devicesTopics){
LOG.info("devicesTopics Id is "+devicesTopicsId);
String[] devices = devicesTopicsId.split(":");
String deviceId = devices[0];
LOG.info("device Id is "+deviceId);
String mqtttopic = devices[1];
LOG.info("Topic Name is "+mqtttopic);
//start topology logic
String mqttreader="mqttreader-"+deviceId;
LOG.info("Mqtt reader is "+mqttreader);
String stream="stream-"+deviceId;
LOG.info("Stream Id is "+stream);
builder.setSpout(mqttreader, new MQTTReader(mqttService,mqtttopic,deviceId,stream));
//set mqttprocessor bolt
String mqttprocesser="mqttprocesser-"+deviceId;
LOG.info("Mqtt processor is "+mqttprocesser);
builder.setBolt(mqttprocesser, new MQTTProcesser(stream)).shuffleGrouping(mqttreader,stream);
//set dateretriver bolt
String dateretriver="dateretriver-"+deviceId;
LOG.info("date retriver is "+dateretriver);
builder.setBolt(dateretriver, new DateRetriver(stream)).shuffleGrouping(mqttprocesser,stream);
//set archival bolt from dateretriver
String archive="archive-"+deviceId;
LOG.info("archival is "+archive);
builder.setBolt(archive, new Archival(stream)).shuffleGrouping(dateretriver,stream);
//get hive bolt
MQTTHiveSinker sinker = new MQTTHiveSinker();
String metaStoreURI = (propReader.getValue(prop, "hiveurl")).trim();
String dbName = (propReader.getValue(prop, "dbname.MQTT")).trim();
String tblName = (propReader.getValue(prop, "tblname.MQTT")).trim();
String[] colNames = ((propReader.getValue(prop, "colNames.MQTT")).trim()).split(",");
LOG.info("colName0 is "+colNames[0]);
LOG.info("colName1 is "+colNames[1]);
LOG.info("colName2 is "+colNames[2]);
LOG.info("colName3 is "+colNames[3]);
HiveBolt hiveBolt = sinker.buildHiveBolt(metaStoreURI, dbName, tblName, colNames);
//set hive bolt
String hivebolt="hivebolt-"+deviceId;
LOG.info("Hivbolt is "+hivebolt);
builder.setBolt(hivebolt, hiveBolt).shuffleGrouping(archive);
//set hdfs bolt
MQTTHDFSSinker hdfssinker = new MQTTHDFSSinker();
String hdfsboltId="hdfsbolt-"+deviceId;
LOG.info("hdfsbolt is "+hdfsboltId);
HdfsBolt hdfbolt = hdfssinker.makeHDFSBolt();
builder.setBolt(hdfsboltId,hdfbolt).shuffleGrouping(archive);
}
return builder.createTopology();
}
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
Config conf = new Config();
conf.setDebug(true);
conf.setNumWorkers(2);
//conf.setMaxSpoutPending(1);
StormSubmitter.submitTopology(args[0], conf, buildTopology());
} And HDFS streaming code as below, public class MQTTHDFSSinker {
private static final Logger LOG = LoggerFactory.getLogger(MQTTHDFSSinker.class);
public HdfsBolt makeHDFSBolt(){
LOG.info("start of MQTTHDFSSinker.makeHDFSBolt");
RecordFormat format = new DelimitedRecordFormat().withFieldDelimiter(",");
// sync the filesystem after every 1k tuples
SyncPolicy syncPolicy = new CountSyncPolicy(1000);
// rotate files when they reach 5MB
FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB);
FileNameFormat fileNameFormat = new CustomHDFSFileNameFormat("/user/test/xxx/ccc/bbb/","value_fct_archive","csv");
HdfsBolt bolt = new HdfsBolt().withFsUrl("hdfs://namenode:8020").withFileNameFormat(fileNameFormat).withRecordFormat(format).withRotationPolicy(rotationPolicy).withSyncPolicy(syncPolicy);
LOG.info("end of MQTTHDFSSinker.makeHDFSBolt");
return bolt;
}
} Is it issue of multiple threads trying to write the file. Can anybody please help me for the error.
... View more
Labels:
08-30-2017
02:13 PM
I am facing issue in storm and hive streaming with HDP 2.5. I am getting error as, Org.apache.storm.hive.common.HiveWriter$ConnectFailure: Failed connecting to EndPoint {metaStoreUri='thrift://base1.rolta.com:9083', database='default', table='table_mqtt', partitionVals=[2017/08/242] }
at org.apache.storm.hive.common.HiveWriter.<init>(HiveWriter.java:80) ~[stormjar.jar:?]
at org.apache.storm.hive.common.HiveUtils.makeHiveWriter(HiveUtils.java:50) ~[stormjar.jar:?]
at org.apache.storm.hive.bolt.HiveBolt.getOrCreateWriter(HiveBolt.java:271) ~[stormjar.jar:?]
at org.apache.storm.hive.bolt.HiveBolt.execute(HiveBolt.java:114) [stormjar.jar:?]
at org.apache.storm.daemon.executor$fn__9364$tuple_action_fn__9366.invoke(executor.clj:734) [storm-core-1.0.1.2.5.3.0-37.jar:1.0.1.2.5.3.0-37]
at org.apache.storm.daemon.executor$mk_task_receiver$fn__9285.invoke(executor.clj:466) [storm-core-1.0.1.2.5.3.0-37.jar:1.0.1.2.5.3.0-37]
at org.apache.storm.disruptor$clojure_handler$reify__8798.onEvent(disruptor.clj:40) [storm-core-1.0.1.2.5.3.0-37.jar:1.0.1.2.5.3.0-37]
at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:451) [storm-core-1.0.1.2.5.3.0-37.jar:1.0.1.2.5.3.0-37]
at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:430) [storm-core-1.0.1.2.5.3.0-37.jar:1.0.1.2.5.3.0-37]
at org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:73) [storm-core-1.0.1.2.5.3.0-37.jar:1.0.1.2.5.3.0-37]
at org.apache.storm.daemon.executor$fn__9364$fn__9377$fn__9430.invoke(executor.clj:853) [storm-core-1.0.1.2.5.3.0-37.jar:1.0.1.2.5.3.0-37]
at org.apache.storm.util$async_loop$fn__656.invoke(util.clj:484) [storm-core-1.0.1.2.5.3.0-37.jar:1.0.1.2.5.3.0-37]
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_77]
Caused by: org.apache.storm.hive.common.HiveWriter$TxnBatchFailure: Failed acquiring Transaction Batch from EndPoint: {metaStoreUri='thrift://base1.rolta.com:9083', database='default', table='table_mqtt', partitionVals=[2017/08/242] }
at org.apache.storm.hive.common.HiveWriter.nextTxnBatch(HiveWriter.java:264) ~[stormjar.jar:?]
at org.apache.storm.hive.common.HiveWriter.<init>(HiveWriter.java:72) ~[stormjar.jar:?]
... 13 more
Caused by: org.apache.hive.hcatalog.streaming.TransactionError: Unable to acquire lock on {metaStoreUri='thrift://base1.rolta.com:9083', database='default', table='table_mqtt', partitionVals=[2017/08/242] }
at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.beginNextTransactionImpl(HiveEndPoint.java:575) ~[stormjar.jar:?]
at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.beginNextTransaction(HiveEndPoint.java:544) ~[stormjar.jar:?]
at org.apache.storm.hive.common.HiveWriter.nextTxnBatch(HiveWriter.java:259) ~[stormjar.jar:?]
at org.apache.storm.hive.common.HiveWriter.<init>(HiveWriter.java:72) ~[stormjar.jar:?]
... 13 more
Caused by: org.apache.thrift.transport.TTransportException
at org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132) ~[stormjar.jar:?]
at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84) ~[stormjar.jar:?]
at org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:378) ~[stormjar.jar:?]
at org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:297) ~[stormjar.jar:?]
at org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:204) ~[stormjar.jar:?]
at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:69) ~[stormjar.jar:?]
at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_lock(ThriftHiveMetastore.java:3781) ~[stormjar.jar:?]
at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.lock(ThriftHiveMetastore.java:3768) ~[stormjar.jar:?]
at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.lock(HiveMetaStoreClient.java:1736) ~[stormjar.jar:?]
at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.beginNextTransactionImpl(HiveEndPoint.java:570) ~[stormjar.jar:?]
at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.beginNextTransaction(HiveEndPoint.java:544) ~[stormjar.jar:?]
at org.apache.storm.hive.common.HiveWriter.nextTxnBatch(HiveWriter.java:259) ~[stormjar.jar:?]
at org.apache.storm.hive.common.HiveWriter.<init>(HiveWriter.java:72) ~[stormjar.jar:?]
... 13 more
I tried below, 1. Done coding as link http://storm.apache.org/releases/2.0.0-SNAPSHOT/storm-hive.html 2. Then changed hive-conf.xml on all nodes as link http://www.openkb.info/2015/06/hive-transaction-feature-in-hive-10.html But then also getting error. Below is my pom. <dependencies>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.9.9</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.0.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-hive</artifactId>
<version>1.0.1</version>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.3.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.2.0</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.2.0</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
Can anybody please help me with suggestions.
... View more
- Tags:
- hive-streaming
- Storm
Labels:
08-30-2017
12:39 PM
So I updated hive-site.xml with /tmp/mydir value for scratchdir property on namenode. And it kept showing me error for /tmp/hive. So I changed configuration file on all datanodes and error went (but another error came.) So, 1. Is it correct way to change property on all nodes. 2. Does that also suggest that in case hive property change it has to be changed for all thenodes. Can anybody please let me know.
... View more
08-30-2017
09:49 AM
Hi, I am having issue with storm and hive streaming. I did as post https://community.hortonworks.com/questions/96995/storm-hdfs-javalangruntimeexception-error-preparin.html but did not helped. I also looked at https://community.hortonworks.com/questions/111874/non-local-session-path-expected-to-be-non-null-try.html post. But not understood as which jar to be included. Can anybody please help me. Below is my POM. <dependencies>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.9.9</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.0.1</version>
<scope>provided</scope>
</dependency>
<dependency> <groupId>org.apache.storm</groupId>
<artifactId>storm-hive</artifactId>
<version>1.0.1</version>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.3.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.2.0</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion> </exclusions> </dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId> <version>2.2.0</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion> </exclusions> </dependency>
</dependencies> Below is my maven shade plugin <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId> </plugin> <plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.8</source> <target>1.8</target> </configuration>
</plugin> <!-- Maven shade plugin --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.4.3</version>
<configuration>
<createDependencyReducedPom>true</createDependencyReducedPom> </configuration> <executions>
<execution> <phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration> <!-- <artifactSet>
<excludes>
<exclude>org.slf4j:slf4j-log4j12:*</exclude>
<exclude>log4j:log4j:jar:</exclude>
<exclude>org.slf4j:slf4j-simple:jar</exclude>
<exclude>org.apache.storm:storm-core</exclude>
</excludes>
</artifactSet> -->
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass></mainClass> </transformer> </transformers>
</configuration> </execution>
</executions>
</plugin> </plugins>
</build> Please see below error. java.lang.RuntimeException: java.lang.RuntimeException: The root scratch dir: /tmp/hive on HDFS should be writable. Current permissions are: rwx------
at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:444) ~[stormjar.jar:?]
at org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.createPartitionIfNotExists(HiveEndPoint.java:358) ~[stormjar.jar:?]
at org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.<init>(HiveEndPoint.java:276) ~[stormjar.jar:?]
at org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.<init>(HiveEndPoint.java:243) ~[stormjar.jar:?]
at org.apache.hive.hcatalog.streaming.HiveEndPoint.newConnectionImpl(HiveEndPoint.java:180) ~[stormjar.jar:?]
at org.apache.hive.hcatalog.streaming.HiveEndPoint.newConnection(HiveEndPoint.java:157) ~[stormjar.jar:?]
at org.apache.storm.hive.common.HiveWriter$5.call(HiveWriter.java:238) ~[stormjar.jar:?]
at org.apache.storm.hive.common.HiveWriter$5.call(HiveWriter.java:235) ~[stormjar.jar:?]
at org.apache.storm.hive.common.HiveWriter$9.call(HiveWriter.java:366) ~[stormjar.jar:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_77]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[?:1.8.0_77]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[?:1.8.0_77]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_77]
Caused by: java.lang.RuntimeException: The root scratch dir: /tmp/hive on HDFS should be writable. Current permissions are: rwx------
at org.apache.hadoop.hive.ql.session.SessionState.createRootHDFSDir(SessionState.java:529) ~[stormjar.jar:?]
at org.apache.hadoop.hive.ql.session.SessionState.createSessionDirs(SessionState.java:478) ~[stormjar.jar:?]
at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:430) ~[stormjar.jar:?]
... 12 more
2017-08-30 14:51:58.022 o.a.s.d.executor [INFO] BOLT fail TASK: 3 TIME: TUPLE: source: mqttprocesser:4, stream: default, id: {}, [My count is, 11]
2017-08-30 14:51:58.023 o.a.s.d.executor [INFO] Execute done TUPLE source: mqttprocesser:4, stream: default, id: {}, [My count is, 11] TASK: 3 DELTA:
2017-08-30 14:51:58.023 o.a.s.d.executor [INFO] Processing received message FOR 3 TUPLE: source: mqttprocesser:4, stream: default, id: {}, [My count is, 12]
2017-08-30 14:51:58.050 h.metastore [INFO] Trying to connect to metastore with URI thrift://base1.rolta.com:9083
2017-08-30 14:51:58.052 h.metastore [INFO] Connected to metastore.
2017-08-30 14:51:58.124 o.a.h.h.q.l.PerfLogger [INFO] <PERFLOG method=Driver.run from=org.apache.hadoop.hive.ql.Driver>
2017-08-30 14:51:58.124 o.a.h.h.q.l.PerfLogger [INFO] <PERFLOG method=TimeToSubmit from=org.apache.hadoop.hive.ql.Driver>
2017-08-30 14:51:58.124 o.a.h.h.q.l.PerfLogger [INFO] <PERFLOG method=compile from=org.apache.hadoop.hive.ql.Driver>
2017-08-30 14:51:58.202 STDIO [ERROR] FAILED: NullPointerException Non-local session path expected to be non-null
2017-08-30 14:51:58.202 o.a.h.h.q.Driver [ERROR] FAILED: NullPointerException Non-local session path expected to be non-null
java.lang.NullPointerException: Non-local session path expected to be non-null
at com.google.common.base.Preconditions.checkNotNull(Preconditions.java:204)
at org.apache.hadoop.hive.ql.session.SessionState.getHDFSSessionPath(SessionState.java:590)
at org.apache.hadoop.hive.ql.Context.<init>(Context.java:129)
at org.apache.hadoop.hive.ql.Context.<init>(Context.java:116)
at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:382)
at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:303)
at org.apache.hadoop.hive.ql.Driver.compileInternal(Driver.java:1067)
at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1129)
at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1004)
at org.apache.hadoop.hive.ql.Driver.run(Driver.java:994)
at org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.runDDL(HiveEndPoint.java:404)
at org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.createPartitionIfNotExists(HiveEndPoint.java:369)
at org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.<init>(HiveEndPoint.java:276)
at org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.<init>(HiveEndPoint.java:243)
at org.apache.hive.hcatalog.streaming.HiveEndPoint.newConnectionImpl(HiveEndPoint.java:180)
at org.apache.hive.hcatalog.streaming.HiveEndPoint.newConnection(HiveEndPoint.java:157)
at org.apache.storm.hive.common.HiveWriter$5.call(HiveWriter.java:238)
at org.apache.storm.hive.common.HiveWriter$5.call(HiveWriter.java:235)
at org.apache.storm.hive.common.HiveWriter$9.call(HiveWriter.java:366)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
... View more
Labels:
08-09-2017
11:45 AM
So use case here, I am getting tweets in storm and saving it in Solr after NLP operation which is sentence detetction . But same sentences are getting stored. So can any body please help me on it
... View more
08-09-2017
09:01 AM
Hi All, So I am able to save data in Solr using Apache Storm. But it seems that same value is getting saved multiple times. Now i am going through links on it but can anybody please help me to know what ia the best way to save data using Apache Storm and overcome this issue. I am using shufflegrouping. Can anybody please help.
... View more
Labels:
08-08-2017
09:39 AM
thanks 🙂 It worked. I am using twitter4j 4.0.4 now.
... View more
08-07-2017
11:57 AM
@Jay SenSharma Thanks Jay for your response. I will try your suggestion. Mean while my POM is as below, <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.rolta</groupId> <artifactId>stormtwitter</artifactId> <version>0.0.1-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.7</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.7</version> </dependency> <dependency> <groupId>joda-time</groupId> <artifactId>joda-time</artifactId> <version>2.9.9</version> </dependency> <dependency> <groupId>org.twitter4j</groupId> <artifactId>twitter4j-core</artifactId> <version>4.0.6</version> </dependency> <dependency> <groupId>org.twitter4j</groupId> <artifactId>twitter4j-stream</artifactId> <version>4.0.6</version> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>1.0.1</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.opennlp</groupId> <artifactId>opennlp-tools</artifactId> <version>1.8.1</version> </dependency> <dependency> <groupId>org.apache.solr</groupId> <artifactId>solr-solrj</artifactId> <version>6.6.0</version> </dependency> <dependency> <groupId>com.jolira</groupId> <artifactId>onejar-maven-plugin</artifactId> <version>1.4.4</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-source-plugin</artifactId> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.4.3</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <artifactSet> <excludes> <exclude>org.slf4j:slf4j-log4j12:*</exclude> <exclude>log4j:log4j:jar:</exclude> <exclude>org.slf4j:slf4j-simple:jar</exclude> <exclude>org.apache.storm:storm-core</exclude> </excludes> </artifactSet> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>com.rolta.storm.topology.Topology</mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build> </project>
... View more
08-07-2017
11:13 AM
Hi, I am having HDP 2.5 and storm core version as storm-core-1.0.1.2.5.3.0-37.jar. My code was working with Local Cluster but its failing when using StormSubmitter.submitTopology. It gives me InvalidClassException. Below is complete stack. One more observation that jar gives error while executing first time and run afterwards. I searched and implemented as below, but nothing worked. Please help me. java.lang.RuntimeException: java.io.InvalidClassException: twitter4j.MediaEntityJSONImpl; local class incompatible: stream classdesc serialVersionUID = 1571961225214439778, local class serialVersionUID = 3609683338035442290 at org.apache.storm.serialization.SerializableSerializer.read(SerializableSerializer.java:58) ~[storm-core-1.0.1.2.5.3.0-37.jar:1.0.1.2.5.3.0-37] 1. added serialVersionUID with value 1571961225214439778L, 3609683338035442290, 1L but nothing worked 2. Implemented Serializable, Externalizable in my POJOs 3. Since I am using storm-core-1.0.1.2.5.3.0-37.jar & HDP 2.5 so added maven dependency in pom with scope as provided which matches it, but did not worked. 4. Added no-arg constructors.
... View more
Labels: