Support Questions

Find answers, ask questions, and share your expertise

Split Attributes and pass into different Kafka Topics

avatar
Contributor

I have a file with 36 numeric data fields. Like CPU usage, memory usage, date time, IP address etc. which looks like--
10.8.x.y, 151490..., 45.00, 95.00, 8979.09, 3984.90, ... (36 fields)

Now there is a stream (GetFile --> Publish_kafka) which is sent to a single kafka topic (Publish Kafka 0_11)

I want to be able to now split this stream into 4 and 30 field two streams and then into two different kafka topics.

That is:

to kafka topic 1;;;

10.8.x.y, 151490..., 45.00,95.00

And to kafka topic 2;;;

8979.09, 3984.90, ... etc. (30 fields)

How do I do that? Split text just sems to split into line count.

1 ACCEPTED SOLUTION

avatar
Master Guru
@Merlin Sundar

One way of doing this is by using publish kafka record processor which can read the incoming flowfile data and writes the required fileds into topic.

Fork the success relation from GetFile processor and then use two Publish Kafka Record processors to Publish messages into Kafka topic1 and kafka topic2.

Flow:-

68466-kafka.png

First PublishKafkaRecord processor configure the Record reader as CsvReader to read your incoming file(with 36 fields) and Record Writer as CsvSetWriter to write only to required four fields to KafkaTopic1.

Second PublishKafkaRecord processor configure the Record reader as Csv Reader same as above to read your 36 fields and Record Writer as CsvSetWriter to write only to required thirty fields to KafkaTopic2.

By using this way we are splitting the fields and publishing them to the desired kafka topics.

(or)

if you want to publish all the fields into one topic using publishkafka_0_11 processor then want to split into two, then

Use ConsumeKafka Processor to consume the 36 fileds message and use two PublishKafkaRecord_0_11 processors with Same CsvReader as record reader and different CSVSetWriters with 4fields and 30fileds.

Now we are Consuming the messages and writing them to two different kafka topics that are configured with different csvsetwriter controller services.

Flow:-

68468-publishkafka-approach1.png

(or)

if you want to publish all the fields into one topic using publishkafka_0_11 processor then want to split into two, then

Once you publish the messages to Kafka topic by using Publish Kafka 0_11 processor then use
Consume kafka processor to consume the published kafka messages from the topic.

Then use Two Convert record processors in parallel with same csv reader controller service and two different Csv Set Writer controller services because we need to write 4 fields to kafka1 topic and 30 fields to kafka2 topic.

then use two PublishKafka_0_11 processors in parallel to publish the prepared messages from the two convertrecord processors.

Flow:-

68467-publishkafka-approach2.png

As you can choose which is the best fit for your case from the above three approaches.

Please refer to below links to configure/use the Record Reader and Record writer properties

https://community.hortonworks.com/articles/115311/convert-csv-to-json-avro-xml-using-convertrecord-p...

View solution in original post

2 REPLIES 2

avatar
Master Guru
@Merlin Sundar

One way of doing this is by using publish kafka record processor which can read the incoming flowfile data and writes the required fileds into topic.

Fork the success relation from GetFile processor and then use two Publish Kafka Record processors to Publish messages into Kafka topic1 and kafka topic2.

Flow:-

68466-kafka.png

First PublishKafkaRecord processor configure the Record reader as CsvReader to read your incoming file(with 36 fields) and Record Writer as CsvSetWriter to write only to required four fields to KafkaTopic1.

Second PublishKafkaRecord processor configure the Record reader as Csv Reader same as above to read your 36 fields and Record Writer as CsvSetWriter to write only to required thirty fields to KafkaTopic2.

By using this way we are splitting the fields and publishing them to the desired kafka topics.

(or)

if you want to publish all the fields into one topic using publishkafka_0_11 processor then want to split into two, then

Use ConsumeKafka Processor to consume the 36 fileds message and use two PublishKafkaRecord_0_11 processors with Same CsvReader as record reader and different CSVSetWriters with 4fields and 30fileds.

Now we are Consuming the messages and writing them to two different kafka topics that are configured with different csvsetwriter controller services.

Flow:-

68468-publishkafka-approach1.png

(or)

if you want to publish all the fields into one topic using publishkafka_0_11 processor then want to split into two, then

Once you publish the messages to Kafka topic by using Publish Kafka 0_11 processor then use
Consume kafka processor to consume the published kafka messages from the topic.

Then use Two Convert record processors in parallel with same csv reader controller service and two different Csv Set Writer controller services because we need to write 4 fields to kafka1 topic and 30 fields to kafka2 topic.

then use two PublishKafka_0_11 processors in parallel to publish the prepared messages from the two convertrecord processors.

Flow:-

68467-publishkafka-approach2.png

As you can choose which is the best fit for your case from the above three approaches.

Please refer to below links to configure/use the Record Reader and Record writer properties

https://community.hortonworks.com/articles/115311/convert-csv-to-json-avro-xml-using-convertrecord-p...

avatar
Contributor

I too tried something in the meanwhile. Here is the screenshot of the flow. In this flow I simple split them using regular expression and then extracted what was needed using the success connectors. Will surely try one of the methods mentioned by you as well and get back here. @Shu

68478-screenshot-from-2018-04-16-101520.png