Member since
Kudos Received
My Accepted Solutions
Title | Views | Posted |
33235 | 02-14-2019 02:53 PM | |
2534 | 01-04-2019 08:39 PM | |
10807 | 11-05-2018 03:38 PM | |
5079 | 09-27-2018 04:21 PM | |
2758 | 07-05-2018 02:56 PM |
03:40 PM
I changed the extensions on both .xlsx files to .zip. Unzipping them reveals the folder structure of those files. Going through the included XML files, there were some differences but nothing that stood out to cause these errors. Do you know how the vendor generates the Excel files? Is it possible these files are really .xls files but just have the .xlsx file extension? Do you know what version of Excel they use?
... View more
06:04 PM
Looking more closely at nifi-app.log, I see the following errors: 2017-09-21 13:58:36,314 ERROR [Timer-Driven Process Thread-9] o.a.n.p.poi.ConvertExcelToCSVProcessor ConvertExcelToCSVProcessor[id=a4cfc1b5-015e-1000-b59d-535f6969973d] Failed to process incoming Excel document: java.lang.UnsupportedOperationException: Only .xlsx Excel 2007 OOXML files are supported
java.lang.UnsupportedOperationException: Only .xlsx Excel 2007 OOXML files are supported
at org.apache.nifi.processors.poi.ConvertExcelToCSVProcessor$1.process(
at org.apache.nifi.processors.poi.ConvertExcelToCSVProcessor.onTrigger(
at org.apache.nifi.processor.AbstractProcessor.onTrigger(
at org.apache.nifi.controller.StandardProcessorNode.onTrigger(
at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$
at java.util.concurrent.Executors$
at java.util.concurrent.FutureTask.runAndReset(
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(
at java.util.concurrent.ScheduledThreadPoolExecutor$
at java.util.concurrent.ThreadPoolExecutor.runWorker(
at java.util.concurrent.ThreadPoolExecutor$
Caused by: org.apache.poi.openxml4j.exceptions.InvalidFormatException: Package should contain a content type part [M1.13]
at org.apache.poi.openxml4j.opc.ZipPackage.getPartsImpl(
at org.apache.poi.openxml4j.opc.OPCPackage.getParts(
at org.apache.nifi.processors.poi.ConvertExcelToCSVProcessor$1.process(
... 15 common frames omitted
2017-09-21 13:58:36,430 ERROR [Timer-Driven Process Thread-9] o.a.n.p.poi.ConvertExcelToCSVProcessor ConvertExcelToCSVProcessor[id=a4cfc1b5-015e-1000-b59d-535f6969973d] Failed to process incoming Excel document: java.lang.NullPointerException
java.lang.NullPointerException: null
at org.apache.nifi.processors.poi.ConvertExcelToCSVProcessor.handleExcelSheet(
at org.apache.nifi.processors.poi.ConvertExcelToCSVProcessor.access$000(
at org.apache.nifi.processors.poi.ConvertExcelToCSVProcessor$1.process(
at org.apache.nifi.processors.poi.ConvertExcelToCSVProcessor.onTrigger(
at org.apache.nifi.processor.AbstractProcessor.onTrigger(
at org.apache.nifi.controller.StandardProcessorNode.onTrigger(
at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$
at java.util.concurrent.Executors$
at java.util.concurrent.FutureTask.runAndReset(
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(
at java.util.concurrent.ScheduledThreadPoolExecutor$
at java.util.concurrent.ThreadPoolExecutor.runWorker(
at java.util.concurrent.ThreadPoolExecutor$
... View more
05:36 PM
I was able to reproduce the issue using the sample file you provided. If I save that .xlsx file (without making any modifications) using my Excel (Microsoft Excel for Mac Version 15.18) and use that file instead, the ConvertExcelToCSV processor has no errors. Please see attached file: Trying to determine what difference is causing the error.
... View more
04:11 PM
Hi @Lovelesh Chawla, It looks like someone has encountered a similar issue ( In /logs/nifi-app.log, can you provide the full stacktrace of the error? Have you confirmed that the data going into the ConvertExcelToCSV processor is the proper format (.xlsx (XSSF 2007 OOXML file format) Excel documents and not older .xls (HSSF '97(-2007) file format) documents).
... View more
02:36 PM
Hi @Sanaz Janbakhsh, PublishMQTT is a processor that requires an incoming relationship. Basically, something has to be coming in to it to publish to a broker. That is why you are getting the "upstream connections is invalid..." message. It sounds like you need to use the ConsumeMQTT processor which receives messages from an MQTT broker. If I am misunderstanding your use case, please post a screenshot of your flow if possible.
... View more
06:41 PM
3 Kudos
This tutorial demonstrates how to use the
PutElasticsearchHttpRecord processor to easily put data from a CSV file into Elasticsearch.
Note: The record-oriented processors and controller services were introduced in NiFi 1.2.0. As such, the tutorial needs to be done running Version 1.2.0 or later. Environment
This tutorial was tested using the following environment and components:
Mac OS X 10.11.6 Apache NiFi 1.3.0 Elasticsearch 2.3.3 PublishElasticsearchHttpRecord (CSVReader) Demo Configuration Elasticsearch
For my environment, I had Elasticsearch 2.3.3 installed.
Start Elasticsearch and assign cluster and node names:
./elasticsearch elasticsearch hcc
I like to use
Elastic HQ to manage/monitor my cluster:
Initial Flow
One of the great things about the record-oriented functionality in NiFi is the ability to re-use Record Readers and Writers. In conjunction with the Record processors, it is quick and easy to change data formats and data destinations.
For example, let's assume you have the flow working from the article
"Using PublishKafkaRecord_0_10 (CSVReader/JSONWriter)".
Note: The template for that flow can be found in that article as well as step-by-step instructions on how to configure it.
As currently configured, the flow:
1. Pulls a .zip file of movie data (titles, tags, ratings, etc.) from a website.
2. Unzips the file.
3. Sends only the movie title information on in the flow.
4. Adds Schema Name "movies" as an attribute to the flowfile.
5. Uses
PublishKafkaRecord_0_10 to convert the flowfile contents from CSV to JSON and publish to a Kafka topic.
Say instead of publishing that movie data to Kafka, you now want to put it in
Elasticsearch. The following steps will demonstrate how to do that quickly and simply by replacing the PublishKafkaRecord processor with a PutElasticsearchRecord processor and re-using a CSVReader. Elasticsearch Flow Setup
1. Delete the connection between the UpdateAttribute and PublishKafkaRecord_0_10 processors. Now delete the PublishKafkaRecord_0_10 processor or set it off to the side.
2. Add a
PutElasticsearchHttpRecord to the canvas.
3. Connect the UpdateAttribute processor to the PutElasticsearchHttpRecord processor:
4. Open the Configure dialog for the PutElasticsearchHttpRecord process. On the Settings tab, auto-terminate the "success" relationship and for the purposes of this demo, auto-terminate the "failure" relationship also.
5. On the canvas, make a "retry" relationship connection from the PutElasticsearchHttpRecord to itself.
6. On the Properties tab:
Add "" for the Elasticsearch URL property Add "movies" for the Index property Add "default" for the Type property Since it and its schema was already defined for the original PublishKafka flow, simply select "CSVReader" for the Record Reader property.
The flow is ready to run.
Flow Results
Start the flow.
(Note: If you had run the original PublishKafka flow previously, don't forget to clear the state of the GetHTTP processor so that the movie data zip is retrieved again.)
The movie data is now in Elasticsearch:
Helpful Links
Here are some links to check out if you are interested in other flows which utilize the record-oriented processors and controller services in NiFi:
Change Data Capture (CDC) with Apache NiF Convert CSV to JSON, Avro, XML using ConvertRecord Installing a local Hortonworks Registry to use with Apache NiFi Running SQL on FlowFiles using QueryRecord Processor Using PartitionRecord (GrokReader/JSONWriter) to Parse and Group Log Files
... View more
07:09 PM
2 Kudos
This tutorial walks you through a NiFi flow that utilizes the
PublishKafkaRecord_0_10 processor to easily convert a CVS file into JSON and then publish to Kafka. The tutorial is based on the blog "Integrating Apache Nifi with Apache Kafka", updated with the more recent record-oriented processors and controller services available in NiFi.
Note: The record-oriented processors and controller services were introduced in NiFi 1.2.0. As such, the tutorial needs to be done running Version 1.2.0 or later. Environment
This tutorial was tested using the following environment and components:
Mac OS X 10.11.6
Apache NiFi 1.3.0
Apache Kafka PublishKafkaRecord_0_10 (CSV to JSON) Support Files
Here is a template of the flow discussed in this tutorial:
publishkafkarecord.xml Demo Configuration Kafka Download & Install
The flow in this demo utilizes the PublishKafkaRecord_0_10 processor, which as the name implies, utilizes the Kafka 0.10.x Producer API. As a result, a 0.10.x version of Kafka is required for this tutorial. For my environment, I downloaded and installed
Kafka (Scala 2.11). Kafka Configuration & Startup
In the
bin directory of your Kafka install:
Start ZooKeeper:
./ ../config/
Start Kafka:
./ ../config/
Create Kafka Topic:
./ --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic Movies
Start Kafka Consumer:
./ --zookeeper localhost:2181 --topic Movies --from-beginning Import Template
Start NiFi. Import the provided template and add it to the canvas. You should see the following flow on your NiFi canvas:
Enable Controller Services
Select the gear icon from the Operate Palette:
This opens the NiFi Flow Configuration window. Select the Controller Services tab:
Enable AvroSchemaRegistry by selecting the lightning bolt icon/button. This will then allow you to enable the CSVReader and JSONRecordSetWriter controller services. Select the lightning bolt icons for both of these services. All the controller services should be enabled at this point:
The flow is now ready to run. Flow Overview
Here is a quick overview of the flow:
GetHTTP pulls a .zip file of movie data (titles, tags, ratings, etc.) from a website
UnpackContent unzips the file
RouteOnAttribute sends just the movie title information on in the flow
UpdateAttribute adds Schema Name "movies" as an attribute to the flowfile
Converts the flowfile contents from CSV to JSON
Publishes the JSON data to the Kafka topic "Movies" Flow Details
Let's look at each of the processors in the flow in detail:
Get Movie Data (GetHTTP Processor)
This processor pulls a zip file from the website
MovieLens, a movie recommendation service. The dataset ( contains 20,000,263 ratings and 465,564 tag applications across 27,278 movies.
Looking at the processor's configuration:
Start the processor to retrieve the file:
Note: On the Scheduling tab, the Runs Schedule is set to 10 minutes instead of the default 0 secs, so the processory only periodically checks to see if the data file has been updated instead of constantly:
Unzip (UnpackContent Processor)
The next processor is UnpackContent which unzips the "" file:
Running the processor unzips the file into 7 separate csv files (movies.csv, ratings.csv, tags.csv, links.csv, genome-scores.csv, genome-tags.csv, and README.txt):
RouteOnAttribute Processor
RouteOnAttribute is next. Looking at its configuration:
the processor routes the flowfiles to different connections depending on the file name (movies.csv, ratings.csv, tags.csv).
For the purposes of this demo, we are only interested in publishing the movie title data to Kafka. As such, we make the connection to the next processor (UpdateAttribute) using the "movies" relationship and auto-terminate the others:
Run the RouteOnAttributeProcessor to send only the movie title data:
Add Schema Name Attribute (UpdateAttribute Processor)
The next step in the flow is an UpdateAttribute processor which adds the attribute with the value of "movies" to the flowfile:
Start the processor, and view the attributes of the flowfile to confirm this:
You can also confirm the contents of the flowfile is in CSV format at this point in the flow:
Publish to "Movies" Topic (PublishKafkaRecord_0_10 Processor)
The final processor is PublishKafkaRecord_0_10. Looking at its configuration:
Kafka Brokers property is set to "localhost:9092" and Topic Name property is set to "Movies". Record Reader is set to "CSVReader" and Record Writer is set to "JsonRecordSetWriter". The "CSVReader" controller service parses the incoming CSV data and determines the data's schema. The "JsonRecordSetWriter" controller service determines the data's schema and writes that data into JSON.
CSVReader Controller Service
Select the arrow icon next to the "CSV Reader" which opens the Controller Services list in the NiFi Flow Configuration. "CSVReader" should be highlighted in the list. Select the View Details button ("i" icon) to see the properties:
With Schema Access Strategy property set to "Use 'Schema Name' Property", the reader specifies the schema expected in an attribute, which in this flow is The Schema Registry property is set to the
AvroSchemaRegistry Controller Service which defines the "movies" schema. Select the arrow icon next to "AvroSchemaRegistry" and select the View Details button ("i" icon) to see its properties:
The schema is defined as:
"type": "record",
"name": "MoviesRecord",
"fields" : [
{"name": "movieId", "type": "long"},
{"name": "title", "type": ["null", "string"]},
{"name": "genres", "type": ["null", "string"]}
JsonRecordSetWriter Controller Service
Close the window for the AvroSchemaRegistry. Select the View Details button ("i" icon) next to the "JsonRecordSetWriter" controller service to see its properties:
Schema Write Strategy is set to "Set '' Attribute", Schema Access Strategy property is set to "Use 'Schema Name' Property" and Schema Registry is set to AvroSchemaRegistry.
See JSON in Kafka
Start the PublishKafaRecord processor and you will see the JSON movie data in your Kafka Consumer window: Helpful Links
Here are some links to check out if you are interested in other flows which utilize the record-oriented processors and controller services in NiFi:
Change Data Capture (CDC) with Apache NiF
Convert CSV to JSON, Avro, XML using ConvertRecord
Installing a local Hortonworks Registry to use with Apache NiFi
Running SQL on FlowFiles using QueryRecord Processor
Using PartitionRecord (GrokReader/JSONWriter) to Parse and Group Log Files
... View more
06:27 PM
What version of NiFi are you using? If you are on 1.2.0 or later, you can make use of the new Record Reader/Writer capabilities. Check out the HCC article I wrote showing CSV to JSON to learn more: There are some URLs at the end for other articles about the Record Readers/Writers. In your scenario, you would use the AvroReader and the CSVRecordSetWriter in your ConvertRecord processor
... View more
07:40 PM
Hi @Anishkumar Valsalam I setup a Schema Registry using MySQL as part of another HCC article ( I believe you need to run the bootstrap script, which will create the necessary tables: ./bootstrap/
... View more
06:12 PM
3 Kudos
Apache NiFi provides the option of starting an embedded ZooKeeper server. However, NiFi can also be configured to run with an external ZooKeeper server. This article describes how to install and configure a 3 host ZooKeeper ensemble to work with a 2 node NiFi cluster. Environment
This tutorial was tested using the following environment and components:
Mac OS X 10.11.6
Apache ZooKeeper 3.4.6
Apache NiFi 1.3.0 ZooKeeper ZooKeeper Version
The version of ZooKeeper chosen for this tutorial is
Release 3.4.6.
Note: ZooKeeper 3.4.6 is the version supported by the latest and previous versions of Hortonworks HDF as shown in the "Component Availability In HDF" table of the HDF Release Notes. ZooKeeper Download
Go to to determine the best Apache mirror site to download a stable ZooKeeper distribution. From that mirror site, select the zookeeper-3.4.6 directory and download the zookeeper-3.4.6.tar.gz file.
Unzip the tar.gz file and create 3 copies of the distribution directory, one for each host in the ZooKeeper ensemble. For example:
Note: In this tutorial, we are running multiple servers on the same machine. ZooKeeper Configuration
"zoo.cfg" file
Next we need to create three config files. In the
conf directory of zookeeper-1, create a zoo.cfg file with the following contents:
Because we are running multiple ZooKeeper servers on a single machine, we specified the servername as localhost with unique quorum & leader election ports (i.e. 2888:3888, 2889:3889, 2890:3890) for each server.X.
Create similar
zoo.cfg files in the conf directories of zookeeper-2 and zookeeper-3 with modified values for dataDir and clientPort properties as separate dataDirs and distinct clientPorts are necessary.
"myid" file
Every machine that is part of the ZooKeeper ensemble needs to know about every other machine in the ensemble. As such, we need to attribute a server id to each machine by creating a file named
myid , one for each server, which resides in that server's data directory, as specified by the configuration file parameter dataDir .
For example, create a
myid file in /usr/local/zookeeper1 that consists of a single line with the text "1" and nothing else. Create the other myid files in the /usr/local/zookeeper2 and /usr/local/zookeeper3 directories with the contents of "2" and "3" respectively.
Note: More information about ZooKeper configuration settings can be found in the ZooKeeper Getting Started Guide. ZooKeeper Startup
Start up each ZooKeeper host, by navigating to the /bin directory of each and applying the following command:
./ start
NiFi NiFi Configuration
For a two node NiFi cluster, in each
conf directory modify the following properties in the file:
The first property configures NiFi to not use its embedded ZooKeeper. As a result, the and state-management.xml files in the conf directory are ignored. The second property must be specified to join the cluster as it lists all the ZooKeeper instances in the ensemble. NiFi Startup
You can now start up each NiFi node. When the UI is available, create or upload a flow that has processors that capture state information. For example, import and setup the flow from the
Change Data Capture (CDC) with Apache NiFi series:
In addition to the other setup steps from the CDC article, since this environment is a cluster, for the CaptureChangeMySQL processor, go to the Scheduling tab on the Configure Processor dialog. Change the Execution setting to "Primary node" from "All nodes":
Run the flow and select "View State" from the CaptureChangeMySQL and/or EnforceOrder processors to verify that state information is managed properly by the external ZooKeeper ensemble:
... View more