Member since
09-28-2015
16
Posts
26
Kudos Received
0
Solutions
11-06-2020
12:33 AM
Hello, I set up the type and entity according to your steps.But I can't see the entities in the Atlas UI. I can see the entity by looking up the guid through the API. What's the problem?
... View more
10-04-2018
04:46 PM
3 Kudos
Objective
The objective of this
article is to present a workflow to capture and republish data in Kafka topics. Note that NiFi is NOT a Kafka Replication/Backup tool. This is just a workflow based approach to capture kafka data and publish it back to a fresh topic if necessary.
Although storage of Kafka data is not mandatory is several cases, it may be essential in cases such as
Archival – Dump Kafka Data in HDFS Corrections/Inspections – In cases where an invalid event
may disrupt a downstream system, to correct and restore the messages.
Start
by creating two processor groups – One for the backup workflow, second for the
restore workflow Backup
Workflow
Backup
is a long running workflow which archives data to HDFS. A file in HDFS may
contain a batch of events or a single event, based on how frequently messages
are getting published to the source kafka topic. Hence, user does not have to
stop/start this backup workflow frequently. Restore
Workflow
Restore
a manually triggered job/workflow in Nifi and it has to be modeled based on the
topic being restored.
Some considerations:
Restore
data to a temporary topic. A restore to original topic may duplicate the data
in the original topic.
If you
want to restore data to original topic, do that only after cleaning the
original topic (re-create the original topic)
Stop
backup workflow which re-publishing to original topic: To avoid duplication of
backed up data in HDFS by Nifi. An alternate approach would be to move or
rename the HDFS backup directory for the topic.
Steps:-
Stop
Backup KafkaConsumer processor.
Edit
Restore Workflow - ListHDFS processor - to update the HDFS directory to
the topic which needs to be restored.
Edit
Restore Workflow - PublishKafka processor - to update the topic which
needs to be restored.
Delete
and Recreate the restore topic (if it already exists)
Start
the Restore Workflow.
After
the restore is complete and verified -
Stop the restore workflow Move the backup directory (which has
the original backup files)
Start the backup process (this should
take all the data to a new directory to avoid duplication)
Backup Workflow
Some Technical Consideration for Backup Workflow:-
1.Backup location is HDFS
2.Kafka Key is to be saved along with the
Kafka Payload
3.Message Metadata like, partition info,
message offset should be stored along with the message
4.Messages format is either text or json
Variables Used
in the Backup Workflow
The following variables have been used the following
processors to avoid hardcoding values.
kafka.brokers
kafka.kerberos-service-name
kerberos.principal
kerberos.keytab
hadoop.configuration.resources
Destination Directory: /edl_data/backup/kafka/${kafka.topic}
Data Format
Each
file thus created may have a batch of Kafka messages. Each message would have
the following format. Note that in case there is a missing key or payload, that
part would just be left empty.
[topicname-partition-offset]<@>[message
key]
<@>[message payload]
Example:-
topic1-0-231<@> {"ID":"535"}<@> {"vwcustomer":{"id":"13631918","name":"Mikel","age":51,"date":"2018-10-04T15:16:06Z"}}
ConsumeKafka
This processor has a list of
topic(s), whose messages should be consumed and eventually backed up in HDFS.
Topic Names: Can have a list of topics that would be consumed by
this processor.
GroupID:
the nifi-kafka-consumer group id. if you want to consume all the messages,
please reset this to a new consumer group for a new backup
Max Poll Records: Why is this set to 1? To get
the key for each Kafka message. If we poll a batch of kafka messages, the
message key is lost and not stored as an attribute for the flow file. In order
to save the kafka key, max poll records should be set to 1. This way each
message is sent to a flow file having a kafka.key attribute.
UpdateAttribute
This processor captures the kafka-metadata
that would be used later to store this information along with the message key
and value. ReplaceText
This processor is used to
append the kafka.topic.metadata and the kafka.key to the beginning of the
message.
Note that the delimiter
used is separate metadata, key and value is "<@>". If this
delimiter is updated, make sure to update the same in the ExtractText processor
of the Restore Workflow.
MergeContent
If we do NOT use this
processor, then each kafka message (metadata<@>key<@>value)- would
be written to a separate HDFS file. Instead, we want to merge multiple kafka
messages to a given file in HDFS, if possible. We use MergeContent processor to
this. Note that we are merging content based on the attribute kafka.topic;
hence messages with same topic should end up together.
For
information on how batching would occur and other properties, Check:
https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.6.0/org.apache.nifi.processors.standard.MergeContent/index.html PutHDFS
Eventually, we backup the files to HDFS. Note that if the kafka.topic
destination directory does not exist, PutHDFS will create it. Restore Kafka Messages
Some Technical Consideration for Restore Workflow:-
1.Restore Messages from HDFS backup location to
Kafka Topic
2.Restore Key and payload 3.Order of restored messages: An attempt has been
made to preserve order while restoring kafka messages. Kafka maintains order
only on a partition. Hence, this Restore workflow is for topics with 1
partition.
4.If order of restored messages is not important,
than this workflow can be made significantly simpler.
5.As per the backup workflow, this is for messages
in text or json format.
ENSURE THAT BACKUP IS STOPPED BEFORE RESTORE IS STARTED.
This prevents a backup/restore loop thus duplicating the messages. ListHDFS and FetchHDFS
These processors fetch the hdfs files from the configured
directory location in ListHDFS. The files will not be deleted in the source
directory.
SplitText and ExtractText
These processors are used to extract the Kafka offset from
the metadata section stored in the backed up hdfs files.
The offset attribute extracted is used to sequence the
messages using “priority” attribute and “PriorityAttributePrioritizer”
Update Attribute, EnforceOrder and MergeContent
Update Attribute creates a new 'priority' attribute in every
flowfile (kafka message), and assigns the offset value to it.
This 'priority' attribute is used in the following relationships to prioritize processing of the messages.
Enforce Order is used to order flow files based on the
attribute "kafka.offset".
The wait timeout is set to 10 seconds. This means that if
there is a missing offset (order attribute) in the sequence, the processor will
wait for a 10 seconds window in which time, if it gets the event, it continues
to process and take the message to success relationship. If event does not
arrive within the window, the message will be taken to overtook relationship.
Refer:-
https://issues.apache.org/jira/browse/NIFI-3414
https://gist.github.com/ijokarumawak/88fc30a2300845b3c27a79113fc72d41
MergeContent batches the messages back into 1 flow file.
ExtractText, UpdateAttribute, PublishKafka Finally, we extract the metadata, key and value. Publish
kafka key and value using PublishKafka processor.
Note, that kafka topic names should be updated in
PublishKafka processor.
We use UpdateAttribute processor yet again to order messages
before publishing them.
... View more
Labels:
10-04-2018
04:50 AM
4 Kudos
Objective
-To store multiple row versions in HBase
to evaluate the impact on performance when doing reading all versions vs.
getting the latest version. To put this differently, would storing multiple
versions affect the performance when querying the latest version. -Using NiFi to be able to quickly ingest millions
or rows into HBase Warning
-Do not store more than a few versions in HBase.
This can have negative impacts. HBase is NOT designed to store more than a few
versions of a cell. Step 1: Create Sample Workflow using NiFi to ingest data into HBase table
-Create HBase Table Dataset: https://www.citibikenyc.com/system-data create 'venkataw:nycstations','nycstationfam'
0 row(s) in 1.3070 seconds
hbase(main):014:0> desc 'venkataw:nycstations'
Table venkataw.nycstations is ENABLED
venkataw.nycstations
COLUMN FAMILIES DESCRIPTION
{NAME => 'nycstationfam', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE', MIN_VERSIONS =>
'0', BLOCKCACHE => 'true', BLOCKSIZE => '65536', REPLICATION_SCOPE => '0'}
1 row(s) in 0.1870 seconds
put 'venkataw:nycstations', 1224, 'nycstationfam:name', 'citiWunnava'
put 'venkataw:nycstations',1224,'nycstationfam:short_name','citiW'
put 'venkataw:nycstations',1224,'nycstationfam:lat','-90.12'
put 'venkataw:nycstations',1224,'nycstationfam:lon','.92'
put 'venkataw:nycstations',1224,'nycstationfam:region_id','9192'
put 'venkataw:nycstations',1224,'nycstationfam:capacity','100202'
put 'venkataw:nycstations',1224,'nycstationfam:rental_url','http://www.google.com/'
hbase(main):016:0> scan 'venkataw:nycstations'
ROW COLUMN+CELL
1224 column=nycstationfam:capacity, timestamp=1538594876306, value=100202
1224 column=nycstationfam:lat, timestamp=1538594875626, value=-90.12
1224 column=nycstationfam:lon, timestamp=1538594875643, value=.92
1224 column=nycstationfam:name, timestamp=1538594875555, value=citiWunnava
1224 column=nycstationfam:region_id, timestamp=1538594875660, value=9192
1224 column=nycstationfam:rental_url, timestamp=1538594902755, value=http://www.google.com/
1224 column=nycstationfam:short_name, timestamp=1538594875606, value=citiW
alter 'venkataw:nycstations', NAME=>'nycstationfam',VERSIONS => 10000
Step 2: NiFi Workflow to publish data to HBase table
The above NiFi workflow consumes messages
from a web server and published it to HBase. Configuration for the processors is as
follows: GetHTTP processor reads the REST endpoint
every 5 seconds. We
extract the stations object using SplitJson processor Finally
we use the PutHBaseJson processor we ingest the data to the destination HBase
Table created above. Notice that I am trying to randomly assign row identifier
so that eventually I get multiple rows versions for the same identifier The
PutHBaseJson processor uses the HBase Client Controller Servicer to connect to
HBase using Kerberos credentials. Step 3: Run queries to read the latest version and all available versions
I tried querying all versions vs. latest
versions in HBase with the following queries hbase(main):002:0> get 'venkataw:nycstations', 99886 , {COLUMN=> ['nycstationfam:station_id','nycstationfam:name','nycstationfam:short_name','nycstationfam:lat','nycstationfam:lon','nycstationfam:region_id','nycstationfam:capacity','nycstationfam:rental_url']}
COLUMN CELL
nycstationfam:capacity timestamp=1507322481470, value=31
nycstationfam:lat timestamp=1507322481470, value=40.71286844
nycstationfam:lon timestamp=1507322481470, value=-73.95698119
nycstationfam:name timestamp=1507322481470, value=Grand St & Havemeyer St
nycstationfam:region_id timestamp=1507322481470, value=71
nycstationfam:rental_url timestamp=1507322481470, value=http://app.citibikenyc.com/S6Lr/IBV092JufD?station_id=471
nycstationfam:short_name timestamp=1507322481470, value=5267.08
nycstationfam:station_id timestamp=1507322481470, value=471
8 row(s) in 0.0600 seconds
get 'venkataw:nycstations', 99828 , {COLUMN=> ['nycstationfam:station_id','nycstationfam:name','nycstationfam:short_name','nycstationfam:lat','nycstationfam:lon','nycstationfam:region_id','nycstationfam:capacity','nycstationfam:rental_url'],VERSIONS => 100}
{done for diff. rowids}
24 row(s) in 0.0200 seconds
16 row(s) in 0.0300 seconds
8 row(s) in 0.0310 seconds
232 row(s) in 0.1850 seconds
8 row(s) in 0.0570 seconds
152 row(s) in 0.0380 seconds
184 row(s) in 0.0420 seconds
208 row(s) in 0.1550 seconds
1 row:-
8 row(s) in 0.0050 seconds
8 row(s) in 0.0040 seconds
8 row(s) in 0.0060 seconds
all versions:-
14765 row(s) in 2.4350 seconds
14351 row(s) in 1.1620 seconds
14572 row(s) in 2.4210 seconds
In the above results * The green rows are for the latest version reads * The yellow rows are all version reads Notice how latest version reads are fairly consistent and have a smaller response times. Also notice as the number of versions (rows) increase, the response times for all-version reads keep increasing. So
based on this observation, as expected, it would seem like a query to get the latest
version would consistently perform well when compared to a query which returns ‘n’
versions.
... View more
Labels:
10-01-2018
04:40 PM
Failing with the below error . what is the reason ?
Exception in thread "main" java.sql.SQLException: Could not open client transport with JDBC Uri: jdbc:hive2://<<HOST>>:10000/default;principal=<<Principle_NAME>>;ssl=true;transportMode=http;httpPath=gateway/default/hive: Could not create http connection to jdbc:hive2://<<HOST>>:10000/default;principal=hive/_HOST@DEVAD.DOMAIN.COM;ssl=true;transportMode=http;httpPath=gateway/default/hive. org.apache.http.client.ClientProtocolException at org.apache.hive.jdbc.HiveConnection.openTransport(HiveConnection.java:210) at org.apache.hive.jdbc.HiveConnection.<init>(HiveConnection.java:156) at org.apache.hive.jdbc.HiveDriver.connect(HiveDriver.java:105) at java.sql.DriverManager.getConnection(DriverManager.java:664) at java.sql.DriverManager.getConnection(DriverManager.java:247) at com.hwx.hive.HiveJDBCKnox.main(HiveJDBCKnox.java:80) Caused by: org.apache.thrift.transport.TTransportException: Could not create http connection to jdbc:hive2://<<HOST>>:10000/default;principal=hive/_HOST@DEVAD.DOMAIN.COM;ssl=true;transportMode=http;httpPath=gateway/default/hive. org.apache.http.client.ClientProtocolException at org.apache.hive.jdbc.HiveConnection.createHttpTransport(HiveConnection.java:255) at org.apache.hive.jdbc.HiveConnection.openTransport(HiveConnection.java:183) ... 5 more Caused by: org.apache.thrift.transport.TTransportException: org.apache.http.client.ClientProtocolException at org.apache.thrift.transport.THttpClient.flushUsingHttpClient(THttpClient.java:297) at org.apache.thrift.transport.THttpClient.flush(THttpClient.java:313) at org.apache.thrift.TServiceClient.sendBase(TServiceClient.java:65) at org.apache.hive.service.cli.thrift.TCLIService$Client.send_OpenSession(TCLIService.java:154) at org.apache.hive.service.cli.thrift.TCLIService$Client.OpenSession(TCLIService.java:146) at org.apache.hive.jdbc.HiveConnection.createHttpTransport(HiveConnection.java:244) ... 6 more Caused by: org.apache.http.client.ClientProtocolException at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:186) at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:117) at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:55) at org.apache.thrift.transport.THttpClient.flushUsingHttpClient(THttpClient.java:251) ... 11 more Caused by: org.apache.http.HttpException at org.apache.hive.jdbc.HttpRequestInterceptorBase.process(HttpRequestInterceptorBase.java:86) at org.apache.http.protocol.ImmutableHttpProcessor.process(ImmutableHttpProcessor.java:132) at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:182) at org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:88) at org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:110) at org.apache.http.impl.execchain.ServiceUnavailableRetryExec.execute(ServiceUnavailableRetryExec.java:84) at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:184) ... 14 more Caused by: org.apache.http.HttpException at org.apache.hive.jdbc.HttpKerberosRequestInterceptor.addHttpAuthHeader(HttpKerberosRequestInterceptor.java:68) at org.apache.hive.jdbc.HttpRequestInterceptorBase.process(HttpRequestInterceptorBase.java:74) ... 20 more Caused by: java.lang.reflect.UndeclaredThrowableException at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1713) at org.apache.hive.service.auth.HttpAuthUtils.getKerberosServiceTicket(HttpAuthUtils.java:83) at org.apache.hive.jdbc.HttpKerberosRequestInterceptor.addHttpAuthHeader(HttpKerberosRequestInterceptor.java:62) ... 21 more Caused by: GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt) at sun.security.jgss.krb5.Krb5InitCredential.getInstance(Krb5InitCredential.java:147) at sun.security.jgss.krb5.Krb5MechFactory.getCredentialElement(Krb5MechFactory.java:122) at sun.security.jgss.krb5.Krb5MechFactory.getMechanismContext(Krb5MechFactory.java:187) at sun.security.jgss.GSSManagerImpl.getMechanismContext(GSSManagerImpl.java:224) at sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:212) at sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:179) at org.apache.hive.service.auth.HttpAuthUtils$HttpKerberosClientAction.run(HttpAuthUtils.java:183) at org.apache.hive.service.auth.HttpAuthUtils$HttpKerberosClientAction.run(HttpAuthUtils.java:151) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698) ... 23 more
... View more
02-16-2017
10:51 AM
Some Comments in 2017:-) The Process i most of the time correct but i have a few more information on this: 1. Kerberos Cache Mechanisms: Depending on your line: [libdefaults]
default_ccache_name you have various odd behaviors, when not seeing your Kerberos Ticket in your Kerberos Ticket Manager Windows Client. You need to check out, which Cache Behavior suits your environment. default_ccache_name = FILE:%TEMP%\krb5cc
default_ccache_name = DIR:%TEMP%\krb5cc
default_ccache_name = API:krb5cc
default_ccache_name = MEMORY:krb5cc
default_ccache_name = MSLSA:krb5cc 2. Choosing the "workable" Kerberos Windows Client. Version 4.01 as of now demands its records from an DNS, if i understood that correctly. Since most Lab Environemnts don't have a full fledged DNS Server with Kerberos SRV Entries, which the client is looking for, switching to the older Version 3.22 seems appropriate. Also version 3.22 havs a lot more Functtions than the stripped down 4.01 Client - So it seems. Please check out the version for yourself. 3. Choosing the right Firefox Browser with the right gssapi Library. When modifying your Firefox Network.* directives, you need to make sure, that your Firefox Version (32bit/64Bit) matches EXACTLY the used gssapi Library -> gssapi32.dll / gssapi64.dll network.negotiate-auth.using-native-gsslib: false
network.negotiate-auth.gsslib;C:\09_Tools\MIT_Kerberos\bin\gssapi64.dll
It wont work otherwise and noone will tell you 😉 Additionally, when Hortonworks speak about to Kerberize the Hadoop Cluster, they actually forgot to mention that its really important to kerberize the client too. It does not magically happen. The Amount of Questions regarding "Kerberos - View doesnt work anymore, cant access my Site anymore. etc.) speaks for itself. @Hortonworks: Lot of room for more decent and more importantly End2End Documentation. Best Regards, Normen
... View more
10-04-2015
11:35 AM
1 Kudo
Removed customer name.
... View more