Member since
12-03-2017
55
Posts
6
Kudos Received
7
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
174 | 06-23-2020 10:08 PM | |
440 | 06-17-2020 11:43 PM | |
519 | 06-09-2020 01:50 AM | |
530 | 06-09-2020 12:31 AM | |
569 | 06-05-2020 08:44 AM |
07-07-2020
09:23 AM
Use the date filed extracted in attribute while merging so that merging will happen for like (belongs to same date-time-hour) files. In MergeContent processor set ' Correlation Attribute Name ' field with above populated date attribute name, rest will be taken care. You may need to increase number of buckets in merge processor based on your use case
... View more
06-23-2020
10:08 PM
2 Kudos
@AJ121 - Please try this : ${datestring:toDate("yyyyMMdd","GMT"):format("yyyy-MM-dd")} Please accept if it helped/solved your problem.
... View more
06-23-2020
09:59 PM
@Maria_pl - I think you can put that one flow file from another flow to DistributedMapCache and use it get that cached data in another flow where you want to use/merge. Processors : PutDistributedMapCache GetDistributedMapCache Thanks Mahendra
... View more
06-23-2020
09:50 PM
@varun_rathinam - It looks like a batch mode rather than real streaming when you say 'first time and next time'. You can merge them in that batch fashion only, using minimum and maximum waiting time configurations. OR If you think you can get common unique value for these files which needs to be bunched together, then you can use ' Correlation Attribute Name ' in MergeContent processors to achieve the same
... View more
06-21-2020
09:55 PM
@varun_rathinam I dont think you can set that filed dynamically as that filed does not support expression language. How do you decide that dynamic value? Cant you control your merge criteria by using max size & max bucket time along with the count? Thanks Mahendra
... View more
06-17-2020
11:43 PM
1 Kudo
Hi @SirV , I see there are 2 possible options : 1. Merge two flow files based on common key (' FALLA_ID' ) using MergeContent processor : - Use EvaluateJsonPath first to get ' FALLA_ID ' value to flow file attribute. - Use MergeContent processor to merge master-detail flow files, you need to use above step extracted FALLA_ID value in ' Correlation Attribute Name' filed of MergeContent processor, so that it always merge flow files based on common FALL_ID value, so that you can get single merged file for each FALL_ID. - Use JOLTTransformJson to transform your merged json to desired format of output json. 2. Cache the first flow file content in to cache with key as ' FALLA_ID' value and merge when second flow file arrives : - Use NiFi DistributedMap Cache (or any other external cache like Ignite) to cache the first flow. (It will be key-value pair in cache, so use key as FALL_ID and value as whole flow file content.) Before caching the FF just check if that key is already present in cache, if already present means first (master/details) file has already arrived, so you can read that file and dont need to cache the current flow file. - Now you have 1 file in FF Content and a file in FF Attribute (which is read from cache), now you can use ExceuteScript and write simple script (of your choice - python/groovy) to club FF content and attributes to form a desired output json Note : This cache approach has to be picked carefully based on your file volumes and content size etc else it may fill up your memory. Also if you are in multi node cluster mode, NiFi DistributedCache will be independent for each node and does not interact with other nodes so if master file and details files get picked by different nodes then logic will fail ! Please ACCEPT if it helps/resolves your problem. Thanks Mahendra
... View more
06-09-2020
01:50 AM
Leave the schedule configurations to default, it runs whenever you send flow file to that processor, i.e whenever you receive request on HTTP listener it will get triggered -
... View more
06-09-2020
12:31 AM
Hi @VINODTV , Nifi input/output ports are for connecting processor groups. You can pass your attribute (emp id) from Jenkins as header or http request body. If you receive your parameter value to NiFi as header then you can just use one UpdateAttribute to prepare your query with parameter (header) received in header. Then use above prepared attribute 'cqlSelectQuery' as 'CQL select query' in QueryCassandra processor. If you receiving your parameter (emp id) to Nifi in request body as json (along with other parameters may be), Then you need to use EvaluateJson processor just before UpdateAttribute so that you can pull that emp id value from flowfile content to attribute and then use it in UpdateAttribute Thanks Mahendra
... View more
06-05-2020
08:44 AM
Hi @VINODTV , I think you can try to have notification based trigger for Cassandra query. To receive notification request (to trigger your flow) you can have a HandleHTTPRequest processor listening on particular port, and once request is received you can take that data (from http req body) and build your cassandra query based on the received data and execute it (May be using QueryCassandra processor). From Jenkins or any other tool you can notify that above service (listener) by invoking the specific url with data which you want to pass for query. Jenkins Job [Invoke http://<hostname>:<port>/<optionalURI> with data] --> Request received at Listener [HandleHTTPRequest] --> Prepare Query --> Execute Query. Below curl command can be used to notify listener from Jenkins : curl -d 'my input data for cassandra query' http://<hostname>:<port>/<optionalURI> You can refer for more detail on HTTP Listener configurations https://community.cloudera.com/t5/Support-Questions/how-to-configure-listenerhttp-processor-in-NIFI/m-p/297344#M218652 Please ACCEPT the solution if it helps/resolves your problem. Thanks Mahendra
... View more
06-05-2020
08:23 AM
@vikrant_kumar24 - Have you found a better way to solve this ? If so would be curious to hear that!
... View more
06-04-2020
01:55 AM
Hi @renuu ListenHTTP also works same way - Find below sample configuration and how to invoke using curl. ListenHTTP listens on a specific port (in my case 8889) and a base path ('/curlRequestListener') Using below curl command from terminal you can post data - curl -d 'my request data' http://<nifihostname>:8889/curlRequestListener 'my request data' is the sample data which I am posting to the ListenHTTP processor and same is received in processors and queued as flowfile. Received request Flowfile content -
... View more
06-03-2020
09:20 AM
Hi @renuu , You can use'HandleHttpRequest' to listen for incoming GET/POST request on specific port and respond back using 'HandleHttpResponse' processor. Add one Http Context Map controller service and use the same in both processor to map request to response, thats it. Take a look at the below sample configurations. This HandleHTTPRequest listens for http requests on port 8888 & specifi path i.e /curlRequest. 'Allowed path' is optional, if you just want listen for any request then you ca remove that value. HandleHTTPRequest processor HandleHTTPResponse responds to client with 201 response code and any content which is flowfile content. HandleHTTPResponse processor Instead of curl command I have invoked above listener with another NiFi processor (which you can consider as client) InvokeHttp processor with target end point as above HandleHTTPRequest host & port. I am using host as localhost as I am invoking form Nifi (local) only. For curl command form outside, you need to use your nifi host name instead of 'localhost'. So that would be 'http://<hostname>':8888/curlRequest All together - Please ACCEPT the solution if it helps/resolves your problem. Thanks Mahendra
... View more
05-29-2020
08:50 AM
Hi, I feel ListS3 should work well even with massive data buckets. It collects only metadata and create a flow file. Below few things you can try and see : 1) If you have multiple subdirectories inside that bucket, try to put filter for a specific directory in ListS3 processor (if this helps then you can run multiple processors in parallel pointing to same bucket but different specific sub directory) In the screenshot attached, "2019/Q1" is the one specific directory in side bucket and ListS3 will list only the files of that specific directory. 2) Duplicate(copy and paste) the ListS3 processor (which you are trying to run), and run the newly created (duplicated) processor again. As ListS3 tracks(internally) the files already listed, it will not re-list even if you stop and start, so easy way is to duplicate the processor and run that new processor and see. Thanks Mahendra
... View more
05-29-2020
01:03 AM
Hi, This is one option using simple Python script - (I am not python expert, there should be better way to write below code, but this works) Just use ExecuteScript processor and add below code as script body - import csv
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
global filename
class PyStreamCallback(StreamCallback):
def __init__(self):
pass
def process(self, inputStream, outputStream):
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
lines = text.splitlines()
reader = csv.reader(lines)
headerList = list(next(reader))
dataList = list(next(reader))
global filename
filename = dataList[headerList.index('Name')] + "_" + dataList[headerList.index('Country')] + ".csv"
outputStream.write(bytearray(text.encode('utf-8')))
flowFile = session.get()
if (flowFile != None):
flowFile = session.write(flowFile, PyStreamCallback())
flowFile = session.putAttribute(flowFile, "filename", filename)
session.transfer(flowFile, REL_SUCCESS) Thanks Mahendra
... View more
05-29-2020
12:54 AM
Hey, This is the one option using Python script - Just use ExecuteScript processor and use below script as script body - (I am not Python expert, there should be better way to write below code, but this works) import csv
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
global filename
class PyStreamCallback(StreamCallback):
def __init__(self):
pass
def process(self, inputStream, outputStream):
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
lines = text.splitlines()
reader = csv.reader(lines)
headerList = list(next(reader))
dataList = list(next(reader))
global filename
filename = dataList[headerList.index('Name')] + "_" + dataList[headerList.index('Country')] + ".csv"
outputStream.write(bytearray(text.encode('utf-8')))
flowFile = session.get()
if (flowFile != None):
flowFile = session.write(flowFile, PyStreamCallback())
flowFile = session.putAttribute(flowFile, "filename", filename)
session.transfer(flowFile, REL_SUCCESS) Thanks Mahendra
... View more
05-24-2020
07:36 AM
I could not succeed in multipart upload with InvokeHTTP processor. So I used a simple java code to upload file using 'multipart/form-data'. Invoked java code from nifi SprintContextProcessor. As my file was large, I wrote the file content to disc before invoking 'SprintContextProcessor' and using java code read the file content and uploaded to the target. You can transfer flow file content directly to the java code (SpringContext processor). Thanks Mahendra
... View more
12-01-2019
09:47 PM
Hi @ankita_pise , Were you able to achieve this using InvokeHttp processor ? I am also looking for the solution. Thanks, Mahendra
... View more
12-01-2019
09:27 PM
Hello,
I am trying to use Apache NiFi InvokeHTTP processor to send/upload file and json to Rest API.
I am able to send file using POSTMAN (screen shot attached below).
But in NiFi I could not find any way to attach file in request, so I tried forming request body manually with json data and file content, but getting below error -
Request header :-
Content-Type:multipart/form-data; boundary=12345
Request body :-
--12345\r\n Content-Disposition: form-data; name="dataFields"\r\n Content-Type: application/json; charset=utf-8\r\n {"subtaskId": "task123", “requestRefId": "request456", "firstName": "Mahendra", "lastName": "Hegde" } \r\n --12345\r\n Content-Disposition: form-data; name="file"; filename="file1.csv"\r\n Content-Type: text/csv\r\n \r\n user,dev,name,lastname,age abcd,1122,Mahendra,Hegde,32 abcd,1122,Mahendra,Hegde,32 \r\n \r\n --12345--\r\n
Error respone :-
{"errors":[{"title":"Error extracting message data [missing file object].","detail":"Error extracting message data [missing file object].","code":"ERROR_INTEGRATION-MANAGER_DECODE_ERROR"}]}
Anyone knows how to use InvokeHTTP for multipart?
Thanks
Mahendra
... View more
Labels:
04-03-2019
04:54 AM
1 Kudo
Hello, I am observing quite strange behaviour of anyDelineatedValue expression when I use it with 'OR' operation. If I use 'AND' operation there is no issues. Input attribute value: org.entitlement.list=AA,CC,DD,FF Expression used in RouteOnAttribte/UpdateAttribute processor : ${anyDelineatedValue("${org.entitlement.list}", ","):equals('BB'):or(${anyDelineatedValue("${org.entitlement.list}", ","):contains('EE')})} Above expression is giving ArrayIndexOutOfBoundException for OR operation for the given input (where both the values which expression is looking is not present) If I use 'AND' operation expression works fine. What is the difference here? If I pass BB or EE in the input then expression wont give exception ! Looks like defect, can some one please suggest if anything wrong in this? Thanks Mahendra
... View more
Labels:
03-13-2019
06:12 AM
Hello, I have a Nifi SpringContextProcessor which is establishing connection to the Cassandra. On processor START it is establishing the connection, but on processor STOP it is not releasing the connection consistently. (Some time it release and some time it does not) Do you have any idea how I can handle this connection? Below is the piece of code used for connection - session = DseCluster.builder() .addContactPoint(cassandraUrl) .withPort(cassandraPort) .withSpeculativeExecutionPolicy( new ConstantSpeculativeExecutionPolicy( 500, // delay before a new execution is launched 2 // maximum number of executions ) ) .withAuthProvider(new DseGSSAPIAuthProvider("HTTP")) .build() .register(QueryLogger.builder() .withConstantThreshold(100) .withMaxQueryStringLength(20) .build()) .connect(cassandraKeySpace); Thanks Mahendra
... View more
Labels:
12-05-2018
03:57 PM
Hello, I need to query a cassandra table from NiFi to get small result set. I am using 'QueryCassandra' processor , and it is working fine with 'username' & 'password' for authentication. But due to some reason I cant go ahead with username & password, instead I should use kerberos keytab to authenticate cassandra. 'QueryCassandra' processor has no option to provide keytab & principal for authentication. Is there any way to connect using kerberos Thanks, Mahendra
... View more
Labels:
12-05-2018
11:34 AM
Thank you, its working with ${anyDelineatedValue("${your_attribute}", ","):equals("US")}
... View more
12-05-2018
10:24 AM
Hello, I have a scenario where I need to check for string contains, but for exact match of comma separated words. Example String: example = "US,UK,IN,NOT_APPLICABLE" I want to check above string contains specific word like 'US' or 'LE' Case 1: example.contains('US') . - expecting TRUE Case 2: example.contains('LE') - expecting FALSE If I use normal :contains(), case 2 will also return TRUE since 'LE' is part of 'NOT_APPLICABLE' But I want to check contains for comma separated values strictly. Is there any easy way to achieve this? Thanks, Mahendra
... View more
Labels:
11-25-2018
03:40 PM
Hello, I have a scenario where nifi flow file is flowing which is of up to 5 MB. During the flow I need to get some info from Cassandra master table and use it for the flow file validation. If I go with 'QueryCassandra' that fetches result into flow file content, so I will loose my original source flow file content ! Cassandra table result will be of only 2-3 rows (less in size), so is there any way to get this result into an attribute and retain my original flow file content? One option I have is SpringContext processor, Is there any other better approach ? Any suggestion will be appreciated. Thanks, Mahendra
... View more
Labels:
11-23-2018
05:54 AM
Thanks @Shu ! It worked, I was using 'Return type' as auto instead of json in EvaluateJsonPath. Thanks for the reply 🙂
... View more
11-22-2018
10:53 AM
Hello, I have below json as flowfile content - { "organization_id": "life-360", "message_type_id": "MSG01", "consent_required": "1", "country_list": [
"GB",
"IT",
"BG"
]
} I want below content as output - GB, IT, BG I am able to get "country_list": [ "GB", "IT", "BG" ] in flow file using JOLT but not comma separated country list. Could any one please help with best way? Thanks, Mahendra
... View more
Labels:
11-20-2018
07:29 AM
Hello , I have a scenario where I need to run a processor on NiFi restart. Is there any way to trigger a nifi processor whenever nifi restarts ? Thanks in advance, Mahendra
... View more
Labels:
10-30-2018
03:42 PM
Thanks @Matt Burgess for clear information. Do you suggest any other approach? I am open for any kind of storage or caching, but end goal is I should be able to use some master data for every flow file validation without impacting performance..... (Master data is combination of company-country-datatype : this i want to use for every flow file validation) Thanks for your response 🙂 Mahendra
... View more
10-30-2018
11:27 AM
Hello All, I have requirement of caching some information in nifi and which should be used for every flow file validation (more than 1000 per second). I am thinking to store information in Cassandra table and update that to Nifi cache every 24 hours or so.. (Nifi cache update using PutDistributedMapCache processor) But problem is how to hold/re-load cache value when Nifi restarts due to various reasons.. I also want to understand where these cached message will be stored actually in multinode cluster nifi. Thanks in advance. Thanks, Mahendra
... View more
Labels:
09-04-2018
03:22 PM
Hello, Seeing below error in nifi kafka consumer frequently - "org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member" We are using nifi 1.5 Could any one please let me know the solution for this? Thanks, Mahendra
... View more
Labels: