Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

How to get only unique data from flow files

avatar
Contributor

Hello Team,

I am trying to create a data flow for live data streaming of twitter using Nifi. But while I try to run my flow it gives me duplicates (say one tweet two or more times). I have attached the screen shot and template of flow.

Can you please help me out with any expression which i can put in the flow to remove the duplicates. I don't want to use DeleteDuplicate processor because it affect the performance by taking time in cache creation.

(In flow I am formatting the tweets and used repalcetext to make it formatted) @Matt Burgess

-Thank you1.jpg2.jpg

1 ACCEPTED SOLUTION

avatar
Super Mentor

@Yogesh Sharma

Is your NiFi a cluster or Standalone instance of NiFi? If it is a cluster, it could explain why you are seeing duplicates since the same GetTwitter processor would be running on every Node.

Matt

View solution in original post

9 REPLIES 9

avatar

Hello,

I am a bit surprised you receive duplicate tweets. Do you know why? How is configured your GetTwitter processor?

avatar
Super Mentor

@Yogesh Sharma

Is your NiFi a cluster or Standalone instance of NiFi? If it is a cluster, it could explain why you are seeing duplicates since the same GetTwitter processor would be running on every Node.

Matt

avatar
Contributor

@mclark Can you please elaborate it ?

avatar
Super Mentor

With a NiFi cluster, every node in that cluster runs the exact same dataflow. Some data ingest type processors are not ideally suited for this as they may complete or pull the same data in to each cluster node. In cases like this it is better to set the scheduling strategy on these processor to "On primary Node" so that the processor only runs on one node (primary node). You can then use dataflow design strategies like RPGs (NiFi Site-to-Site) to redistribute the received data across all your NiFi cluster nodes for processing.

avatar
Contributor

Thanks for Respond. I tried this but within a single node I am getting duplicate data. Do we have any expression so that I can use it to remove duplicate data.

Can you check the data flow which i attached.

avatar
Super Mentor

The attached images do not really show us your complete configuration. Can you generate a template of your flow through the NiFi UI and share that? You create a template by highlighting/selecting all components you want to include in your template and then click on the "create template" icon 6417-screen-shot-2016-08-05-at-104825-am.png in the upper center of the UI. After the template has been created you can export it out of your NiFi from the template management UI icon 6418-screen-shot-2016-08-05-at-105111-am.png (upper right corner of UI). Then attach that exported xml template here.

avatar
Contributor

Thanks @mclark . I am attaching a template of a flow which extract earthquake data from US government site. But getting duplicate data as output.eqdataus.xml

avatar
Super Mentor

@Yogesh Sharma

You are seeing duplicate data because the run schedule on your invokeHTTP processor is set to 1 sec and the data you are pulling is not updated that often. You can build in to your flow the ability to detect duplicates (even across a NiFi cluster). In order to do this you will need the following things setup:

1. DistributedMapCacheServer (Add this controller service to "Cluster Manager" if clustered. If standalone it still needs to be added. This is configured with a listening port)

2. DistributedMap CacheClientService (Add this controller service to "Node" if clustered. If standalone it still needs to be added. This is configured with teh FQDN of the NCM running the above Cache Server.)

3. Start the above controller services.

4. Add a HashContent and DetectDuplicate processors to your flow between your invokeHTTP processor and the SplitJson processors. I have attached a modified version of your template.

eqdataus-detectduplicates.xml

If you still see duplicates, adjust the configured age off duration in the DetectDuplicate processor.

Thanks,

Matt

avatar
Contributor
@mclark

Thanks for the response and appreciated. Do I need to configure something at back-end as well i.e. in nifi.properties or any other file in cluster or node because I am facing attached error.

6681-error1.jpg