Member since
11-07-2016
70
Posts
40
Kudos Received
16
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
3893 | 02-22-2018 09:20 PM | |
6850 | 01-30-2018 03:41 PM | |
1202 | 10-25-2017 06:09 PM | |
10910 | 08-15-2017 10:54 PM | |
3322 | 06-26-2017 05:05 PM |
01-16-2024
03:22 AM
Hi @eberezitsky is there a way to extend ListSFTP with the same functionality but that allows inward connection and allows expression language for configuration
... View more
09-15-2023
02:06 AM
I use CDH 6.3.2 。 hive 2.1 hadoop 3.0 hive on spark 。yarn cluster 。 hive.merge.sparkfiles=true ; hive.merge.orcfile.stripe.level=true ; This configuration makes the 1099 reduce file result merge into one file when the result is small 。Then the merged file has about 1099 stripes in one file 。 Then the result is so slow when it is read. I tried hive.merge.orcfile.stripe.level=false ; The result is desirable 。One small file with one stripe and read fast 。 Can anyone tell the difference between true and false ? Why " hive.merge.orcfile.stripe.level=true " is the default one ?
... View more
11-13-2019
06:48 AM
After we load over 100 million notes in HBase, I will be using Nifi to listening to a live HL7 feed to keep the data current. Some of these HL7 message are delete message and the rows need to be removed from HBase.
... View more
11-11-2019
12:58 PM
Hi, Even though we modified the stripe size to custom value - "orc.stripe.size"="248435456" there are many files which are still with 5MB , 9 MB. Any reason for this behavior?
... View more
04-11-2018
05:46 AM
Thanks for the solution, but since i am not familiar with rest api, solution by Matt looks easy to me. Will surely try yours one too.
... View more
10-24-2017
05:41 PM
@xav webmaster Straight answer: flowFile = session.get()
if (flowFile != None):
flowFile = session.putAttribute(flowFile, 'myAttr', 'myValue')
# implicit return at the end More info on executeScript processor: https://community.hortonworks.com/articles/75032/executescript-cookbook-part-1.html In your particular case, in callback function where you read from input stream, you can scan from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
class PyStreamCallback(StreamCallback):
def __init__(self):
self.topic_name=''
pass
def get_topic_name(self):
return self.topic_name
def process(self, inputStream, outputStream):
Log = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
Log2 = str(Log).split(',')
Brand = Log2[0]
Color = Log2[5]
Model = Log2[1]
if Brand == 'ford' and Color == 'gray':
NewLog = str(Log2)
self.topic_name = 'ford'
outputStream.write(bytearray((NewLog).encode('utf-8')))
if Brand == 'audi' and Color == 'black':
NewLog = str(Log2)
self.topic_name = 'audi'
outputStream.write(bytearray((NewLog).encode('utf-8')))
if Brand == 'bmw' and Color == 'white':
NewLog = str(Log2)
self.topic_name = 'bmw'
outputStream.write(bytearray((NewLog).encode('utf-8')))
# add exception handling if needed for empty flowfile content, etc
if(flowFile != None):
caller = PyStreamCallback()
flowFile = session.write(flowFile, caller)
topic_name = caller.get_topic_name()
flowFile = session.putAttribute(flowFile, 'kafka_topic', topic_name)
Hope that will help.
... View more
06-26-2017
05:05 PM
1 Kudo
Alright, so I ended up with simple script and one processor in NiFi. Modules for reload should be provided in "modules_list" property of the processor (comma delimited). Script body: import sys, json
def class_reloader(modules_to_reload):
reload_msg = ""
all_module_names = sys.modules.keys()
all_module_names.sort()
for mn in all_module_names:
m = sys.modules[mn]
# -- find full match of names with given modules
if mn in modules_to_reload:
try:
reload(m)
reload_msg = reload_msg + mn + "|"
except:
return 1, reload_msg
continue
# -- find if mn is submodule of any given one
for mtr in modules_to_reload:
if mn.startswith(mtr+'.'):
try:
reload(m)
reload_msg = reload_msg + mn + "|"
break
except:
return 1, reload_msg
return 0, reload_msg
#-------------------------------#
flowFile = session.create()
if(flowFile != None):
modules_prop = modules_list.getValue()
ml = []
if modules_prop:
ml = modules_prop.split(',')
cr = class_reloader(ml)
flowFile = session.putAttribute(flowFile, 'class_reload_result', str(cr[0]))
flowFile = session.putAttribute(flowFile, 'class_reload_found', str(cr[1]))
session.transfer(flowFile, REL_SUCCESS)
# implicit return at the end
The code can be improved to navigate to FAILURE relationship in case of non-zero response code from the method. It's not perfect solution, but will work in most cases.
If you have better one - please share! 🙂
... View more
10-17-2018
06:30 PM
to me.. this is not the simple approach.. its very limiting and its no simpler than using the jolttransformation suggested above.
... View more
04-28-2019
02:56 PM
1 Kudo
We can use rank approach which is faster than max , max scans the table twice: Here , partition column is load_date: select ld_dt.txnno , ld_dt.txndate , ld_dt.custno , ld_dt.amount , ld_dt.productno , ld_dt.spendby , ld_dt.load_date from (select *,dense_rank() over (order by load_date desc) dt_rnk from datastore_s2.transactions)ld_dt where ld_dt.dt_rnk=1
... View more