Member since
07-07-2017
16
Posts
0
Kudos Received
0
Solutions
09-22-2018
12:38 PM
I am trying to consume from the __consumer_offsets topics to obtain the offsets for each consumer group from a topic to use to calculate total lag. I was told to obtain the logendoffset from the jvm using jmx and the different between this value and offset would give me the total lag. I am having difficulty implementing this. I keep getting an error trying to read the consum_offsets topic and in order to decode the content I opted for a different route as the normal consumer I have does not decode the content. I am a little unfamiliar with formatting on this site as each time I include a code box, I cannot then post normal text beneath it, so please excuse the format here: Current code tried and error received: try {
ByteBuffer bb = ByteBuffer.allocate(1056);
while (true) {
ConsumerRecords<byte[], byte[]> records = kconsumer.poll(100);
for (ConsumerRecord<byte[], byte[]> record : records)
if(record != null){
counter += 1;
bb = ByteBuffer.wrap(record.key());
short version = bb.getShort();
String group = (String)Type.STRING.read(bb);
String topic = (String)Type.STRING.read(bb);
int partition = bb.getInt();
long offset = (long)(record.offset());
The error occurs on the following line: String topic = (String)Type.STRING.read(bb); java.nio.BufferUnderflowException
at java.nio.Buffer.nextGetIndex(Unknown Source)
at java.nio.HeapByteBuffer.getShort(Unknown Source)
at org.apache.kafka.common.protocol.types.Type$7.read(Type.java:269)
at org.apache.kafka.common.protocol.types.Type$7.read(Type.java:257)
Can anyone suggest an easier way to go about this? I found a couple tools created by others but none were compatible with the latest kafka version. I am looking into burrow now but would ideally be expected to create my own.
... View more
Labels:
- Labels:
-
Apache Kafka
08-08-2018
02:32 PM
I am trying to run an ExecuteScript that should be taking in a json message and parsing the fields for further processing but I cannot get this to work. I ran a tester for ExecuteScript created by this author: https://funnifi.blogspot.com/2016/06/testing-executescript-processor-scripts.html And it returned the following error: cannot create an instance from the abstract interface org.apache.nifi.processor.io.StreamCallback I do not see the error in the script so could someone please assist with this: flowFile = session.get();
if (flowFile != null) {
var StreamCallback = Java.type("org.apache.nifi.processor.io.StreamCallback");
var IOUtils = Java.type("org.apache.commons.io.IOUtils");
var StandardCharsets = Java.type("java.nio.charset.StandardCharsets");
var transformed_message = {};
var error = false;
var line = "ops_track";
// Get attributes
flowFile = session.write(flowFile, new StreamCallback(function (inputStream, outputStream) {
// Read input FlowFile content
var content = IOUtils.toString(inputStream, StandardCharsets.UTF_8); // message or content
var message_content = {};
try {
message_content = JSON.parse(content);
transformed_message.postID = (((message_content || {}).postID || "null"));
transformed_message.contentType = (((message_content || {}).contentType || "null"));
transformed_message.published = (((message_content || {}).published || "null"));
transformed_message.crawled = (((message_content || {}).crawled || "null"));
transformed_message.providerID = (((message_content || {}).providerID || "null"));
line = line + " " + "postID=" + transformed_message.postID + ","
+ "contentType=" + transformed_message.contentType + ","
+ "published=" + transformed_message.published + ","
+ "crawled=" + transformed_message.crawled + ","
+ "providerID=" + transformed_message.providerID + ","
+ " value=" + "1" + " "
+ time * 1000000 + "\n";
// Write output content
if (transformed_message) {
outputStream.write(line.getBytes(StandardCharsets.UTF_8));
}
} catch (e) {
error = true;
outputStream.write(content.getBytes(StandardCharsets.UTF_8));
}
}));
if (error) {
session.transfer(flowFile, REL_FAILURE)
} else {
session.transfer(flowFile, REL_SUCCESS)
}
}
... View more
Labels:
- Labels:
-
Apache NiFi
07-24-2018
05:40 PM
Sorry I need to update this question. I resolved the issue. I am using ConsumeKafka_1_0 and the parsing issue got resolved but now a kafka related error being thrown
... View more
07-23-2018
06:43 PM
I am trying to read data from a kafka topic that consists of json messages with about 4 fields. The ExecutionProcess has the following script and it returns the following error: Failed to insert into influxDB due to {"error":"unable to parse 'ops_inbound, messageType=[object Object], if (flowFile != null) {
var StreamCallback = Java.type("org.apache.nifi.processor.io.StreamCallback");
var IOUtils = Java.type("org.apache.commons.io.IOUtils");
var StandardCharsets = Java.type("java.nio.charset.StandardCharsets");
var transformed_message = {};
var error = false;
var line = "ops_inbound";
// Get attributes
flowFile = session.write(flowFile, new StreamCallback(function (inputStream, outputStream) {
// Read input FlowFile content
var content = IOUtils.toString(inputStream, StandardCharsets.UTF_8); // message or content
var message_content = {};
try {
message_content = JSON.parse(content);
transformed_message.type = (((message_content || {}).message_type || {}));
transformed_message.post_state = (((message_content || {}).message_body || {}).type || "NULL");
line = line + ",messageType=json" +
", postID=NULL" + ",contentType=NULL" + ",published=NULL" + ",crawled=NULL" + ",custID=NULL" +
" value=" + "1"
+ " " + Date.now() * 1000000;
// Write output content
if (transformed_message) {
outputStream.write(line.getBytes(StandardCharsets.UTF_8));
}
} catch (e) {
error = true;
outputStream.write(content.getBytes(StandardCharsets.UTF_8));
}
}));
if (transformed_message.post_state) {
flowFile = session.putAttribute(flowFile, "type", transformed_message.type);
}
if (error) {
session.transfer(flowFile, REL_FAILURE)
} else {
session.transfer(flowFile, REL_SUCCESS)
}
}
... View more
Labels:
- Labels:
-
Apache NiFi
07-10-2017
12:29 PM
I'm sorry to keep on persisting with this; but need to get clear confirmation; I need to do the language bindings for the installed thrift on the hbase server if it is not already done and then have a separate installation of thrift on the client server as well? I had an error trying to run the thrift command on the hbase server which was -bash command thrift not found. Then searched on this and found I need to install a compiler for thrift.
... View more
07-05-2017
04:49 PM
Yes, I know that the REST API can be used. But the team are convinced that they would get better response using THRIFT. REST and THRIFT are both running on the same server with the HBASE installation. Just wanted to make sure that this setup is fine. Thanks
... View more
07-05-2017
03:43 PM
We have an HBASE installation and after working with both Java and the REST API successfully, I need to create a way for others to link to HBASE with other applications, one being in python. I found the cloudera tutorial and was not sure about the requirements of having separate thrift installations on the client machine running the python scripts. I was able to start thrift as it must have been installed with the HBASE installation. Do I really need to have a second instance running? 'For both Thrift and REST to work, another HBase daemon needs to be running to handle these requests. These daemons can be installed with the hbase-thrift and hbase-rest packages. The diagram below shows how Thrift and REST are placed in the cluster.'
... View more
Labels:
- Labels:
-
Apache HBase
06-28-2017
06:23 AM
I think I still tend to look at HBASE as SQL databases and to clarify my understanding I was hoping someone could explain how a rowkey is found in such optimal return performance.For example I have a table that stores requests to an API rest endpoint in the form of ipaddress, url and a few other columns. Using the ipaddress as the unique identifier (rowkey) how does the matching occur? Is it a direct string match lookup, left to right?
... View more
Labels:
- Labels:
-
Apache HBase
06-08-2017
07:08 PM
Thanks Josh. I managed to figure how to get this working but realized that you cannot query with a GET for multiple columns ( at least not aware of a way to do it). The PUT so far does not work for me either. The java library and API works well so not sure why making requests through the API with nodejs is so difficult. But thanks again.
... View more
06-07-2017
08:37 PM
Thank you so much for responding. It's working now and I hope you could answer one more question for me please: to connect to hbase using nodejs, is there a client library that needs to be downloaded or could I just create a script that will be able to make a GET request for a specific record?
... View more
06-07-2017
01:11 PM
We have hbase setup with application integrating using java. I need to try and setup a way to have nodejs scripts interact with the tables and finding it hard to get workable resources on how to do this. I found the rest/thrift tutorials but need to test a simple way first before getting any new software installed. 1 - rest api start and test fails hbase(main):005:0> bin/hbase-daemon.sh start rest <a href="http://1xx.xxx.xxx.xx:8080/version">http://1xx.xxx.xxx.xx:8080/version</a> {
"status": 404,
"message": "Not Found"
}
How do I activate the rest API and then also could someone provide resources on how to integrate with hbase. My understanding from the tutorials, is setting up thrift on the same hbase server where it interacts with the REST API. The nodejs scripts however are run on the same machine which does not help me. Thanks in advance for any suggestions. EDIT: Tried the following based on another site: # hbase rest start 2017-06-07 07:40:32,341 INFO [main] util.VersionInfo: HBase 1.1.2.2.3.4.0-3485
2017-06-07 07:40:32,342 INFO [main] util.VersionInfo: Source code repository ..... org.apache.hadoop.hbase.http.HttpServer.openListeners(HttpServer.java:1009)
... 3 more
Exception in thread "main" java.net.BindException: Port in use: 0.0.0.0:8085
at org.apache.hadoop.hbase.http.HttpServer.openListeners(HttpServer.java:1014)
at org.apache.hadoop.hbase.http.HttpServer.start(HttpServer.java:950)
at org.apache.hadoop.hbase.http.InfoServer.start(InfoServer.java:90)
at org.apache.hadoop.hbase.rest.RESTServer.main(RESTServer.java:248)
Caused by: java.net.BindException: Address already in use
at sun.nio.ch.Net.bind0(Native Method)
at sun.nio.ch.Net.bind(Net.java:437)
at sun.nio.ch.Net.bind(Net.java:429)
at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:223)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)
at org.mortbay.jetty.nio.SelectChannelConnector.open(SelectChannelConnector.java:216)
at org.apache.hadoop.hbase.http.HttpServer.openListeners(HttpServer.java:1009)
... 3 more
2017-06-07 07:40:33,437 INFO [Shutdown] mortbay.log: Shutdown hook executing
2017-06-07 07:40:33,438 INFO [Shutdown] mortbay.log: Shutdown hook complete
[
... View more
Labels:
- Labels:
-
Apache HBase