Created 06-05-2020 02:59 AM
I have a use case to read data from Cassandra where I provide inputs from outside NIFI[Flink Job], also instead of querying the given data for the scheduled time intervals, can we initiate the fetch whenever needed? Is these two use cases are possible with current NIFI system?
Created 06-05-2020 08:44 AM
Hi @VINODTV ,
I think you can try to have notification based trigger for Cassandra query.
To receive notification request (to trigger your flow) you can have a HandleHTTPRequest processor listening on particular port, and once request is received you can take that data (from http req body) and build your cassandra query based on the received data and execute it (May be using QueryCassandra processor).
From Jenkins or any other tool you can notify that above service (listener) by invoking the specific url with data which you want to pass for query.
Jenkins Job [Invoke http://<hostname>:<port>/<optionalURI> with data] --> Request received at Listener [HandleHTTPRequest] --> Prepare Query --> Execute Query.
Below curl command can be used to notify listener from Jenkins :
curl -d 'my input data for cassandra query' http://<hostname>:<port>/<optionalURI>
You can refer for more detail on HTTP Listener configurations https://community.cloudera.com/t5/Support-Questions/how-to-configure-listenerhttp-processor-in-NIFI/...
Please ACCEPT the solution if it helps/resolves your problem.
Thanks
Mahendra
Created 06-09-2020 12:31 AM
Hi @VINODTV ,
Nifi input/output ports are for connecting processor groups.
You can pass your attribute (emp id) from Jenkins as header or http request body.
If you receive your parameter value to NiFi as header then you can just use one UpdateAttribute to prepare your query with parameter (header) received in header.
Then use above prepared attribute 'cqlSelectQuery' as 'CQL select query' in QueryCassandra processor.
If you receiving your parameter (emp id) to Nifi in request body as json (along with other parameters may be),
Then you need to use EvaluateJson processor just before UpdateAttribute so that you can pull that emp id value from flowfile content to attribute and then use it in UpdateAttribute
Thanks
Mahendra
Created 06-09-2020 01:50 AM
Leave the schedule configurations to default, it runs whenever you send flow file to that processor, i.e whenever you receive request on HTTP listener it will get triggered -
Created 06-05-2020 08:44 AM
Hi @VINODTV ,
I think you can try to have notification based trigger for Cassandra query.
To receive notification request (to trigger your flow) you can have a HandleHTTPRequest processor listening on particular port, and once request is received you can take that data (from http req body) and build your cassandra query based on the received data and execute it (May be using QueryCassandra processor).
From Jenkins or any other tool you can notify that above service (listener) by invoking the specific url with data which you want to pass for query.
Jenkins Job [Invoke http://<hostname>:<port>/<optionalURI> with data] --> Request received at Listener [HandleHTTPRequest] --> Prepare Query --> Execute Query.
Below curl command can be used to notify listener from Jenkins :
curl -d 'my input data for cassandra query' http://<hostname>:<port>/<optionalURI>
You can refer for more detail on HTTP Listener configurations https://community.cloudera.com/t5/Support-Questions/how-to-configure-listenerhttp-processor-in-NIFI/...
Please ACCEPT the solution if it helps/resolves your problem.
Thanks
Mahendra
Created 06-08-2020 10:11 PM
Hi Mahendra, thanks for the suggestions, this seems like I can not use Input port to receive in NIFI, Also can you help me with how can I transfer the parameter example "Emp No" in where clause of cassandra query in Query casandra?
Created 06-09-2020 12:31 AM
Hi @VINODTV ,
Nifi input/output ports are for connecting processor groups.
You can pass your attribute (emp id) from Jenkins as header or http request body.
If you receive your parameter value to NiFi as header then you can just use one UpdateAttribute to prepare your query with parameter (header) received in header.
Then use above prepared attribute 'cqlSelectQuery' as 'CQL select query' in QueryCassandra processor.
If you receiving your parameter (emp id) to Nifi in request body as json (along with other parameters may be),
Then you need to use EvaluateJson processor just before UpdateAttribute so that you can pull that emp id value from flowfile content to attribute and then use it in UpdateAttribute
Thanks
Mahendra
Created 06-09-2020 12:44 AM
Nice solution this is what I was looking for, thanks. Query Cassandra designed to invoke in given scheduled time intervals right, Is there any way we can disable it because I need to fetch the value from Cassandra whenever the request comes from outside with the given employee id.
Created 06-09-2020 01:50 AM
Leave the schedule configurations to default, it runs whenever you send flow file to that processor, i.e whenever you receive request on HTTP listener it will get triggered -
Created 06-09-2020 01:55 AM
Great, thanks a lot