Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Is it possible to Store kafka message offset inside json message as a key-value using getkafka processor?

Is it possible to Store kafka message offset inside json message as a key-value using getkafka processor?

Contributor

With GetKafka processor for 0.8 kafka consumer its write attributes are kafka.topic, kafka.offset & so on. Iam planning to write the json events consumed from kafka topic to S3. i can save the kafka.offset as part of filename in s3 but i was wondering if its possible to create a new key-value in json message itself before storing it to s3 like key=offset, value = 1111 i.e. offset=1111 inside the metadata in the kafka message?

thank you

5 REPLIES 5
Highlighted

Re: Is it possible to Store kafka message offset inside json message as a key-value using getkafka processor?

The GetKafka processor is not specific to a type of data... someone could receive JSON, Avro, plain text, anything, so knowing how to modify the message would be very specific to the type of data. You would want to do this after GetKafka with some follow on processors. Unfortunately right now there isn't a simple way to add a field to a JSON document, something like "AddAttributesToJson".

A couple of ways to do this...

1) Use ExecuteScript right after GetKafka, and write a Groovy script that read in the JSON and adds a new field and then writes it back out.

2) Use EvaluateJsonPath to extract all of the fields from the JSON coming out of GetKafka, and then use AttributesToJson to write a new JSON document including the offset attribute. This is a bit inefficient and might be kind of annoying if you have a lot of fields in your JSON document coming from Kafka.

3) Write a simple custom processor that reads in JSON, and has a property for attribute names to add.

Highlighted

Re: Is it possible to Store kafka message offset inside json message as a key-value using getkafka processor?

Contributor

@Bryan Bende Thanks for the answer. Currently iam planning to add kafka offset to the json message content. kafka.offset is output of getkafka processor along with kafka.topic.

Highlighted

Re: Is it possible to Store kafka message offset inside json message as a key-value using getkafka processor?

I think creating an AddAttributesToJson processor would be a good addition to the standard processors. We have AttributesToJson which creates a new JSON document from attributes, so AddAttributesToJson would require the incoming flow file to already be JSON, and it would have a property for attribute names to add to the JSON which is where the user would specify kafka.offset. If you are interested in working on this and contributing it back, we would be happy to guide the process.

Highlighted

Re: Is it possible to Store kafka message offset inside json message as a key-value using getkafka processor?

Contributor

@Bryan Bende sure will look into creating a custom processor. I was looking into the code and found one of the comment misleading. Please correct me if iam wrong. AttributesToJson says "If list of attributes specified get only those attributes. Otherwise write them all." If you hover on top of ? in the processor it says if you leave empty all the json attributes will be emitted. So when i did GetKafka->EvaluateJsonPath->UpdateAttribute(i added new property i.e. kafka offset here)->AttributesToJson. i expected Kafka.offset in my final json whereas it just emitted kafka.offset,kafka.topic,kafka.partition etc but it didnt emit the original json message along with offset?

* Builds the Map of attributes that should be included in the JSON that is emitted from this process.

@return

* Map of values that are feed to a Jackson ObjectMapper

*/

protected Map<String, String> buildAttributesMapForFlowFile(FlowFile ff, String atrList,

boolean includeCoreAttributes,

boolean nullValForEmptyString) {

Map<String, String> atsToWrite = new HashMap<>();

//If list of attributes specified get only those attributes. Otherwise write them all

Highlighted

Re: Is it possible to Store kafka message offset inside json message as a key-value using getkafka processor?

AttributesToJson takes FlowFile attributes and creates a flat JSON document from them, and overwrites the content of the flow file with that JSON... so its not going to include the original JSON because it doesn't even know the content was originally JSON, it could have been anything.

If you have a flow file with attributes kafka.offset=1 and kafka.topic=foo and the content is { "field1" : "value1" }, then after AttributesToJson the content will be { "kafka.offset" : "1", "kakfa.topic" : "foo" }.

If you wanted to include "field1" then you would have to first use EvaluateJsonPath to get field1=value1 into a flow file attribute, the it would be available for AttributesToJson to include.

The property to specify the attribute names is so when you might have tons of attributes and want to produce a JSON document with only a couple of them.

Don't have an account?
Coming from Hortonworks? Activate your account here