Support Questions
Find answers, ask questions, and share your expertise

NIFI Stored Procedure from Python Execute Script but getting error

Solved Go to solution

NIFI Stored Procedure from Python Execute Script but getting error

Explorer

Hi, I have written a simple python script to execute the oracle stored proc and its working when i executing using the python IDEL, when i am integrating in NIFI its giving error .

ERROR : SyntaxError: no viable alternative at input '\n' in <script> at line number 1 at column number 15; rolling back session: {}

import sys 
import getpass 
import platform 
import cx_Oracle 
my_connection = cx_Oracle.connect('USer/Password@server.entsvcs.net:1526/database') 
my_cursor = my_connection.cursor() 
in_track_id='20585' 
in_username='A3843UP' 
in_course_no='00009934' 
my_cursor=my_connection.cursor() 
cur_var=my_cursor.var(cx_Oracle.CURSOR) 
cur_var1=my_cursor.var(cx_Oracle.NUMBER)
 cur_var2=my_cursor.var(cx_Oracle.STRING)
 my_cursor.callproc('sp_hp_course_status_chk', [in_track_id, in_username,in_course_no, cur_var,cur_var1,cur_var2]) 
print (cur_var.getvalue().fetchall()) 
print (int(cur_var1.getvalue())) 
print (cur_var2.getvalue())

#This code is working in python IDLE but getting error when adding in  NIFI

64430-configuration.jpg

64429-errornifi.jpg

1 ACCEPTED SOLUTION

Accepted Solutions

Re: NIFI Stored Procedure from Python Execute Script but getting error

Explorer

Solved using the Groovy Code. Hope it will help someone..

Stored PROC using the following input and output

in_track_id IN NUMBER,
in_username IN VARCHAR2,
in_course_no IN VARCHAR2,
out_details OUT SYS_REFCURSOR,
out_status_code OUT NUMBER,
out_status_desc OUT VARCHAR2)

import org.apache.commons.io.IOUtils
import org.apache.nifi.controller.ControllerService
import org.apache.nifi.processor.io.StreamCallback
import java.nio.charset.*
import groovy.sql.OutParameter
import groovy.sql.Sql
import oracle.jdbc.OracleTypes
import java.sql.ResultSet




//DB Setting Start
//Harcoded connection string
//def sql = Sql.newInstance('jdbc:oracle:thin:@//ecuxxx.xxx.xxx.net:1526/dbSchema', 'UserID', 'UserPass$', 'oracle.jdbc.OracleDriver')


//Get the conncation string from NIFI conncation pool
def lookup = context.controllerServiceLookup
def dbServiceName = ConncationPool.value
def dbcpServiceId = lookup.getControllerServiceIdentifiers(ControllerService).find {
    cs -> lookup.getControllerServiceName(cs) == dbServiceName
}


def conn = lookup.getControllerService(dbcpServiceId).getConnection();
sql = Sql.newInstance(conn);






////Get the session values from Nifi flow Start 
def flowFile = session.get()
if(!flowFile) return
in_track_id = flowFile.getAttribute('body_track_id')
in_username =  flowFile.getAttribute('body_username')
in_course_no = flowFile.getAttribute('body_course_number')
////Get the session values from Nifi flow END


// special OutParameter for cursor type
OutParameter CURSOR_PARAMETER = new OutParameter() {
    public int getType() {
        return OracleTypes.CURSOR;
    }
};
def data = []
// Stored proc having 3 input and 3 out put, one out put having CURSOR datatype
String sqlString ="""{call <StoredPrcoName>(?, ?, ?, ?, ?, ?)}""";
// rs contains the result set of cursor and data values start Shakeel
def  status_desc
def  status_code
def  status_data
//def parametersList = ['20585', 'A3843UP', '00009934', CURSOR_PARAMETER, Sql.NUMERIC ,Sql.VARCHAR]; // testing purpose
def parametersList = [in_track_id, in_username, in_course_no, CURSOR_PARAMETER, Sql.NUMERIC ,Sql.VARCHAR];


sql.call(sqlString, parametersList) {out_details, out_status_code,out_status_desc ->
    status_desc = out_status_desc
    status_code = out_status_code
    out_details.eachRow {
        data << it.toRowResult()
        status_data = data
    }
};
// rs contains the result set of cursor and data values end


//Set the session values start 
def attrMap = ['status_desc':status_desc, 'status_code':String.valueOf(status_code),'status_data':String.valueOf(status_data),'Conn':String.valueOf(conn)]
flowFile = session.putAllAttributes(flowFile, attrMap)
session.transfer(flowFile, REL_SUCCESS)


View solution in original post

3 REPLIES 3

Re: NIFI Stored Procedure from Python Execute Script but getting error

Super Guru

The scripting processors (ExecuteScript, e.g.) offer Jython, not Python, as a scripting engine. Jython can't use compiled (CPython) modules, or modules whose dependencies include compiled modules. I suspect cx_Oracle or one of the other modules is (or depends on) compiled modules.

Since your script uses "print" rather than the NiFi API (see my ExecuteScript Cookbook for examples of the latter), you could use ExecuteProcess or ExecuteStreamCommand to run your script using your native Python interpreter from the command line, the output will become the content of the flow file and should work for your use case.

Re: NIFI Stored Procedure from Python Execute Script but getting error

Explorer

Thanks @Matt Burgess I checked but issues is not with "print", error due to "cx_Oracle" object. Could you guide how i can run the my python code with ExecuteProcess or ExecuteStreamCommand process? do you have configuration details ?

Re: NIFI Stored Procedure from Python Execute Script but getting error

Explorer

Solved using the Groovy Code. Hope it will help someone..

Stored PROC using the following input and output

in_track_id IN NUMBER,
in_username IN VARCHAR2,
in_course_no IN VARCHAR2,
out_details OUT SYS_REFCURSOR,
out_status_code OUT NUMBER,
out_status_desc OUT VARCHAR2)

import org.apache.commons.io.IOUtils
import org.apache.nifi.controller.ControllerService
import org.apache.nifi.processor.io.StreamCallback
import java.nio.charset.*
import groovy.sql.OutParameter
import groovy.sql.Sql
import oracle.jdbc.OracleTypes
import java.sql.ResultSet




//DB Setting Start
//Harcoded connection string
//def sql = Sql.newInstance('jdbc:oracle:thin:@//ecuxxx.xxx.xxx.net:1526/dbSchema', 'UserID', 'UserPass$', 'oracle.jdbc.OracleDriver')


//Get the conncation string from NIFI conncation pool
def lookup = context.controllerServiceLookup
def dbServiceName = ConncationPool.value
def dbcpServiceId = lookup.getControllerServiceIdentifiers(ControllerService).find {
    cs -> lookup.getControllerServiceName(cs) == dbServiceName
}


def conn = lookup.getControllerService(dbcpServiceId).getConnection();
sql = Sql.newInstance(conn);






////Get the session values from Nifi flow Start 
def flowFile = session.get()
if(!flowFile) return
in_track_id = flowFile.getAttribute('body_track_id')
in_username =  flowFile.getAttribute('body_username')
in_course_no = flowFile.getAttribute('body_course_number')
////Get the session values from Nifi flow END


// special OutParameter for cursor type
OutParameter CURSOR_PARAMETER = new OutParameter() {
    public int getType() {
        return OracleTypes.CURSOR;
    }
};
def data = []
// Stored proc having 3 input and 3 out put, one out put having CURSOR datatype
String sqlString ="""{call <StoredPrcoName>(?, ?, ?, ?, ?, ?)}""";
// rs contains the result set of cursor and data values start Shakeel
def  status_desc
def  status_code
def  status_data
//def parametersList = ['20585', 'A3843UP', '00009934', CURSOR_PARAMETER, Sql.NUMERIC ,Sql.VARCHAR]; // testing purpose
def parametersList = [in_track_id, in_username, in_course_no, CURSOR_PARAMETER, Sql.NUMERIC ,Sql.VARCHAR];


sql.call(sqlString, parametersList) {out_details, out_status_code,out_status_desc ->
    status_desc = out_status_desc
    status_code = out_status_code
    out_details.eachRow {
        data << it.toRowResult()
        status_data = data
    }
};
// rs contains the result set of cursor and data values end


//Set the session values start 
def attrMap = ['status_desc':status_desc, 'status_code':String.valueOf(status_code),'status_data':String.valueOf(status_data),'Conn':String.valueOf(conn)]
flowFile = session.putAllAttributes(flowFile, attrMap)
session.transfer(flowFile, REL_SUCCESS)


View solution in original post