Created on 08-16-2017 05:33 PM - edited 08-17-2019 11:33 AM
Technology: CSV, AVRO, Hortonworks Schema Registry, Apache NiFi, Streaming Analytics Manager, Kafka, Hadoop HDFS
This is a simple example of reading CSV data, using it's schema to convert it to AVRO, sending it via Kafka to SAM. SAM then reads it and stores it to HDFS. This is a simple flow, but a start to setting up any level of complex flow.
A number of questions have come up on how to setup this basic flow.
Gotchas: Must send AVRO, beware of nulls, set your schema, put your schema name somewhere, create your kafka topic and make sure you have permissions.
Building Sample CSV Data
curl "http://api.mockaroo.com/api/ANID?count=1000&key=AKEY" > "simple.csv"
****You need to sign up for mockaroo and create your own schema. You will then need to add the key and account id to the above link.
Import this schema to your account
<strong>{ "id": 76671, "num_rows": 1000, "file_format": "csv", "name": "simple", "include_header": true, "columns": [ { "name": "first", "type": "First Name (Male)", "formula": "" }, { "name": "second", "type": "Row Number", "formula": "" } ] }</strong>
Create a Kafka Topic (simple)
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partition 1 --topic simple
You can Test Consuming the Messages
./kafka-console-consumer.sh --zookeeper princeton10.field.hortonworks.com:2181 --topic simple Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].{metadata.broker.list=princeton10.field.hortonworks.com:6667, request.timeout.ms=30000, client.id=console-consumer-7447, security.protocol=PLAINTEXT}aabaabb
Apache NiFi 1.x Flow For CSV Ingest
Set the Schema Name via Update Properties in NiFi
Publish Kafka Record Settings (need CSV Reader and AVRO Record Set Writer)
CSV Reader (Make sure your use the same Schema Registry, Access Strategy and Schema Name)
Avro Writer (Make sure you use the same Schema Registry, Write Strategy and Schema Name)
Add Your Simple Schema to the HSR
Simple Streaming Analytics Manager Store to HDFS
SAM Kafka Source:
SAM HDFS Sink: Set a directory and what fields you want to output
Result File in HDFS
hdfs dfs -cat /simple/38-HDFS-2-0-1502904631885.txt a,1 a,2 b,1 a,5 a,-1 b,-5 b,9 a,1 a,2 b,1 a,5 a,-1 b,-5 b,9
Confirmations via Data Provenance
Source File wc -l simple.csv 1000 simple.csv PublishKafka msg.count 1000
hdfs dfs -cat /simple/38-HDFS-2-3-1502906678558.txt | wc -l 472 hdfs dfs -cat /simple/38-HDFS-2-2-1502906677976.txt | wc -l 296 hdfs dfs -cat /simple/38-HDFS-2-1-1502906676961.txt | wc -l 232
Reference:
Templates (Load greg.xml into NiFi via templates. Change simplejson.txt to simple.json and load into SAM)
Sample Data
Create a New Schema in the Registry from this Schema
{ "name": "simple", "type": "record", "fields": [ { "name": "first", "type": "string" }, { "name": "second", "type": "int" } ] }
Created on 08-21-2017 07:53 PM
I upload a sample of the data incase you don't want to generate your own with mockaroo. It's in simplecsv.txt.
Drop this file in the GetFile directory