Community Articles
Find and share helpful community-sourced technical articles.
Labels (2)
Rising Star


Learn how to consume real-time data from the Satori RTM platform using NiFi.


Satori is a cloud-based live platform that provides a publish-subscribe messaging service called RTM, and also makes available a set of free real-time data feeds as part of their Open Data Channels initiative:

This article steps through how to consume from Satori's Open Data Channels in NiFi, using a custom NiFi processor.
Note - the article assumes you already have a working version of NiFi up and running.

Link to code on github:

Installing the custom processor

To create the required nar file, simply clone and build the following repo with maven:

git clone
cd nifi-satori-bundle
mvn clean install

This will make the following .nar file under the nifi-satori-bundle-nar/target/ directory:


Copy this file into the lib directory of your NiFi instance. If using HDF, it exists at:


Restart NiFi for the nar to be loaded.

Using the ConsumeSatoriRtm processor

The ConsumeSatoriRtm accepts the following configurations:


At a minimum, you will just need to provide the following configurations (which you can get directly from the satori open channels site):

  • Endpoint
  • Appkey
  • Channel

In this example, I've chosen to consume from the 'big-rss' feed using the configurations provided here:

That's it! after starting the ConsumeSatoriRtm process you will see data flowing:


Additional Features

  • The processor also supports using Satori's Streamview filters, which allow you to provide SQL-like queries to select, transform, or aggregate messages from a subscribed channel:
    • In the 'big-rss' example above, the following filter configuration would limit the stream to messages containing the word "jobs".
    select * from `big-rss` where feedURL like '%jobs%'
  • The NiFi processor also supports batching of multiple messages into a single FlowFile, which will provide a new-line delimited list of messages in each file (based on a 'minimum batch size' configuration):