Community Articles

Find and share helpful community-sourced technical articles.
Announcements
Celebrating as our community reaches 100,000 members! Thank you!
avatar
Rising Star

Overview

As mentioned in the article parent to this article, the objective of this part 1 is to ingest data from the New York Times API, run HTML scraping, personality detection and expose it to a Kafka topic, as depicted by the architecture below:

86650-huuty.png

The end goal is thus to implement a NiFi flow like the following:

86651-t8syo.png

To implement it, this article will be divided in 4 section:

  • Section 1: Create a NYT API consumer
  • Section 2: Create a custom BoilerPipe Extractor Processor
  • Section 3: Create a custom Mairesse Personality Processor
  • Section 4: Create and configure a Kafka topic

Section 1: Create a NYT API consumer

Disclaimer: I basically copy-pasted this from my previous article, Determining the big 5 traits of Personality Psychology of news articles using NiFi, Hive & Zeppelin.

Step 1: Obtaining an API Key

This step is very straight forward. Go to https://developer.nytimes.com/signup and sign-up for a key.

86652-k0azp.png

Step 2: Configuring InvokeHTTP

The InvokeHTTP is used here with all default parameters, except for the URL. Here are some key configuration items and a screenshot of the Processor configuration:

  • HTTP Method: GET
  • Remote URL: http://api.nytimes.com/svc/search/v2/articlesearch.json?fq=source:("The New York Times")&page=0&sort=newest&fl=web_url,snippet,headline,pub_date,document_type,news_desk,byline≈i-key=[YOUR_KEY] (This URL selects article from the New York Times as a source, and only selects some of the fields I am interested in: web_url,snippet,headline,pub_date,document_type,news_desk,byline).
  • Content-Type: ${mime.type}
  • Run Schedule: 5 mins (could be set to a little more, I'm not sure the frequency at which new articles are published)

86653-dgmua.png

Step 3: Extracting results from Invoke HTTP response

The API call parameter page=0 returns results 0-9; for this exercise, I'm only interested in the latest article, so I setup an evaluateJSONpath processor take care of that, as you can see below:

86654-fvuyw.png

Section 2: Create a custom BoilerPipe Extractor Processor

The goal of this section is to create a simple processor that extracts article text based on a URL, using the BoilerPipe library.

Step 1: Install Maven

Having Maven installed is a pre-requisite (so is having Java installed, a computer and the knowledge & will to run this tutorial). There are many ways to install maven, depending on your machine, but for MacOS I recommend using the homebrew package manager and simply run

$ brew install maven

Step 2: Generate a NiFi processor archetype

Once installed, create a new folder, wherever you'd like and create a nifi processor archetype as such:

$ mkdir tutorial
$ cd tutorial/
$ mvn archetype:generate
[INFO] Scanning for projects...
[INFO] 
[INFO] ------------------< org.apache.maven:standalone-pom >-------------------
[INFO] Building Maven Stub Project (No POM) 1
[INFO] --------------------------------[ pom ]---------------------------------
[INFO] 
[INFO] >>> maven-archetype-plugin:3.0.1:generate (default-cli) > generate-sources @ standalone-pom >>>
[INFO] 
[INFO] <<< maven-archetype-plugin:3.0.1:generate (default-cli) < generate-sources @ standalone-pom <<<
[INFO] 
[INFO] 
[INFO] --- maven-archetype-plugin:3.0.1:generate (default-cli) @ standalone-pom ---
[INFO] Generating project in Interactive mode
[INFO] No archetype defined. Using maven-archetype-quickstart (org.apache.maven.archetypes:maven-archetype-quickstart:1.0)
Choose archetype:
1: remote -> am.ik.archetype:maven-reactjs-blank-archetype (Blank Project for React.js)
2: remote -> am.ik.archetype:msgpack-rpc-jersey-blank-archetype (Blank Project for Spring Boot + Jersey)
3: remote -> am.ik.archetype:mvc-1.0-blank-archetype (MVC 1.0 Blank Project)
4: remote -> am.ik.archetype:spring-boot-blank-archetype (Blank Project for Spring Boot)
[...]
2214: remote -> xyz.luan.generator:xyz-generator (-)
Choose a number or apply filter (format: [groupId:]artifactId, case sensitive contains): 1224: nifi
Choose archetype:
1: remote -> org.apache.nifi:nifi-processor-bundle-archetype (-)
2: remote -> org.apache.nifi:nifi-service-bundle-archetype (-)
Choose a number or apply filter (format: [groupId:]artifactId, case sensitive contains): : 1
Choose org.apache.nifi:nifi-processor-bundle-archetype version: 
1: 0.0.2-incubating
2: 0.1.0-incubating
3: 0.2.0-incubating
4: 0.2.1
5: 0.3.0
6: 0.4.0
7: 0.4.1
8: 0.5.0
9: 0.5.1
10: 0.6.0
11: 0.6.1
12: 0.7.0
13: 0.7.1
14: 0.7.2
15: 0.7.3
16: 0.7.4
17: 1.0.0-BETA
18: 1.0.0
19: 1.0.1
20: 1.1.0
21: 1.1.1
22: 1.1.2
23: 1.2.0
24: 1.3.0
25: 1.4.0
26: 1.5.0
27: 1.6.0
28: 1.7.0
29: 1.7.1
Choose a number: 29: 29
Define value for property 'groupId': com.github.paulvid
Define value for property 'artifactId': boilerpipe-article-extractor
Define value for property 'version' 1.0-SNAPSHOT: : 1.7.0.3.2.0.0-520
Define value for property 'artifactBaseName': BoilerpipeArticleExtractor
Define value for property 'package' com.github.paulvid.processors.BoilerpipeArticleExtractor: : 
[INFO] Using property: nifiVersion = 1.7.1
Confirm properties configuration:
groupId: com.github.paulvid
artifactId: boilerpipe-article-extractor
version: 1.7.0.3.2.0.0-520
artifactBaseName: BoilerpipeArticleExtractor
package: com.github.paulvid.processors.BoilerpipeArticleExtractor
nifiVersion: 1.7.1
 Y: : Y
[INFO] ----------------------------------------------------------------------------
[INFO] Using following parameters for creating project from Archetype: nifi-processor-bundle-archetype:1.7.1
[INFO] ----------------------------------------------------------------------------
[INFO] Parameter: groupId, Value: com.github.paulvid
[INFO] Parameter: artifactId, Value: boilerpipe-article-extractor
[INFO] Parameter: version, Value: 1.7.0.3.2.0.0-520
[INFO] Parameter: package, Value: com.github.paulvid.processors.BoilerpipeArticleExtractor
[INFO] Parameter: packageInPathFormat, Value: com/github/paulvid/processors/BoilerpipeArticleExtractor
[INFO] Parameter: package, Value: com.github.paulvid.processors.BoilerpipeArticleExtractor
[INFO] Parameter: artifactBaseName, Value: BoilerpipeArticleExtractor
[INFO] Parameter: version, Value: 1.7.0.3.2.0.0-520
[INFO] Parameter: groupId, Value: com.github.paulvid
[INFO] Parameter: artifactId, Value: boilerpipe-article-extractor
[INFO] Parameter: nifiVersion, Value: 1.7.1
[INFO] Project created from Archetype in dir: /Users/pvidal/Documents/tutorial/boilerpipe-article-extractor
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 02:38 min
[INFO] Finished at: 2018-08-23T10:36:04-04:00
[INFO] ------------------------------------------------------------------------

Few important points in the configuration of the archetype:

  • Maven has a ton of archetypes, filter by using "nifi" and then select 1: remote -> org.apache.nifi:nifi-processor-bundle-archetype (-)
  • Choose the version that works with your environment (1.7.1 and 1.7.0 would work in my case)
  • Properties don't really matter for this tutorial but here is what I used:
    • Define value for property 'groupId': com.github.paulvid
    • Define value for property 'artifactId': boilerpipe-article-extractor
    • Define value for property 'version' 1.0-SNAPSHOT: : 1.7.0.3.2.0.0-520
    • Define value for property 'artifactBaseName': BoilerpipeArticleExtractor
    • Define value for property 'package' com.github.paulvid.processors.BoilerpipeArticleExtractor

Step 3: Configure your processor

Once the archetype is successfully built, use your favorite IDE and open the folder containing the code.

Adding the BoilerPipe dependency

Open the file [YOUR_PATH]/tutorial/boilerpipe-article-extractor/nifi-BoilerpipeArticleExtractor-processors/pom.xml

Add the following dependency:

86655-ta47o.png

Rename the processor

By default, the processor under [YOUR_PATH]/tutorial/boilerpipe-article-extractor/nifi-BoilerpipeArticleExtractor-processors/src/main/java/com/github/paulvid/processors/BoilerpipeArticleExtractor is called MyProcessor. Using your IDE, rename your Processor and the corresponding test class to BoilerpipeArticleExtractor

Add code to processor

Modify your BoilerpipeArticleExtractor.java to contain the following

package com.github.paulvid.processors.BoilerpipeArticleExtractor;
import de.l3s.boilerpipe.BoilerpipeProcessingException;
import de.l3s.boilerpipe.extractors.ArticleExtractor;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.expression.ExpressionLanguageScope;
import java.io.BufferedOutputStream;
import java.io.OutputStream;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@Tags({"example"})
@CapabilityDescription("Provide a description")
@SeeAlso({})
@ReadsAttributes({@ReadsAttribute(attribute="", description="")})
@WritesAttributes({@WritesAttribute(attribute="", description="")})
public class BoilerpipeArticleExtractor extends AbstractProcessor {
    public static final PropertyDescriptor URL_PROPERTY = new PropertyDescriptor
            .Builder().name("URL")
            .displayName("URL")
            .description("URL of the Article to Extract")
            .required(true)
            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
            .build();
    public static final Relationship REL_SUCCESS = new Relationship.Builder()
            .name("success")
            .description("A FlowFile is routed to this relationship after the article has been successfully extracted")
            .build();
    public static final Relationship REL_FAILURE = new Relationship.Builder()
            .name("failure")
            .description("A FlowFile is routed to this relationship if an error occurred during the article extraction")
            .build();
    private List<PropertyDescriptor> descriptors;
    private Set<Relationship> relationships;
    @Override
    protected void init(final ProcessorInitializationContext context) {
        final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
        descriptors.add(URL_PROPERTY);
        this.descriptors = Collections.unmodifiableList(descriptors);
        final Set<Relationship> relationships = new HashSet<Relationship>();
        relationships.add(REL_SUCCESS);
        relationships.add(REL_FAILURE);
        this.relationships = Collections.unmodifiableSet(relationships);
    }
    @Override
    public Set<Relationship> getRelationships() {
        return this.relationships;
    }
    @Override
    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return descriptors;
    }
    @OnScheduled
    public void onScheduled(final ProcessContext context) {
    }
    @Override
    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
        FlowFile flowFile = session.get();
        if (flowFile == null) {
            return;
        }
        try {
            java.net.URL articleURL = new URL(String.valueOf(context.getProperty(URL_PROPERTY).evaluateAttributeExpressions(flowFile).getValue()));
            String text = ArticleExtractor.INSTANCE.getText(articleURL);
            flowFile = session.write(flowFile, out -> {
                try (OutputStream outputStream = new BufferedOutputStream(out)) {
                    outputStream.write(text.getBytes(StandardCharsets.UTF_8));
                }
            });
            session.transfer(flowFile, REL_SUCCESS);
        } catch (MalformedURLException mue) {
            getLogger().error("Failed to get URL {} for {} due to {}", new Object[]{context.getProperty(URL_PROPERTY).evaluateAttributeExpressions(flowFile).getValue(), flowFile, mue});
            session.transfer(flowFile, REL_FAILURE);
            return;
        } catch (BoilerpipeProcessingException bpe) {
            getLogger().error("Failed to extract article for {} due to {}", new Object[]{flowFile, bpe});
            session.transfer(flowFile, REL_FAILURE);
            return;
        }
    }
}

Create test case

Modify your BoilerpipeArticleExtractor.java to contain the following code:

package com.github.paulvid.processors.BoilerpipeArticleExtractor;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Before;
import org.junit.Test;
import java.util.List;
public class BoilerpipeArticleExtractorTest {
    private TestRunner testRunner;
    @Before
    public void init() {
        testRunner = TestRunners.newTestRunner(BoilerpipeArticleExtractor.class);
    }
    @Test
    public void testProcessor() {
        testRunner.setValidateExpressionUsage(false);
        testRunner.setProperty(BoilerpipeArticleExtractor.URL_PROPERTY,"https://www.nytimes.com/reuters/2018/08/17/business/17reuters-usa-tunnels-china.html");
        testRunner.enqueue("Mock FlowFile");
        testRunner.run();
        testRunner.assertTransferCount(BoilerpipeArticleExtractor.REL_SUCCESS, 1);
        testRunner.assertTransferCount(BoilerpipeArticleExtractor.REL_FAILURE, 0);
        List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(BoilerpipeArticleExtractor.REL_SUCCESS);
        flowFiles.get(0).assertContentEquals("Musk's Tunnel-Boring Firm Seeks U.S. Tariff Exemption for Chinese Parts\n" +
                "By Reuters\n" +
                "Aug. 17, 2018\n" +
                "WASHINGTON — A company owned by Elon Musk that is trying to lower the cost of building high-speed transit tunnels has asked the Trump administration to exempt it from tariffs for some Chinese-made tunnel boring machine components, warning the tariffs could significantly delay a planned tunnel between New York and Washington.\n" +
                "In a July 31 letter posted last week on a government website, the Boring Co asked the U.S. Trade Representative to exempt parts like cutterheads, screw conveyors and related machinery. Boring seeks \"limited parts from China in the near-term for use in a small number of tunnel boring machines.\" The letter added those parts are \"readily available only from China.\"\n" +
                "Privately held Boring added that it is \"working to develop and manufacture our own tunnel boring machines\" and wants to \"restore the now-dormant American tunnel boring machine industry.\"\n" +
                "The company said for planned tunnels, including a project between Washington and Baltimore, it will \"use machines that are majority-composed of U.S. content.\"\n" +
                "The tariffs could cause \"severe economic harm\" to the company and U.S. interests and could result in a delay of one to two years in the construction of a proposed Washington-to-Baltimore tunnel that it plans to eventually extend to New York.\n" +
                "Exempting the parts will not harm U.S. industry, the company said, and noted that tunneling is not one of 10 sectors identified in China's \"Made in 2025\" plan. The company said its business model is \"predicated upon substantially reducing the cost of tunneling.\"\n" +
                "Musk, who is also chief executive of Tesla Inc, in June proposed building a $1 billion underground transit system in Chicago. The plan would send people from Chicago’s downtown Loop district to O’Hare International Airport at 150 miles (241 km) per hour.\n" +
                "The Boring Co has been promoting its plans for tunnels that would allow high-speed travel between cities. The company initially plans to ferry passengers between Washington and Baltimore on autonomous electric vehicles carrying 8 to 16 passengers at 125-150 miles per hour, but would not use tracks or railway equipment.\n" +
                "On Wednesday, the company proposed to build a 3.6-mile tunnel between Dodger Stadium in Los Angeles and the city's subway system.\n" +
                "The U.S. Trade Representative's Office and Boring did not immediately respond to requests for comment.\n" +
                "Boring said in an earlier letter it was converting from diesel-powered to electric-powered construction equipment and had innovated in \"concrete mixing, segment production, excavation and hauling practices.\"\n" +
                "Other companies including General Motors Co have sought exemptions from new U.S. tariffs imposed on Chinese imports.\n" +
                "(Reporting by David Shepardson; Editing by David Gregorio)\n" +
                "Advertisement\n");
    }
}

Compile your processor and create a nar

In your command line, go to [YOUR_PATH]/tutorial/boilerpipe-article-extractor/ and type the following:

 $ mvn install

You should see an output like this:

 $ mvn install
[INFO] Scanning for projects...
[INFO] Inspecting build with total of 3 modules...
[INFO] Installing Nexus Staging features:
[INFO]   ... total of 3 executions of maven-deploy-plugin replaced with nexus-staging-maven-plugin
[INFO] ------------------------------------------------------------------------
[INFO] Reactor Build Order:
[INFO] 
[INFO] boilerpipe-article-extractor                                       [pom]
[INFO] nifi-BoilerpipeArticleExtractor-processors                         [jar]
[INFO] nifi-BoilerpipeArticleExtractor-nar                                [nar]
[INFO] 
[INFO] ----------< com.github.paulvid:boilerpipe-article-extractor >-----------
[INFO] Building boilerpipe-article-extractor 1.7.0.3.2.0.0-520            [1/3]
[INFO] --------------------------------[ pom ]---------------------------------
[INFO] 
[INFO] --- maven-enforcer-plugin:1.4.1:enforce (enforce-maven-version) @ boilerpipe-article-extractor ---
[INFO] 
[INFO] --- maven-enforcer-plugin:1.4.1:enforce (enforce-maven) @ boilerpipe-article-extractor ---
[INFO] 
[INFO] --- buildnumber-maven-plugin:1.4:create (default) @ boilerpipe-article-extractor ---
[INFO] 
[INFO] --- maven-remote-resources-plugin:1.5:process (process-resource-bundles) @ boilerpipe-article-extractor ---
[INFO] 
[INFO] --- maven-compiler-plugin:3.6.0:testCompile (groovy-tests) @ boilerpipe-article-extractor ---
[INFO] No sources to compile
[INFO] 
[INFO] --- maven-site-plugin:3.7:attach-descriptor (attach-descriptor) @ boilerpipe-article-extractor ---
[INFO] No site descriptor found: nothing to attach.
[INFO] 
[INFO] --- maven-install-plugin:2.5.2:install (default-install) @ boilerpipe-article-extractor ---
[INFO] Installing /Users/pvidal/Documents/tutorial/boilerpipe-article-extractor/pom.xml to /Users/pvidal/.m2/repository/com/github/paulvid/boilerpipe-article-extractor/1.7.0.3.2.0.0-520/boilerpipe-article-extractor-1.7.0.3.2.0.0-520.pom
[INFO] 
[INFO] ---< com.github.paulvid:nifi-BoilerpipeArticleExtractor-processors >----
[INFO] Building nifi-BoilerpipeArticleExtractor-processors 1.7.0.3.2.0.0-520 [2/3]
[INFO] --------------------------------[ jar ]---------------------------------
[INFO] 
[INFO] --- maven-enforcer-plugin:1.4.1:enforce (enforce-maven-version) @ nifi-BoilerpipeArticleExtractor-processors ---
[INFO] 
[INFO] --- maven-enforcer-plugin:1.4.1:enforce (enforce-maven) @ nifi-BoilerpipeArticleExtractor-processors ---
[INFO] 
[INFO] --- buildnumber-maven-plugin:1.4:create (default) @ nifi-BoilerpipeArticleExtractor-processors ---
[INFO] 
[INFO] --- maven-remote-resources-plugin:1.5:process (process-resource-bundles) @ nifi-BoilerpipeArticleExtractor-processors ---
[INFO] 
[INFO] --- maven-resources-plugin:3.0.2:resources (default-resources) @ nifi-BoilerpipeArticleExtractor-processors ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] Copying 1 resource
[INFO] Copying 3 resources
[INFO] 
[INFO] --- maven-compiler-plugin:3.6.0:compile (default-compile) @ nifi-BoilerpipeArticleExtractor-processors ---
[INFO] Changes detected - recompiling the module!
[INFO] Compiling 1 source file to /Users/pvidal/Documents/tutorial/boilerpipe-article-extractor/nifi-BoilerpipeArticleExtractor-processors/target/classes
[INFO] 
[INFO] --- maven-resources-plugin:3.0.2:testResources (default-testResources) @ nifi-BoilerpipeArticleExtractor-processors ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] skip non existing resourceDirectory /Users/pvidal/Documents/tutorial/boilerpipe-article-extractor/nifi-BoilerpipeArticleExtractor-processors/src/test/resources
[INFO] Copying 3 resources
[INFO] 
[INFO] --- maven-compiler-plugin:3.6.0:testCompile (default-testCompile) @ nifi-BoilerpipeArticleExtractor-processors ---
[INFO] Changes detected - recompiling the module!
[INFO] Compiling 1 source file to /Users/pvidal/Documents/tutorial/boilerpipe-article-extractor/nifi-BoilerpipeArticleExtractor-processors/target/test-classes
[INFO] 
[INFO] --- maven-compiler-plugin:3.6.0:testCompile (groovy-tests) @ nifi-BoilerpipeArticleExtractor-processors ---
[INFO] Changes detected - recompiling the module!
[INFO] Nothing to compile - all classes are up to date
[INFO] 
[INFO] --- maven-surefire-plugin:2.20.1:test (default-test) @ nifi-BoilerpipeArticleExtractor-processors ---
[INFO] 
[INFO] -------------------------------------------------------
[INFO]  T E S T S
[INFO] -------------------------------------------------------
[INFO] Running com.github.paulvid.processors.BoilerpipeArticleExtractor.BoilerpipeArticleExtractorTest
[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 1.907 s - in com.github.paulvid.processors.BoilerpipeArticleExtractor.BoilerpipeArticleExtractorTest
[INFO] 
[INFO] Results:
[INFO] 
[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0
[INFO] 
[INFO] 
[INFO] --- maven-jar-plugin:3.0.2:jar (default-jar) @ nifi-BoilerpipeArticleExtractor-processors ---
[INFO] Building jar: /Users/pvidal/Documents/tutorial/boilerpipe-article-extractor/nifi-BoilerpipeArticleExtractor-processors/target/nifi-BoilerpipeArticleExtractor-processors-1.7.0.3.2.0.0-520.jar
[INFO] 
[INFO] --- maven-site-plugin:3.7:attach-descriptor (attach-descriptor) @ nifi-BoilerpipeArticleExtractor-processors ---
[INFO] Skipping because packaging 'jar' is not pom.
[INFO] 
[INFO] --- maven-install-plugin:2.5.2:install (default-install) @ nifi-BoilerpipeArticleExtractor-processors ---
[INFO] Installing /Users/pvidal/Documents/tutorial/boilerpipe-article-extractor/nifi-BoilerpipeArticleExtractor-processors/target/nifi-BoilerpipeArticleExtractor-processors-1.7.0.3.2.0.0-520.jar to /Users/pvidal/.m2/repository/com/github/paulvid/nifi-BoilerpipeArticleExtractor-processors/1.7.0.3.2.0.0-520/nifi-BoilerpipeArticleExtractor-processors-1.7.0.3.2.0.0-520.jar
[INFO] Installing /Users/pvidal/Documents/tutorial/boilerpipe-article-extractor/nifi-BoilerpipeArticleExtractor-processors/pom.xml to /Users/pvidal/.m2/repository/com/github/paulvid/nifi-BoilerpipeArticleExtractor-processors/1.7.0.3.2.0.0-520/nifi-BoilerpipeArticleExtractor-processors-1.7.0.3.2.0.0-520.pom
[INFO] 
[INFO] -------< com.github.paulvid:nifi-BoilerpipeArticleExtractor-nar >-------
[INFO] Building nifi-BoilerpipeArticleExtractor-nar 1.7.0.3.2.0.0-520     [3/3]
[INFO] --------------------------------[ nar ]---------------------------------
[INFO] 
[INFO] --- maven-enforcer-plugin:1.4.1:enforce (enforce-maven-version) @ nifi-BoilerpipeArticleExtractor-nar ---
[INFO] 
[INFO] --- maven-enforcer-plugin:1.4.1:enforce (enforce-maven) @ nifi-BoilerpipeArticleExtractor-nar ---
[INFO] 
[INFO] --- buildnumber-maven-plugin:1.4:create (default) @ nifi-BoilerpipeArticleExtractor-nar ---
[INFO] 
[INFO] --- maven-remote-resources-plugin:1.5:process (process-resource-bundles) @ nifi-BoilerpipeArticleExtractor-nar ---
[INFO] 
[INFO] --- maven-resources-plugin:3.0.2:resources (default-resources) @ nifi-BoilerpipeArticleExtractor-nar ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] skip non existing resourceDirectory /Users/pvidal/Documents/tutorial/boilerpipe-article-extractor/nifi-BoilerpipeArticleExtractor-nar/src/main/resources
[INFO] Copying 3 resources
[INFO] 
[INFO] --- maven-compiler-plugin:3.6.0:compile (default-compile) @ nifi-BoilerpipeArticleExtractor-nar ---
[INFO] No sources to compile
[INFO] 
[INFO] --- maven-resources-plugin:3.0.2:testResources (default-testResources) @ nifi-BoilerpipeArticleExtractor-nar ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] skip non existing resourceDirectory /Users/pvidal/Documents/tutorial/boilerpipe-article-extractor/nifi-BoilerpipeArticleExtractor-nar/src/test/resources
[INFO] Copying 3 resources
[INFO] 
[INFO] --- maven-compiler-plugin:3.6.0:testCompile (default-testCompile) @ nifi-BoilerpipeArticleExtractor-nar ---
[INFO] No sources to compile
[INFO] 
[INFO] --- maven-compiler-plugin:3.6.0:testCompile (groovy-tests) @ nifi-BoilerpipeArticleExtractor-nar ---
[INFO] No sources to compile
[INFO] 
[INFO] --- maven-surefire-plugin:2.20.1:test (default-test) @ nifi-BoilerpipeArticleExtractor-nar ---
[INFO] 
[INFO] --- nifi-nar-maven-plugin:1.2.0:nar (default-nar) @ nifi-BoilerpipeArticleExtractor-nar ---
[INFO] Copying nifi-BoilerpipeArticleExtractor-processors-1.7.0.3.2.0.0-520.jar to /Users/pvidal/Documents/tutorial/boilerpipe-article-extractor/nifi-BoilerpipeArticleExtractor-nar/target/classes/META-INF/bundled-dependencies/nifi-BoilerpipeArticleExtractor-processors-1.7.0.3.2.0.0-520.jar
[INFO] Copying nekohtml-1.9.21.jar to /Users/pvidal/Documents/tutorial/boilerpipe-article-extractor/nifi-BoilerpipeArticleExtractor-nar/target/classes/META-INF/bundled-dependencies/nekohtml-1.9.21.jar
[INFO] Copying xercesImpl-2.11.0.jar to /Users/pvidal/Documents/tutorial/boilerpipe-article-extractor/nifi-BoilerpipeArticleExtractor-nar/target/classes/META-INF/bundled-dependencies/xercesImpl-2.11.0.jar
[INFO] Copying xml-apis-1.4.01.jar to /Users/pvidal/Documents/tutorial/boilerpipe-article-extractor/nifi-BoilerpipeArticleExtractor-nar/target/classes/META-INF/bundled-dependencies/xml-apis-1.4.01.jar
[INFO] Copying nifi-utils-1.7.1.jar to /Users/pvidal/Documents/tutorial/boilerpipe-article-extractor/nifi-BoilerpipeArticleExtractor-nar/target/classes/META-INF/bundled-dependencies/nifi-utils-1.7.1.jar
[INFO] Copying jericho-html-3.3.jar to /Users/pvidal/Documents/tutorial/boilerpipe-article-extractor/nifi-BoilerpipeArticleExtractor-nar/target/classes/META-INF/bundled-dependencies/jericho-html-3.3.jar
[INFO] Copying boilerpipe-1.2.3.jar to /Users/pvidal/Documents/tutorial/boilerpipe-article-extractor/nifi-BoilerpipeArticleExtractor-nar/target/classes/META-INF/bundled-dependencies/boilerpipe-1.2.3.jar
[INFO] Building jar: /Users/pvidal/Documents/tutorial/boilerpipe-article-extractor/nifi-BoilerpipeArticleExtractor-nar/target/nifi-BoilerpipeArticleExtractor-nar-1.7.0.3.2.0.0-520.nar
[INFO] 
[INFO] --- maven-site-plugin:3.7:attach-descriptor (attach-descriptor) @ nifi-BoilerpipeArticleExtractor-nar ---
[INFO] Skipping because packaging 'nar' is not pom.
[INFO] 
[INFO] --- maven-install-plugin:2.5.2:install (default-install) @ nifi-BoilerpipeArticleExtractor-nar ---
[INFO] Installing /Users/pvidal/Documents/tutorial/boilerpipe-article-extractor/nifi-BoilerpipeArticleExtractor-nar/target/nifi-BoilerpipeArticleExtractor-nar-1.7.0.3.2.0.0-520.nar to /Users/pvidal/.m2/repository/com/github/paulvid/nifi-BoilerpipeArticleExtractor-nar/1.7.0.3.2.0.0-520/nifi-BoilerpipeArticleExtractor-nar-1.7.0.3.2.0.0-520.nar
[INFO] Installing /Users/pvidal/Documents/tutorial/boilerpipe-article-extractor/nifi-BoilerpipeArticleExtractor-nar/pom.xml to /Users/pvidal/.m2/repository/com/github/paulvid/nifi-BoilerpipeArticleExtractor-nar/1.7.0.3.2.0.0-520/nifi-BoilerpipeArticleExtractor-nar-1.7.0.3.2.0.0-520.pom
[INFO] ------------------------------------------------------------------------
[INFO] Reactor Summary:
[INFO] 
[INFO] boilerpipe-article-extractor 1.7.0.3.2.0.0-520 ..... SUCCESS [  1.487 s]
[INFO] nifi-BoilerpipeArticleExtractor-processors ......... SUCCESS [ 14.513 s]
[INFO] nifi-BoilerpipeArticleExtractor-nar 1.7.0.3.2.0.0-520 SUCCESS [  0.281 s]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 17.712 s
[INFO] Finished at: 2018-08-23T11:18:13-04:00
[INFO] ------------------------------------------------------------------------

Step 4: Upload the generated nar

The file nifi-BoilerpipeArticleExtractor-nar-1.7.0.3.2.0.0-520.nar should be generated under [YOUR_PATH]/tutorial/boilerpipe-article-extractor/nifi-BoilerpipeArticleExtractor-nar/target Upload it onto your server under the library folder of nifi (typically /usr/hdf/current/nifi/lib/) Restart NiFi and you should see the BoilerpipeArticleExtractor available

Step 5: Configure your processor

Configure the processor to use the web_url attribute as a property, as follows:

86657-screen-shot-2018-08-23-at-112352-am.png

Section 3: Create a custom Mairesse processor

This section is a little advanced. I built a custom processor using this personality recognizer (http://farm2.user.srcf.net/research/personality/recognizer). If you want to checkout my github . For this tutorial, we will only concentrate on uploading my compiled code and the dependencies necessary.

Step 1: Upload the Mairesse Processor

Download the file nifi-MairessePersonalityRecognition-nar-1.5.0.3.1.2.0-7.nar from my github Upload it onto your server under the library folder of nifi (typically /usr/hdf/current/nifi/lib/) Restart NiFi and you should see the MairessePersonalityRecognition available

Step 2: Upload the library files

Upload the files under the folder to-upload-to-nifi-lib-folder from my github to a location on your server (e.g. /home/nifi/perso-recognition/lib/)

Step 3: Configure your processor

Configure the processor as follows:

86658-screen-shot-2018-08-23-at-114243-am.png

Section 4: Create and configure a Kafka topic

Step 1: Create a schema in Schema Registry

Go to your schema registry and create a new schema called personality-recognition:

86663-screen-shot-2018-08-23-at-121703-pm.png

The schema you will enter should be as follows:

  {
 "type": "record",
 "name": "personalityrecognition",
 "fields": [
  {
   "name": "web_url",
   "type": "string",
   "default": null
  },
  {
   "name": "snippet",
   "type": "string",
   "default": null
  },
  {
   "name": "byline",
   "type": "string",
   "default": null
  },
  {
   "name": "pub_date",
   "type": "string",
   "default": null
  },
  {
   "name": "headline",
   "type": "string",
   "default": null
  },
  {
   "name": "document_type",
   "type": "string",
   "default": null
  },
  {
   "name": "news_desk",
   "type": "string",
   "default": null
  },
  {
   "name": "timestamp",
   "type": "string",
   "default": null
  },
  {
   "name": "extraversion",
   "type": "string",
   "default": null
  },
  {
   "name": "emotional_stability",
   "type": "string",
   "default": null
  },
  {
   "name": "agreeableness",
   "type": "string",
   "default": null
  },
  {
   "name": "conscientiousness",
   "type": "string",
   "default": null
  },
  {
   "name": "openness_to_experience",
   "type": "string",
   "default": null
  }
 ]
}

Step 2: Update attribute to add the schema name

Using an UpdateAttribute processor, configure it to use the schema.name personality-recognition, as well as removing double quotes from the headlines and snippet attributes:


86664-screen-shot-2018-08-23-at-121912-pm.png

Step 3: Create Json from attributes

Using an ReplaceText processor, configure the replacement value to create the appropriate JSON:

87410-screen-shot-2018-08-28-at-33505-pm.png

The replacement value you will enter should be as follows:

  {
 "web_url": "${web_url}",
 "snippet": "${snippet}",
 "byline": "${byline}",
 "pub_date": "${pub_date}",
 "headline": "${headline}",
 "document_type": "${document_type}",
 "news_desk": "${news_desk}",
 "timestamp": "${now()}",
 "extraversion": "${mairessepersonalityrecognition.extraversion}",
 "emotional_stability": "${mairessepersonalityrecognition.emotional_stability}",
 "agreeableness": "${mairessepersonalityrecognition.agreeableness}",
 "conscientiousness": "${mairessepersonalityrecognition.conscientiousness}",
 "openness_to_experience": "${mairessepersonalityrecognition.openness_to_experience}"
}

Step 4: Create Kafka topic

On your server running Kafka, find the shell scripts to create topics (typically under /usr/hdp/current/kafka-broker/bin). Run the following command:

$ ./kafka-topics.sh --zookeeper localhost:2181 --topic personality-recognition --create --partitions 1 --replication-factor 1

Step 5: Configure PublishKafkaRecord

Configure Hortonworks Schema Registry Controller

In your controller configuration add a new Hortonworks Schema Registry controller:

86666-screen-shot-2018-08-23-at-122806-pm.png

The Schema registry URL should look something like this:

  http://pvidal-personality-recognition0.field.hortonworks.com:7788/api/v1/

Configure JsonPathReader Controller

In your controller configuration add a new JsonPathReader Controller to use the schema name and the Hortonworks Schema Registry controller we created, as follows:

86667-screen-shot-2018-08-23-at-123309-pm.png

Add all the fields from the JSON output we created

86668-screen-shot-2018-08-23-at-123319-pm.png

Configure AvroRecordSetWriter Controller

In your controller configuration add a new AvroRecordSetWriter Controller to use the schema name and the Hortonworks Schema Registry controller we created, as follows:

86669-screen-shot-2018-08-23-at-123509-pm.png

Configure PublishKafkaRecord Processor

Add a new processor to your flow, configured to use the controllers we created, as follows:

86670-screen-shot-2018-08-23-at-124138-pm.png

The Kafka Broker URL should look something like this:

  pvidal-personality-recognition0.field.hortonworks.com:6667

Step 6: Run flow and consume data via command line

On your server running Kafka, find the shell scripts to create topics (typically under /usr/hdp/current/kafka-broker/bin). Run the following command:

$ ./kafka-console-consumer.sh --zookeeper localhost:2181 --topic personality-recognition

Run your flow in NiFi and you should start seeing data flowing!


screen-shot-2018-08-23-at-122201-pm.png
1,884 Views