Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Fetch records from Cassandra on need basis with filter parameters.

avatar
Explorer

I have a use case to read data from Cassandra where I provide inputs from outside NIFI[Flink Job], also instead of querying the given data for the scheduled time intervals, can we initiate the fetch whenever needed? Is these two use cases are possible with current NIFI system?

 

3 ACCEPTED SOLUTIONS

avatar
Master Collaborator

Hi @VINODTV ,

I think you can try to have notification based trigger for Cassandra query.

 

To receive notification request (to trigger your flow) you can have a HandleHTTPRequest processor listening on particular port, and once request is received you can take that data (from http req body) and build your cassandra query based on the received data and execute it (May be using QueryCassandra processor).

 

From Jenkins or any other tool you can notify that above service (listener) by invoking the specific url with data which you want to pass for query.

Jenkins Job [Invoke http://<hostname>:<port>/<optionalURI> with data] --> Request received at Listener [HandleHTTPRequest] --> Prepare Query --> Execute Query.

 

Below curl command can be used to notify listener from Jenkins :

curl -d 'my input data for cassandra query' http://<hostname>:<port>/<optionalURI>


You can refer for more detail on HTTP Listener configurations https://community.cloudera.com/t5/Support-Questions/how-to-configure-listenerhttp-processor-in-NIFI/...

 

Please ACCEPT the solution if it helps/resolves your problem.

 

Thanks

Mahendra

View solution in original post

avatar
Master Collaborator

Hi @VINODTV ,

 Nifi input/output ports are for connecting processor groups.

You can pass your attribute (emp id) from Jenkins as header or http request body.

 

If you receive your parameter value to NiFi as header then you can just use one UpdateAttribute to prepare your query with parameter (header) received in header.

 

Screenshot 2020-06-09 at 12.52.08 PM.png

 

Then use above prepared attribute 'cqlSelectQuery' as 'CQL select query' in QueryCassandra processor.

 

Screenshot 2020-06-09 at 12.50.02 PM.png

 

 

If you receiving your parameter (emp id) to Nifi in request body as json (along with other parameters may be),
Then you need to use EvaluateJson processor just before UpdateAttribute so that you can pull that emp id value from flowfile content to attribute and then use it in UpdateAttribute

 

 

Thanks

Mahendra

View solution in original post

avatar
Master Collaborator

Leave the schedule configurations to default, it runs whenever you send flow file to that processor, i.e whenever you receive request on HTTP listener it will get triggered -
Screenshot 2020-06-09 at 2.19.00 PM.png

View solution in original post

6 REPLIES 6

avatar
Master Collaborator

Hi @VINODTV ,

I think you can try to have notification based trigger for Cassandra query.

 

To receive notification request (to trigger your flow) you can have a HandleHTTPRequest processor listening on particular port, and once request is received you can take that data (from http req body) and build your cassandra query based on the received data and execute it (May be using QueryCassandra processor).

 

From Jenkins or any other tool you can notify that above service (listener) by invoking the specific url with data which you want to pass for query.

Jenkins Job [Invoke http://<hostname>:<port>/<optionalURI> with data] --> Request received at Listener [HandleHTTPRequest] --> Prepare Query --> Execute Query.

 

Below curl command can be used to notify listener from Jenkins :

curl -d 'my input data for cassandra query' http://<hostname>:<port>/<optionalURI>


You can refer for more detail on HTTP Listener configurations https://community.cloudera.com/t5/Support-Questions/how-to-configure-listenerhttp-processor-in-NIFI/...

 

Please ACCEPT the solution if it helps/resolves your problem.

 

Thanks

Mahendra

avatar
Explorer

Hi Mahendra, thanks for the suggestions,  this seems like I can not use Input port to receive in NIFI, Also can you help me with how can I transfer the parameter example "Emp No" in where clause of cassandra query in Query casandra? 

avatar
Master Collaborator

Hi @VINODTV ,

 Nifi input/output ports are for connecting processor groups.

You can pass your attribute (emp id) from Jenkins as header or http request body.

 

If you receive your parameter value to NiFi as header then you can just use one UpdateAttribute to prepare your query with parameter (header) received in header.

 

Screenshot 2020-06-09 at 12.52.08 PM.png

 

Then use above prepared attribute 'cqlSelectQuery' as 'CQL select query' in QueryCassandra processor.

 

Screenshot 2020-06-09 at 12.50.02 PM.png

 

 

If you receiving your parameter (emp id) to Nifi in request body as json (along with other parameters may be),
Then you need to use EvaluateJson processor just before UpdateAttribute so that you can pull that emp id value from flowfile content to attribute and then use it in UpdateAttribute

 

 

Thanks

Mahendra

avatar
Explorer

Nice solution this is what I was looking for, thanks. Query Cassandra designed to invoke in given scheduled time intervals right, Is there any way we can disable it because I need to fetch the value from Cassandra whenever the request comes from outside with the given employee id.

avatar
Master Collaborator

Leave the schedule configurations to default, it runs whenever you send flow file to that processor, i.e whenever you receive request on HTTP listener it will get triggered -
Screenshot 2020-06-09 at 2.19.00 PM.png

avatar
Explorer

Great, thanks a lot