Community Articles

Find and share helpful community-sourced technical articles.
Labels (2)
avatar
Master Guru

Technology: CSV, AVRO, Hortonworks Schema Registry, Apache NiFi, Streaming Analytics Manager, Kafka, Hadoop HDFS

This is a simple example of reading CSV data, using it's schema to convert it to AVRO, sending it via Kafka to SAM. SAM then reads it and stores it to HDFS. This is a simple flow, but a start to setting up any level of complex flow.

A number of questions have come up on how to setup this basic flow.

Gotchas: Must send AVRO, beware of nulls, set your schema, put your schema name somewhere, create your kafka topic and make sure you have permissions.

Building Sample CSV Data

curl "http://api.mockaroo.com/api/ANID?count=1000&key=AKEY" > "simple.csv"

****You need to sign up for mockaroo and create your own schema. You will then need to add the key and account id to the above link.


Import this schema to your account

<strong>{
  "id": 76671,
  "num_rows": 1000,
  "file_format": "csv",
  "name": "simple",
  "include_header": true,
  "columns": [
    {
      "name": "first",
      "type": "First Name (Male)",
      "formula": ""
    },
    {
      "name": "second",
      "type": "Row Number",
      "formula": ""
    }
  ]
}</strong>


Create a Kafka Topic (simple)

./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partition 1 --topic simple

You can Test Consuming the Messages

./kafka-console-consumer.sh --zookeeper princeton10.field.hortonworks.com:2181 --topic simple

Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].{metadata.broker.list=princeton10.field.hortonworks.com:6667, request.timeout.ms=30000, client.id=console-consumer-7447, security.protocol=PLAINTEXT}aabaabb

Apache NiFi 1.x Flow For CSV Ingest

27494-simplenifi.png

Set the Schema Name via Update Properties in NiFi

27500-updateattribue.png

Publish Kafka Record Settings (need CSV Reader and AVRO Record Set Writer)

27502-publickafkarecord-0-10.png


CSV Reader (Make sure your use the same Schema Registry, Access Strategy and Schema Name)27503-csvreader.png


Avro Writer (Make sure you use the same Schema Registry, Write Strategy and Schema Name)

27504-avrorecordsetwriter.png

Add Your Simple Schema to the HSR

27501-schema.png


Simple Streaming Analytics Manager Store to HDFS

27496-samoverview.png

27497-simplesamrunning.png

SAM Kafka Source:

27499-simplesamkafka.png

SAM HDFS Sink: Set a directory and what fields you want to output

27498-simplesamhdfs2.png

Result File in HDFS

hdfs dfs -cat /simple/38-HDFS-2-0-1502904631885.txt
a,1
a,2
b,1
a,5
a,-1
b,-5
b,9
a,1
a,2
b,1
a,5
a,-1
b,-5
b,9


Confirmations via Data Provenance

Source File 
wc -l simple.csv
1000 simple.csv

PublishKafka 
msg.count 1000
hdfs dfs -cat /simple/38-HDFS-2-3-1502906678558.txt  | wc -l
472
hdfs dfs -cat /simple/38-HDFS-2-2-1502906677976.txt | wc -l
296
hdfs dfs -cat /simple/38-HDFS-2-1-1502906676961.txt | wc -l
232


Reference:


Templates (Load greg.xml into NiFi via templates. Change simplejson.txt to simple.json and load into SAM)

greg.xml

simplejson.txt

Sample Data

simplecsv.txt

Create a New Schema in the Registry from this Schema

{
 "name": "simple",
 "type": "record",
 "fields": [
  {
   "name": "first",
   "type": "string"
  },
  {
   "name": "second",
   "type": "int"
  }
 ]
}
4,621 Views
Comments
avatar
Master Guru

I upload a sample of the data incase you don't want to generate your own with mockaroo. It's in simplecsv.txt.

Drop this file in the GetFile directory