Created 03-02-2017 01:05 PM
I am trying to pull data from Nifi using site to site mechanism
The Nifi flow has following processors :-
GetFile Processor, Output Port and connection from GetFile to Output port.
On starting them all ... files are getting in queue but not reaching Output Port as i can't see them in Data Provenance of Output Port...
I am trying to pull data from Nifi through Spark.
# Site to Site properties
nifi.remote.input.host=localhost
nifi.remote.input.secure=false
nifi.remote.input.socket.port=10000
nifi.remote.input.http.enabled=true
nifi.remote.input.http.transaction.ttl=30 sec
nifi.web.http.port=8080
The port name in Nifi and Spark config is also same.
I am also using Nifi receiver for Spark
Please assist
Created 03-02-2017 01:23 PM
@nmaillard : Please assist. I have followed steps given in site https://blogs.apache.org/nifi/entry/stream_processing_nifi_and_spark
Created 03-02-2017 01:34 PM
Some suggestions...
- You can telnet over ports 9090 (assume using Ni-Fi default), and 10000, from both servers to each other.
- Port 10000 is not already in use?
- Can you define the nifi.remote.input.host - use the FQDN of the partner ni-fi server.
- What is showing up on /var/log/ni-fi?
- I usually setup a flow ending in a Remote Process Group on the source, and then use an Input Port on the target.
- The ports sometimes take some time to show up and become available.
- In the ports, you are defining the correct URL......FQDN:9090/nifi
- Enable Transmission has been set
- No issues downstream, permissions, etc., the logic works if you define and test a local source in the flow.
Graham
Created 03-02-2017 01:35 PM
/var/log/nifi should read @Sunny Bhan
Created 03-02-2017 05:17 PM
Graham Martin , Graham Martin : No errror logs, nifi.remote.input.host=localhost
I am trying to pull data from Ouput Port through Spark program ( that pulls from Output Port in nifi work flow)
Success of GetFile is connected to Output Port.
Data Provenance for Output Port doesn't show any records for flow files. Although GetFile and connecting queue have flowfile records in Data Provernance.
Spark code :
SiteToSiteClientConfig config = new SiteToSiteClient.Builder() .url("http://localhost:8080/nifi") .portName("SparkPort") .buildConfig(); SparkConf sparkConf = new SparkConf() .setMaster("local[2]") .setAppName("SparkNifiConnectorApp");
nifi.web.http.port=8080
Created 03-02-2017 08:03 PM
If the Output Port in NiFi is started and running, and there is data in the queue in front of it, then it is available to be pulled and something is wrong in your Spark job.
What are the logs of your Spark job showing?
Is your Spark job really running on the same machine where NiFi is?
Just checking since you specified localhost:8080 for the location of NiFi.
Created 03-03-2017 07:18 AM
I checked once, the Spark program is running on same machine as Nifi.
The queue before the Output Port has data whereas the Output Port doesn't have any ( Doesn't show any data for it in Data Provenance Page of Output Port).
I am new to this tool.
I have attached the screenshots of my flow.nifi-flow.png output-port.pngoutput-port-data-provenance.png
Created 03-03-2017 02:45 PM
As I described already, the problem is not in your NiFi flow. Data will stay in the queue until something (the spark job) pulls the data from the port, and you won't see anything in provenance until that happens, so the problem is that your Spark job is not pulling the data.
Can you answer the question I asked before, what is shown in the logs of your Spark job?
Created 03-06-2017 07:40 AM
I am using embedded spark and running Spark application with that along with nifi receiver for spark.
On debugging I found that objects are created but no data comes there.
# Site to Site properties
nifi.remote.input.host=localhost
nifi.remote.input.secure=false
nifi.remote.input.socket.port=10000
nifi.remote.input.http.enabled=true
nifi.remote.input.http.transaction.ttl=30 sec
nifi.web.http.port=8080
Below is the code :
SiteToSiteClientConfig config = new SiteToSiteClient.Builder() .url("http://localhost:8080/nifi") .portName("SparkPort") .buildConfig();
SparkConf sparkConf = new SparkConf() .setMaster("local[2]") .setAppName("SparkNifiConnectorApp");
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));
// Create a JavaReceiverInputDStream using a NiFi receiver so that we can pull data from // specified Port
while(true)
{
JavaReceiverInputDStream packetStream = ssc.receiverStream(new NiFiReceiver(config, StorageLevel.MEMORY_ONLY()));
// Map the data from NiFi to text, ignoring the attributes
System.out.println("Packet Content = " + packetStream);
System.out.println("Packet Count = " + packetStream.countByValue());
System.out.println("Packet Count = " + packetStream.count());
JavaDStream text = packetStream.map(new Function<NiFiDataPacket, String>()
{
public String call(final NiFiDataPacket dataPacket) throws Exception
{ System.out.println("Packet Data : " +dataPacket.getContent());
return new String(dataPacket.getContent(), StandardCharsets.UTF_8); }
});
System.out.println("Data from Nifi :" + text.toString());
text.print();