Support Questions

Find answers, ask questions, and share your expertise

How to pass flowfile to a Python curl command ?

avatar
Contributor

I have a simple Nifi flow where it picks up the files from landing directory (Get file processor) and place them on HDFS. To place files on HDFS I am using webhdfs over knox. As Nifi does not have webhdfs processor, I am using python executeScript processor to run the Curl command for webhdfs post. But I am getting an error while running the below code.

Here is my python script code

import sys import os import traceback from org.apache.commons.io import IOUtils from java.nio.charset import StandardCharsets from org.apache.nifi.processor.io import InputStreamCallback from org.python.core.util import StringUtil

class readFirstLineCallback(InputStreamCallback): def __init__(self): pass

def process(self, inputStream): try: input_text = IOUtils.toString(inputStream, StandardCharsets.UTF_8) os.system('touch /u/itlaked/process_outputs/' + input_text) print 'AD_TEST' os.system('echo '+input_text+' | curl -iku user:password -L --data-binary @- -x PUT https://gatewayhost:8443/gateway/default/webhdfs/v1/user/user/ad_test1020.txt?op=CREATE') except: traceback.print_exc(file=sys.stdout) raise

flowFile = session.get() if flowFile != None: readCallback = readFirstLineCallback() session.read(flowFile, readCallback) session.remove(flowFile) session.commit() ERROR Message: 2016-10-20 09:36:15,106 ERROR [NiFi logging handler] org.apache.nifi.StdErr /bin/sh: -c: line 1: syntax error near unexpected token `|' 3262 2016-10-20 09:36:15,106 ERROR [NiFi logging handler] org.apache.nifi.StdErr /bin/sh: -c: line 1: ` | curl -iku user:password -L --data-binary @- -x PUT https://gatewayhost:8443/gateway/default/webhdfs/v1/user/user/ad_test1020.txt?op=CREATE'

Please let me know if anybody has any thoughts ...I am very new to the Nifi and I just chose ExecuteScript because this curl command runs fine on the command line. Not sure if there is any other ways to achieve this

1 ACCEPTED SOLUTION

avatar
@Aruna dadi

I have an example NiFi flow that may be helpful: https://github.com/zaratsian/nifi_python_executescript

It shows how to execute a python script within a NiFi flow using the ExecuteProcess processor that @Matt Burgess mentioned. There are ways to process both the flowfile and the attributes as part of your python program, but you need to add a java command to initialize the flowfile object within python.

Also as @Matt Burgess mentioned, you can use python to make an HTTP request directly. I prefer to use the "requests" package, such as:

import requests

r = requests.get("http://yourURL.com")

You can also you a POST , PUT, or DELETE, HEAD, OPTIONS...

View solution in original post

7 REPLIES 7

avatar
Master Guru

If your Python script is just calling out to the operating system, consider using ExecuteStreamCommand or ExecuteProcess instead. Otherwise there are (hopefully!) some pure Python module(s) for doing HTTP requests, rather than using curl via os.system().

avatar
Contributor

@Matt Burgess

I did try those two processors. But it didn't work for me. I need to run the curl command on incoming filefile. Executeprocess - I don't see an option to specify the incoming flowfile. ExecuteStreamCommand - I did try running this by specifying the curl command and specifying static target filename for now and @- to get the stdin to the curl.

curl -iku user:password -L --data-binary @- -x PUT https://gatewayhost:8443/gateway/default/webhdfs/v1/user/user/ad_test1020.txt?op=CREATE

But it doesn't work for me with the below error. seems it doesn't understand the @-

org.apache.nifi.processor.exception.ProcessException: java.io.IOException: Cannot run program " curl -iku user:password -L --data-binary @- -x PUT https://gatewayhost:8443/gateway/default/webhdfs/v1/user/user/ad_test1020.txt?op=CREATE": error=2, No such file or directory

avatar
@Aruna dadi

I have an example NiFi flow that may be helpful: https://github.com/zaratsian/nifi_python_executescript

It shows how to execute a python script within a NiFi flow using the ExecuteProcess processor that @Matt Burgess mentioned. There are ways to process both the flowfile and the attributes as part of your python program, but you need to add a java command to initialize the flowfile object within python.

Also as @Matt Burgess mentioned, you can use python to make an HTTP request directly. I prefer to use the "requests" package, such as:

import requests

r = requests.get("http://yourURL.com")

You can also you a POST , PUT, or DELETE, HEAD, OPTIONS...

avatar

Here's example python code that you can run from within NiFi's ExecuteProcess processor:

import json
import java.io
from org.apache.commons.io import IOUtils

# Get flowFile Session
flowFile = session.get()

# Open data.json file and parse json values
filename = flowFile.getAttribute('filename')
filepath = flowFile.getAttribute('absolute.path')

data = json.loads(open(filepath + filename, "r").read())
data_value1 = data["record"]["value1"]

# Calculate arbitrary new value within python
new_value   = data_value1 * 100

# Add/Put values to flowFile as new attributes
if (flowFile != None):
    flowFile = session.putAttribute(flowFile, "from_python_string", "python string example")
    flowFile = session.putAttribute(flowFile, "from_python_number", str(new_value))

session.transfer(flowFile, REL_SUCCESS)
session.commit()

avatar
Contributor

Hi Dan, Thank you for your sample code. I am able to get this working by using Flowfile attributes and by specifying local file system file. I ended up using os.system call as I am facing issues with python libraries(requests,urllib,..etc) for webhdfs op command options.

Issues with this is approach ,

I have to keep my source files exist in my Getfile processor. If I keep the source files exist then the subsequent iterations it complains the file already exists.

If I don't keep the source file , once the Getfile processor completes it removes the source file and it will not be available for the execute script processor to refer for the curl command.

Here is my new code..

import sys 
import os
import java.io
from org.apache.commons.io import IOUtils

flowFile = session.get()
filename = flowFile.getAttribute('filename')
filepath = flowFile.getAttribute('absolute.path')
file= filepath + filename
if (flowFile != None):
	print 'filename is ::'+file
        os.system('curl
-ku user:password -L -T '+file+' -X PUT "https://gatewayhost:8443/gateway/default/webhdfs/v1/user/'+filename+'?op=CREATE"')
session.remove(flowFile)
session.commit()

avatar

Hi @Dan Zaratsian, out of curiosity how do you debug this stuff? I tried starting a jython interpret and loading some of the nifi bits into the classpath, but it's still not really convienent. You surely aren't just debugging in that little box that is the nifi form right?

avatar
Rising Star

I would suggest using build in Jython http functionality if possible:

Here's an example POST request.

If you still need to shell out, os.system should be replaced with subprocess.Popen where possible.

os.system is pretty much the same as subprocess.Popen with shell = True. This is bad because it opens the door for shell injection. If there are unexpected characters in your input_text variable, bad things could happen either on accident or via crafted input by a malicious user.

It is much safer to use multiple Popen invocations connected together with Python's piping functionality.