Member since
10-09-2015
86
Posts
179
Kudos Received
8
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
25202 | 12-29-2016 05:19 PM | |
1857 | 12-17-2016 06:05 PM | |
14771 | 08-24-2016 03:08 PM | |
2163 | 07-14-2016 02:35 AM | |
3995 | 07-08-2016 04:29 PM |
05-02-2017
02:41 PM
@Laurence Da Luz That might be a little outdated. I have an updated Article for HDF-2.x and configuring LDAP via Ambari. https://community.hortonworks.com/articles/79627/hdf-20-ldap-user-authentication-with-ambari.html
... View more
03-14-2017
06:33 PM
1 Kudo
Hi @Sunile Manjee, Since you are using NiFi to launch jobs, why don't you use NiFi itself to monitor it 😛 I tried to flex NiFi to monitor Yarn jobs by querying ResourceManager , and have documented it and my flow xml is attached in the comments. check it out. https://community.hortonworks.com/content/kbentry/42995/yarn-application-monitoring-with-nifi.html In the demo I used it to monitor Failed and Killed jobs only, you can change the query and ask for all the jobs say user smanjee submitted and alert you as soon as its completed/failed/killed. Thanks Jobin
... View more
02-24-2017
09:46 PM
@Matt Clarke G1GC was set as garbage collector and Issue was fixed in the next version of HDF. This one went unnoticed, accepting the answer. Thanks, Jobin George
... View more
02-21-2017
06:13 PM
1 Kudo
Hi @Josh Elser, I was wondering why there could be an invalid version of Hbase Jar in the class path of a Sanbox, I havnt changed any configs. Also what is the best approach to fix this? Thanks, Jobin
... View more
02-21-2017
06:33 AM
3 Kudos
Introduction Using NiFi, data can be exposed in such a way that a receiver can pull from it by adding an Output Port to the root process group.
For Spark, we will use this same mechanism - we will use the Site-to-Site protocol to pull data from NiFi's Output Ports. In this tutorial we learn to capture NiFi app log from the Sandbox and parse it using Java regex and ingest it to Phoenix via Spark or Directly using NiFi PutSql Processor. Prerequisites 1) Assuming you already have latest version of NiFi-1.x/HDF-2.x downloaded as zip file (HDF and HDP cannot be managed by Ambari on same nodes as of now) on to your HW Sandbox Version 2.5, else execute below after ssh connectivity to sandbox is established: # mkdir /opt/HDF-2.1.1
# cd /opt/HDF-2.1.1 # wget http://public-repo-1.hortonworks.com/HDF/2.1.1.0/nifi-1.1.0.2.1.1.0-2-bin.tar.gz
# tar -xvf nifi-1.1.0.2.1.1.0-2-bin.tar.gz 2) Spark, Zeppelin, YARN and HDFS are Installed on your VM and started. 3) Hbase is Installed with phoeix Query Server. 4) Download Compatible version [in our case 1.1.0] of "nifi-spark-receiver" and "nifi-site-to-site-client" to Sandbox in a specific location: # mkdir /opt/spark-receiver
# cd /opt/spark-receiver
# wget http://central.maven.org/maven2/org/apache/nifi/nifi-spark-receiver/1.1.0/nifi-spark-receiver-1.1.0.jar
# wget http://central.maven.org/maven2/org/apache/nifi/nifi-site-to-site-client/1.1.0/nifi-site-to-site-client-1.1.0.jar 5) Make sure Git is installed on the VM: # yum install git -y Configuring and Creating Table in Hbase via Phoenix 1) Make sure Hbase components as well as phoenix query server is started.
2) Make sure Hbase is up and running and out of maintenance mode, below properties are set(if not set it and restart the services): - Enable Phoenix --> Enabled
- Enable Authorization --> Off 3) Create Phoenix Table after connecting to phoenix shell (or via Zeppelin): # /usr/hdp/current/phoenix-client/bin/sqlline.py sandbox.hortonworks.com:2181:/hbase-unsecure 4) Execute below in the Phoenix shell to create tables in Hbase: CREATE TABLE NIFI_LOG( UUID VARCHAR NOT NULL, EVENT_DATE VARCHAR, BULLETIN_LEVEL VARCHAR, EVENT_TYPE VARCHAR, CONTENT VARCHAR CONSTRAINT pk PRIMARY KEY(UUID));
CREATE TABLE NIFI_DIRECT( UUID VARCHAR NOT NULL, EVENT_DATE VARCHAR, BULLETIN_LEVEL VARCHAR, EVENT_TYPE VARCHAR, CONTENT VARCHAR CONSTRAINT pk PRIMARY KEY(UUID)); Configuring and Restarting Spark 1) Login to Ambari UI and Navigate to Services --> Spark --> Configs --> Custom spark-defaults and add 2 below properties with given values: spark.driver.extraClassPath = /opt/HDF-2.1.1/nifi-1.1.0.2.1.1.0-2/lib/nifi-framework-api-1.1.0.2.1.1.0-2.jar:/opt/spark-receiver/nifi-site-to-site-client-1.1.0.jar:/opt/spark-receiver/nifi-spark-receiver-1.1.0.jar:/opt/HDF-2.1.1/nifi-1.1.0.2.1.1.0-2/lib/nifi-api-1.1.0.2.1.1.0-2.jar:/opt/HDF-2.1.1/nifi-1.1.0.2.1.1.0-2/lib/bootstrap/nifi-utils-1.1.0.2.1.1.0-2.jar:/opt/HDF-2.1.1/nifi-1.1.0.2.1.1.0-2/work/nar/framework/nifi-framework-nar-1.1.0.2.1.1.0-2.nar-unpacked/META-INF/bundled-dependencies/nifi-client-dto-1.1.0.2.1.1.0-2.jar:/opt/HDF-2.1.1/nifi-1.1.0.2.1.1.0-2/work/nar/framework/nifi-framework-nar-1.1.0.2.1.1.0-2.nar-unpacked/META-INF/bundled-dependencies/httpcore-nio-4.4.5.jar:/usr/hdp/current/phoenix-client/phoenix-client.jar
spark.driver.allowMultipleContexts = true 2) Once properties are add, restart Spark. Configuring and Starting NiFi 1) Open nifi.properties for updating configurations: # vi /opt/HDF-2.1.1/nifi-1.1.0.2.1.1.0-2/conf/nifi.properties 2) Change NIFI http port to run on 9090 as default 8080 will conflict with Ambari web UI # web properties #
nifi.web.http.port=9090 3) Configure NiFi instance to run site-to site by changing below configuration : add a port say 8055 and set "nifi.remote.input.secure" as "false" # Site to Site properties #
nifi.remote.input.socket.port=8055
nifi.remote.input.secure=false 4) Now Start [Restart if already running for configuration change to take effect] NiFi on your Sandbox. # /opt/HDF-2.1.1/nifi-1.1.0.2.1.1.0-2/bin/nifi.sh start 5) Make sure NiFi is up and running by connecting to its Web UI from your browser: http://your-vm-ip:9090/nifi/ Building a Flow in NiFi to fetch and parse nifi-app.log 1) Let us build a small flow on NiFi canvas to read app log generated by NiFi itself to feed to Spark:
2) Drop a "TailFile" Processor to canvas to read lines added to "/opt/HDF-2.1.1/nifi-1.1.0.2.1.1.0-2/logs/nifi-app.log". Auto Terminate relationship Failure. 3) Drop a "SplitText" Processor to canvas to split the log file into separate lines. Auto terminate Original and Failure Relationship for now. Connect TailFile processor to SplitText Processor for Success Relationship. 4) Drop a "ExtractText" Processor to canvas to extract portions of the log content to attributes as below. Connect SplitText processor to ExtractText Processor for splits relationship. - BULLETIN_LEVEL:([A-Z]{4,5})
- CONTENT:(^.*)
- EVENT_DATE:([^,]*)
- EVENT_TYPE:(?<=\[)(.*?)(?=\]) 5) Drop an OutputPort to the canvas and Name it "spark", Once added, connect "ExtractText" to the port for matched relationship. The Flow would look similar as below: 6) Start the flow on NiFi and notice data is stuck in the connection before the output port "spark" Building Spark application 1) To begin with, lets clone the git repo below: # cd /opt/
# git clone https://github.com/jobinthompu/NiFi-Spark-Feeding-Data-to-Spark-Streaming.git 2) Feel free the inspect Spark application code: # vi /opt/NiFi-Spark-Feeding-Data-to-Spark-Streaming/src/main/Spark+NiFi+Phoenix.sh 3) Now let us go ahead and submit the Spark Application to YARN or can run locally via spark-shell # spark-shell --master yarn --deploy-mode client -i /opt/NiFi-Spark-Feeding-Data-to-Spark-Streaming/src/main/Spark+NiFi+Phoenix.sh OR # spark-shell -i /opt/NiFi-Spark-Feeding-Data-to-Spark-Streaming/src/main/Spark+NiFi+Phoenix.sh 4) Make sure the application is submitted and it prints out statistics.
5) Lets Go ahead and verify that the Application is submitted and started in YARN (you can drill down and see the Application-Master spark UI as well): YARN UI: http://your-vm-ip:8088 Or if you Submit the application locally you can verify that by accessing spark shell application UI: http://sandbox.hortonworks.com:4040/executors/ 6) Lets Go back to the NiFi Web UI, if everything worked fine, the data which was pending on the port 'spark' will be gone as it was consumed by Spark. 7) Now Lets Connect to Phoenix and check out the data populated in tables, you can either use Phoenix sqlline command line or Zeppelin: - via phoenix sqlline # /usr/hdp/current/phoenix-client/bin/sqlline.py localhost:2181:/hbase-unsecure
SELECT EVENT_DATE,EVENT_TYPE,BULLETIN_LEVEL FROM NIFI_LOG WHERE BULLETIN_LEVEL='INFO' ORDER BY EVENT_DATE LIMIT 20; - via Zeppelin for better visualization Zeppelin UI: http://your-vm-ip:9995/ Extending NiFi Flow to ingest data directly to Phoenix using PutSql processor 1) Lets go ahead and kill the Spark Application by pressing cntrl+c from command-line: 2) Log back to NiFi UI currently running the flow, and stop the entire flow. 3) Drop a RouteOnAttribute processor to canvas for Matched relation from ExtractText processor and configure it with below property and auto terminate unmatched relation. DEBUG : ${BULLETIN_LEVEL:equals('DEBUG')}
ERROR : ${BULLETIN_LEVEL:equals('ERROR')}
INFO : ${BULLETIN_LEVEL:equals('INFO')}
WARN : ${BULLETIN_LEVEL:equals('WARN')} 4) Drop an AttributesToJSON processor to canvas with below configuration and connect RouteOnAttribute's DEBUG,ERROR,INFO,DEBUG relations to it. Attributes List : uuid,EVENT_DATE,BULLETIN_LEVEL,EVENT_TYPE,CONTENT
Destination : flowfile-content 5) Create and enable DBCPConnectionPool with name "Phoenix-Spark" with below configuration: Database Connection URL : jdbc:phoenix:sandbox.hortonworks.com:2181:/hbase-unsecure
Database Driver Class Name : org.apache.phoenix.jdbc.PhoenixDriver
Database Driver Location(s) : /usr/hdp/current/phoenix-client/phoenix-client.jar 6) Drop a ConvertJSONToSQL to canvas with below configuration, connect AttributesToJSON's success relation to it, auto terminate Failure relation for now after connecting to Phoenix-Spark DB Controller service. 7) Drop a ReplaceText processor canvas to update INSERT statements to UPSERT for Phoenix with below configuration, connect sql relation of ConvertJSONToSQL auto terminate original and Failure relation. 😎 Finally add a PutSQL processor with below configurations and connect it to ReplaceText's success relation and auto terminate all of its relations. 9) The final flow including both ingestion via Spark and direct to phoenix using PutSql is complete, it should look similar to below: 10) Now go ahead and start the flow to ingest data to both Tables via Spark and directly from NiFi. 11) Login back to Zeppelin to see if data is populated in the NIFI_DIRECT table. %jdbc(phoenix)
SELECT EVENT_DATE,EVENT_TYPE,BULLETIN_LEVEL FROM NIFI_DIRECT WHERE BULLETIN_LEVEL='INFO' ORDER BY EVENT_DATE Too Lazy to create flow??? download my flow template here
This completes the tutorial, You have successfully: - Installed and Configured HDF 2.1 on your HDP-2.5 Sandbox. - Created a Data flow to pull logs and then to Parse it and make it available on a Site-to-site enabled NiFi port. - Created a Spark Application to consume data from NiFi via Site-to-Site and Ingest it to Hbase via Phoenix. - Directly Ingested Data to Phoenix with PutSQL Processor in NiFi with out using Spark. - Viewed the Ingested data from Phoenix command line and Zeppelin References: Mark Payne's - NiFi-Spark My GitHub Article Thanks, Jobin George
... View more
02-19-2017
04:48 PM
2 Kudos
Hi, While Trying to execute phoenix queries via zeppelin in HDP-2.5 Sandbox, I am getting below NoSuchMethodError. I enabled Phoenix and turned Auth to simple in Hbase conf. Please let me know if I am missing something here (ranger is disabled and not Kerberized): 04:50:05.006 [pool-3-thread-2] ERROR org.apache.zeppelin.jdbc.security.JDBCSecurityImpl - Invalid auth.type detected with value , defaulting auth.type to SIMPLE
org.apache.phoenix.exception.PhoenixIOException: java.lang.NoSuchMethodError: org.apache.hadoop.hbase.protobuf.generated.MasterProtos$ModifyTableRequest$Builder.setNonceGroup(J)Lorg/apache/hadoop/hbase/protobuf/generated/MasterProtos$ModifyTableRequest$Builder;
at org.apache.phoenix.util.ServerUtil.parseServerException(ServerUtil.java:111) ~[phoenix-core-4.7.0-HBase-1.1.jar:4.7.0-HBase-1.1]
at org.apache.phoenix.query.ConnectionQueryServicesImpl.ensureTableCreated(ConnectionQueryServicesImpl.java:1063) ~[phoenix-core-4.7.0-HBase-1.1.jar:4.7.0-HBase-1.1]
at org.apache.phoenix.query.ConnectionQueryServicesImpl.createTable(ConnectionQueryServicesImpl.java:1369) ~[phoenix-core-4.7.0-HBase-1.1.jar:4.7.0-HBase-1.1]
at org.apache.phoenix.schema.MetaDataClient.createTableInternal(MetaDataClient.java:2116) ~[phoenix-core-4.7.0-HBase-1.1.jar:4.7.0-HBase-1.1]
at org.apache.phoenix.schema.MetaDataClient.createTable(MetaDataClient.java:828) ~[phoenix-core-4.7.0-HBase-1.1.jar:4.7.0-HBase-1.1]
at org.apache.phoenix.compile.CreateTableCompiler$2.execute(CreateTableCompiler.java:183) ~[phoenix-core-4.7.0-HBase-1.1.jar:4.7.0-HBase-1.1]
at org.apache.phoenix.jdbc.PhoenixStatement$2.call(PhoenixStatement.java:338) ~[phoenix-core-4.7.0-HBase-1.1.jar:4.7.0-HBase-1.1]
at org.apache.phoenix.jdbc.PhoenixStatement$2.call(PhoenixStatement.java:326) ~[phoenix-core-4.7.0-HBase-1.1.jar:4.7.0-HBase-1.1]
at org.apache.phoenix.call.CallRunner.run(CallRunner.java:53) ~[phoenix-core-4.7.0-HBase-1.1.jar:4.7.0-HBase-1.1]
at org.apache.phoenix.jdbc.PhoenixStatement.executeMutation(PhoenixStatement.java:324) ~[phoenix-core-4.7.0-HBase-1.1.jar:4.7.0-HBase-1.1]
at org.apache.phoenix.jdbc.PhoenixStatement.executeUpdate(PhoenixStatement.java:1326) ~[phoenix-core-4.7.0-HBase-1.1.jar:4.7.0-HBase-1.1]
at org.apache.phoenix.query.ConnectionQueryServicesImpl$13.call(ConnectionQueryServicesImpl.java:2436) ~[phoenix-core-4.7.0-HBase-1.1.jar:4.7.0-HBase-1.1]
at org.apache.phoenix.query.ConnectionQueryServicesImpl$13.call(ConnectionQueryServicesImpl.java:2248) ~[phoenix-core-4.7.0-HBase-1.1.jar:4.7.0-HBase-1.1]
at org.apache.phoenix.util.PhoenixContextExecutor.call(PhoenixContextExecutor.java:78) ~[phoenix-core-4.7.0-HBase-1.1.jar:4.7.0-HBase-1.1]
at org.apache.phoenix.query.ConnectionQueryServicesImpl.init(ConnectionQueryServicesImpl.java:2248) ~[phoenix-core-4.7.0-HBase-1.1.jar:4.7.0-HBase-1.1]
at org.apache.phoenix.jdbc.PhoenixDriver.getConnectionQueryServices(PhoenixDriver.java:233) ~[phoenix-core-4.7.0-HBase-1.1.jar:4.7.0-HBase-1.1]
at org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.createConnection(PhoenixEmbeddedDriver.java:135) ~[phoenix-core-4.7.0-HBase-1.1.jar:4.7.0-HBase-1.1]
at org.apache.phoenix.jdbc.PhoenixDriver.connect(PhoenixDriver.java:202) ~[phoenix-core-4.7.0-HBase-1.1.jar:4.7.0-HBase-1.1]
at java.sql.DriverManager.getConnection(DriverManager.java:664) ~[?:1.8.0_111]
at java.sql.DriverManager.getConnection(DriverManager.java:208) ~[?:1.8.0_111]
at org.apache.zeppelin.jdbc.JDBCInterpreter.getConnection(JDBCInterpreter.java:257) ~[zeppelin-jdbc-0.6.0.2.5.0.0-1245.jar:0.6.0.2.5.0.0-1245]
at org.apache.zeppelin.jdbc.JDBCInterpreter.getStatement(JDBCInterpreter.java:275) ~[zeppelin-jdbc-0.6.0.2.5.0.0-1245.jar:0.6.0.2.5.0.0-1245]
at org.apache.zeppelin.jdbc.JDBCInterpreter.executeSql(JDBCInterpreter.java:336) [zeppelin-jdbc-0.6.0.2.5.0.0-1245.jar:0.6.0.2.5.0.0-1245]
at org.apache.zeppelin.jdbc.JDBCInterpreter.interpret(JDBCInterpreter.java:442) [zeppelin-jdbc-0.6.0.2.5.0.0-1245.jar:0.6.0.2.5.0.0-1245]
at org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:94) [zeppelin-interpreter-0.6.0.2.5.0.0-1245.jar:0.6.0.2.5.0.0-1245]
at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:341) [zeppelin-interpreter-0.6.0.2.5.0.0-1245.jar:0.6.0.2.5.0.0-1245]
at org.apache.zeppelin.scheduler.Job.run(Job.java:176) [zeppelin-interpreter-0.6.0.2.5.0.0-1245.jar:0.6.0.2.5.0.0-1245]
at org.apache.zeppelin.scheduler.ParallelScheduler$JobRunner.run(ParallelScheduler.java:162) [zeppelin-interpreter-0.6.0.2.5.0.0-1245.jar:0.6.0.2.5.0.0-1245]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_111]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_111]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_111]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_111]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_111]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_111]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_111]
Caused by: org.apache.hadoop.hbase.DoNotRetryIOException: java.lang.NoSuchMethodError: org.apache.hadoop.hbase.protobuf.generated.MasterProtos$ModifyTableRequest$Builder.setNonceGroup(J)Lorg/apache/hadoop/hbase/protobuf/generated/MasterProtos$ModifyTableRequest$Builder;
at org.apache.hadoop.hbase.client.RpcRetryingCaller.translateException(RpcRetryingCaller.java:229) ~[hbase-client-1.1.3.jar:1.1.3]
at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:140) ~[hbase-client-1.1.3.jar:1.1.3]
at org.apache.hadoop.hbase.client.HBaseAdmin.executeCallable(HBaseAdmin.java:4036) ~[hbase-client-1.1.3.jar:1.1.3]
at org.apache.hadoop.hbase.client.HBaseAdmin.modifyTable(HBaseAdmin.java:2548) ~[hbase-client-1.1.3.jar:1.1.3]
at org.apache.hadoop.hbase.client.HBaseAdmin.modifyTable(HBaseAdmin.java:2561) ~[hbase-client-1.1.3.jar:1.1.3]
at org.apache.phoenix.query.ConnectionQueryServicesImpl.modifyTable(ConnectionQueryServicesImpl.java:1086) ~[phoenix-core-4.7.0-HBase-1.1.jar:4.7.0-HBase-1.1]
at org.apache.phoenix.query.ConnectionQueryServicesImpl.ensureTableCreated(ConnectionQueryServicesImpl.java:1058) ~[phoenix-core-4.7.0-HBase-1.1.jar:4.7.0-HBase-1.1]
... 33 more
Caused by: java.lang.NoSuchMethodError: org.apache.hadoop.hbase.protobuf.generated.MasterProtos$ModifyTableRequest$Builder.setNonceGroup(J)Lorg/apache/hadoop/hbase/protobuf/generated/MasterProtos$ModifyTableRequest$Builder;
at org.apache.hadoop.hbase.protobuf.RequestConverter.buildModifyTableRequest(RequestConverter.java:1298) ~[hbase-client-1.1.3.jar:1.1.3]
at org.apache.hadoop.hbase.client.HBaseAdmin$25.call(HBaseAdmin.java:2551) ~[hbase-client-1.1.3.jar:1.1.3]
at org.apache.hadoop.hbase.client.HBaseAdmin$25.call(HBaseAdmin.java:2548) ~[hbase-client-1.1.3.jar:1.1.3]
at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:126) ~[hbase-client-1.1.3.jar:1.1.3]
at org.apache.hadoop.hbase.client.HBaseAdmin.executeCallable(HBaseAdmin.java:4036) ~[hbase-client-1.1.3.jar:1.1.3]
at org.apache.hadoop.hbase.client.HBaseAdmin.modifyTable(HBaseAdmin.java:2548) ~[hbase-client-1.1.3.jar:1.1.3]
at org.apache.hadoop.hbase.client.HBaseAdmin.modifyTable(HBaseAdmin.java:2561) ~[hbase-client-1.1.3.jar:1.1.3]
at org.apache.phoenix.query.ConnectionQueryServicesImpl.modifyTable(ConnectionQueryServicesImpl.java:1086) ~[phoenix-core-4.7.0-HBase-1.1.jar:4.7.0-HBase-1.1]
at org.apache.phoenix.query.ConnectionQueryServicesImpl.ensureTableCreated(ConnectionQueryServicesImpl.java:1058) ~[phoenix-core-4.7.0-HBase-1.1.jar:4.7.0-HBase-1.1]
... 33 more
Thanks, Jobin George
... View more
Labels:
02-16-2017
08:06 PM
1 Kudo
Hi @Andrew Grande , Thanks for the response, I have seen this when threads are set more than one for some processors and these settings worked so far exactly when funnel is not involved. thats why It is confusing.
... View more
02-16-2017
07:46 PM
3 Kudos
Hi, I have a small flow created to test priority, but while configuring i noticed that sometimes Back Pressure(Object Threshold) set is not honored per connection when I have a funnel in it.
Below is a flow I created, every connection except final connection have default Back Pressure Settings(even Concurrent Tasks is default 1), Last connection out of funnel have Object threshold Set to 10. When I start all the processors togather except the last LogAttribute processor in the flow, I see Back Pressure Threshold is exceeded in at least 2 connections each time I tested. Am I missing something or is there a logical explanation?
Attaching My flow back-pressure.xml Version: [HDF-2.0.1] [NiFi-1.0.0.2.0.1.0-12] Thanks, Jobin George
... View more
Labels:
- Labels:
-
Apache NiFi
-
Cloudera DataFlow (CDF)
02-16-2017
03:54 AM
1 Kudo
Hi Harsh, Cant make out much from the screenshot as the main error cause is not visible. First glance, it looks more like environment issue.
... View more
02-15-2017
07:29 AM
4 Kudos
Introduction Recently a customer asked me how to change destination of a connection which still contains data, but the destination is stopped. Using NiFi REST Api we can change the flow, here in this article I am trying to capture steps to update destination of a connection using REST API. The requirement around this was to push incoming data to different flows on a timely manner. Prerequisites 1) To test this, Make sure HDF-2.x version of NiFi is up an running 2) Minimum 3 processors are on the canvas with connection like below: 3) Note the destination's uuid [In my case 'PutNext' processor's uuid] 4) Note the uuid of the connection that has to be made to the 'PutNext' processor 'GET'ing the details of connection 1) Execute the below command on your terminal with uuid of the connection: curl -i -X GET http://localhost:8080/nifi-api/connections/dcbee9dd-0159-1000-45a7-8306c28f2786 2) Now Copy the result of the GET curl command and update the below section with uuid of the 'PutNext' processor. "destination":{"id":
"destinationId":"
"destinationId": 3) Remove the below json element from the copied result: "permissions":{"canRead":true,"canWrite":true}, Updating Destination with PUT REST calls 1) Once updated, run the PUT REST API call from command line using curl as below: curl -i -X PUT -H 'Content-Type: application/json' -d '{ **MY UPDATED JSON**}' http://localhost:8080/nifi-api/connections/dcbee9dd-0159-1000-45a7-8306c28f2786 My Sample command is as below: curl -i -X PUT -H 'Content-Type: application/json' -d '{
"revision": {
"clientId": "dd1c2f03-0159-1000-845b-d5c732a49869",
"version": 15
},
"id": "dcbee9dd-0159-1000-45a7-8306c28f2786",
"uri": "http://localhost:8080/nifi-api/connections/dcbee9dd-0159-1000-45a7-8306c28f2786",
"component": {
"id": "dcbee9dd-0159-1000-45a7-8306c28f2786",
"parentGroupId": "cbe6e53b-0158-1000-e36a-f9d26bb1b510",
"source": {
"id": "dcbea89f-0159-1000-278c-cc38bab689bf",
"type": "PROCESSOR",
"groupId": "cbe6e53b-0158-1000-e36a-f9d26bb1b510",
"name": "GenerateFlowFile",
"running": false,
"comments": ""
},
"destination": {
"id": "dcbebd86-0159-1000-7559-d77d1e05c910",
"type": "PROCESSOR",
"groupId": "cbe6e53b-0158-1000-e36a-f9d26bb1b510",
"name": "PutFile",
"running": false,
"comments": ""
},
"name": "",
"labelIndex": 1,
"zIndex": 0,
"selectedRelationships": ["success"],
"availableRelationships": ["success"],
"backPressureObjectThreshold": 10000,
"backPressureDataSizeThreshold": "1 GB",
"flowFileExpiration": "0 sec",
"prioritizers": [],
"bends": []
},
"status": {
"id": "dcbee9dd-0159-1000-45a7-8306c28f2786",
"groupId": "cbe6e53b-0158-1000-e36a-f9d26bb1b510",
"name": "success",
"statsLastRefreshed": "09:02:22 EST",
"sourceId": "dcbea89f-0159-1000-278c-cc38bab689bf",
"sourceName": "GenerateFlowFile",
"destinationId": "dcbebd86-0159-1000-7559-d77d1e05c910",
"destinationName": "PutFile",
"aggregateSnapshot": {
"id": "dcbee9dd-0159-1000-45a7-8306c28f2786",
"groupId": "cbe6e53b-0158-1000-e36a-f9d26bb1b510",
"name": "success",
"sourceName": "GenerateFlowFile",
"destinationName": "PutFile",
"flowFilesIn": 0,
"bytesIn": 0,
"input": "0 (0 bytes)",
"flowFilesOut": 0,
"bytesOut": 0,
"output": "0 (0 bytes)",
"flowFilesQueued": 18,
"bytesQueued": 18,
"queued": "18 (18 bytes)",
"queuedSize": "18 bytes",
"queuedCount": "18"
}
},
"bends": [],
"labelIndex": 1,
"zIndex": 0,
"sourceId": "dcbea89f-0159-1000-278c-cc38bab689bf",
"sourceGroupId": "cbe6e53b-0158-1000-e36a-f9d26bb1b510",
"sourceType": "PROCESSOR",
"destinationId": "dcbebd86-0159-1000-7559-d77d1e05c910",
"destinationGroupId": "cbe6e53b-0158-1000-e36a-f9d26bb1b510",
"destinationType": "PROCESSOR"
}' http://localhost:8080/nifi-api/connections/dcbee9dd-0159-1000-45a7-8306c28f2786
2) Once you execute the above, if the update is successful, you will get below result: HTTP/1.1 200 OK
Date: Fri, 27 Jan 2017 14:59:37 GMT
Cache-Control: private, no-cache, no-store, no-transform
Content-Type: application/json
Transfer-Encoding: chunked
Server: Jetty(9.3.9.v20160517) Result will be similar as below: 3) Now login Back to the NiFi UI and make sure the change is done: References:
NiFi API My github Thanks, Jobin George
... View more
Labels: