Created 12-16-2018 04:06 PM
Below is my custom processor which selects data from mongoDB and write it in flowFile stream. Code is working fine but I need to modify the connection part so it uses connection pool instead of traditional connection. Similar to GetMongo processor
String db_name = "cms", host_name = "localhost"; MongoClient mongo_client = new MongoClient(host_name, 27017); MongoDatabase db = mongo_client.getDatabase(db_name);
How to achieve this,
Custom processor Below
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package test.processors.test2; import com.mongodb.MongoClient; import com.mongodb.client.FindIterable; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoCursor; import com.mongodb.client.MongoDatabase; import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStream; import java.nio.charset.StandardCharsets; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.annotation.behavior.ReadsAttribute; import org.apache.nifi.annotation.behavior.ReadsAttributes; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.util.StandardValidators; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; import java.io.OutputStreamWriter; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.exception.FlowFileAccessException; import org.bson.Document; @Tags({"example"}) @CapabilityDescription("Provide a description") @SeeAlso({}) @ReadsAttributes({ @ReadsAttribute(attribute = "", description = "")}) @WritesAttributes({ @WritesAttribute(attribute = "", description = "")}) public class MyProcessor extends AbstractProcessor { public static final PropertyDescriptor MY_PROPERTY = new PropertyDescriptor.Builder().name("MY_PROPERTY") .displayName("Prefix property") .description("Prefix Property") .required(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); public static final Relationship REL_SUCCESS = new Relationship.Builder() .name("Success") .description("Sucessful relationship") .build(); public static final Relationship REL_FAILURE = new Relationship.Builder() .name("Fail") .description("Failure relationship") .build(); private List<PropertyDescriptor> descriptors; private Set<Relationship> relationships; @Override protected void init(final ProcessorInitializationContext context) { final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>(); descriptors.add(MY_PROPERTY); this.descriptors = Collections.unmodifiableList(descriptors); final Set<Relationship> relationships = new HashSet<Relationship>(); relationships.add(REL_SUCCESS); relationships.add(REL_FAILURE); this.relationships = Collections.unmodifiableSet(relationships); } @Override public Set<Relationship> getRelationships() { return this.relationships; } @Override public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { return descriptors; } @OnScheduled public void onScheduled(final ProcessContext context) { } @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { FlowFile flowFile = session.get(); if (flowFile == null) { return; } final ComponentLog logger = getLogger(); final String prefixStr = context.getProperty(MY_PROPERTY).getValue(); try { flowFile = session.write(flowFile, (final InputStream inputStream, final OutputStream outputStream) -> { BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8)); BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(outputStream, StandardCharsets.UTF_8)); String line; while ((line = bufferedReader.readLine()) != null) { String xxx = mongoDB( prefixStr, line); String aggregatedText = prefixStr + ": " + line + ": " + xxx; bufferedWriter.write(aggregatedText); bufferedWriter.newLine(); } bufferedWriter.flush(); }); session.transfer(flowFile, REL_SUCCESS); logger.info("successfully processed FlowFile {}", new Object[]{flowFile}); } catch (FlowFileAccessException e) { session.transfer(flowFile, REL_FAILURE); logger.info("Failed to add prefix for FlowFile: " + e.getMessage()); } } String db_name = "cms", host_name = "localhost"; MongoClient mongo_client = new MongoClient(host_name, 27017); MongoDatabase db = mongo_client.getDatabase(db_name); public String mongoDB(String arg1, String arg2) { final ComponentLog logger = getLogger(); String result = ""; try { String db_col_name = arg1; MongoCollection<Document> coll = db.getCollection(db_col_name); Document query = new Document(); query.put("msisdn", arg2); logger.info(query.toJson()); FindIterable<Document> fi = coll.find(query); try (MongoCursor<Document> cursor = fi.iterator()) { while (cursor.hasNext()) { result = result + cursor.next().toJson(); } } } catch (Exception e) { logger.info("Error: " + e.getMessage()); } return result; } }
Created 12-17-2018 08:42 AM
Any help...