Community Articles

Find and share helpful community-sourced technical articles.
Labels (1)
avatar
Master Guru

Business Need

I needed to extract links from web pages using JSoup. I originally wrote a microservice for my NiFi MP3 Jukebox. I built a custom NiFi Processor instead. It's a pretty simple process and there's a ton of great articles on how to do it referenced below.

Custom NiFi Process Development Process

One thing I found useful was having at least one JUnit to test running your processor. Deploying takes a while especially if you need to deploy to a cluster of servers. I found a lot of great NIFI Custom Processor Unit and Integration tests online (see reference area). It's really easy to develop custom processors in Java. In your tests you can input files and get out real files. In your tests you should have a saved copy of what the valid file should be and then you can compare the output. Your JUnits can be triggered from Jenkins or other build tools. NiFi Custom Processors can be developed in your standard development process (TDD, CD, CI, Autodeploy). For the NAR you just need to SCP your NAR file to all the NIFI nodes lib directory and restart NIFI. If you want to deploy your flow templates, you can do so with this tool.

Test

package com.dataflowdeveloper.processors.process;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.UnsupportedEncodingException;
import java.util.List;

import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Before;
import org.junit.Test;

public class LinkProcessorTest {

 private TestRunner testRunner;

 @Before
 public void init() {
  testRunner = TestRunners.newTestRunner(LinkProcessor.class);
 }

 @Test
 public void testProcessor() {
  testRunner.setProperty("url", "http://sparkdeveloper.com");
  try {
   testRunner.enqueue(new FileInputStream(new File("src/test/resources/test.csv")));
  } catch (FileNotFoundException e) {
   e.printStackTrace();
  }

  testRunner.run();
  testRunner.assertValid();
  List<MockFlowFile> successFiles = testRunner.getFlowFilesForRelationship(LinkProcessor.REL_SUCCESS);

  for (MockFlowFile mockFile : successFiles) {
   try {
    System.out.println("FILE:" + new String(mockFile.toByteArray(), "UTF-8"));
   } catch (UnsupportedEncodingException e) {
    e.printStackTrace();
   }
  }
 }
}

10631-linkprocessorflow.png

Step 1: Ingest / Parse a URL via NiFi (or could source from a file)

10609-configurelinkprocessor.png

Step 2: LinkProcessor url could be hardcoded or expression from attributes from previous processor.

10610-linkprocessoradd.png

Step 3: UpdateAttribute to change the name of the resulting filename (add JSON, make unique).

Building the Link Processor

Run the included build.sh (on Linux or OSX), or run mvn install. Requires JDK 8 and Maven and internet access to build. Deploy the NAR! scp nifi-process-nar/target/nifi-linkextractor-nar-1.0-SNAPSHOT.nar PLACE:/place/ or copy it to your NIFI/lib directory locally. You can also get a release of the NAR from github.

Maven Build Script (pom.xml)

<?xml version="1.0" encoding="UTF-8"?>
<!--
  Licensed to the Apache Software Foundation (ASF) under one or more
  contributor license agreements. See the NOTICE file distributed with
  this work for additional information regarding copyright ownership.
  The ASF licenses this file to You under the Apache License, Version 2.0
  (the "License"); you may not use this file except in compliance with
  the License. You may obtain a copy of the License at
  http://www.apache.org/licenses/LICENSE-2.0
  Unless required by applicable law or agreed to in writing, software
  distributed under the License is distributed on an "AS IS" BASIS,
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  See the License for the specific language governing permissions and
  limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <parent>
        <groupId>com.dataflowdeveloper</groupId>
        <artifactId>linkextractor</artifactId>
        <version>1.0-SNAPSHOT</version>
    </parent>
  
    <artifactId>nifi-process-processors</artifactId>
    <packaging>jar</packaging>

    <dependencies>
        <dependency>
            <groupId>org.apache.nifi</groupId>
            <artifactId>nifi-api</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.nifi</groupId>
            <artifactId>nifi-processor-utils</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.nifi</groupId>
            <artifactId>nifi-mock</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <scope>test</scope>
        </dependency>
  <dependency>
  <!-- jsoup HTML parser library @ http://jsoup.org/ -->
  <groupId>org.jsoup</groupId>
  <artifactId>jsoup</artifactId>
  <version>1.10.1</version>
</dependency>
<dependency>
    <groupId>com.google.code.gson</groupId>
    <artifactId>gson</artifactId>
    <version>2.8.0</version>
</dependency>

    </dependencies>
    <repositories>
  <repository>
      <id>jitpack.io</id>
      <url>https://jitpack.io</url>
  </repository>
 </repositories>
</project>

Upgrades Recommended

When I add this to a production flow, I would upsert this into Phoenix or convert to ORC for a Hive table. See my other articles listed below for examples of doing just that.

Example File

hdfs dfs -cat /linkprocessor/379875e9-5d99-4f88-82b1-fda7cdd7bc98.json
[{"link":"","descr":"http://www.dataflowdeveloper.com/#"},{"link":"","descr":"http://twitter.com/paasdev"},{"link":"","descr":"http://www.dataflowdeveloper.com/#"},{"link":"","descr":"http://www.dataflowdeveloper.com/#"},{"link":"DataFlow Developer","descr":"http://www.dataflowdeveloper.com/"},{"link":"Programmable OCR with Tesseract","descr":"http://www.dataflowdeveloper.com/2016/09/21/programmable-ocr-with-tesseract/"},{"link":"Python Text Searchinv","descr":"https://pypi.python.org/pypi/Whoosh/"},{"link":"Simple Bayes Text Classifier","descr":"https://pypi.python.org/pypi/simplebayes/"},{"link":"Python Wrapper for Tesseract","descr":"https://github.com/jflesch/pyocr/"},{"link":"Lector","descr":"https://github.com/zdenop/lector"},{"link":"VietOCR","descr":"http://vietocr.sourceforge.net/"},{"link":"OCRivist","descr":"http://www.ocrivist.com/"},{"link":"TesseractGUI","descr":"http://tesseract-gui.sourceforge.net/"},{"link":"Tesseract4J","descr":"https://github.com/tesseract4java/tesseract4java"},{"link":"Java wrapper for Tesseract","descr":"http://tess4j.sourceforge.net/"},{"link":"Basic Tesseract OCR Engine","descr":"https://github.com/tesseract-ocr/tesseract"},{"link":"Command Line Example Usage","descr":"https://github.com/tesseract-ocr/tesseract/wiki/Command-Line-Usage"},{"link":"Tesseract OCR Wiki","descr":"https://github.com/tesseract-ocr/tesseract/wiki"},{"link":"OCR Engine PDF","descr":"https://github.com/tesseract-ocr/docs/blob/master/das_tutorial2016/7Building%20a%20Multi-Lingual%20OCR%20Engine.pdf"},{"link":"Homebrew","descr":"http://brew.sh/"},{"link":"Running Tesseract from NiFI","descr":"https://issues.apache.org/jira/browse/NIFI-1815"},

Source Code

https://github.com/tspannhw/linkextractorprocessor

Articles for Storing to Phoenix, ORC and More

Reference

Example Flow File

link-processor.xml


linkprocess1.png
15,104 Views