Support Questions

Find answers, ask questions, and share your expertise

Dynamic client id creation using NiFi expression language in ConnectWebSocket processor

avatar
New Contributor

Hi everyone!
Can somebody assist me, please?

I have a task to consume data from different WebSocket servers (hosted on different IPs). Each WebSocket server has a lot of sensors producing messages, and I need to subscribe and listen to each of the sensors. I need connections to stay alive.
This is a simplified communication diagram:

yar_bh_0-1680340880703.png

 

As I understood, I can use a ConnectWebSocket processor with JettyWebSocketClient under the hood to create a connection to a particular WebSocket server. The problem I faced with is that I can not dynamically create ClientId in Connect WebSocket processor.

yar_bh_1-1680341220481.png

That leads to a problem. Let's say, there are 50 sensors producing messages on each of the 3 WebSocket servers I have. I will receive the WebSkocet server's IP address and specific sensor id from upstream processors. I can dynamically change the IP address in JettyWebSocketClient so that I can connect to 3 different servers, but after that, I can not distinguish sensors within each server, because I can not dynamically create WebSocket ClientId in the ConnectWebSocket processor.
As I understood, I can create 50 ConnectWebSocket processors with hardcoded sensor names, to act those names as client IDs. But problem is that sensor names can change over time and the censor quantity on each WebSocket server is in unpredictable.
QUESTION
Did I understand the Wersocket processor's behavior in NiFi correctly?
Have I missed something? I there some approach to handle all these connections to censors in different WebSocket servers?
Does somebody know how to ask the NiFi dev team to add dynamic client id creation using NiFi expression language?

 

2 REPLIES 2

avatar
New Contributor

Can somebody advise on how to check or list all active connections in JettyWebSocketClient?
I need to figure out if NiFi has a live connection with a specific WebSocket server to omit reconnection.
Because as I understood new incoming flowfile will terminate previous sessions in the ConnectWebSocket processor.
Regards

avatar
Community Manager

@yar_bh, Welcome to our community! To help you get the best possible answer, I have tagged in our Nifi experts @cotopaul @SAMSAL @MattWho @steven-matison @ckumar who may be able to assist you further.

Please feel free to provide any additional information or details about your query, and we hope that you will find a satisfactory solution to your question.



Regards,

Vidya Sargur,
Community Manager


Was your question answered? Make sure to mark the answer as the accepted solution.
If you find a reply useful, say thanks by clicking on the thumbs up button.
Learn more about the Cloudera Community: