Created on 08-09-2016 11:07 AM - edited 08-18-2019 03:45 AM
Hortonworks post: Hi!
I wonder about the status of the ConsumeAMQP processor for NiFi since I am interested in consuming a live RabbitMQ stream.
This stream I am currently trying to consume is a live stream generated by a remote server. I am able to for example use a connector written as a Scala script to consume the stream. In the connector I configure some parameters:
factory.setUsername
factory.setPassword
factory.setVirtualHost
factory.setHost
factory.setPort
Following the channel, queue and binding declarations:
channel.exchangeDeclare
channel.queueDeclare
channel.queueBind
What I want to do is to be able to do is to set up NiFi in my virtual HDP 2.4 environment to consume the data stream and push it into HDFS, for example. NiFi's processor seems to be easy to configure. Here is a screenshot of the ConsumeAMQP processor's configuration:
Once I introduce the parameters needed and start the processor I get the following errors:
I am a little confused when it comes to how to configure the Queue correctly, since I am not able to introduce the channel nor bind it to the Queue. The issue is not a blocked firewall since I am using the same terminal I use to Connect to the Rabbit stream via an Scala script.
I have been looking around for some tutorial or documentation page for hortonworks and RabbitMQ wihtout success.
Any help / tips / pointers and comments could be of great help!
Created 08-09-2016 12:23 PM
This could be due to auto-delete settings for queues, bindings and exchanges. I'd suggest to talk to your administrators or install a separate version of Rabbit that you can fully control.
Created 08-09-2016 11:12 AM
Is queue "research_test_ defined? Remember that AMQP has a fundamentally different model then your typical messaging. Publisher publishes message NOT on the queue but to the "exchange" from which it gets routed to the queue(s) based on the bindings. So those have to be setup ahead of time.
So I am essentially trying to figure out what help are you looking for specifically? Setting up queues, exchanges and bindings is usually the administrative task in Rabbit. Is that the help you need? Getting to the right documentation?
Created 08-09-2016 11:24 AM
Hi @ozhurakousky, thanks for Your reply! I don't have access to the Publisher side however, in my scala Connector I am able to declare the "exchange", "queue" and "binding" and then connect to the messaging stream so I think it is safe to assume that they are set up.
I wonder on how to pass the same kind of declarations as in my Scala Connector for "exchange", "queue" and "binding" but in this case within NiFi.
Created 08-09-2016 11:18 AM
This should be quite straightforward. If your processor is correctly configured with the same properties as you do in your script, there is no reason it does not work. Could you share the stack traces you may have in ./log/nifi-app.log file?
Created 08-09-2016 11:25 AM
Assume nothing 😉 and validate instead. So let's try one step at the time. This documents https://www.rabbitmq.com/management-cli.html describes how you can list exchanges, bindings and queues.
$> rabbitmqadmin list queues
See if you can see your queue there since the exception you see makes me believe that it is not.
Created 08-09-2016 11:30 AM
Trying it, @ozhurakousky ...
Created on 08-09-2016 12:23 PM - edited 08-18-2019 03:45 AM
@ozhurakousky, I am not able to run the Python script against the server (due to their security set-up). The exchange and queue are however defined: I ran a script on Scala that invoques the Rabbit Connector I have, where the exchange is declared together with the queue. Weirdly enough, once the scrip was running I started the ConsumeAMQP processor in NiFi:
As you can see, there are no error Messages, the processor is able to Connect With the given queue once I am running the Scala script. Still there is no read / Write Activity:
I have also Attached a "PutHDFS" procesor Connected to the ConsumeAMQP processor but it makes no difference...
Created 08-09-2016 12:26 PM
If there are no messages sent to the queue, the consuming processor won't get any messages. Is your script publishing messages to the queue?
The fact that the consume processors only works when you run your script let me think that you have defined a queue that is not persistent (not durable). It means that your queue does not exist if you don't have a producer connected to it. You should consider creating a persistent queue.
Created 08-10-2016 01:26 PM
This is still cracking my nuts 😛 ... I have been in contact with the admin on the Publisher side. He has told me I can create my own Queue when I bind to the Exchange (like I do in my Scala script / connector). I can use any binding key or just "#" to filter events. If I have more than one process / instance I can use the same Queue name (that's what I do in NiFi when running simultaneously my Scala script and NiFi doesn't give errors, and runs tasks but without Reading a single byte), or I can leave an empty Queue and RabbitMQ automatically creates the name for me... Still the Exchange is of the "topic" kind and non-durable. I just don't know how to create the Queue and bind it to the Exchange within NiFi, it looks like it should be easy but I just don't see how...
Created 08-10-2016 01:36 PM
To my point of view the situation is simple.
- your queue does not exist if your script is not running because your queue is not durable and then your queue will exist if and only if your script is running (because when running it creates the exchange, the queue, etc).
- NiFi requires that your queue exist to connect to it (as indicated in the logs).
- if your run your script, the queue is created, then there is no error in NiFi. But since your script does not send data to the queue, NiFi has nothing to consume and transfer.
What you want is something that send data to a queue (preferably a durable one) and use NiFi to pull data from this queue.