Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

NiFi's ConsumeAMQP RabbitMQ to HDFS

avatar
Contributor

Hortonworks post: Hi!

I wonder about the status of the ConsumeAMQP processor for NiFi since I am interested in consuming a live RabbitMQ stream.

This stream I am currently trying to consume is a live stream generated by a remote server. I am able to for example use a connector written as a Scala script to consume the stream. In the connector I configure some parameters:

factory.setUsername
factory.setPassword
factory.setVirtualHost
factory.setHost
factory.setPort

Following the channel, queue and binding declarations:

channel.exchangeDeclare
channel.queueDeclare
channel.queueBind

What I want to do is to be able to do is to set up NiFi in my virtual HDP 2.4 environment to consume the data stream and push it into HDFS, for example. NiFi's processor seems to be easy to configure. Here is a screenshot of the ConsumeAMQP processor's configuration:

6480-nifirabbit.png

Once I introduce the parameters needed and start the processor I get the following errors:

6481-nifirabbiterrors.png

6482-nifierror2rabbit.png

I am a little confused when it comes to how to configure the Queue correctly, since I am not able to introduce the channel nor bind it to the Queue. The issue is not a blocked firewall since I am using the same terminal I use to Connect to the Rabbit stream via an Scala script.

I have been looking around for some tutorial or documentation page for hortonworks and RabbitMQ wihtout success.

Any help / tips / pointers and comments could be of great help!

1 ACCEPTED SOLUTION

avatar
Contributor

This could be due to auto-delete settings for queues, bindings and exchanges. I'd suggest to talk to your administrators or install a separate version of Rabbit that you can fully control.

View solution in original post

15 REPLIES 15

avatar
Contributor

Is queue "research_test_ defined? Remember that AMQP has a fundamentally different model then your typical messaging. Publisher publishes message NOT on the queue but to the "exchange" from which it gets routed to the queue(s) based on the bindings. So those have to be setup ahead of time.

So I am essentially trying to figure out what help are you looking for specifically? Setting up queues, exchanges and bindings is usually the administrative task in Rabbit. Is that the help you need? Getting to the right documentation?

avatar
Contributor

Hi @ozhurakousky, thanks for Your reply! I don't have access to the Publisher side however, in my scala Connector I am able to declare the "exchange", "queue" and "binding" and then connect to the messaging stream so I think it is safe to assume that they are set up.

I wonder on how to pass the same kind of declarations as in my Scala Connector for "exchange", "queue" and "binding" but in this case within NiFi.

avatar

This should be quite straightforward. If your processor is correctly configured with the same properties as you do in your script, there is no reason it does not work. Could you share the stack traces you may have in ./log/nifi-app.log file?

avatar
Contributor

Assume nothing 😉 and validate instead. So let's try one step at the time. This documents https://www.rabbitmq.com/management-cli.html describes how you can list exchanges, bindings and queues.

$> rabbitmqadmin list queues

See if you can see your queue there since the exception you see makes me believe that it is not.

avatar
Contributor

Trying it, @ozhurakousky ...

avatar
Contributor

@ozhurakousky, I am not able to run the Python script against the server (due to their security set-up). The exchange and queue are however defined: I ran a script on Scala that invoques the Rabbit Connector I have, where the exchange is declared together with the queue. Weirdly enough, once the scrip was running I started the ConsumeAMQP processor in NiFi:

6485-status2.png

As you can see, there are no error Messages, the processor is able to Connect With the given queue once I am running the Scala script. Still there is no read / Write Activity: 6484-status1.png

I have also Attached a "PutHDFS" procesor Connected to the ConsumeAMQP processor but it makes no difference...

avatar

If there are no messages sent to the queue, the consuming processor won't get any messages. Is your script publishing messages to the queue?

The fact that the consume processors only works when you run your script let me think that you have defined a queue that is not persistent (not durable). It means that your queue does not exist if you don't have a producer connected to it. You should consider creating a persistent queue.

avatar
Contributor

This is still cracking my nuts 😛 ... I have been in contact with the admin on the Publisher side. He has told me I can create my own Queue when I bind to the Exchange (like I do in my Scala script / connector). I can use any binding key or just "#" to filter events. If I have more than one process / instance I can use the same Queue name (that's what I do in NiFi when running simultaneously my Scala script and NiFi doesn't give errors, and runs tasks but without Reading a single byte), or I can leave an empty Queue and RabbitMQ automatically creates the name for me... Still the Exchange is of the "topic" kind and non-durable. I just don't know how to create the Queue and bind it to the Exchange within NiFi, it looks like it should be easy but I just don't see how...

avatar

To my point of view the situation is simple.

- your queue does not exist if your script is not running because your queue is not durable and then your queue will exist if and only if your script is running (because when running it creates the exchange, the queue, etc).

- NiFi requires that your queue exist to connect to it (as indicated in the logs).

- if your run your script, the queue is created, then there is no error in NiFi. But since your script does not send data to the queue, NiFi has nothing to consume and transfer.

What you want is something that send data to a queue (preferably a durable one) and use NiFi to pull data from this queue.