Member since
11-07-2016
70
Posts
39
Kudos Received
16
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
1961 | 02-22-2018 09:20 PM | |
4871 | 01-30-2018 03:41 PM | |
618 | 10-25-2017 06:09 PM | |
10299 | 08-15-2017 10:54 PM | |
2059 | 06-26-2017 05:05 PM |
11-13-2019
06:48 AM
After we load over 100 million notes in HBase, I will be using Nifi to listening to a live HL7 feed to keep the data current. Some of these HL7 message are delete message and the rows need to be removed from HBase.
... View more
11-11-2019
12:58 PM
Hi, Even though we modified the stripe size to custom value - "orc.stripe.size"="248435456" there are many files which are still with 5MB , 9 MB. Any reason for this behavior?
... View more
09-26-2018
02:02 PM
sorry, missed that you need "greater than 10", so change regex to: (?s)([^\n]*(\n)){9,}[^\n]* Note, it is "9" for ten records, because last line won't have \n (EOL)
... View more
04-11-2018
05:46 AM
Thanks for the solution, but since i am not familiar with rest api, solution by Matt looks easy to me. Will surely try yours one too.
... View more
02-22-2018
11:58 PM
@Anishkumar Valsalam For server level stats, you can use Grafana, which is part of HDF stack, here what we have and how we track: List of system level stats: List of NIFI-related stats:
... View more
04-19-2018
01:38 PM
I agree it would be nice to allow incoming upstream connections to ListSFTP. However in answer to the previous comment you can also set variables in a Process Group by right clicking on the canvas and selecting the variable option. It is dynamic and much better for my needs than setting in 'nifi.properties'
... View more
12-12-2017
05:26 PM
Can you ping that IP from the NiFi node? Is port 10000 exposed to outside users? Perhaps there is a firewall or that port is exposed as a different port to the outside world.
... View more
08-25-2018
09:23 AM
I am facing connectivity issue from Apache Nifi to Amazon SQS & S3: I have given the details in the below link: https://community.hortonworks.com/questions/214811/connection-issue-in-integrating-apache-nifi-with-a.html Thanks in Advance. Please suggest.
... View more
10-24-2017
05:41 PM
@xav webmaster Straight answer: flowFile = session.get()
if (flowFile != None):
flowFile = session.putAttribute(flowFile, 'myAttr', 'myValue')
# implicit return at the end More info on executeScript processor: https://community.hortonworks.com/articles/75032/executescript-cookbook-part-1.html In your particular case, in callback function where you read from input stream, you can scan from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
class PyStreamCallback(StreamCallback):
def __init__(self):
self.topic_name=''
pass
def get_topic_name(self):
return self.topic_name
def process(self, inputStream, outputStream):
Log = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
Log2 = str(Log).split(',')
Brand = Log2[0]
Color = Log2[5]
Model = Log2[1]
if Brand == 'ford' and Color == 'gray':
NewLog = str(Log2)
self.topic_name = 'ford'
outputStream.write(bytearray((NewLog).encode('utf-8')))
if Brand == 'audi' and Color == 'black':
NewLog = str(Log2)
self.topic_name = 'audi'
outputStream.write(bytearray((NewLog).encode('utf-8')))
if Brand == 'bmw' and Color == 'white':
NewLog = str(Log2)
self.topic_name = 'bmw'
outputStream.write(bytearray((NewLog).encode('utf-8')))
# add exception handling if needed for empty flowfile content, etc
if(flowFile != None):
caller = PyStreamCallback()
flowFile = session.write(flowFile, caller)
topic_name = caller.get_topic_name()
flowFile = session.putAttribute(flowFile, 'kafka_topic', topic_name)
Hope that will help.
... View more
06-26-2017
05:05 PM
1 Kudo
Alright, so I ended up with simple script and one processor in NiFi. Modules for reload should be provided in "modules_list" property of the processor (comma delimited). Script body: import sys, json
def class_reloader(modules_to_reload):
reload_msg = ""
all_module_names = sys.modules.keys()
all_module_names.sort()
for mn in all_module_names:
m = sys.modules[mn]
# -- find full match of names with given modules
if mn in modules_to_reload:
try:
reload(m)
reload_msg = reload_msg + mn + "|"
except:
return 1, reload_msg
continue
# -- find if mn is submodule of any given one
for mtr in modules_to_reload:
if mn.startswith(mtr+'.'):
try:
reload(m)
reload_msg = reload_msg + mn + "|"
break
except:
return 1, reload_msg
return 0, reload_msg
#-------------------------------#
flowFile = session.create()
if(flowFile != None):
modules_prop = modules_list.getValue()
ml = []
if modules_prop:
ml = modules_prop.split(',')
cr = class_reloader(ml)
flowFile = session.putAttribute(flowFile, 'class_reload_result', str(cr[0]))
flowFile = session.putAttribute(flowFile, 'class_reload_found', str(cr[1]))
session.transfer(flowFile, REL_SUCCESS)
# implicit return at the end
The code can be improved to navigate to FAILURE relationship in case of non-zero response code from the method. It's not perfect solution, but will work in most cases.
If you have better one - please share! 🙂
... View more
10-17-2018
06:30 PM
to me.. this is not the simple approach.. its very limiting and its no simpler than using the jolttransformation suggested above.
... View more
09-22-2017
04:22 PM
it can connect to brokers, the problem is SASL protocols aren't supported in kafka producer.
... View more
04-28-2019
02:56 PM
1 Kudo
We can use rank approach which is faster than max , max scans the table twice: Here , partition column is load_date: select ld_dt.txnno , ld_dt.txndate , ld_dt.custno , ld_dt.amount , ld_dt.productno , ld_dt.spendby , ld_dt.load_date from (select *,dense_rank() over (order by load_date desc) dt_rnk from datastore_s2.transactions)ld_dt where ld_dt.dt_rnk=1
... View more
12-20-2016
02:43 AM
Do you have ranger audit enabled? if so please provide what the log shows when nifi tries to hit /tmp
... View more