Member since
11-07-2016
70
Posts
40
Kudos Received
16
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
3936 | 02-22-2018 09:20 PM | |
6893 | 01-30-2018 03:41 PM | |
1216 | 10-25-2017 06:09 PM | |
10922 | 08-15-2017 10:54 PM | |
3333 | 06-26-2017 05:05 PM |
11-10-2018
12:23 AM
1 Kudo
@Krishna Sreenivas
there is no default value. Empty value (null) means - that first-time records (which aren't detected as dups yet) will stay in a cache forever (if you have cache service with persistence directory defined, otherwise - till restart). If you define, for example, 1 min, that means, that during 1 min, all dups will be detected, but after that time, another record won't be detected as a dup, as it will be aged off (and removed) from cache.
... View more
04-10-2018
01:11 PM
@Matt Clarke, @sri chaturvedi, Matt, this actually requires server restart (and if there is a cluster - removing or editing the file on the rest of the nodes). I think it can be done by creating a flow in NIFI :)))) 1. Read flow.xml.gz 2. Parse XML to find stopped processors and their IDs (as per above) 3. Use NiFi rest API to change a state to disabled.
... View more
02-22-2018
09:20 PM
1 Kudo
@Benjamin Newell So, the idea of ListSFTP is to provide list of files from SFTP based on filters, etc. This processor is stateful, meaning it will give you list of files that have been modified since last run. And it will maintain a state. That's a reason for not allowing incoming connections. Option 1. FetchSFTP has connection "not.found". You can use it to postpone processing of this file (add processor to penalize it and loop it back to FetchSFTP). And configure your log monitor to ignore all the messages matching "Failed to fetch content for {} from filename {} on remote host {} because the file could not be found on the remote system; routing to not.found" Option 2. First Flow: Use ListSFTP to pull files and put them on landing zone (local or NAS). Second Flow: In your flow - replace FetchSFTP with FetchFile. FetchFile has property "Log level when file not found". Use "DEBUG", so it won't be printing errors into log file. Option 3. Create custom processor extending FetchSFTP and change onTrigger to NOT print error message in case file not found exception. Let me know if that helps.
... View more
02-01-2018
09:36 PM
@Carlton Patterson, There are multiple ways of doing that. One of them explained above by Sridhar. Another way: after you execute your script, all your files will be CSV formatted under '/user/hive/geography' on HDFS. So, you will need to run in command line: hdfs dfs -get /user/hive/geography /hadoop/hdfs/
... View more
01-30-2018
03:41 PM
@Hanu V To work around your issue, you can explicitly cast to string or int (or whatever your data type should be). I could reproduce your case: select greatest(month, billed) from sales limit 5;
Error: java.io.IOException: org.apache.hadoop.hive.ql.metadata.HiveException: Error evaluating greatest(month,billed) (state=,code=0) with workaround: select greatest(int(month), int(billed)) from sales limit 5;
+------+--+
| _c0 |
+------+--+
| 8 |
| 8 |
| 8 |
| 8 |
| 2 |
+------+--+ or select greatest(string(month), string(billed)) from sales limit 5;
+------+--+
| _c0 |
+------+--+
| 8 |
| 8 |
| 8 |
| 8 |
| 2 |
+------+--+
... View more
12-05-2017
06:33 PM
@Saikrishna Tarapareddy You don't use username and pswd, you need access key and secret key. Admin of the S3 account can generate it (refer to https://aws.amazon.com/blogs/security/wheres-my-secret-access-key/) Region and bucket are mandatory and you can see that when you access your S3 via web: under "Services" menu go to "Storage->S3":
... View more
10-25-2017
06:09 PM
@Bilel Boubakri, I've tested your use case with getSFTP processor, it worked as expected with given regex (skipped MATFIC dir with all the files in it). I cannot test specifically GetFTP, as we don't have FTP servers running, but I'm sure it should give the same result. Tested with NiFi 1.1.0.2.1.1.0-2. Some suggestions: 1. if you copy-pasted your regex - make sure you don't have new line character 2. make sure your regex is in Path Filter Regex and not in File Filter Regex property 3. If 1 and 2 don't help, try to dance with tambourine and call Cthulhu Hope, that will help!
... View more
10-24-2017
05:41 PM
@xav webmaster Straight answer: flowFile = session.get()
if (flowFile != None):
flowFile = session.putAttribute(flowFile, 'myAttr', 'myValue')
# implicit return at the end More info on executeScript processor: https://community.hortonworks.com/articles/75032/executescript-cookbook-part-1.html In your particular case, in callback function where you read from input stream, you can scan from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
class PyStreamCallback(StreamCallback):
def __init__(self):
self.topic_name=''
pass
def get_topic_name(self):
return self.topic_name
def process(self, inputStream, outputStream):
Log = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
Log2 = str(Log).split(',')
Brand = Log2[0]
Color = Log2[5]
Model = Log2[1]
if Brand == 'ford' and Color == 'gray':
NewLog = str(Log2)
self.topic_name = 'ford'
outputStream.write(bytearray((NewLog).encode('utf-8')))
if Brand == 'audi' and Color == 'black':
NewLog = str(Log2)
self.topic_name = 'audi'
outputStream.write(bytearray((NewLog).encode('utf-8')))
if Brand == 'bmw' and Color == 'white':
NewLog = str(Log2)
self.topic_name = 'bmw'
outputStream.write(bytearray((NewLog).encode('utf-8')))
# add exception handling if needed for empty flowfile content, etc
if(flowFile != None):
caller = PyStreamCallback()
flowFile = session.write(flowFile, caller)
topic_name = caller.get_topic_name()
flowFile = session.putAttribute(flowFile, 'kafka_topic', topic_name)
Hope that will help.
... View more
08-15-2017
10:54 PM
1 Kudo
@Andres Urrego, What you are looking for (UPSERTS) aren't available in SQOOP-import. There are several approaches on how to actually update data in Hive. One of them is described here: https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.6.1/bk_data-access/content/incrementally-updating-hive-table-with-sqoop-and-ext-table.html Other approaches are also using side load and merge as post-sqoop or scheduled jobs/processes. You can also check Hive ACID transactions, or using Hive-Hbase integration package. Choosing right approach is not trivial and depends on: initial volume, incremental volumes, frequency or incremental jobs, probability of updates, ability to identify uniqueness of records, acceptable latency, etc...
... View more
06-26-2017
05:05 PM
1 Kudo
Alright, so I ended up with simple script and one processor in NiFi. Modules for reload should be provided in "modules_list" property of the processor (comma delimited). Script body: import sys, json
def class_reloader(modules_to_reload):
reload_msg = ""
all_module_names = sys.modules.keys()
all_module_names.sort()
for mn in all_module_names:
m = sys.modules[mn]
# -- find full match of names with given modules
if mn in modules_to_reload:
try:
reload(m)
reload_msg = reload_msg + mn + "|"
except:
return 1, reload_msg
continue
# -- find if mn is submodule of any given one
for mtr in modules_to_reload:
if mn.startswith(mtr+'.'):
try:
reload(m)
reload_msg = reload_msg + mn + "|"
break
except:
return 1, reload_msg
return 0, reload_msg
#-------------------------------#
flowFile = session.create()
if(flowFile != None):
modules_prop = modules_list.getValue()
ml = []
if modules_prop:
ml = modules_prop.split(',')
cr = class_reloader(ml)
flowFile = session.putAttribute(flowFile, 'class_reload_result', str(cr[0]))
flowFile = session.putAttribute(flowFile, 'class_reload_found', str(cr[1]))
session.transfer(flowFile, REL_SUCCESS)
# implicit return at the end
The code can be improved to navigate to FAILURE relationship in case of non-zero response code from the method. It's not perfect solution, but will work in most cases.
If you have better one - please share! 🙂
... View more