Created 10-20-2016 02:50 PM
I have a simple Nifi flow where it picks up the files from landing directory (Get file processor) and place them on HDFS. To place files on HDFS I am using webhdfs over knox. As Nifi does not have webhdfs processor, I am using python executeScript processor to run the Curl command for webhdfs post. But I am getting an error while running the below code.
Here is my python script code
import sys import os import traceback from org.apache.commons.io import IOUtils from java.nio.charset import StandardCharsets from org.apache.nifi.processor.io import InputStreamCallback from org.python.core.util import StringUtil
class readFirstLineCallback(InputStreamCallback): def __init__(self): pass
def process(self, inputStream): try: input_text = IOUtils.toString(inputStream, StandardCharsets.UTF_8) os.system('touch /u/itlaked/process_outputs/' + input_text) print 'AD_TEST' os.system('echo '+input_text+' | curl -iku user:password -L --data-binary @- -x PUT https://gatewayhost:8443/gateway/default/webhdfs/v1/user/user/ad_test1020.txt?op=CREATE') except: traceback.print_exc(file=sys.stdout) raise
flowFile = session.get() if flowFile != None: readCallback = readFirstLineCallback() session.read(flowFile, readCallback) session.remove(flowFile) session.commit() ERROR Message: 2016-10-20 09:36:15,106 ERROR [NiFi logging handler] org.apache.nifi.StdErr /bin/sh: -c: line 1: syntax error near unexpected token `|' 3262 2016-10-20 09:36:15,106 ERROR [NiFi logging handler] org.apache.nifi.StdErr /bin/sh: -c: line 1: ` | curl -iku user:password -L --data-binary @- -x PUT https://gatewayhost:8443/gateway/default/webhdfs/v1/user/user/ad_test1020.txt?op=CREATE'
Please let me know if anybody has any thoughts ...I am very new to the Nifi and I just chose ExecuteScript because this curl command runs fine on the command line. Not sure if there is any other ways to achieve this
Created 10-20-2016 03:08 PM
I have an example NiFi flow that may be helpful: https://github.com/zaratsian/nifi_python_executescript
It shows how to execute a python script within a NiFi flow using the ExecuteProcess processor that @Matt Burgess mentioned. There are ways to process both the flowfile and the attributes as part of your python program, but you need to add a java command to initialize the flowfile object within python.
Also as @Matt Burgess mentioned, you can use python to make an HTTP request directly. I prefer to use the "requests" package, such as:
import requests
r = requests.get("http://yourURL.com")
You can also you a POST , PUT, or DELETE, HEAD, OPTIONS...
Created 10-20-2016 02:53 PM
If your Python script is just calling out to the operating system, consider using ExecuteStreamCommand or ExecuteProcess instead. Otherwise there are (hopefully!) some pure Python module(s) for doing HTTP requests, rather than using curl via os.system().
Created 10-20-2016 03:32 PM
@Matt Burgess
I did try those two processors. But it didn't work for me. I need to run the curl command on incoming filefile. Executeprocess - I don't see an option to specify the incoming flowfile. ExecuteStreamCommand - I did try running this by specifying the curl command and specifying static target filename for now and @- to get the stdin to the curl.
curl -iku user:password -L --data-binary @- -x PUT https://gatewayhost:8443/gateway/default/webhdfs/v1/user/user/ad_test1020.txt?op=CREATE
But it doesn't work for me with the below error. seems it doesn't understand the @-
org.apache.nifi.processor.exception.ProcessException: java.io.IOException: Cannot run program " curl -iku user:password -L --data-binary @- -x PUT https://gatewayhost:8443/gateway/default/webhdfs/v1/user/user/ad_test1020.txt?op=CREATE": error=2, No such file or directory
Created 10-20-2016 03:08 PM
I have an example NiFi flow that may be helpful: https://github.com/zaratsian/nifi_python_executescript
It shows how to execute a python script within a NiFi flow using the ExecuteProcess processor that @Matt Burgess mentioned. There are ways to process both the flowfile and the attributes as part of your python program, but you need to add a java command to initialize the flowfile object within python.
Also as @Matt Burgess mentioned, you can use python to make an HTTP request directly. I prefer to use the "requests" package, such as:
import requests
r = requests.get("http://yourURL.com")
You can also you a POST , PUT, or DELETE, HEAD, OPTIONS...
Created 10-20-2016 03:21 PM
Here's example python code that you can run from within NiFi's ExecuteProcess processor:
import json import java.io from org.apache.commons.io import IOUtils # Get flowFile Session flowFile = session.get() # Open data.json file and parse json values filename = flowFile.getAttribute('filename') filepath = flowFile.getAttribute('absolute.path') data = json.loads(open(filepath + filename, "r").read()) data_value1 = data["record"]["value1"] # Calculate arbitrary new value within python new_value = data_value1 * 100 # Add/Put values to flowFile as new attributes if (flowFile != None): flowFile = session.putAttribute(flowFile, "from_python_string", "python string example") flowFile = session.putAttribute(flowFile, "from_python_number", str(new_value)) session.transfer(flowFile, REL_SUCCESS) session.commit()
Created 10-21-2016 03:44 PM
Hi Dan, Thank you for your sample code. I am able to get this working by using Flowfile attributes and by specifying local file system file. I ended up using os.system call as I am facing issues with python libraries(requests,urllib,..etc) for webhdfs op command options.
Issues with this is approach ,
I have to keep my source files exist in my Getfile processor. If I keep the source files exist then the subsequent iterations it complains the file already exists.
If I don't keep the source file , once the Getfile processor completes it removes the source file and it will not be available for the execute script processor to refer for the curl command.
Here is my new code..
import sys import os import java.io from org.apache.commons.io import IOUtils flowFile = session.get() filename = flowFile.getAttribute('filename') filepath = flowFile.getAttribute('absolute.path') file= filepath + filename if (flowFile != None): print 'filename is ::'+file os.system('curl -ku user:password -L -T '+file+' -X PUT "https://gatewayhost:8443/gateway/default/webhdfs/v1/user/'+filename+'?op=CREATE"') session.remove(flowFile) session.commit()
Created 10-22-2016 08:02 PM
Hi @Dan Zaratsian, out of curiosity how do you debug this stuff? I tried starting a jython interpret and loading some of the nifi bits into the classpath, but it's still not really convienent. You surely aren't just debugging in that little box that is the nifi form right?
Created 10-20-2016 03:10 PM
I would suggest using build in Jython http functionality if possible:
Here's an example POST request.
If you still need to shell out, os.system should be replaced with subprocess.Popen where possible.
os.system is pretty much the same as subprocess.Popen with shell = True. This is bad because it opens the door for shell injection. If there are unexpected characters in your input_text variable, bad things could happen either on accident or via crafted input by a malicious user.
It is much safer to use multiple Popen invocations connected together with Python's piping functionality.