Member since
04-19-2018
32
Posts
4
Kudos Received
0
Solutions
08-14-2019
11:40 AM
I want to transform a NiFi Flow to Minifi Flow using transform command by minifi toolkit . I observe that property values are not getting copied to yaml and I have to manually add them like password , file path etc. Is there any way to handle this in automated fashion ? Any help would be appreciable . TIA
... View more
Labels:
- Labels:
-
Apache MiNiFi
-
Apache NiFi
07-24-2018
09:34 AM
@Matt Clarke above command gives the following error . The specified resource could not be found.curl: (6) Could not resolve host: Content-Type; Name or service not known
... View more
07-18-2018
06:13 AM
Is there any way where we can provide nifi processor group variables when executing the process group from rest api as parameter
... View more
Labels:
- Labels:
-
Apache NiFi
06-20-2018
04:18 AM
@Matt Burgess I did checked the code in the mentioned PR- NIFI-4836 . I believe batching in QueryDatabaseTable is different to selectHiveQL processor where the bottle neck is to breakdown the resultset in batches . Here is the code from NIFI-4836 which only releases the flow files if the batch size is reached . This is an interesting problem statement to tune the hiveql processing . if (outputBatchSize > 0 && resultSetFlowFiles.size() >= outputBatchSize) {
session.transfer(resultSetFlowFiles, REL_SUCCESS);
session.commit();
resultSetFlowFiles.clear();
}
... View more
06-19-2018
11:15 AM
I am trying to fetch big full table with 10M+ records from hive using selectHiveQL Processor and did find that the converttoCSVStream() method in the source code takes longer time than fetching the result set . Observing the code : Result set is being iterated row by row and then added to the output stream . When the table size is small it completes the process in seconds but as the data is large it takes longer time . Is there any way we can optimize the conversion ?? I have tried with the fetch size of 100000/1000/10000/1000 . while (rs.next()) {
//logger.info("+++++++++++++Inside the While loop+++++++++++++++++");
if (callback != null) {
callback.processRow(rs);
}
List<String> rowValues = new ArrayList<>(nrOfColumns);
for (int i = 1; i <= nrOfColumns; i++) {
final int javaSqlType = meta.getColumnType(i);
final Object value = rs.getObject(i);
//logger.info("+++++++++++++Entering the Switch at +++++++++++++++++");
switch (javaSqlType) {
case CHAR:
case LONGNVARCHAR:
case LONGVARCHAR:
case NCHAR:
case NVARCHAR:
case VARCHAR:
String valueString = rs.getString(i);
if (valueString != null) {
// Removed extra quotes as those are a part of the escapeCsv when required.
StringBuilder sb = new StringBuilder();
if (outputOptions.isQuote()) {
sb.append("\"");
if (outputOptions.isEscape()) {
sb.append(StringEscapeUtils.escapeCsv(valueString));
} else {
sb.append(valueString);
}
sb.append("\"");
rowValues.add(sb.toString());
} else {
if (outputOptions.isEscape()) {
rowValues.add(StringEscapeUtils.escapeCsv(valueString));
} else {
rowValues.add(valueString);
}
}
} else {
rowValues.add("");
}
break;
case ARRAY:
case STRUCT:
case JAVA_OBJECT:
String complexValueString = rs.getString(i);
if (complexValueString != null) {
rowValues.add(StringEscapeUtils.escapeCsv(complexValueString));
} else {
rowValues.add("");
}
break;
default:
if (value != null) {
rowValues.add(value.toString());
} else {
rowValues.add("");
}
}
//logger.info("+++++++++++++Exiting the Switch at +++++++++++++++++" + System.currentTimeMillis());
}
//List<String> rowValues1 = new ArrayList<>();
//rowBuffer.add(StringUtils.join(rowValues, outputOptions.getDelimiter()));
//rowBuffer.add("\n");
// Write row values
//logger.info("+++++++++++++Writing Row value at+++++++++++++++++" + System.currentTimeMillis());
outStream.write(StringUtils.join(rowValues, outputOptions.getDelimiter()).getBytes(StandardCharsets.UTF_8));
outStream.write("\n".getBytes(StandardCharsets.UTF_8));
nrOfRows++;
/* if (rowBuffer.size() == BATCH_SIZE) {
writeToOutStream( outStream,rowBuffer );
} */
if (maxRows > 0 && nrOfRows == maxRows)
break;
}
... View more
Labels:
- Labels:
-
Apache Hive
-
Apache NiFi
06-11-2018
06:01 AM
You can try removing your .m2 directory and then retry the build .. Or manually copy the jar to the directory and then try.
... View more
06-07-2018
11:06 AM
1 Kudo
Appreciate your response ! I did the same using the Advanced property section of UpdateAttribute to create rules and actions as i have to append leading 0's since I need the fragment index appended in three digit like 001,020,100 ... I have created three rules to check if the fragment index is less than 10 then make fragment.index = 00${fragment.index} if it is more than 9 and less than 100 then fragment.index = 0${fragment.index} else fragment.index = ${fragment.index}
... View more
06-07-2018
04:57 AM
Thanks ! I also did manage to do by the same way ! But there is one thing which needs to be tweaked . Since I am using selectHiveQl Processor for fetching the data from hive and then routing the files to split processor if file size is greater than 2 GB . For all the unmatched files i.e files with size less than 2 GB will not have fragment.index attribute associated with them as they are passed through split text processor . So You need to add the suffix tableName_001 using the update attribute processor to the table name attribute as it will not take the ${fragment.index} as it is null .
... View more
06-06-2018
06:46 AM
I am doing the following stuff in nifi :
Fetching data from tables in hive and then routing the flow files based
on size :
If flowfile size is gt 2GB then split the flow file to multiple flow
files of 2Gb each.
I want to use update attribute to name those splits like
TableName_001_001,Tablename_001_002,Tablename_001_003 for a particular
flow file or table . When next flow file comes in the split it should also be named like above . Is there any way we can do with the existing processor ?
... View more
Labels:
- Labels:
-
Apache NiFi