Member since
11-07-2016
70
Posts
39
Kudos Received
16
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
1961 | 02-22-2018 09:20 PM | |
4871 | 01-30-2018 03:41 PM | |
618 | 10-25-2017 06:09 PM | |
10299 | 08-15-2017 10:54 PM | |
2058 | 06-26-2017 05:05 PM |
11-28-2018
10:29 PM
@Tim Onyschak have you tried to use EvaluateXPath? If your data as a record starts from the middle of the XML, and the rest of the XML doc isn't needed, use EvaluateXPath to trim the content down to relevant piece. after that you can use record-based processors with matching schema.
... View more
11-10-2018
12:23 AM
1 Kudo
@Krishna Sreenivas
there is no default value. Empty value (null) means - that first-time records (which aren't detected as dups yet) will stay in a cache forever (if you have cache service with persistence directory defined, otherwise - till restart). If you define, for example, 1 min, that means, that during 1 min, all dups will be detected, but after that time, another record won't be detected as a dup, as it will be aged off (and removed) from cache.
... View more
10-01-2018
10:26 PM
@Manoj can you the content of the flowfile if you add "ADD JAR...." into statement? And also an error that you are getting. I'm using different "set conf=value" without any issue, so it could be related to how you use.
... View more
09-26-2018
02:02 PM
sorry, missed that you need "greater than 10", so change regex to: (?s)([^\n]*(\n)){9,}[^\n]* Note, it is "9" for ten records, because last line won't have \n (EOL)
... View more
09-26-2018
01:50 PM
@Pepelu Rico if for some reason you don't like records (see answer from @Shu, here is easier solution (having fun with regex). Note, you'll need to set a buffer appropriately, and this will work good if your files are really around 10 lines and lines aren't huge.
... View more
04-10-2018
01:11 PM
@Matt Clarke, @sri chaturvedi, Matt, this actually requires server restart (and if there is a cluster - removing or editing the file on the rest of the nodes). I think it can be done by creating a flow in NIFI :)))) 1. Read flow.xml.gz 2. Parse XML to find stopped processors and their IDs (as per above) 3. Use NiFi rest API to change a state to disabled.
... View more
02-22-2018
11:58 PM
@Anishkumar Valsalam For server level stats, you can use Grafana, which is part of HDF stack, here what we have and how we track: List of system level stats: List of NIFI-related stats:
... View more
02-22-2018
09:20 PM
1 Kudo
@Benjamin Newell So, the idea of ListSFTP is to provide list of files from SFTP based on filters, etc. This processor is stateful, meaning it will give you list of files that have been modified since last run. And it will maintain a state. That's a reason for not allowing incoming connections. Option 1. FetchSFTP has connection "not.found". You can use it to postpone processing of this file (add processor to penalize it and loop it back to FetchSFTP). And configure your log monitor to ignore all the messages matching "Failed to fetch content for {} from filename {} on remote host {} because the file could not be found on the remote system; routing to not.found" Option 2. First Flow: Use ListSFTP to pull files and put them on landing zone (local or NAS). Second Flow: In your flow - replace FetchSFTP with FetchFile. FetchFile has property "Log level when file not found". Use "DEBUG", so it won't be printing errors into log file. Option 3. Create custom processor extending FetchSFTP and change onTrigger to NOT print error message in case file not found exception. Let me know if that helps.
... View more
02-01-2018
09:36 PM
@Carlton Patterson, There are multiple ways of doing that. One of them explained above by Sridhar. Another way: after you execute your script, all your files will be CSV formatted under '/user/hive/geography' on HDFS. So, you will need to run in command line: hdfs dfs -get /user/hive/geography /hadoop/hdfs/
... View more
01-30-2018
03:41 PM
@Hanu V To work around your issue, you can explicitly cast to string or int (or whatever your data type should be). I could reproduce your case: select greatest(month, billed) from sales limit 5;
Error: java.io.IOException: org.apache.hadoop.hive.ql.metadata.HiveException: Error evaluating greatest(month,billed) (state=,code=0) with workaround: select greatest(int(month), int(billed)) from sales limit 5;
+------+--+
| _c0 |
+------+--+
| 8 |
| 8 |
| 8 |
| 8 |
| 2 |
+------+--+ or select greatest(string(month), string(billed)) from sales limit 5;
+------+--+
| _c0 |
+------+--+
| 8 |
| 8 |
| 8 |
| 8 |
| 2 |
+------+--+
... View more
12-12-2017
05:16 PM
@Umer Umer can you start beeline (or any SQL tool like Squirrel) with the same connection string?
... View more
12-12-2017
05:05 PM
@Zarier Behardien can you please run "show create table test.tmp_sales_target" and show the result?
... View more
12-11-2017
03:16 PM
@Roger Young Parse an output of your processor and assign values to attribute/s. Then in python code your attribute will be available within a flowfile. If this doesn't work for you, please provide more details.
... View more
12-05-2017
06:33 PM
@Saikrishna Tarapareddy You don't use username and pswd, you need access key and secret key. Admin of the S3 account can generate it (refer to https://aws.amazon.com/blogs/security/wheres-my-secret-access-key/) Region and bucket are mandatory and you can see that when you access your S3 via web: under "Services" menu go to "Storage->S3":
... View more
10-25-2017
06:09 PM
@Bilel Boubakri, I've tested your use case with getSFTP processor, it worked as expected with given regex (skipped MATFIC dir with all the files in it). I cannot test specifically GetFTP, as we don't have FTP servers running, but I'm sure it should give the same result. Tested with NiFi 1.1.0.2.1.1.0-2. Some suggestions: 1. if you copy-pasted your regex - make sure you don't have new line character 2. make sure your regex is in Path Filter Regex and not in File Filter Regex property 3. If 1 and 2 don't help, try to dance with tambourine and call Cthulhu Hope, that will help!
... View more
10-24-2017
05:41 PM
@xav webmaster Straight answer: flowFile = session.get()
if (flowFile != None):
flowFile = session.putAttribute(flowFile, 'myAttr', 'myValue')
# implicit return at the end More info on executeScript processor: https://community.hortonworks.com/articles/75032/executescript-cookbook-part-1.html In your particular case, in callback function where you read from input stream, you can scan from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
class PyStreamCallback(StreamCallback):
def __init__(self):
self.topic_name=''
pass
def get_topic_name(self):
return self.topic_name
def process(self, inputStream, outputStream):
Log = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
Log2 = str(Log).split(',')
Brand = Log2[0]
Color = Log2[5]
Model = Log2[1]
if Brand == 'ford' and Color == 'gray':
NewLog = str(Log2)
self.topic_name = 'ford'
outputStream.write(bytearray((NewLog).encode('utf-8')))
if Brand == 'audi' and Color == 'black':
NewLog = str(Log2)
self.topic_name = 'audi'
outputStream.write(bytearray((NewLog).encode('utf-8')))
if Brand == 'bmw' and Color == 'white':
NewLog = str(Log2)
self.topic_name = 'bmw'
outputStream.write(bytearray((NewLog).encode('utf-8')))
# add exception handling if needed for empty flowfile content, etc
if(flowFile != None):
caller = PyStreamCallback()
flowFile = session.write(flowFile, caller)
topic_name = caller.get_topic_name()
flowFile = session.putAttribute(flowFile, 'kafka_topic', topic_name)
Hope that will help.
... View more
09-22-2017
04:22 PM
it can connect to brokers, the problem is SASL protocols aren't supported in kafka producer.
... View more
09-22-2017
04:20 PM
PLAINTEXTSASL not supported in pyspark on HDP 2.5 (Kafka 0.10). So I did a workaround with HDF instead of HDP side spark.
... View more
08-30-2017
01:54 PM
1 Kudo
@Gayathri Devi, just to add to Dinesh's comment: you could use Hive UDF for percentile and adopt it for Impala. Hive percentile UDF: https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/udf/UDAFPercentile.java
... View more
08-28-2017
04:58 PM
1 Kudo
@Gayathri Devi As previously said: providing more tech details would help. Meanwhile, I believe that the issue you have is related to the way you create Hive table and its columns' types. Take a look on this page: https://cwiki.apache.org/confluence/display/Hive/HBaseIntegration CREATE TABLE hbase_table_1 (key int, value string, foobar double)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES (
"hbase.columns.mapping" = ":key,cf:val#s,cf:foo",
"hbase.table.default.storage.type" = "binary"
); If your default storage type is string, then: CREATE TABLE hbase_table_1 (key int, value string, foobar double)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES (
"hbase.columns.mapping" = ":key#b,cf:val,cf:foo#b"
); "#b" means - the field is binary field, that will help with numbers
... View more
08-15-2017
10:54 PM
1 Kudo
@Andres Urrego, What you are looking for (UPSERTS) aren't available in SQOOP-import. There are several approaches on how to actually update data in Hive. One of them is described here: https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.6.1/bk_data-access/content/incrementally-updating-hive-table-with-sqoop-and-ext-table.html Other approaches are also using side load and merge as post-sqoop or scheduled jobs/processes. You can also check Hive ACID transactions, or using Hive-Hbase integration package. Choosing right approach is not trivial and depends on: initial volume, incremental volumes, frequency or incremental jobs, probability of updates, ability to identify uniqueness of records, acceptable latency, etc...
... View more
06-26-2017
05:05 PM
1 Kudo
Alright, so I ended up with simple script and one processor in NiFi. Modules for reload should be provided in "modules_list" property of the processor (comma delimited). Script body: import sys, json
def class_reloader(modules_to_reload):
reload_msg = ""
all_module_names = sys.modules.keys()
all_module_names.sort()
for mn in all_module_names:
m = sys.modules[mn]
# -- find full match of names with given modules
if mn in modules_to_reload:
try:
reload(m)
reload_msg = reload_msg + mn + "|"
except:
return 1, reload_msg
continue
# -- find if mn is submodule of any given one
for mtr in modules_to_reload:
if mn.startswith(mtr+'.'):
try:
reload(m)
reload_msg = reload_msg + mn + "|"
break
except:
return 1, reload_msg
return 0, reload_msg
#-------------------------------#
flowFile = session.create()
if(flowFile != None):
modules_prop = modules_list.getValue()
ml = []
if modules_prop:
ml = modules_prop.split(',')
cr = class_reloader(ml)
flowFile = session.putAttribute(flowFile, 'class_reload_result', str(cr[0]))
flowFile = session.putAttribute(flowFile, 'class_reload_found', str(cr[1]))
session.transfer(flowFile, REL_SUCCESS)
# implicit return at the end
The code can be improved to navigate to FAILURE relationship in case of non-zero response code from the method. It's not perfect solution, but will work in most cases.
If you have better one - please share! 🙂
... View more
06-22-2017
07:48 PM
Hi Matt, thanks for your response. The solution by your link won't work for Jython in NiFi (actually, it created a lot of issues and I had to reboot NiFi services). But it gave me some ideas on what I can do and how. Once I complete all the tests, I'll put an answer with recommendations for others. As for the permanent solution, I think performance impact would be too big to check whether file have been changed every time the processor is being triggered by incoming flow file. Instead it can be done on "Start" only. But this still won't resolve issues with classes (modules) having the same name but deployed under different locations (paths), which make env sharing or versioning impossible (dev and qa, for example, or different builds/versions during dev stages). I would suggest to have custom class loader (modules defined) on processor level instead of global.
... View more
06-20-2017
05:30 PM
Hi All, We use ExecuteScript processor to run some python code. So, python file used in processor is just a wrapper, which invokes actual python process. My problem is that when I change a file with python code on file system, it's not been reloaded in NiFi flows until I fully restart NiFi cluster. I understand that happens due to the classes being loaded in classloader of JVM after first use (since it is actually Jython). Question: is there work around to reload classes after python code is changed on file system, instead of restarting NiFi Cluster? Thanks!
... View more
Labels:
- Labels:
-
Apache NiFi
06-08-2017
07:56 PM
Thanks for your comment. Yup, agreed. Gave my answer as an alternative for @Timo Burmeister
... View more
06-08-2017
07:39 PM
@Matt Burgess, Love JOLT transformation solution, we use it a lot with dynamic jsons, transpose, etc... but in this case, I would go simple, just replace text. let me know if I miss something (check my alternative)
... View more
06-08-2017
07:35 PM
1 Kudo
@Timo Burmeister, Keep it simple: use replaceText processor with this configuration: Search Value : [{]
Replacement Value : { "ctime":"${now()}",
Replacement Strategy : Regex Replace
Evaluation Mode : Entire text
... View more
06-01-2017
01:31 AM
@Nara g, There are not enough details to suggest you on "ideal" solution. You can play with different number of threads for HTTP connection to pull the data from web, and for executing your python script. It depends on resources required to execute each script, and resources available on your Nifi edge node or cluster (in case it is pyspark jobs submitted in yarn mode). But I would suggest: 1. Get all JSON docs from web and store them in HDFS (no dependency on processing, and will allow you to reprocess if bug is found and data should be reprocessed) 2. Execute single python/spark job on all of them in YARN mode.
... View more
05-25-2017
01:27 PM
@Dhanya Kumar Heballi Shivamurthy, please accept the answer to close the thread.
... View more
05-23-2017
05:29 PM
1 Kudo
@Dhanya Kumar Heballi Shivamurthy, assuming you have both arrays with the same size (per record): select
c1,
c21,
c31,
c4
from (
select 100 c1, split('Delta|Alpha|Beta','\\|') c2, split('Source|Varied|Volume','\\|') c3, 'AppData' c4
) foo
LATERAL VIEW posexplode(c2) n1 as c22, c21
LATERAL VIEW posexplode(c3) n2 as c32, c31
where c22=c32;
If array lengths can be different, then you need to add more conditions: select c1, c222 c2, c333 c3, c4
from (
select
c1,
c22, c32, -- keep indices
case when c32 < size(c2) then c21 else null end c222,
case when c22 < size(c3) then c31 else null end c333,
c4
from (
select 100 c1, split('Delta|Alpha|Beta','\\|') c2, split('Source|Varied|Volume|Owner','\\|') c3, 'AppData' c4
) foo
LATERAL VIEW posexplode(c2) n1 as c22, c21
LATERAL VIEW posexplode(c3) n2 as c32, c31
) bar
where c22=c32 or (c222 is null and c22=0) or (c333 is null and c32=0);
Result: +------+--------+---------+----------+--+
| c1 | c2 | c3 | c4 |
+------+--------+---------+----------+--+
| 100 | Delta | Source | AppData |
| 100 | Alpha | Varied | AppData |
| 100 | Beta | Volume | AppData |
| 100 | Owner | NULL | AppData |
+------+--------+---------+----------+--+
... View more