Created on 12-21-2016 02:28 PM - edited 08-17-2019 07:10 AM
Business Need
I needed to extract links from web pages using JSoup. I originally wrote a microservice for my NiFi MP3 Jukebox. I built a custom NiFi Processor instead. It's a pretty simple process and there's a ton of great articles on how to do it referenced below.
Custom NiFi Process Development Process
One thing I found useful was having at least one JUnit to test running your processor. Deploying takes a while especially if you need to deploy to a cluster of servers. I found a lot of great NIFI Custom Processor Unit and Integration tests online (see reference area). It's really easy to develop custom processors in Java. In your tests you can input files and get out real files. In your tests you should have a saved copy of what the valid file should be and then you can compare the output. Your JUnits can be triggered from Jenkins or other build tools. NiFi Custom Processors can be developed in your standard development process (TDD, CD, CI, Autodeploy). For the NAR you just need to SCP your NAR file to all the NIFI nodes lib directory and restart NIFI. If you want to deploy your flow templates, you can do so with this tool.
Test
package com.dataflowdeveloper.processors.process; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.UnsupportedEncodingException; import java.util.List; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.Before; import org.junit.Test; public class LinkProcessorTest { private TestRunner testRunner; @Before public void init() { testRunner = TestRunners.newTestRunner(LinkProcessor.class); } @Test public void testProcessor() { testRunner.setProperty("url", "http://sparkdeveloper.com"); try { testRunner.enqueue(new FileInputStream(new File("src/test/resources/test.csv"))); } catch (FileNotFoundException e) { e.printStackTrace(); } testRunner.run(); testRunner.assertValid(); List<MockFlowFile> successFiles = testRunner.getFlowFilesForRelationship(LinkProcessor.REL_SUCCESS); for (MockFlowFile mockFile : successFiles) { try { System.out.println("FILE:" + new String(mockFile.toByteArray(), "UTF-8")); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } } }
Step 1: Ingest / Parse a URL via NiFi (or could source from a file)
Step 2: LinkProcessor url could be hardcoded or expression from attributes from previous processor.
Step 3: UpdateAttribute to change the name of the resulting filename (add JSON, make unique).
Building the Link Processor
Run the included build.sh (on Linux or OSX), or run mvn install. Requires JDK 8 and Maven and internet access to build. Deploy the NAR! scp nifi-process-nar/target/nifi-linkextractor-nar-1.0-SNAPSHOT.nar PLACE:/place/ or copy it to your NIFI/lib directory locally. You can also get a release of the NAR from github.
Maven Build Script (pom.xml)
<?xml version="1.0" encoding="UTF-8"?> <!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. --> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>com.dataflowdeveloper</groupId> <artifactId>linkextractor</artifactId> <version>1.0-SNAPSHOT</version> </parent> <artifactId>nifi-process-processors</artifactId> <packaging>jar</packaging> <dependencies> <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-api</artifactId> </dependency> <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-processor-utils</artifactId> </dependency> <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-mock</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> </dependency> <dependency> <!-- jsoup HTML parser library @ http://jsoup.org/ --> <groupId>org.jsoup</groupId> <artifactId>jsoup</artifactId> <version>1.10.1</version> </dependency> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <version>2.8.0</version> </dependency> </dependencies> <repositories> <repository> <id>jitpack.io</id> <url>https://jitpack.io</url> </repository> </repositories> </project>
Upgrades Recommended
When I add this to a production flow, I would upsert this into Phoenix or convert to ORC for a Hive table. See my other articles listed below for examples of doing just that.
Example File
hdfs dfs -cat /linkprocessor/379875e9-5d99-4f88-82b1-fda7cdd7bc98.json [{"link":"","descr":"http://www.dataflowdeveloper.com/#"},{"link":"","descr":"http://twitter.com/paasdev"},{"link":"","descr":"http://www.dataflowdeveloper.com/#"},{"link":"","descr":"http://www.dataflowdeveloper.com/#"},{"link":"DataFlow Developer","descr":"http://www.dataflowdeveloper.com/"},{"link":"Programmable OCR with Tesseract","descr":"http://www.dataflowdeveloper.com/2016/09/21/programmable-ocr-with-tesseract/"},{"link":"Python Text Searchinv","descr":"https://pypi.python.org/pypi/Whoosh/"},{"link":"Simple Bayes Text Classifier","descr":"https://pypi.python.org/pypi/simplebayes/"},{"link":"Python Wrapper for Tesseract","descr":"https://github.com/jflesch/pyocr/"},{"link":"Lector","descr":"https://github.com/zdenop/lector"},{"link":"VietOCR","descr":"http://vietocr.sourceforge.net/"},{"link":"OCRivist","descr":"http://www.ocrivist.com/"},{"link":"TesseractGUI","descr":"http://tesseract-gui.sourceforge.net/"},{"link":"Tesseract4J","descr":"https://github.com/tesseract4java/tesseract4java"},{"link":"Java wrapper for Tesseract","descr":"http://tess4j.sourceforge.net/"},{"link":"Basic Tesseract OCR Engine","descr":"https://github.com/tesseract-ocr/tesseract"},{"link":"Command Line Example Usage","descr":"https://github.com/tesseract-ocr/tesseract/wiki/Command-Line-Usage"},{"link":"Tesseract OCR Wiki","descr":"https://github.com/tesseract-ocr/tesseract/wiki"},{"link":"OCR Engine PDF","descr":"https://github.com/tesseract-ocr/docs/blob/master/das_tutorial2016/7Building%20a%20Multi-Lingual%20OCR%20Engine.pdf"},{"link":"Homebrew","descr":"http://brew.sh/"},{"link":"Running Tesseract from NiFI","descr":"https://issues.apache.org/jira/browse/NIFI-1815"},
Source Code
https://github.com/tspannhw/linkextractorprocessor
Articles for Storing to Phoenix, ORC and More
Reference
Example Flow File