Member since
11-12-2016
71
Posts
11
Kudos Received
0
Solutions
10-28-2017
07:01 PM
One silly question plz.. where can I find the below import source files ? which folder if I need to check the content or add more imports to them ? import org.apache.nifi.controller.ControllerService
import groovy.sql.Sql
... View more
10-27-2017
06:26 PM
Thanks @Shu and @Matt for your answers, @Shu, the controller service Id is different because I recreated it couple of times before posting the questions and comments 🙂 What I did is that I moved this part ( flowFile = session.get()
if(!flowFile) return ) before defining the connection, code is working since yesterday but Im not sure if this was the issue ! @Matt, for your question... I was not able to find any other logs, I got this behaviour only on Nifi 1.4, version 1.0 is working fine. import org.apache.nifi.controller.ControllerService
import groovy.sql.Sql
flowFile = session.get()
if(!flowFile) return
def lookup = context.controllerServiceLookup
def dbServiceName = databaseConnectionPoolName.value
def dbcpServiceId = lookup.getControllerServiceIdentifiers(ControllerService).find {
cs -> lookup.getControllerServiceName(cs) == dbServiceName
}
def conn = lookup.getControllerService(dbcpServiceId).getConnection()
def sql = new Sql(conn)
def sqlInsert = new Sql(conn)
... View more
10-26-2017
01:32 PM
1 Kudo
Thanks @Shu I tried your suggestion as below . However, I still get my connection disconnected... and I can't see in the log that "select CURRENT_TIMESTAMP()" is being executed ? is this normal ? Also I dunno why the ConnectionPool is not showing that ExecuteScript is refering to it ? see below Thanks a lot...
... View more
10-26-2017
07:21 AM
Dear Forum I have the below Nifi flow to fetch data from mysql table and then load it to another mysql table (data enrichment) I used executescript processor to connect to DB and manipulate the data as below (image 1.png) this is my code import org.apache.nifi.controller.ControllerService
import groovy.sql.Sql
def lookup = context.controllerServiceLookup
def dbServiceName = databaseConnectionPoolName.value
def dbcpServiceId = lookup.getControllerServiceIdentifiers(ControllerService).find {
cs -> lookup.getControllerServiceName(cs) == dbServiceName
}
def conn = lookup.getControllerService(dbcpServiceId).getConnection()
def sql = new Sql(conn)
def sqlInsert = new Sql(conn)
try {
flowFile = session.get()
if(!flowFile) return
//log.error("ExecuteScript: Line records Exception: " + dbServiceName )
flowFile = session.write(flowFile, {inputStream, outputStream ->
inputStream.eachLine { line ->
def String[] lineCellArr = ["", "", "", "", "", "", ""]
def sqlSelectArr = ""
if (line != null){
//log.error("ExecuteScript: Line records Exception: " + dbServiceName )
lineArr = line.split("\,", -1)
for(int i = 0; i < lineArr.length ; i++) {
lineCellArr[i] = lineArr[i];
}
sql.rows("SELECT * FROM Polygon.cells where cellId='" + lineCellArr[6] +"'").eachWithIndex { row, idx ->
sqlSelectArr = row.values().join(';').split("\;", -1)
outputStream.write((line.toString() + row.values().join(';') + ";" + sqlSelectArr[1]).getBytes())
def insertResult1 = sqlInsert.executeUpdate("INSERT IGNORE INTO Polygon.recharges (rechargeNumber, rechargeTime, serviceClass, rechargeValue, rechargeType, balanceAfter, rechargeLocation, towerName) VALUES ('" + lineCellArr[0] + "','" + lineCellArr[1] + "','"+ lineCellArr[2] + "','"+ lineCellArr[3] + "','"+ lineCellArr[4] + "','"+ lineCellArr[5] + "','"+ lineCellArr[6] + "','"+ sqlSelectArr[1] + "')")
def insertResult2 = sqlInsert.executeUpdate("INSERT INTO Polygon.rechargesSummary (rechargeDate, serviceClass, rechargeValue, rechargeCount, rechargeType, rechargeLocation, towerName, latitude , longitude, region, subRegion) VALUES ('" + lineCellArr[1] + "','"+ lineCellArr[2] + "','"+ lineCellArr[3] + "','1','"+ lineCellArr[4] + "','"+ lineCellArr[6] + "','" + sqlSelectArr[1] + "','"+ sqlSelectArr[2] + "','"+ sqlSelectArr[3] + "','"+ sqlSelectArr[4] + "','"+ sqlSelectArr[5] + "') on DUPLICATE KEY UPDATE rechargeValue = rechargeValue + '"+ lineCellArr[3] +"', rechargeCount = rechargeCount + 1")
//write results
outputStream.write((";" + insertResult1.toString() +insertResult2.toString() + " \n").getBytes())
}
attrMap = ['Type': "Vouchers", 'Company': "Zain"]
}
}
} as StreamCallback)
flowFile = session.putAllAttributes(flowFile, attrMap)
session.transfer(flowFile, REL_SUCCESS)
} catch(e) {
log.error('Scripting error', e)
session.transfer(flowFile, REL_FAILURE)
}
conn.close() Everything works fine, but after couple of hours I start getting the below error Caused by: javax.script.ScriptException: javax.script.ScriptException: java.lang.IllegalStateException: Cannot invoke method public abstract java.sql.Connection org.apache.nifi.dbcp.DBCPService.getConnection() throws org.apache.nifi.processor.exception.ProcessException on Controller Service with identifier 7e50134b-1000-115f-d673-ccc9fbd344fe because the Controller Service is disabled
at org.codehaus.groovy.jsr223.GroovyScriptEngineImpl.eval(GroovyScriptEngineImpl.java:159)
at javax.script.AbstractScriptEngine.eval(AbstractScriptEngine.java:264)
at org.apache.nifi.script.impl.GroovyScriptEngineConfigurator.eval(GroovyScriptEngineConfigurator.java:54)
at org.apache.nifi.processors.script.ExecuteScript.onTrigger(ExecuteScript.java:220)
... 11 common frames omitted
Caused by: javax.script.ScriptException: java.lang.IllegalStateException: Cannot invoke method public abstract java.sql.Connection org.apache.nifi.dbcp.DBCPService.getConnection() throws org.apache.nifi.processor.exception.ProcessException on Controller Service with identifier 7e50134b-1000-115f-d673-ccc9fbd344fe because the Controller Service is disabled
at org.codehaus.groovy.jsr223.GroovyScriptEngineImpl.eval(GroovyScriptEngineImpl.java:355)
at org.codehaus.groovy.jsr223.GroovyScriptEngineImpl.eval(GroovyScriptEngineImpl.java:153)
... 14 common frames omitted
Caused by: java.lang.IllegalStateException: Cannot invoke method public abstract java.sql.Connection org.apache.nifi.dbcp.DBCPService.getConnection() throws org.apache.nifi.processor.exception.ProcessException on Controller Service with identifier 7e50134b-1000-115f-d673-ccc9fbd344fe because the Controller Service is disabled
at org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler.invoke(StandardControllerServiceInvocationHandler.java:84)
at com.sun.proxy.$Proxy77.getConnection(Unknown Source)
at org.apache.nifi.dbcp.DBCPService$getConnection.call(Unknown Source)
at Script4.run(Script4.groovy:19)
at org.codehaus.groovy.jsr223.GroovyScriptEngineImpl.eval(GroovyScriptEngineImpl.java:352)
... 15 common frames omitted
2017-10-26 09:55:16,873 ERROR [Timer-Driven Process Thread-12] o.a.nifi.processors.script.ExecuteScript ExecuteScript[id=7e50130c-1000-115f-dab6-0ef65aa69de2] Failed to process session due to org.apache.nifi.processor.exception.ProcessException: javax.script.ScriptException: javax.script.ScriptException: java.lang.IllegalStateException: Cannot invoke method public abstract java.sql.Connection org.apache.nifi.dbcp.DBCPService.getConnection() throws org.apache.nifi.processor.exception.ProcessException on Controller Service with identifier 7e50134b-1000-115f-d673-ccc9fbd344fe because the Controller Service is disabled: {}
org.apache.nifi.processor.exception.ProcessException: javax.script.ScriptException: javax.script.ScriptException: java.lang.IllegalStateException: Cannot invoke method public abstract java.sql.Connection org.apache.nifi.dbcp.DBCPService.getConnection() throws org.apache.nifi.processor.exception.ProcessException on Controller Service with identifier 7e50134b-1000-115f-d673-ccc9fbd344fe because the Controller Service is disabled
at org.apache.nifi.processors.script.ExecuteScript.onTrigger(ExecuteScript.java:230)
at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1119)
at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:147)
at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:128)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748) When I restart the connectionpool, everything works fine for couple of hours till I get this error again, could you please advise Thanks a lot
... View more
Labels:
- Labels:
-
Apache NiFi
06-19-2017
03:26 PM
Hello Matt, You are right, I added "sesson.commit()" and now files are generated. Thanks 🙂 what is the method that is triggered/called when I stop the processor ? To stop the SMPP process when I stop the processor ! Thanks a lot
... View more
06-19-2017
03:25 PM
Hello Matt, You are right, I added "sesson.commit()" and now files are generated. Thanks 🙂 what is the method that is triggered/called when I stop the processor ? To stop the SMPP process when I stop the processor ! Thanks a lot
... View more
06-19-2017
12:21 AM
Hello forum,
I'm trying to create custom Nifi processor to run Java code inside (processor to act as server to generate file once a request is received). All is working fine so far except that processor is not generating any file to the next processor through MY_RELATIONSHIP
Myprocessor code
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package hwx.processors.demo;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import jsmpp.SMPPServerSimulator;
import org.apache.log4j.BasicConfigurator;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.scheduling.SchedulingStrategy;
@Tags({"example"})
@CapabilityDescription("Provide a description")
@SeeAlso({})
@ReadsAttributes({
@ReadsAttribute(attribute = "", description = "")})
@WritesAttributes({
@WritesAttribute(attribute = "", description = "")})
public class MyProcessor extends AbstractProcessor {
private SchedulingStrategy schedulingStrategy; // guarded by read/write lock
public static final PropertyDescriptor MY_PROPERTY = new PropertyDescriptor.Builder().name("MY_PROPERTY")
.displayName("SMPP Server port")
.description("SMPP Server port")
.required(true)
.addValidator(StandardValidators.INTEGER_VALIDATOR)
.build();
public static final Relationship MY_RELATIONSHIP = new Relationship.Builder()
.name("SUCCESS_RELATIONSHIP")
.description("Success relationship")
.build();
private List<PropertyDescriptor> descriptors;
private Set<Relationship> relationships;
@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
descriptors.add(MY_PROPERTY);
this.descriptors = Collections.unmodifiableList(descriptors);
final Set<Relationship> relationships = new HashSet<Relationship>();
relationships.add(MY_RELATIONSHIP);
this.relationships = Collections.unmodifiableSet(relationships);
System.out.println("This is a custom processor that will receive flow file");
}
@Override
public Set<Relationship> getRelationships() {
return this.relationships;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
}
@OnScheduled
public void onScheduled(final ProcessContext context) {
}
@OnStopped
public void onStopped(final ProcessContext context) {
}
public SchedulingStrategy getSchedulingStrategy() {
return schedulingStrategy.TIMER_DRIVEN;
}
@Override
public void onTrigger(final ProcessContext context, ProcessSession session) throws ProcessException {
final int port = context.getProperty(MY_PROPERTY).evaluateAttributeExpressions().asInteger();
BasicConfigurator.configure();
SMPPServerSimulator smppServerSim = new SMPPServerSimulator(8058, session, MY_RELATIONSHIP);
smppServerSim.run();
}
}
SMPPServerSimulator.java
public MessageId onAcceptSubmitSm(SubmitSm submitSm,
SMPPServerSession source) throws ProcessRequestException {
MessageId messageId = messageIDGenerator.newMessageId();
logger.debug("\nReceiving submit_sm {}, and return message id {}\n", new String(submitSm.getShortMessage()), messageId.getValue());
if (SMSCDeliveryReceipt.SUCCESS.containedIn(submitSm.getRegisteredDelivery()) || SMSCDeliveryReceipt.SUCCESS_FAILURE.containedIn(submitSm.getRegisteredDelivery())) {
execServiceDelReciept.execute(new DeliveryReceiptTask(source, submitSm, messageId));
}
flowfile = session.create();
flowfile = session.putAttribute(flowfile, "match", new String(submitSm.getShortMessage()));
System.out.println(session + " flowfile: " + flowfile + " Relation: " + SUCCESS_RELATIONSHIP);
session.transfer(flowfile, MY_RELATIONSHIP);
System.out.println("Message Recieved: " + new String(submitSm.getShortMessage()));
return messageId;
}
Below Logs below for the System.out.println command. flowfile is created but not sent
p.p1 {margin: 0.0px 0.0px 0.0px 0.0px; font: 11.0px Menlo; color: #000000; background-color: #ffffff}
span.s1 {font-variant-ligatures: no-common-ligatures}2017-06-18 17:31:46,449 INFO [NiFi logging handler] org.apache.nifi.StdOut StandardProcessSession[id=0] flowfile: StandardFlowFileRecord[uuid=ad3128b6-0032-491e-a5ba-ee73f99b8f0b,claim=,offset=0,name=50727827275499,size=0] Relation: SUCCESS_RELATIONSHIP
can you help please..
... View more
Labels:
- Labels:
-
Apache NiFi
04-16-2017
05:03 PM
Hello Am trying to extract fields from flowfile and put it as attributes so I send it to invokeHTTP processor as GET request, flowfile content +1234323432,200,Dynamic,Text ------- Attributes needed field1: +1234323432 field2: 200 ------ something like below
... View more
Labels:
- Labels:
-
Apache NiFi