Member since
Kudos Received
My Accepted Solutions
Title | Views | Posted |
2024 | 04-25-2018 08:37 PM | |
5951 | 04-01-2018 09:37 PM | |
1639 | 03-29-2018 05:15 PM | |
6857 | 03-27-2018 07:22 PM | |
2078 | 03-27-2018 06:14 PM |
10:13 PM
Yes this is best approach, Inserting all in staging table and then using merge to insert/update on target table, Just need to truncate staging table for next round of inserts.
... View more
02:01 AM
any way to use query for filter only last minutes transactions
... View more
03:43 AM
Hello @priyanshu_soni You can skip the "Timestamp" part as the same is inserted by HBase implicitly. I tried the same Query as you, excluding the Timestamp & the same was Successful: hbase(main):018:0> put 'Table_X1','125','Cf1:CheckItem','{"ID" : "1334134","Name" : "Apparel Fabric","Path" : "Arts, Crafts & Sewing/Fabric/Apparel Fabric"}'
Took 0.0077 seconds
hbase(main):019:0> scan 'Table_X1'
125 column=Cf1:CheckItem, timestamp=1621593487680, value={"ID" : "1334134","Name" : "Apparel Fabric","Path" : "Arts, Crafts & Sewing/Fabric/Apparel Fabric"}
1 row(s)
Took 0.0057 seconds As you may see above, the "timestamp" field corresponds to the Epoch Timestamp of the Inserted Row Time of Operation. If you wish to explicitly specify the Timestamp, You can include a EpochTime as shared below: hbase(main):020:0> put 'Table_X1','126','Cf1:CheckItem','{"ID" : "1334134","Name" : "Apparel Fabric","Path" : "Arts, Crafts & Sewing/Fabric/Apparel Fabric"}',1621593487680
Took 0.0202 seconds
hbase(main):021:0> scan 'Table_X1'
125 column=Cf1:CheckItem, timestamp=1621593487680, value={"ID" : "1334134","Name" : "Apparel Fabric","Path" : "Arts, Crafts & Sewing/Fabric/Apparel Fabric"}
126 column=Cf1:CheckItem, timestamp=1621593487680, value={"ID" : "1334134","Name" : "Apparel Fabric","Path" : "Arts, Crafts & Sewing/Fabric/Apparel Fabric"}
2 row(s)
Took 0.0071 seconds Let us know if you have any issues with the Put Operation. - Smarak
... View more
01:35 AM
I am using Nifi Registry installed on Windows Server and Windows 10 It works good from WSL (Ubuntu, Debian) on Windows 10. Or on Windows Server 2016 I used cygwin (there is no WSL on this server version) 1) I added in .bash_profile in WSL or cygwin commands to export path of Java and start Nifi Registry on boot For example for cygwin: export JAVA_HOME=/cygdrive/c/'Program Files'/Java/jdk1.8.0_261
export PATH=$JAVA_HOME/bin:$PATH
/cygdrive/d/nifi-registry-0.7.0/bin/ start (cygwin use the same java folder as windows system) 2) And than saved a .vbs script to run cygwin on Windows boot: Set ws = CreateObject("Wscript.Shell") "c:\cygwin64\cygwin.bat", vbhide 3) And put this script to autorun folder in Windows: win + R -> shell:startup So now after Windows boot cygwin starting on background with Nifi Registry on board I also tried to find the way how to run cygwin with nssm but it was unsuccessfully
... View more
11:41 PM
@RahulSoni I think you're a bit quick to dismiss Spark + JDBC. There is actually a solution for the multithreading - Spark will extract the data to different partitions in parallel, just like when your read an HDFS file. You just have to specify the number of partitions in the extracted dataframe and optimize the parameters for your job (number of executors, cores per executor, memory per executor). While sqoop is easier to use out of the box, the fact that it is based on MapReduce will likely mean that Spark is superior in some scenarios, and it should be your go-to option when you want to save the data as Parquet or ORC (not supported by sqoop). I haven't tried yet, but here's someone who seems quite pleased with how Spark worked for them: Sqoop is supposed to support Avro but when I tried to output Avro files it failed with a low-level error in a Java library. I wasn't too impressed by the performance either, although that could be due to bandwidth problems.
... View more
06:28 PM
1 Kudo
In this article, I will discuss one of the most exciting new features coming with NiFi 1.7.0 is the possibility of terminating running threads from the NiFi UI. I can think of a couple of examples in the previous NiFi versions where we either had to wait for a running thread to end before being able to make any changes to the processor configs or, in worst case scenarios, restart the NiFi cluster because of some thread is in a deadlock condition. For example An ExecuteSQL processor is stuck since the source RDBMS is not able to handle the data pull and has yielded under pressure. Or the other processes are not able to use the RDBMS since resources are hogged by this complete database scan operation. Either we wait for a literally infinite period of time or if the problem is serious, stop the cluster all together. Some custom script/processor has a deadlock situation and the thread won't stop ever. The only option we have in this scenario was to restart the machine/cluster running that process. Thanks to NiFi 1.7.0, now we have a more elegant solution to these kinds of problems, Terminate the thread from the UI itself. Follows a quick example of how we can do it. So for my flow, I created a sample flow with a GenerateFlowFile processor which is running continuously on all the possible nodes, a single one, in this case, my Mac 🙂 I have made the thread to run for longer once initiated and hence, even if I stop the processor, the thread will still keep on running. Have a look into the snapshots below. When I stopped the processor, the number of thread increase from 1 to 2, since now the thread to stop the processor is waiting for actively running thread. But with this new version of NiFi, NiFi 1.7.0, we have this option of terminating the threads explicitly from the UI itself, see snapshot #2 for the Terminate option. When the Terminate option is chosen Interrupt for the thread will be issued. A new instance of the processor will be created. The old instance will be eventually shut down. So here we are! With the new power to interrupt the threads from the NiFi UI. But please be careful! With greater powers, come greater responsibilities! I will add more information on what can be the probable issues, if any, of stopping the threads in between. Please feel free to leave comments to let know about the flow and for questions and queries. Hope that helps!
... View more
02:30 AM
2 Kudos
In this article, I'm going to cover a simple solution to control the data processing in NiFi serially or based on an event trigger. This article use Wait and Notify processors and DistributedMapCache controller service to achieve this functionality. Follows the links to the Usage Guide of these processors and Controller Services. Wait Processor Notify Processor DistributedMapCache Server DistributedMapCache client service NiFi Flow The flow that I am going to present in this article has a very simple use case. Get the data from the source, at whatever rate, but process it one flow file at a time. Once a flow file is processed, it should trigger the processing of the next flow file. Follows a step by step explanation of what I have implemented to solve this problem. First, the pre-requisites to start the flow. We will be storing the DistributedMapCache to store and retrieve the "signals" to process the flow files and hence we would need the Server and Client Service for that. Follows a quick description. DistributedMapCacheServer We will be storing some information about the flow file which has been processed, which will help us to trigger the next flow file processing. To do so, we will be using the DistributedMapCacheServer controller service. It provides a map (key/value) cache that can be accessed over a socket. Interaction with this service is typically accomplished via a DistributedMapCacheClient service, which is discussed below. Follows a snapshot of this controller service. Nothing fancy about it and I have used the default settings for it. DistributedMapCacheClientService Now to access the DistributedMapCacheServer, hosted at port 4557 above, we need a client service. This will aid us in storing and retrieving the cache. I am keeping it simple and leaving the default settings again for simplicity. Now the NiFI flow details Follows the snapshot of the flow for a quick overview Data Generation For this use case, I am generating the data using a GenerateFlowFile processor. Flow file tagging This is an important part of the processing. Here I am using an UpdateAttribute processor. This processor assigns a token to each flow file, which is incremented by 1 every time a flow file passes through it. The important part is that we are storing the state of this token variable and hence are able to assign a unique and auto incremented value to each of our flow files. This token will help us process the data in a serial fashion. Follows a snapshot of this processor. Tagged? Now let's make them wait! Once the flow files are tagged, they are redirected to the Wait processor. This processor makes the flow files to wait and don't release them until a matching release signal is stored in Distributed Cache. Have a look at the configuration of the Wait processor. We are looking at the DistributedMapCache Server for a counter called tokenCounter and when the value of tokenCounter will equal the value of Release Signal Identifier, which is the token number of the flow file, in this case, it will release that flow file. So how does the DistributedMapCache get this tokenNumber? If you look at the NiFi flow, before Wait processor, we have the RouteonAttribute. This is just for to handle the very first flow file. It will redirect the flow file with token #1 to the Notify processor. The Notify processor picks the value from the token attribute and stores it in the DistributedMapCache for against the key tokenCounter. This will instruct the Wait processor to release the flow file with token #1 for further processing. What's next? Next, the desired processing can be done on the flow file and once done, simple one up the token attributed and feed it to the Notify processor to release the next flow file. For example, flow file with token #1, once processed, will be updated to increment the token # to 2 and then sent to Notify processor. This will trigger the release of the file with token #2 by the Wait processor and cycle will go on. So here we are! With our flow to control the processing of our data according to our need in NiFi. Please feel free to leave comments to let know about the flow and for questions and queries. Hope that helps!
... View more
07:56 PM
@Mahmoud Shash Just wanted to follow-up to see how things are progressing. Still seeing issue? Did you try any of my suggestions above? I see no reason for having your invokeHTTP processor scheduled to execute on primary node only since it is being triggered by incoming FlowFiles. If you switch it to "all nodes", do you still see issue? What do you see when you perform a "List queue" action on the connection feeding your invokeHTTP processor? Within the "Cluster" UI found under the global menu, who is currently elected as the primary node? Does the data listed when you ran "List queue" belong to that same node? - Thank you, Matt
... View more
05:15 PM
@Aishwarya Sudhakar Could you clarify which username under which you are running the spark under? Because of its distributed aspect, you should copy the dataset.csv to HDFS users directory which is accessible to that user running the spark job. According to your output above you file is HDFS directory /demo/demo/dataset.csv so your load should look like this load "hdfs:////demo/demo/dataset.csv" This is what you said. "The demo is the directory that is inside hadoop. And datset.csv is the file that contains data." Did you mean in HDFS? Does the command print anything $ hdfs dfs -cat /demo/demo/dataset.csv Please revert !
... View more
04:05 PM
@Aishwarya Sudhakar You need to understand the HDFS directory structure. This is the one which is causing issues to you. Follows some explanation. Let's say the username for these example commands is ash. So when ash tries to create a directory in HDFS with the following command /user/ashhadoop fs -mkdir demo
//This creates a directory inside HDFS directory
//The complete directory path shall be /user/ash/demo it is different than the command given below hadoop fs -mkdir /demo
//This creates a directory in the root directory.
//The complete directory path shall be /demo So a suggestion here is, whenever you try to access the directories, use the absolute path(s) to avoid the confusion. So in this case, when you create a directory using hadoop fs -mkdir demo and loads the file to HDFS using hadoop fs -copyFromLocal dataset.csv demo You file exists at /user/ash/demo/dataset.csv
//Not at /demo So your reference to your spark code for this file should be sc.textFile("hdfs://user/ash/demo/dataset.csv") Hope this helps!
... View more