Member since
07-09-2016
83
Posts
17
Kudos Received
2
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
208 | 12-08-2016 06:46 AM | |
530 | 12-08-2016 06:46 AM |
03-06-2019
10:22 PM
Is there a way to apply the rowprefixfilter search against the row key, in NiFi? I can't find any other HBase processor other than above that can be added as a downstream processor. Please advise
... View more
Labels:
03-06-2019
10:22 PM
Is there a way to apply the rowprefixfilter search against the row key, in NiFi? I can't find any other HBase processor other than above that can be added as a downstream processor. Please advise
... View more
Labels:
02-20-2018
03:44 PM
The issue is that the writer schema uses a different size than the reader schema and that causes the compatibility issue. Is there any workaround this?
... View more
02-13-2018
03:09 PM
How to evolve the schema, when the size of the particular attribute changes?
V1 schema: { "name": "sid", "type": [ "null", { "type": "fixed", "name": "SID", "namespace": "com.int.datatype", "doc": "", "size": 64 } ], "doc": "", "default": null, "businessLogic": "" }
V2 schema: { "name": "sid", "type": [ "null", { "type": "fixed", "name": "SID", "namespace": "com.int.datatype", "doc": "", "size": 16 } ], "doc": "", "default": null, "businessLogic": "" } By keeping the size as 64 (for the schema associated to the Hive table), we can query against the avro files corresponding to ***v1 schema version but not v2*** hive> select sid from avro where dt='2017-12-02' limit 2; OK rojbg4ccmpwz ilknjyclhplm Time taken: 0.103 seconds, Fetched: 2 row(s) hive> select sid from avro where dt='2018-02-11' limit 2; OK Failed with exception java.io.IOException:org.apache.avro.AvroTypeException: Found com.int.datatype.SID, expecting com.int.datatype.SID Time taken: 0.106 seconds
By keeping the size as 16 (for the schema associated to the Hive table), we can query against the avro files corresponding to ***v2 schema version but not v1*** hive> select sid from avro where dt='2018-02-11' limit 2; OK 238d4a8bb307 xpj6nicoaxfl Time taken: 0.205 seconds, Fetched: 2 row(s) hive> select sid from avro where dt='2017-12-02' limit 2; OK Failed with exception java.io.IOException:org.apache.avro.AvroTypeException: Found com.int.datatype.SID, expecting com.int.datatype.SID Time taken: 0.11 seconds
... View more
Labels:
11-25-2017
08:51 AM
1) To send data to and from the same cluster of nodes, do we need configure URL with two different ports within the same cluster? Say for e.g. the UI is accessible on port 9091, configure another port 9092? 2) What is the recommended transport protocol (Raw vs Http)? Does each function differently? 3) I understand that the Input/Output ports have to be created in the root canvas. Can a single Input/Output ports be used for various remote process groups (data flows)? OR, should we configure multiple Input/Output ports corresponding to each data flow?
... View more
Labels:
09-21-2017
08:39 AM
The above processor will stop execution once the finishedQuery is set to true. I have the following questions 1) This processor manages the state locally, so if we need to view the attribute value OR to clear it, what mechanism can be used? I understand we can use the controller service for the distributed cache when it is stored on the cluster? 2) Why does the processor have to stop execution when it completes the scrolling through all the pages for a given search result? What mechanism exists to start with a new search? I understand that the state (finishedQuery attribute) has to be cleared, but there is no documentation of how this can be achieved. Does this require a JIRA to be opened to have this documented so this processor can be utilized and adopted? 3) Is there a sample flow that utilizes this processor and how it submits new search after each search scroll completion? scrollelasticsearchhttp.jpg
... View more
- Tags:
- Data Processing
- NiFi
Labels:
08-26-2017
05:09 PM
What we observe is that the conversion to UTC on the fly doesn't help, since at the time of writing it always uses the EST (system timezone) into Parquet.
... View more
08-22-2017
07:55 AM
So there is a flat text file, all attributes are separated by comma. One of the attribute (say "created") has timestamp value in the string format and it is of UTC timezone value. The requirement is to convert from CSV to Parquet. This is being achieved by having one table based on CSV the other one based on Parquet format and the process runs on a server with EST timezone. However, the attribute "created" timestamp when read from Spark process (running on a UTC timezone server) increments by +4 hrs. Can someone shed somelight on how this can be prevented in Hive while writing into Parquet?
... View more
Labels:
08-02-2017
05:26 AM
A hive table (of AvroSerde) is associated with a static schema file (.avsc). In the event there are data files of varying schema, the hive query parsing fails. Option 1: ------------ Whenever there is a change in schema, the current and the new schema can be compared and the schema can be manually edited. For e.g. we can add the default value for the new column being added. However, this is a manual process. Is it feasible to perform this comparison automatically (custom coding) and evolve the schema file? Option 2: ------------ As we know each Avro file contains both schema and the data, can we use the custom serde to extend Hive class and read the data from HDFS and parse it, relay the results back to Hive. Is this approach feasible?
... View more
Labels:
05-04-2017
07:47 AM
<code>
package com.test.utility;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
public class EHPhoenix {
static final String JDBC_DRIVER = "org.apache.phoenix.jdbc.PhoenixDriver";
static final String ZOOKEPER = "trnsyxsvr:2181";
static final String SECURITYTYPE = "/hbase-secure:phtest@HDP_DEV.COM";
static final String KEYTAB = "/home/phtest/phtest.headless.keytab";
static final String DB_URL = "jdbc:phoenix:" + ZOOKEPER + ":" + SECURITYTYPE + ":" + KEYTAB;
public static void main(String[] args) {
Connection conn = null;
Statement st = null;
System.out.println(DB_URL + ";");
try {
Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
System.out.println("Connecting to database..");
conn = DriverManager.getConnection(DB_URL);
System.out.println("Creating statement...");
st = conn.createStatement();
String sql;
sql = "SELECT * FROM collector_status";
ResultSet rs = st.executeQuery(sql);
while (rs.next()) {
String did = rs.getString(1);
System.out.println("Did found: " + did);
}
rs.close();
st.close();
conn.close();
}
catch (SQLException se) {
se.printStackTrace();
}
catch (Exception e) {
// Handle errors for Class.forName
e.printStackTrace();
}
finally {
// finally block used to close resources
try {
if (st != null)
st.close();
}
catch (SQLException se2) {
} // nothing we can do
try {
if (conn != null)
conn.close();
}
catch (SQLException se) {
se.printStackTrace();
}
}
System.out.println("Goodbye!");
}
}
<code>Gives the following error when running it from Eclipse IDE. Can you please advise?
<code> jdbc:phoenix:trnsyxsvr:2181:/hbase-secure:phtest@HDP_DEV.COM:/home/phtest/phtest.headless.keytab;
log4j:WARN No appenders could be found for logger (org.apache.hadoop.util.Shell).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Connecting to database..
java.sql.SQLException: org.apache.hadoop.hbase.client.RetriesExhaustedException: Failed after attempts=36, exceptions:
Thu May 04 03:19:36 EDT 2017, null, java.net.SocketTimeoutException: callTimeout=60000, callDuration=73361: row 'SYSTEM:CATALOG,,' on table 'hbase:meta' at region=hbase:meta,,1.1588230740, hostname=trnsyxsvr,16020,1493844289998, seqNum=0
at org.apache.phoenix.query.ConnectionQueryServicesImpl$12.call(ConnectionQueryServicesImpl.java:2465)
at org.apache.phoenix.query.ConnectionQueryServicesImpl$12.call(ConnectionQueryServicesImpl.java:2382)
at org.apache.phoenix.util.PhoenixContextExecutor.call(PhoenixContextExecutor.java:76)
at org.apache.phoenix.query.ConnectionQueryServicesImpl.init(ConnectionQueryServicesImpl.java:2382)
at org.apache.phoenix.jdbc.PhoenixDriver.getConnectionQueryServices(PhoenixDriver.java:255)
at org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.createConnection(PhoenixEmbeddedDriver.java:149)
at org.apache.phoenix.jdbc.PhoenixDriver.connect(PhoenixDriver.java:221)
at java.sql.DriverManager.getConnection(DriverManager.java:664)
at java.sql.DriverManager.getConnection(DriverManager.java:270)
at com.cox.util.EHPhoenix.main(EHPhoenix.java:27)
Caused by: org.apache.hadoop.hbase.client.RetriesExhaustedException: Failed after attempts=36, exceptions:
Thu May 04 03:19:36 EDT 2017, null, java.net.SocketTimeoutException: callTimeout=60000, callDuration=73361: row 'SYSTEM:CATALOG,,' on table 'hbase:meta' at region=hbase:meta,,1.1588230740, hostname=trnsyxsvr,16020,1493844289998, seqNum=0
at org.apache.hadoop.hbase.client.RpcRetryingCallerWithReadReplicas.throwEnrichedException(RpcRetryingCallerWithReadReplicas.java:271)
at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:210)
at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:60)
at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:210)
at org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:327)
at org.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:302)
at org.apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:167)
at org.apache.hadoop.hbase.client.ClientScanner.<init>(ClientScanner.java:162)
at org.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:797)
at org.apache.hadoop.hbase.MetaTableAccessor.fullScan(MetaTableAccessor.java:602)
at org.apache.hadoop.hbase.MetaTableAccessor.tableExists(MetaTableAccessor.java:366)
at org.apache.hadoop.hbase.client.HBaseAdmin.tableExists(HBaseAdmin.java:403)
at org.apache.phoenix.query.ConnectionQueryServicesImpl$12.call(ConnectionQueryServicesImpl.java:2410)
... 9 more
Caused by: java.net.SocketTimeoutException: callTimeout=60000, callDuration=73361: row 'SYSTEM:CATALOG,,' on table 'hbase:meta' at region=hbase:meta,,1.1588230740, hostname=trnsyxsvr,16020,1493844289998, seqNum=0
at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:169)
at org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture.run(ResultBoundedCompletionService.java:65)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.hadoop.hbase.exceptions.ConnectionClosingException: Call to trnsyxsvr/210.220.3.139:16020 failed on local exception: org.apache.hadoop.hbase.exceptions.ConnectionClosingException: Connection to trnsyxsvr/210.220.3.139:16020 is closing. Call id=11, waitTime=189
at org.apache.hadoop.hbase.ipc.AbstractRpcClient.wrapException(AbstractRpcClient.java:281)
at org.apache.hadoop.hbase.ipc.RpcClientImpl.call(RpcClientImpl.java:1238)
at org.apache.hadoop.hbase.ipc.AbstractRpcClient.callBlockingMethod(AbstractRpcClient.java:223)
at org.apache.hadoop.hbase.ipc.AbstractRpcClient$BlockingRpcChannelImplementation.callBlockingMethod(AbstractRpcClient.java:328)
at org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$BlockingStub.scan(ClientProtos.java:32831)
at org.apache.hadoop.hbase.client.ScannerCallable.openScanner(ScannerCallable.java:379)
at org.apache.hadoop.hbase.client.ScannerCallable.call(ScannerCallable.java:201)
at org.apache.hadoop.hbase.client.ScannerCallable.call(ScannerCallable.java:63)
at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:210)
at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas$RetryingRPC.call(ScannerCallableWithReplicas.java:364)
at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas$RetryingRPC.call(ScannerCallableWithReplicas.java:338)
at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:136)
... 4 more
Caused by: org.apache.hadoop.hbase.exceptions.ConnectionClosingException: Connection to trnsyxsvr/210.220.3.139:16020 is closing. Call id=11, waitTime=189
at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.cleanupCalls(RpcClientImpl.java:1057)
at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.close(RpcClientImpl.java:856)
at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.run(RpcClientImpl.java:575)
Goodbye!
... View more
Labels:
04-27-2017
11:22 AM
1 Kudo
Hi, I am unable to visualize how multiple NiFi nodes in a cluster processes a flowfile. In a NiFi cluster, the same dataflow runs on all the nodes. As a result, every component in the flow runs on every node Ref: https://nifi.apache.org/docs/nifi-docs/html/administration-guide.html Processors can then be schedule to run on the Primary Node only, via an option on the scheduling tab of the processor which is only available in a cluster. Ref: https://community.hortonworks.com/articles/16120/how-do-i-distribute-data-across-a-nifi-cluster.html With the context as described above, let's say at the very top of the process I have a processor of "CRON driven" schedule type and the rest are simply "Timer driven". Assume this flow doesn't involve any Kafka processors within it and there are 2 nodes in the cluster. If the schedule is set to 12.00AM at the top processor start, will it start on both the NiFi nodes? If so, should we set the Execution to "Prmary node" for the top processor, to prevent the processor getting executed in both the NiFi nodes in parallel? In the second scenario if my top processor is ConsumeKafka_0_10 and if it is of "Timer driven" schedule type, if my topic is configured with 6 partitions and if I set the concurrency to 3, would 3 processor instances run one node and another 3 processor instance on another node?
... View more
Labels:
03-06-2017
06:57 AM
Assume updates, inserts and deletes are replicated into Hadoop file system in a flat text files via ORACLE GoldenGate and there is a partition folder for each day. We have an external Hive table based on the files ingested, that can be used to run queries on it. As far as getting the upto date data is concerned, this method is fine. However, we have an issue with this: 1) The files keeps growing and this will increase the storage utilization, potentially cause the capacity problems 2) As more files to scan, the Hive query performance will decrease eventually. Also utilize additional resources for the map reduce jobs. The above asks for a Compaction, unfortunately this is something not addressed by GoldenGate. I see that the Hive internal compaction (minor/major) supports only on ORC format and that external tables cannot be made ACID tables since the changes on external tables are beyond the control of the compactor. It appears to me that custom compaction process is our best option. I do not want to use GoldenGate - Kafka integration (with Log compaction feature) as this requires to retrieving the data from stream each time to build the latest image of the table and making it accessible via Hive. Can someone please advise on the custom compaction and if there are any better alternatives using the above GoldenGate replication?
... View more
- Tags:
- compaction
- Data Processing
- HDFS
- Hive
- Upgrade to HDP 2.5.3 : ConcurrentModificationException When Executing Insert Overwrite : Hive
Labels:
12-29-2016
08:39 PM
1 Kudo
I am probably throwing this out a little early without looking in detail the documentation. I am wondering if anyone had used the Rest API's to identify the flowfiles active within a particular processgroup, OR perhaps if you could advise on the appropriate Rest API's that can be utilized to identify. One additional question, our environment NiFi is kerberized, does the Rest API require authentication and do we need to maintain the token somewhere in the cache to make additional API calls. Objective: Need to make sure that no flowfile is stuck in a particular process group
... View more
Labels:
12-19-2016
11:43 PM
My use case requires to write in a delimited format. INSERT OVERWRITE LOCAL fits perfectly for this. I wish we had a way to custom delimit the content retrieved through SelectHiveQL processor, so I couldn't opt it. I agree it's a good option of writing to HDFS instead. I will attempt to modify the process, but I still wonder why it wouldn't work with write to Local filesystem, through NiFi.
... View more
12-19-2016
11:37 PM
What I understand is that Hive temporarily uses the /tmp on the HDFS to write into and then copies over to Local directory. So, in the ranger the recursive access to /tmp HDFS folder has been issued. But, the issue still persists with NiFi
... View more
12-19-2016
11:33 PM
RouteOnContent with regex (^\{\}) helped to identify the empty JSON Object.
... View more
12-19-2016
10:29 PM
Yes, the incoming object is always an array, if there is a row. When empty, it comes as a JSON object {} [{"dt": "2016-12-14"},{"dt": "2016-12-15"},{"dt": "2016-12-16"}]
... View more
12-19-2016
09:18 AM
When the incoming flowfile contains empty array {}, it fails with the below error. Is there a way to skip flowfile with such rows? SplitJson[id=01581338-695c-1081-d7c3-a2b84d03b57d] The evaluated value {} of $ was not a JSON Array compatible type and cannot be split.
... View more
Labels:
12-18-2016
11:41 PM
Sample file: NiFi flow: After Fetchfile (flowfile content):
... View more
12-18-2016
09:36 AM
1 Kudo
When you retrieve the content of a file through above processor, in the flowfile we can see a new line character appended at the end. Is this an expected behavior? Any reason for this behavior and how it can be avoided?
... View more
Labels:
12-18-2016
07:25 AM
Ranger is specific to HDFS. I am referring to the issue with Hive writing to the local file system, see Hive statement in my summary above. And, /tmp directory is accessible by any user
... View more
12-16-2016
10:12 PM
I cannot get the following query run using PutHiveQL processor, it fails with the permission denied exception. I see that the processor emulates the same behavior as in Beeline. But, I have been able to run the query from Hive CLI and writes to a file as expected. So, we know that Hive shell is an option, but can you let me know if there is any specific settings that causes this behavior in Beeline (Hive2) preventing to write to local filesystem? insert overwrite local directory '/tmp' select current_date from dual Error: Error while compiling statement: FAILED: HiveAccessControlException Permission denied: user [xxxxx] does not have [WRITE] privilege on [/tmp] (state=42000,code=40000)
... View more
Labels:
12-16-2016
03:28 AM
For a simple reflect function as below which works in HIVE shell, cannot be executed through PutHiveQL processor. Do you know if this is an expected behavior? SELECT reflect("java.util.UUID", "nameUUIDFromBytes", "test") from dual
... View more
Labels:
12-15-2016
06:50 PM
I have a process group and from the input port, I just could not link to the ExecuteProcess processor. Can you please advise on any specific reason for that?
... View more
Labels:
12-15-2016
06:47 PM
Thanks, that appears to be relatively simple change. I will give a try
... View more
12-15-2016
06:46 PM
The only option with Avro available is to split into JSON, but there on any thoughts on how that can be emitted in to a delimited format?
... View more
12-15-2016
12:47 AM
After querying from Hive, I wanted to write into a file with a custom delimiter say |, is there a way to achieve that? I cannot use Replacetext to replace, with the custom delimiter as the column values may have comma in it. One other option I see is have query producing single string with custom delimiter. But it encloses the column values with the double quotes for the string type columns and that voids the required fileformat.
... View more
Labels: