Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

How to add String value in nifi Custom Processor code??

Solved Go to solution

How to add String value in nifi Custom Processor code??

Explorer

Hello,

I have created custom processor same like ExecuteSQL.
after executing query all data get in resultset .
I have to add one external string value .
String queryID="123456" -it may be static or dynamic.
Where I have to add that value in resultSet or flowfile?? and how to add that value??
using convert AvroToJson I have take that queryID using EvaluateJsonPath processor and use further operation.
the code is same like below,

Can someone point me at an example to get me going


--------------------------------------------------------------------------------------------------------------------

fileToProcess = session.write(fileToProcess, new OutputStreamCallback() {
@Overrid
public void process(final OutputStream out) throws IOException {
try {
logger.debug("Executing query {}", new Object[]{selectQuery});
final ResultSet resultSet = st.executeQuery(selectQuery);
final JdbcCommon.AvroConversionOptions options = JdbcCommon.AvroConversionOptions.builder()
.convertNames(convertNamesForAvro)
.useLogicalTypes(useAvroLogicalTypes)
.defaultPrecision(defaultPrecision)
.defaultScale(defaultScale)
.build();
nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, options, null));
} catch (final SQLException e) {
throw new ProcessException(e);
}
}
});

long duration = stopWatch.getElapsed(TimeUnit.MILLISECONDS);

// set attribute how many rows were selected
fileToProcess = session.putAttribute(fileToProcess, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
fileToProcess = session.putAttribute(fileToProcess, RESULT_QUERY_DURATION, String.valueOf(duration));
fileToProcess = session.putAttribute(fileToProcess, CoreAttributes.MIME_TYPE.key(), JdbcCommon.MIME_TYPE_AVRO_BINARY);

logger.info("{} contains {} Avro records; transferring to 'success'",
new Object[]{fileToProcess, nrOfRows.get()});
session.getProvenanceReporter().modifyContent(fileToProcess, "Retrieved " + nrOfRows.get() + " rows", duration);
session.transfer(fileToProcess, REL_SUCCESS);
-----------------------------------------------------------------------------------------------------------------------------------------------

1 ACCEPTED SOLUTION

Accepted Solutions
Highlighted

Re: How to add String value in nifi Custom Processor code??

Wouldn't it make sense to add this as an attribute? See the calls to session.putAttribute() in the code above.

View solution in original post

3 REPLIES 3
Highlighted

Re: How to add String value in nifi Custom Processor code??

Wouldn't it make sense to add this as an attribute? See the calls to session.putAttribute() in the code above.

View solution in original post

Highlighted

Re: How to add String value in nifi Custom Processor code??

Explorer

after executing query i get resultset. now i want add some value into resultset like query id , query start time , query end time in resultset, so i can get in nifi flow.

Highlighted

Re: How to add String value in nifi Custom Processor code??

Expert Contributor

With nifi 1.3 you have the record based processors. So you can forward the output of executesql or your custom processor to QueryRecord. Set it up to read Avro. Now add a query like so select col_a,col_b..'${query_id}','${query_time}','${query_end_time}' from flowfile'. this will add the data you need to the query resultset.

39538-screen-shot-2017-09-26-at-122146-am.png

Don't have an account?
Coming from Hortonworks? Activate your account here