Member since
04-19-2018
32
Posts
4
Kudos Received
0
Solutions
01-08-2020
03:53 AM
I am running nifi flow in minifi in containerized environment. I am using executesqlrecord to fetch data from bigquery.It takes almost 3 hours to fetch 14GB data from bigquery table . Is there any way by which I can reduce the fetch time .
Environment :
Google Compute Engine
Docker container
MiniFi 0.5.0
... View more
Labels:
08-14-2019
11:40 AM
I want to transform a NiFi Flow to Minifi Flow using transform command by minifi toolkit . I observe that property values are not getting copied to yaml and I have to manually add them like password , file path etc. Is there any way to handle this in automated fashion ? Any help would be appreciable . TIA
... View more
Labels:
07-24-2018
09:34 AM
@Matt Clarke above command gives the following error . The specified resource could not be found.curl: (6) Could not resolve host: Content-Type; Name or service not known
... View more
07-18-2018
06:13 AM
Is there any way where we can provide nifi processor group variables when executing the process group from rest api as parameter
... View more
Labels:
07-11-2018
09:50 AM
Hi Team, we have observed that PUTGCSOBJECT processor in NiFi uploads the flowfile in one request which lowers down the upload speed for that file transfer . we tried to use the GSUTIL utility to upload the same file with resumable uploads option and the speed was 10 times fasted as it performs multiple parallel uploads with the following command gsutil -D -o GSUtil:parallel_composite_upload_threshold=100M cp <source file path> gs://<destination cloud bucket name>/ I tried with the ExecuteProcess processor to execute the above command but processor is not able to parse the command and landing into the parsing error. Is some one aware about this performance or anybody has any alternative approach for nifi to upload the file in parts and in parallel and then perform compose operation as gsutils does ?
... View more
Labels:
06-20-2018
04:18 AM
@Matt Burgess I did checked the code in the mentioned PR- NIFI-4836 . I believe batching in QueryDatabaseTable is different to selectHiveQL processor where the bottle neck is to breakdown the resultset in batches . Here is the code from NIFI-4836 which only releases the flow files if the batch size is reached . This is an interesting problem statement to tune the hiveql processing . if (outputBatchSize > 0 && resultSetFlowFiles.size() >= outputBatchSize) {
session.transfer(resultSetFlowFiles, REL_SUCCESS);
session.commit();
resultSetFlowFiles.clear();
}
... View more
06-19-2018
11:15 AM
I am trying to fetch big full table with 10M+ records from hive using selectHiveQL Processor and did find that the converttoCSVStream() method in the source code takes longer time than fetching the result set . Observing the code : Result set is being iterated row by row and then added to the output stream . When the table size is small it completes the process in seconds but as the data is large it takes longer time . Is there any way we can optimize the conversion ?? I have tried with the fetch size of 100000/1000/10000/1000 . while (rs.next()) {
//logger.info("+++++++++++++Inside the While loop+++++++++++++++++");
if (callback != null) {
callback.processRow(rs);
}
List<String> rowValues = new ArrayList<>(nrOfColumns);
for (int i = 1; i <= nrOfColumns; i++) {
final int javaSqlType = meta.getColumnType(i);
final Object value = rs.getObject(i);
//logger.info("+++++++++++++Entering the Switch at +++++++++++++++++");
switch (javaSqlType) {
case CHAR:
case LONGNVARCHAR:
case LONGVARCHAR:
case NCHAR:
case NVARCHAR:
case VARCHAR:
String valueString = rs.getString(i);
if (valueString != null) {
// Removed extra quotes as those are a part of the escapeCsv when required.
StringBuilder sb = new StringBuilder();
if (outputOptions.isQuote()) {
sb.append("\"");
if (outputOptions.isEscape()) {
sb.append(StringEscapeUtils.escapeCsv(valueString));
} else {
sb.append(valueString);
}
sb.append("\"");
rowValues.add(sb.toString());
} else {
if (outputOptions.isEscape()) {
rowValues.add(StringEscapeUtils.escapeCsv(valueString));
} else {
rowValues.add(valueString);
}
}
} else {
rowValues.add("");
}
break;
case ARRAY:
case STRUCT:
case JAVA_OBJECT:
String complexValueString = rs.getString(i);
if (complexValueString != null) {
rowValues.add(StringEscapeUtils.escapeCsv(complexValueString));
} else {
rowValues.add("");
}
break;
default:
if (value != null) {
rowValues.add(value.toString());
} else {
rowValues.add("");
}
}
//logger.info("+++++++++++++Exiting the Switch at +++++++++++++++++" + System.currentTimeMillis());
}
//List<String> rowValues1 = new ArrayList<>();
//rowBuffer.add(StringUtils.join(rowValues, outputOptions.getDelimiter()));
//rowBuffer.add("\n");
// Write row values
//logger.info("+++++++++++++Writing Row value at+++++++++++++++++" + System.currentTimeMillis());
outStream.write(StringUtils.join(rowValues, outputOptions.getDelimiter()).getBytes(StandardCharsets.UTF_8));
outStream.write("\n".getBytes(StandardCharsets.UTF_8));
nrOfRows++;
/* if (rowBuffer.size() == BATCH_SIZE) {
writeToOutStream( outStream,rowBuffer );
} */
if (maxRows > 0 && nrOfRows == maxRows)
break;
}
... View more
Labels:
06-11-2018
06:01 AM
You can try removing your .m2 directory and then retry the build .. Or manually copy the jar to the directory and then try.
... View more
06-07-2018
11:06 AM
1 Kudo
Appreciate your response ! I did the same using the Advanced property section of UpdateAttribute to create rules and actions as i have to append leading 0's since I need the fragment index appended in three digit like 001,020,100 ... I have created three rules to check if the fragment index is less than 10 then make fragment.index = 00${fragment.index} if it is more than 9 and less than 100 then fragment.index = 0${fragment.index} else fragment.index = ${fragment.index}
... View more
06-07-2018
04:57 AM
Thanks ! I also did manage to do by the same way ! But there is one thing which needs to be tweaked . Since I am using selectHiveQl Processor for fetching the data from hive and then routing the files to split processor if file size is greater than 2 GB . For all the unmatched files i.e files with size less than 2 GB will not have fragment.index attribute associated with them as they are passed through split text processor . So You need to add the suffix tableName_001 using the update attribute processor to the table name attribute as it will not take the ${fragment.index} as it is null .
... View more
06-06-2018
06:46 AM
I am doing the following stuff in nifi :
Fetching data from tables in hive and then routing the flow files based
on size :
If flowfile size is gt 2GB then split the flow file to multiple flow
files of 2Gb each.
I want to use update attribute to name those splits like
TableName_001_001,Tablename_001_002,Tablename_001_003 for a particular
flow file or table . When next flow file comes in the split it should also be named like above . Is there any way we can do with the existing processor ?
... View more
Labels:
05-25-2018
09:37 AM
@Matt Clarke @Matt Burgess @Shu I have created a process group with following requirements in Nifi : Fetch data from hive table >> Encrypt content >> upload to azure blob storage . Now I have 3000 tables for which the above flow needs to be scheduled . Is there any way to use only single flow for all the tables instead of creating 3000 flows for each table . Also I want to execute the azure storage for some of the tables not for all . Is there any way to give instruction in the flow based on any condition that Table 1 should go to only gcloud and not on Azure . Similarly I want Table 2 to go to both azure and gcloud. Thanks In Advance
... View more
Labels:
05-18-2018
11:39 AM
I have figured out the real cause behind it . Heap size was splitting the big file into smaller chunks of 128mb each . I increased the heap size and then the single flow file of 1.7gb was uploaded to blob storage .
... View more
05-17-2018
08:56 AM
@Bryan Bende @Matt Burgess @Timothy Spann @Abdelkrim Hadjidj Hi , I am trying to upload a 4gb file to azure blob storage but I can see that the 4 gb file is getting uploaded in smaller files of 128mb and 64 mb .What property is splitting the 4gb file in 128mb or 64 . I have seen some of the properties like provenance.rollover.size =100mb but not sure if this splits the file . I want to understand why this big file is getting uploaded in blocks of 128Mb and not as a whole of 4gb in on file .
... View more
Labels:
05-16-2018
09:33 AM
Is there any way where I can capture the total time taken by PutAzureBlobStorage processor to upload all the files to azure blob storage .
... View more
Labels:
05-15-2018
05:23 AM
I have the following settings in the schedule tab of nifi selectHiveQl Processor settings.png schedule.png And I am using "select * from tablename" query in my selectHiveQlProcessor with fetchsize of 20000 and max number of rows in flow files as 20000. As soon as i schedule the selectHiveQl processor and check the app.log I can see continuous parse statements being printed in the log like below : 2018-05-15 06:29:54,919 INFO [Timer-Driven Process Thread-1] hive.ql.parse.ParseDriver Parsing command: select * from random 2018-05-15 06:29:54,919 INFO [Timer-Driven Process Thread-1] hive.ql.parse.ParseDriver Parse Completed 2018-05-15 06:29:55,063 INFO [Timer-Driven Process Thread-1] hive.ql.parse.ParseDriver Parsing command: select * from random 2018-05-15 06:29:55,063 INFO [Timer-Driven Process Thread-1] hive.ql.parse.ParseDriver Parse Completed 2018-05-15 06:29:55,213 INFO [Timer-Driven Process Thread-1] hive.ql.parse.ParseDriver Parsing command: select * from random 2018-05-15 06:29:55,213 INFO [Timer-Driven Process Thread-1] hive.ql.parse.ParseDriver Parse Completed 2018-05-15 06:29:55,356 INFO [Timer-Driven Process Thread-1] hive.ql.parse.ParseDriver Parsing command: select * from random 2018-05-15 06:29:55,356 INFO [Timer-Driven Process Thread-1] hive.ql.parse.ParseDriver Parse Completed 2018-05-15 06:29:55,501 INFO [Timer-Driven Process Thread-1] hive.ql.parse.ParseDriver Parsing command: select * from random 2018-05-15 06:29:55,502 INFO [Timer-Driven Process Thread-1] hive.ql.parse.ParseDriver Parse Completed 2018-05-15 06:29:55,634 INFO [Timer-Driven Process Thread-1] hive.ql.parse.ParseDriver Parsing command: select * from random 2018-05-15 06:29:55,634 INFO [Timer-Driven Process Thread-1] hive.ql.parse.ParseDriver Parse Completed 2018-05-15 06:29:55,764 INFO [Timer-Driven Process Thread-1] hive.ql.parse.ParseDriver Parsing command: select * from random 2018-05-15 06:29:55,764 INFO [Timer-Driven Process Thread-1] hive.ql.parse.ParseDriver Parse Completed 2018-05-15 06:29:55,914 INFO [Timer-Driven Process Thread-1] hive.ql.parse.ParseDriver Parsing command: select * from random 2018-05-15 06:29:55,914 INFO [Timer-Driven Process Thread-1] hive.ql.parse.ParseDriver Parse Completed 2018-05-15 06:29:56,046 INFO [Timer-Driven Process Thread-1] hive.ql.parse.ParseDriver Parsing command: select * from random 2018-05-15 06:29:56,046 INFO [Timer-Driven Process Thread-1] hive.ql.parse.ParseDriver Parse Completed 2018-05-15 06:29:56,195 INFO [Timer-Driven Process Thread-1] hive.ql.parse.ParseDriver Parsing command: select * from random 2018-05-15 06:29:56,195 INFO [Timer-Driven Process Thread-1] hive.ql.parse.ParseDriver Parse Completed 2018-05-15 06:29:56,334 INFO [Timer-Driven Process Thread-1] hive.ql.parse.ParseDriver Parsing command: select * from random 2018-05-15 06:29:56,335 INFO [Timer-Driven Process Thread-1] hive.ql.parse.ParseDriver Parse Completed It should be parsed one time only , multiple parsing might be using CPU more than what is expected . Can some one help me understand how to prevent this parsing multiple times . @Matt Burgess@Bryan Bende
... View more
Labels:
05-15-2018
04:22 AM
@Matt Burgess Thank you for clarification ! Cheers
... View more
05-14-2018
10:54 AM
1 Kudo
Can anybody explain the Fetch Size,Maximum Number of Fragments properties with examples .
... View more
Labels:
05-14-2018
05:42 AM
@Shu is there any way we can have run time forking based on any of the flag . Because I need to do this type of flow for more than 2k tables in my db.
... View more
05-11-2018
09:46 AM
1 Kudo
Thanks alot for such a great help 1 it works !!
... View more
05-11-2018
06:39 AM
@Timothy Spann Error log Caused by:
java.io.IOException: Error getting access token for service account: at
com.google.auth.oauth2.ServiceAccountCredentials.refreshAccessToken(ServiceAccountCredentials.java:319) at com.google.auth.oauth2.OAuth2Credentials.refresh(OAuth2Credentials.java:149) at
com.google.auth.oauth2.OAuth2Credentials.getRequestMetadata(OAuth2Credentials.java:135) at
com.google.auth.http.HttpCredentialsAdapter.initialize(HttpCredentialsAdapter.java:96) at
com.google.cloud.HttpServiceOptions$1.initialize(HttpServiceOptions.java:224) at
com.google.api.client.http.HttpRequestFactory.buildRequest(HttpRequestFactory.java:93) at
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:423) at
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352) at
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469) at
com.google.cloud.storage.spi.DefaultStorageRpc.create(DefaultStorageRpc.java:245) ... 22 common
frames omitted Caused by:
java.net.SocketTimeoutException: Read timed at
java.net.SocketInputStream.socketRead0(Native Method) at
java.net.SocketInputStream.socketRead(SocketInputStream.java:116) at
java.net.SocketInputStream.read(SocketInputStream.java:170) at
java.net.SocketInputStream.read(SocketInputStream.java:141) at
sun.security.ssl.InputRecord.readFully(InputRecord.java:465) at
sun.security.ssl.InputRecord.read(InputRecord.java:503) at
sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:973) at
sun.security.ssl.SSLSocketImpl.performInitialHandshake(SSLSocketImpl.java:1375) at
sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1403) at
sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1387) at
sun.net.www.protocol.https.HttpsClient.afterConnect(HttpsClient.java:559) at
sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.connect(AbstractDelegateHttpsURLConnection.java:185) at
sun.net.www.protocol.http.HttpURLConnection.getOutputStream0(HttpURLConnection.java:1316) at
sun.net.www.protocol.http.HttpURLConnection.getOutputStream(HttpURLConnection.java:1291) at
sun.net.www.protocol.https.HttpsURLConnectionImpl.getOutputStream(HttpsURLConnectionImpl.java:250
... View more
05-10-2018
09:39 AM
@Timothy Spann any idea on it ? I am also facing the same issue while accessing gcloud from nifi cluster . From my local instance it works fine
... View more
05-10-2018
05:01 AM
1 Kudo
I want to create a generic single flow which should run for multiple tables and will upload the result flow files to google cloud and azure blob . I have created the flow setup on nifi but what I want to achieve is that I dont want the azure upload for all the tables .Azure upload should execute only for specific tables and upload to google cloud for all the tables . Any idea on how to achieve the conditional trigger based on any flag ? Thanks
... View more
Labels:
04-27-2018
05:43 AM
Thanks . I was not accessing the api with correct syntax .
... View more
04-26-2018
10:41 AM
I am not able to access nifi-api on my local instance . it says resource not found .
... View more
Labels:
04-25-2018
07:40 AM
@Abdelkrim Hadjidj , I have tried the suggested implementation but I want to trigger the ListenHTTP processor from an external input .Like I want to trigger the processor through a curl command which will trigger the mentioned nifi processor . Is there any way we can automate this use case . Processor should start only if we have the curl command hitting that processor .
... View more
04-24-2018
11:44 AM
@Abdelkrim Hadjidj Thanks for the solution suggestion . I believe HTTP call to wait can help my problem . I will try and post out the outcomes .
... View more
04-24-2018
11:26 AM
@Rohit Ravishankar, Any suggested answers as I have read a thread where you have been trying to schedule the nifi jobs from control M . Thanks in advance .
... View more