Member since
06-13-2016
76
Posts
13
Kudos Received
6
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
2095 | 08-09-2017 06:54 PM | |
3014 | 05-03-2017 02:25 PM | |
4213 | 03-28-2017 01:56 PM | |
4331 | 09-26-2016 09:05 PM | |
2942 | 09-22-2016 03:49 AM |
08-05-2016
04:31 PM
1 Kudo
Have you tried downloading HDF instead: http://hortonworks.com/downloads/#dataflow (HDF ™ 1.2.0.1: Hortonworks DataFlow) Similar install process. I had noticed a similar issue when I was using the sandbox and downloaded NiFi vs. HDF.
... View more
08-04-2016
01:53 AM
1 Kudo
@INDRANIL ROY The URL is below: curl -i -X PUT -H 'Content-Type: application/json' -d '{"revision":{"version":??,"clientId":"test"},"processor":{"id":"20eb7bbd-2f68-4538-aa13-0f4fa33e63c4","state":"RUNNING"}}' http://localhost:9090/nifi-api/controller/process-groups/root/processors/20eb7bbd-2f68-4538-aa13-0f4fa33e63c Since your processor is not in a processor group you use "root" in the URL as the proccess-group-id, you still need to follow the pattern: /controller/process-groups/{process-group-id}/processors/{id} You will also need to specify a version (which I have in ??) This is NiFi's way of handling optimistic locking. To find the version you can use: curl -i -X GET http://localhost:9090/nifi-api/controller/revision Change the NiFi port accordingly.
... View more
07-09-2016
11:39 PM
You could try: curl -i "http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=GETFILESTATUS"
The client receives a response with a FileStatus JSON object: HTTP/1.1 200 OK
Content-Type: application/json
Transfer-Encoding: chunked
{
"FileStatus":
{
"accessTime" : 0,
"blockSize" : 0,
"group" : "supergroup",
"length" : 0, //in bytes, zero for directories
"modificationTime": 1320173277227,
"owner" : "webuser",
"pathSuffix" : "",
"permission" : "777",
"replication" : 0,
"type" : "DIRECTORY" //enum {FILE, DIRECTORY}
}
}
... View more
07-08-2016
05:58 PM
Still not sure why this is happening but ExecuteProcess works 100% of the time so my work around right now is to useInvokeHTTP to get VersionID, Stop process, Update and Run...
... View more
07-08-2016
01:14 PM
@mclark Great suggestion, thanks! Will definitely take a look at incorporating invokeHTTP.
... View more
07-07-2016
11:12 PM
@Randy Gelhausen The executeStreamCommand is running a shell script that has the "hive -f query.sql" already so I dont think its being piped to the processor. But maybe its the output?
... View more
07-07-2016
11:00 PM
Hello, I have an flow that fetches a file, extract texts, and runs a shell script which in turn runs a hive script. (I am just pulling a date from the file and passing it as -hivevar). My shell script looks something like this: #!/bin/bash
endDate=$1
CONNECT="jdbc:hive2://master2:10000/default"
beeline -u "$CONNECT" -n root -hivevar endDate="$endDate" -f /pathToScript/Hive_scipt.hql And my NiFi flow looks like this: The hive script completes and data is inserted into my table but the ExecuteStreamCommand stays running (1 stays at the top corner) indefinitely and I have to restart nifi.. (is there a better way to handle this?). I've noticed a few things:
If I reduce the size of the query (my hive query is a number of union's) the ExecuteStreamCommand wont hang. When the job hangs, the AM on Resource Manager stays Running for quite some time ~10 min (waits for AM timeout). Sort of like when you create a Hive CLI Tez Session. When i reduce the query size and the job doesn't hang the AM goes to finish state right away. Looking through the application log of job that hangs vs one that doesnt, I see that once the DAG is successful, the AM will call to unregister from RM right away: 2016-07-08 09:49:43,828 [INFO] [Dispatcher thread {Central}] |app.DAGAppMaster|: Central Dispatcher queue size after DAG completion, before cleanup: 0
2016-07-08 09:49:43,828 [INFO] [Dispatcher thread {Central}] |app.DAGAppMaster|: Waiting for next DAG to be submitted.
2016-07-08 09:49:43,829 [INFO] [Dispatcher thread {Central}] |app.DAGAppMaster|: Cleaning up DAG: name=INSERT OVERWRITE TABLE extract.r...t3.Rank=1(Stage-1), with id=dag_1467841289379_0087_1
2016-07-08 09:49:43,831 [INFO] [Dispatcher thread {Central}] |app.DAGAppMaster|: Completed cleanup for DAG: name=INSERT OVERWRITE TABLE extract.r...t3.Rank=1(Stage-1), with id=dag_1467841289379_0087_1
2016-07-08 09:49:44,151 [INFO] [IPC Server handler 0 on 55823] |client.DAGClientHandler|: Received message to shutdown AM
2016-07-08 09:49:44,151 [INFO] [IPC Server handler 0 on 55823] |rm.TaskSchedulerEventHandler|: TaskScheduler notified that it should unregister from RM
2016-07-08 09:49:44,151 [INFO] [IPC Server handler 0 on 55823] |app.DAGAppMaster|: No current running DAG, shutting down the AM
2016-07-08 09:49:44,151 [INFO] [IPC Server handler 0 on 55823] |app.DAGAppMaster|: DAGAppMasterShutdownHandler invoked
2016-07-08 09:49:44,152 [INFO] [IPC Server handler 0 on 55823] |app.DAGAppMaster|: Handling DAGAppMaster shutdown
2016-07-08 09:49:44,153 [INFO] [AMShutdownThread] |app.DAGAppMaster|: Sleeping for 5 seconds before shutting down
2016-07-08 09:49:49,153 [INFO] [AMShutdownThread] |app.DAGAppMaster|: Calling stop for all the services
2016-07-08 09:49:49,153 [INFO] [AMShutdownThread] |rm.YarnTaskSchedulerService|: Initiating stop of YarnTaskScheduler
2016-07-08 09:49:49,153 [INFO] [AMShutdownThread] |rm.YarnTaskSchedulerService|: Releasing held containers
2016-07-08 09:49:49,154 [INFO] [AMShutdownThread] |rm.YarnTaskSchedulerService|: Removing all pending taskRequests
Whereas, with the job that is hanging, it waits for the AM to timeout rather ending right away: 2016-07-07 20:46:34,575 [INFO] [Dispatcher thread {Central}] |app.DAGAppMaster|: Central Dispatcher queue size after DAG completion, before cleanup: 0
2016-07-07 20:46:34,576 [INFO] [Dispatcher thread {Central}] |app.DAGAppMaster|: Waiting for next DAG to be submitted.
2016-07-07 20:46:34,576 [INFO] [Dispatcher thread {Central}] |app.DAGAppMaster|: Cleaning up DAG: name=INSERT OVERWRITE TABLE extract.r...t3.Rank=1(Stage-1), with id=dag_1467841289379_0085_1
2016-07-07 20:46:34,578 [INFO] [Dispatcher thread {Central}] |app.DAGAppMaster|: Completed cleanup for DAG: name=INSERT OVERWRITE TABLE extract.r...t3.Rank=1(Stage-1), with id=dag_1467841289379_0085_1
2016-07-07 20:46:38,380 [INFO] [DelayedContainerManager] |rm.YarnTaskSchedulerService|: No taskRequests. Container's idle timeout delay expired or is new. Releasing container, containerId=container_e11_1467841289379_0085_01_000008, containerExpiryTime=1467938798314, idleTimeout=10000, taskRequestsCount=0, heldContainers=10, delayedContainers=9, isNew=false
2016-07-07 20:46:38,381 [INFO] [Dispatcher thread {Central}] |history.HistoryEventHandler|: [HISTORY][DAG:dag_1467841289379_0085_1][Event:CONTAINER_STOPPED]: containerId=container_e11_1467841289379_0085_01_000008, stoppedTime=1467938798381, exitStatus=0
2016-07-07 20:46:38,381 [INFO] [ContainerLauncher #9] |launcher.ContainerLauncherImpl|: Stopping container_e11_1467841289379_0085_01_000008
2016-07-07 20:46:38,382 [INFO] [ContainerLauncher #9] |impl.ContainerManagementProtocolProxy|: Opening proxy : tf-hdpdata02.greenfieldethanol.com:45454
.
.
.
.
2016-07-07 20:55:43,282 [INFO] [AMRM Callback Handler Thread] |rm.YarnTaskSchedulerService|: Allocated: <memory:0, vCores:0> Free: <memory:667648, vCores:1> pendingRequests: 0 delayedContainers: 0 heartbeats: 2501 lastPreemptionHeartbeat: 2500
2016-07-07 20:55:55,823 [INFO] [AMRM Callback Handler Thread] |rm.YarnTaskSchedulerService|: Allocated: <memory:0, vCores:0> Free: <memory:667648, vCores:1> pendingRequests: 0 delayedContainers: 0 heartbeats: 2551 lastPreemptionHeartbeat: 2550
2016-07-07 20:56:08,362 [INFO] [AMRM Callback Handler Thread] |rm.YarnTaskSchedulerService|: Allocated: <memory:0, vCores:0> Free: <memory:667648, vCores:1> pendingRequests: 0 delayedContainers: 0 heartbeats: 2601 lastPreemptionHeartbeat: 2600
2016-07-07 20:56:20,902 [INFO] [AMRM Callback Handler Thread] |rm.YarnTaskSchedulerService|: Allocated: <memory:0, vCores:0> Free: <memory:667648, vCores:1> pendingRequests: 0 delayedContainers: 0 heartbeats: 2651 lastPreemptionHeartbeat: 2650
2016-07-07 20:56:33,442 [INFO] [AMRM Callback Handler Thread] |rm.YarnTaskSchedulerService|: Allocated: <memory:0, vCores:0> Free: <memory:667648, vCores:1> pendingRequests: 0 delayedContainers: 0 heartbeats: 2701 lastPreemptionHeartbeat: 2700
2016-07-07 20:56:45,981 [INFO] [AMRM Callback Handler Thread] |rm.YarnTaskSchedulerService|: Allocated: <memory:0, vCores:0> Free: <memory:667648, vCores:1> pendingRequests: 0 delayedContainers: 0 heartbeats: 2751 lastPreemptionHeartbeat: 2750
2016-07-07 20:56:58,522 [INFO] [AMRM Callback Handler Thread] |rm.YarnTaskSchedulerService|: Allocated: <memory:0, vCores:0> Free: <memory:667648, vCores:1> pendingRequests: 0 delayedContainers: 0 heartbeats: 2801 lastPreemptionHeartbeat: 2800
2016-07-07 20:57:11,061 [INFO] [AMRM Callback Handler Thread] |rm.YarnTaskSchedulerService|: Allocated: <memory:0, vCores:0> Free: <memory:667648, vCores:1> pendingRequests: 0 delayedContainers: 0 heartbeats: 2851 lastPreemptionHeartbeat: 2850
2016-07-07 20:57:15,823 [INFO] [Timer-1] |app.DAGAppMaster|: Session timed out, lastDAGCompletionTime=1467938794575 ms, sessionTimeoutInterval=600000 ms
2016-07-07 20:57:15,823 [INFO] [Timer-1] |rm.TaskSchedulerEventHandler|: TaskScheduler notified that it should unregister from RM
2016-07-07 20:57:15,823 [INFO] [Timer-1] |app.DAGAppMaster|: No current running DAG, shutting down the AM
2016-07-07 20:57:15,823 [INFO] [Timer-1] |app.DAGAppMaster|: DAGAppMasterShutdownHandler invoked
2016-07-07 20:57:15,824 [INFO] [Timer-1] |app.DAGAppMaster|: Handling DAGAppMaster shutdown Even after the AM closes (10 min later), the Nifi Process still remains running... Running the full query or the script manually via command line works fine. Behavior is not consistent. Sometimes it wont hang, sometimes it will...(most of the time it will.. ExecuteProcess works fine running the same command Any ideas? Couldn't find anything in app.log.
... View more
Labels:
- Labels:
-
Apache NiFi
07-07-2016
06:41 PM
@mclark thanks. added
... View more
07-07-2016
04:13 PM
1 Kudo
Hello, Is it possible for me execute Merge Content processor from a route attribute? For example, I have 1 flow that constantly pulls in data from a sql server, merges the files and puts it into hdfs. The condition for the merge is 5 min or 100mb. ExecuteSQL-MergeFiles->PutHDFS Additionally, I have another flow that loops through a list and once it reaches a certain number it updates a file. This is done through route Attribute. ExecuteSQL(same one as above) -> RouteOnAttribute -> IncrementCounter OR UpdateFile if condition met Here's a screenshot for better understanding of flow: At the bottom corner there you'll see the UpdateAttribute is merging the files and putting into HDFS. At the same time its going to RouteOnAttribute. What I'm looking to do is when RouteOnAttribute is matched, MergeContent and PutHDFS gets executed. I can't have the MergeContent after the "Matched" RouteOnAttribute since I need to store the results of each ExecuteQuery. My temp solution right now is to set a max files in MergeContent equal to the RouteOnAttribute "Matched" count but looking to see if there were other ways to do this. The reason i'm doing this is because I want to ensure that before moving onto the next cycle (RouteOnAttribute - Matched) all the files are in HDFs. Thanks,
... View more
Labels:
- Labels:
-
Apache NiFi
06-29-2016
06:23 PM
@mpayne Perfect. this is exactly what I was looking for. Yes I do maintain just 1 list of counts so I want to update not append and keep track of the last time it was run. Count, TagName always remain the same. Just for some background (incase you know of a better way to handle this). I loop through each tag and pull the latest records from a sql db but each time I go through it there are some tags that I only need to pull every 3rd, 5th or 10th loop. I use one other field I didnt mention (priority) and I use RouteOnAttribute and modulus to see if I should run it that cycle. If the tag is part of the cycle I need to update the last run time so next cycle I can reference from that point. I did try looking into QueryDatabaseTable but had some issues that some other people were running into as well. My new flow looks like: FetchFile -> ReplaceText Search Value: (${retry.counter}\t${TagName},${Priority}),.*), Replace Value: $1,${EndRun} - I create EndRun attribute earlier in the flow ->UpdateAttribute (for file name) ->Put file Do you think there will be an issues with constantly having to fetch, regex replace and putfile? My list has counts that goes up to ~1200 Thanks
... View more
- « Previous
- Next »