Member since
10-17-2022
10
Posts
0
Kudos Received
1
Solution
My Accepted Solutions
Title | Views | Posted |
---|---|---|
13311 | 10-25-2022 01:16 PM |
11-06-2022
05:46 AM
@Matt It would be great if you could share some guides or documents to generate the certificate and do the above configuration
... View more
10-27-2022
06:13 AM
@steven-matison thanks for your answer it is working fine with my local environment, but I have to deploy this into production and it is 4 node cluster, when running this processor in the cluster we are getting below logs and suddenly node going out of the cluster xxx.xx.xx:xxxx- warning - Response time from yyy.yy.yy:yyyy was slow for each last 3 requests made.
xxx.xx.xx:xxxx- warning - Response time from yyy.yy.yy:yyyy was slow for each last 3 requests made.
xxx.xx.xx:xxxx- warning - Response time from yyy.yy.yy:yyyy was slow for each last 3 requests made.
xxx - primary node yyy - node which custom processor is running please guide me on this.
... View more
10-25-2022
01:37 PM
@steven-matison can you please explain more about this
... View more
10-25-2022
01:16 PM
@steven-matison in my scenario there are 2 steps each recursion call I create a flow file and query results write into that flow file in the end, I need to create another new flow file also and put 2 attributes and I need to transfer these 2 files into 2 relationships (status and success), as per your answer, I put session.commit at the end of every session.transfer line. but now I'm getting an error called relationship is not specified but earlier it works well. new development is as below FlowFile stateFlowFile = session.create();
session.putAttribute(stateFlowFile, "processing_status", "processing");
session.putAttribute(stateFlowFile, "record_count", mainI.toString());
session.transfer(stateFlowFile, GPReaderProcessorUtils.STATUS); // state relationship
session.commit()
session.transfer(flowFile, GPReaderProcessorUtils.SUCCESS); // preprocessed flow files returns session.commit()
... View more
10-25-2022
08:46 AM
when the NIFI processor is running (ExecuteSQL) we can see the size of the content_repository location is increasing. while 3 execute SQL processors are running I executed du -sh ./content_repository/* | grep 'G' 2 times and its output as below 1st time 5G ./content_repository/1000 4G ./content_repository/1009 6G ./content_repository/824 2nd time 8G ./content_repository/1000 6G ./content_repository/1009 8G ./content_repository/824 my concern is, is there any way to identify the specific content_repository location for each processor?
... View more
Labels:
- Labels:
-
Apache NiFi
10-25-2022
08:34 AM
I have to do some custom preprocessing tasks on a huge data file (~200GB). currently, its works as below way. select * from table preprocessing line by line return a new single flow file so I decided to convert the above approach to the below way. get the row count from the user (let's assume the user gives 1000) execute select * query as resultSet read the results line by line (rs.next()) when the line count reaches 1000 return the flow file and continues to other lines So my approach is as below onTrigger public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
logger = getLogger();
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
try {
final Long rowLimit = context.getProperty(ProcessorUtils.MAX_RECORD).evaluateAttributeExpressions(flowFile).asLong();
Connection conn = DriverManager.getConnection(
// db connection properties
);
Statement stm = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
ResultSet rs = stm.executeQuery("sql query");
Map<String, String> flowFileAttributes = flowFile.getAttributes();
process(
rs,
session,
flowFileAttributes,
rowLimit,
);
FlowFile stateFlowFile = session.create();
session.putAttribute(stateFlowFile, "processing_status", "end");
session.putAttribute(stateFlowFile, "record_count", "0");
session.transfer(stateFlowFile, GPReaderProcessorUtils.STATUS); // working line
} catch (Exception e) {
logger.warn(" conn " + e);
session.transfer(flowFile, GPReaderProcessorUtils.FAILURE);
}
} Recursion Approach for termination based on line count private void process(ResultSet rs, ProcessSession session, Map<String, String> flowFileAttributes, Long rowLimit) throws SQLException {
try{
logger.info("-> start processing with row limit = " + rowLimit);
AtomicInteger mainI = new AtomicInteger(0);
FlowFile flowFile =
session.write(session.putAllAttributes(session.create(), flowFileAttributes), (OutputStream out) -> {
int i = 0;
Map<String, String> preProcessResults = null;
try {
String res = "";
while (i < rowLimit && rs.next()) {
//preprocessing happens here
i++;
mainI.set(i);
out.write(preprocess results.toString().getBytes(StandardCharsets.UTF_8));
}
}catch (SQLException e) {
e.printStackTrace();
}
}
logger.info("gp-log ->"+ (String.valueOf(i)));
out.close();
});
FlowFile stateFlowFile = session.create();
session.putAttribute(stateFlowFile, "processing_status", "processing");
session.putAttribute(stateFlowFile, "record_count", mainI.toString());
session.transfer(stateFlowFile, GPReaderProcessorUtils.STATUS); // state relationship
session.transfer(flowFile, GPReaderProcessorUtils.SUCCESS); // preprocessed flow files returns
if(!rs.isAfterLast() && mainI != 0 && !rs.isLast()){ // recurrsion call
logger.info("gp-log -> recursion call" );
process(rs, session,flowFileAttributes,column,rowLimit);
}
}catch (Exception e){
logger.info(e.getMessage());
logger.error(e.getMessage());
session.transfer(session.putAllAttributes(session.create(),flowFileAttributes), GPReaderProcessorUtils.FAILURE);
}
} Expected Behaviour -> while processing this one return completed rows as flow files Current Behaviour -> after finishing all return all flow files (generated in recursion) once. please advise on this.
... View more
Labels:
- Labels:
-
Apache NiFi
10-23-2022
10:32 PM
I have NIFI single Node and for my learning purpose I'm trying to implement SiteToSiteBulletinReporyingTask (with assuming this will work for single node) I configured StandardRestrictedSSLContextService as below Note - same keystore and truststore used inside nifi-properties I configured SiteToSiteBulletinReportingTask as below Note - the destination URL is the same URL that nifi is currently running. Now I'm facing below issues - I'm not receiving any data to the bulletin port. I'm getting the below warning message in nifi-app.log (couldn't find resource to fix this) The issue is I'm getting the below warning and, to be honest, I don't have a clear idea about this warning, and also 2022-10-23 17:57:09,618 WARN [NiFi Site-to-Site Connection Pool Maintenance] o.apache.nifi.remote.client.PeerSelector Unable to refresh remote group peers due to: Certificate for <xxx.xxx.xx.xxx> doesn't match any of the subject alternative names: [localhost]
2022-10-23 17:57:09,618 WARN [NiFi Site-to-Site Connection Pool Maintenance] o.a.n.r.SiteToSiteBulletinReportingTask SiteToSiteBulletinReportingTask[id=105311a0-1473-1059-2fb8-ea483b8d9fa8] Unable to refresh remote group peers due to: Certificate for <xxx.xxx.xx.xxx> doesn't match any of the subject alternative names: [localhost]
... View more
Labels:
- Labels:
-
Apache NiFi
10-17-2022
08:37 AM
we have set up 4 node cluster and all other 3 nodes start without any exceptions. and in NIFI ui it shows our down node is in connecting state. but our down NIFI node is not running. its logs as below. 2022-10-17 19:04:51,860 INFO [main] org.apache.nifi.io.socket.SocketListener Now listening for connections from nodes on port 11443 2022-10-17 19:04:51,950 WARN [main] o.a.n.c.l.e.CuratorLeaderElectionManager Failed to close Leader Selector for Cluster Coordinator java.lang.IllegalStateException: Already closed or has not been started at com.google.common.base.Preconditions.checkState(Preconditions.java:173) at org.apache.curator.framework.recipes.leader.LeaderSelector.close(LeaderSelector.java:270) at org.apache.nifi.controller.leader.election.CuratorLeaderElectionManager.stop(CuratorLeaderElectionManager.java:165) at org.apache.nifi.controller.FlowController.shutdown(FlowController.java:1120) at org.apache.nifi.controller.StandardFlowService.stop(StandardFlowService.java:349) at org.apache.nifi.web.server.JettyServer.start(JettyServer.java:1016) at org.apache.nifi.NiFi.<init>(NiFi.java:158) at org.apache.nifi.NiFi.<init>(NiFi.java:72) at org.apache.nifi.NiFi.main(NiFi.java:297) java.lang.InterruptedException: sleep interrupted at java.lang.Thread.sleep(Native Method) at org.apache.nifi.controller.queue.clustered.client.async.nio.NioAsyncLoadBalanceClientTask.run(NioAsyncLoadBalanceClientTask.java:91) at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 2022-10-17 19:04:51,957 ERROR [Load-Balanced Client Thread-58] o.a.n.c.q.c.c.a.n.NioAsyncLoadBalanceClientTask Failed to communicate with peer while trying to load balance data across the cluster java.lang.InterruptedException: sleep interrupted 2022-10-17 19:04:52,495 INFO [main] o.apache.nifi.controller.FlowController Controller has been terminated successfully. 2022-10-17 19:04:52,501 INFO [main] org.apache.nifi.io.socket.SocketListener Socket Listener has been terminated successfully. 2022-10-17 19:04:52,502 WARN [Cluster Socket Listener] org.apache.nifi.io.socket.SocketListener Failed to communicate with Unknown Host due to java.net.SocketException: Socket closed java.net.SocketException: Socket closed at java.net.PlainSocketImpl.socketAccept(Native Method) at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:409) at java.net.ServerSocket.implAccept(ServerSocket.java:545) at sun.security.ssl.SSLServerSocketImpl.accept(SSLServerSocketImpl.java:348) at org.apache.nifi.io.socket.SocketListener$2.run(SocketListener.java:112) at java.lang.Thread.run(Thread.java:748) 2022-10-17 19:04:52,502 ERROR [main] org.apache.nifi.web.server.JettyServer Unable to load flow due to: java.util.zip.ZipException: invalid stored block lengths java.util.zip.ZipException: invalid stored block lengths at java.util.zip.InflaterInputStream.read(InflaterInputStream.java:164) at java.util.zip.GZIPInputStream.read(GZIPInputStream.java:117) at java.io.FilterInputStream.read(FilterInputStream.java:107) at org.apache.nifi.util.file.FileUtils.copy(FileUtils.java:303) at org.apache.nifi.persistence.StandardXMLFlowConfigurationDAO.load(StandardXMLFlowConfigurationDAO.java:110) at org.apache.nifi.controller.StandardFlowService.createDataFlow(StandardFlowService.java:608) at org.apache.nifi.controller.StandardFlowService.load(StandardFlowService.java:467) at org.apache.nifi.web.server.JettyServer.start(JettyServer.java:1009) at org.apache.nifi.NiFi.<init>(NiFi.java:158) at org.apache.nifi.NiFi.<init>(NiFi.java:72) at org.apache.nifi.NiFi.main(NiFi.java:297) 2022-10-17 19:04:52,503 WARN [main] org.apache.nifi.web.server.JettyServer Failed to start web server... shutting down. java.lang.Exception: Unable to load flow due to: java.util.zip.ZipException: invalid stored block lengths at org.apache.nifi.web.server.JettyServer.start(JettyServer.java:1019) at org.apache.nifi.NiFi.<init>(NiFi.java:158) at org.apache.nifi.NiFi.<init>(NiFi.java:72) at org.apache.nifi.NiFi.main(NiFi.java:297) 2022-10-17 19:04:52,504 INFO [Thread-1] org.apache.nifi.NiFi Initiating shutdown of Jetty web server... 2022-10-17 19:04:52,524 INFO [Thread-1] o.eclipse.jetty.server.AbstractConnector Stopped ServerConnector@4e3931{SSL,[ssl, http/1.1]}{bdadataf15.ncell.com.np:9443} 2022-10-17 19:04:52,525 INFO [Thread-1] org.eclipse.jetty.server.session node0 Stopped scavenging we didn't do any changes in properties and we deployed one custom processor since its threw exceptions while processing we revert it back also.
... View more
Labels:
- Labels:
-
Apache NiFi