Member since
12-03-2017
147
Posts
24
Kudos Received
11
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
1162 | 11-03-2023 12:17 AM | |
2893 | 12-12-2022 09:16 PM | |
1100 | 07-14-2022 03:25 AM | |
1751 | 07-28-2021 04:42 AM | |
2022 | 06-23-2020 10:08 PM |
06-17-2020
11:43 PM
1 Kudo
Hi @SirV , I see there are 2 possible options : 1. Merge two flow files based on common key ('FALLA_ID') using MergeContent processor : - Use EvaluateJsonPath first to get 'FALLA_ID' value to flow file attribute. - Use MergeContent processor to merge master-detail flow files, you need to use above step extracted FALLA_ID value in 'Correlation Attribute Name' filed of MergeContent processor, so that it always merge flow files based on common FALL_ID value, so that you can get single merged file for each FALL_ID. - Use JOLTTransformJson to transform your merged json to desired format of output json. 2. Cache the first flow file content in to cache with key as 'FALLA_ID' value and merge when second flow file arrives : - Use NiFi DistributedMap Cache (or any other external cache like Ignite) to cache the first flow. (It will be key-value pair in cache, so use key as FALL_ID and value as whole flow file content.) Before caching the FF just check if that key is already present in cache, if already present means first (master/details) file has already arrived, so you can read that file and dont need to cache the current flow file. - Now you have 1 file in FF Content and a file in FF Attribute (which is read from cache), now you can use ExceuteScript and write simple script (of your choice - python/groovy) to club FF content and attributes to form a desired output json Note : This cache approach has to be picked carefully based on your file volumes and content size etc else it may fill up your memory. Also if you are in multi node cluster mode, NiFi DistributedCache will be independent for each node and does not interact with other nodes so if master file and details files get picked by different nodes then logic will fail ! Please ACCEPT if it helps/resolves your problem. Thanks Mahendra
... View more
06-09-2020
01:50 AM
Leave the schedule configurations to default, it runs whenever you send flow file to that processor, i.e whenever you receive request on HTTP listener it will get triggered -
... View more
06-09-2020
12:31 AM
Hi @VINODTV , Nifi input/output ports are for connecting processor groups. You can pass your attribute (emp id) from Jenkins as header or http request body. If you receive your parameter value to NiFi as header then you can just use one UpdateAttribute to prepare your query with parameter (header) received in header. Then use above prepared attribute 'cqlSelectQuery' as 'CQL select query' in QueryCassandra processor. If you receiving your parameter (emp id) to Nifi in request body as json (along with other parameters may be), Then you need to use EvaluateJson processor just before UpdateAttribute so that you can pull that emp id value from flowfile content to attribute and then use it in UpdateAttribute Thanks Mahendra
... View more
06-05-2020
08:44 AM
Hi @VINODTV , I think you can try to have notification based trigger for Cassandra query. To receive notification request (to trigger your flow) you can have a HandleHTTPRequest processor listening on particular port, and once request is received you can take that data (from http req body) and build your cassandra query based on the received data and execute it (May be using QueryCassandra processor). From Jenkins or any other tool you can notify that above service (listener) by invoking the specific url with data which you want to pass for query. Jenkins Job [Invoke http://<hostname>:<port>/<optionalURI> with data] --> Request received at Listener [HandleHTTPRequest] --> Prepare Query --> Execute Query. Below curl command can be used to notify listener from Jenkins : curl -d 'my input data for cassandra query' http://<hostname>:<port>/<optionalURI> You can refer for more detail on HTTP Listener configurations https://community.cloudera.com/t5/Support-Questions/how-to-configure-listenerhttp-processor-in-NIFI/m-p/297344#M218652 Please ACCEPT the solution if it helps/resolves your problem. Thanks Mahendra
... View more
06-05-2020
08:23 AM
@vikrant_kumar24 - Have you found a better way to solve this ? If so would be curious to hear that!
... View more
06-04-2020
01:55 AM
1 Kudo
Hi @renuu ListenHTTP also works same way - Find below sample configuration and how to invoke using curl. ListenHTTP listens on a specific port (in my case 8889) and a base path ('/curlRequestListener') Using below curl command from terminal you can post data - curl -d 'my request data' http://<nifihostname>:8889/curlRequestListener 'my request data' is the sample data which I am posting to the ListenHTTP processor and same is received in processors and queued as flowfile. Received request Flowfile content -
... View more
06-03-2020
09:20 AM
1 Kudo
Hi @renuu , You can use'HandleHttpRequest' to listen for incoming GET/POST request on specific port and respond back using 'HandleHttpResponse' processor. Add one Http Context Map controller service and use the same in both processor to map request to response, thats it. Take a look at the below sample configurations. This HandleHTTPRequest listens for http requests on port 8888 & specifi path i.e /curlRequest. 'Allowed path' is optional, if you just want listen for any request then you ca remove that value. HandleHTTPRequest processor HandleHTTPResponse responds to client with 201 response code and any content which is flowfile content. HandleHTTPResponse processor Instead of curl command I have invoked above listener with another NiFi processor (which you can consider as client) InvokeHttp processor with target end point as above HandleHTTPRequest host & port. I am using host as localhost as I am invoking form Nifi (local) only. For curl command form outside, you need to use your nifi host name instead of 'localhost'. So that would be 'http://<hostname>':8888/curlRequest All together - Please ACCEPT the solution if it helps/resolves your problem. Thanks Mahendra
... View more
05-29-2020
08:50 AM
Hi, I feel ListS3 should work well even with massive data buckets. It collects only metadata and create a flow file. Below few things you can try and see : 1) If you have multiple subdirectories inside that bucket, try to put filter for a specific directory in ListS3 processor (if this helps then you can run multiple processors in parallel pointing to same bucket but different specific sub directory) In the screenshot attached, "2019/Q1" is the one specific directory in side bucket and ListS3 will list only the files of that specific directory. 2) Duplicate(copy and paste) the ListS3 processor (which you are trying to run), and run the newly created (duplicated) processor again. As ListS3 tracks(internally) the files already listed, it will not re-list even if you stop and start, so easy way is to duplicate the processor and run that new processor and see. Thanks Mahendra
... View more
05-29-2020
01:03 AM
Hi, This is one option using simple Python script - (I am not python expert, there should be better way to write below code, but this works) Just use ExecuteScript processor and add below code as script body - import csv
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
global filename
class PyStreamCallback(StreamCallback):
def __init__(self):
pass
def process(self, inputStream, outputStream):
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
lines = text.splitlines()
reader = csv.reader(lines)
headerList = list(next(reader))
dataList = list(next(reader))
global filename
filename = dataList[headerList.index('Name')] + "_" + dataList[headerList.index('Country')] + ".csv"
outputStream.write(bytearray(text.encode('utf-8')))
flowFile = session.get()
if (flowFile != None):
flowFile = session.write(flowFile, PyStreamCallback())
flowFile = session.putAttribute(flowFile, "filename", filename)
session.transfer(flowFile, REL_SUCCESS) Thanks Mahendra
... View more
05-29-2020
12:54 AM
Hey, This is the one option using Python script - Just use ExecuteScript processor and use below script as script body - (I am not Python expert, there should be better way to write below code, but this works) import csv
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
global filename
class PyStreamCallback(StreamCallback):
def __init__(self):
pass
def process(self, inputStream, outputStream):
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
lines = text.splitlines()
reader = csv.reader(lines)
headerList = list(next(reader))
dataList = list(next(reader))
global filename
filename = dataList[headerList.index('Name')] + "_" + dataList[headerList.index('Country')] + ".csv"
outputStream.write(bytearray(text.encode('utf-8')))
flowFile = session.get()
if (flowFile != None):
flowFile = session.write(flowFile, PyStreamCallback())
flowFile = session.putAttribute(flowFile, "filename", filename)
session.transfer(flowFile, REL_SUCCESS) Thanks Mahendra
... View more