Member since
11-12-2016
71
Posts
11
Kudos Received
0
Solutions
01-05-2019
09:24 PM
Thanks a lot. Jira submitted
... View more
01-05-2019
09:21 PM
1 Kudo
Many thanks for your advise, I filed a jira for that.
... View more
01-01-2019
09:10 PM
on side question. I'm trying to create custom processor where Scheduling tab is disabled, or cant be modified. is this possible ?
... View more
01-01-2019
09:09 PM
Excellent topic.. many thanks
... View more
01-01-2019
07:53 PM
Thanks a lot Andy, is there any way to achieve this ?
... View more
12-31-2018
03:13 PM
Hello I'm trying to create NiFi user to modify the component but can't operate it. This is needed to segregate the duties where someone will create and components and another one will run them. I tried the current privileges but when I set the "Modify component" privilege. The user is able to operate as well even if I removed the "Operate" Privilege can you help ? Screenshots for test user
... View more
Labels:
- Labels:
-
Apache NiFi
12-31-2018
08:33 AM
Hello I'm trying to create NiFi user to modify the component but can't operate it. This is needed to segregate the duties where someone will create and components and another one will run them. I tried the current privileges but when I set the "Modify component" privilege. The user is able to operate as well even if I removed the "Operate" Privilege can you help ? Screenshots for test user
... View more
Labels:
- Labels:
-
Apache NiFi
12-26-2018
11:29 AM
Hi @krajguru I have the same issue, I was able to login through certificate "CN=Adminstrator, OU=NIFI", but when I switched to ldap I got an error "Unknown user with identity 'CN=Adminstrator, OU=NIFI'. Contact the system administrator." Noting that I created user on Nifi called admin to use it with ldap and changed the configuration of "Initial User Identity" to admin and login identity provider as attached. Can you help ? Thanks screenshot-2018-12-26-at-132734.png
... View more
12-17-2018
09:38 PM
Thank you @Otto Fowler I changed it as per your advice. However, I still face the same issue. For some reason, project doesnt include all the pom jar/nar Below is my pom.xml <?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>nifi</groupId>
<artifactId>custom1</artifactId>
<version>1.8.0</version>
</parent>
<artifactId>nifi-nifi-processors</artifactId>
<packaging>nar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
<version>1.8.0</version>
<type>jar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<version>1.8.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongo-java-driver</artifactId>
<version>3.2.2</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mongodb-client-service-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mongodb-client-service-api-nar</artifactId>
<version>1.8.0</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-ssl-context-service-api</artifactId>
<version>1.8.0</version>
<type>jar</type>
</dependency>
</dependencies>
<version>1.8.0</version>
</project>
... View more
12-17-2018
03:18 PM
Hi I have added the below PropertyDescriptor to my custom processor to be able to use Mongo connection pool. however, Nifi wont start. Below errors are generated. can you advise ? I included the below dependencies as well.
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mongodb-client-service-api</artifactId>
<version>1.8.0</version>
<type>jar</type>
</dependency>
import org.apache.nifi.mongodb.MongoDBClientService;
static final PropertyDescriptor CLIENT_SERVICE = new PropertyDescriptor.Builder()
.name("mongo-client-service")
.displayName("Client Service")
.description("If configured, this property will use the assigned client service for connection pooling.")
.required(true)
.identifiesControllerService(MongoDBClientService.class)
.build();
p.p1 {margin: 0.0px 0.0px 0.0px 0.0px; font: 11.0px Menlo; color: #000000}
span.s1 {font-variant-ligatures: no-common-ligatures}
span.Apple-tab-span {white-space:pre}
2018-12-17 09:01:49,569 INFO [main] org.apache.nifi.web.server.JettyServer Loading WAR: /opt/nifi-1.8.0/./work/nar/framework/nifi-framework-nar-1.8.0.nar-unpacked/NAR-INF/bundled-dependencies/nifi-web-error-1.8.0.war with context path set to /
2018-12-17 09:01:49,638 INFO [main] org.apache.nifi.web.server.JettyServer Running in HTTP mode; host headers not restricted
2018-12-17 09:01:50,945 ERROR [main] org.apache.nifi.NiFi Failure to launch NiFi due to java.util.ServiceConfigurationError: org.apache.nifi.processor.Processor: Provider nifi.processors.nifi.MyProcessor could not be instantiated
java.util.ServiceConfigurationError: org.apache.nifi.processor.Processor: Provider nifi.processors.nifi.MyProcessor could not be instantiated
at java.util.ServiceLoader.fail(ServiceLoader.java:232)
at java.util.ServiceLoader.access$100(ServiceLoader.java:185)
at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:384)
at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
at org.apache.nifi.nar.ExtensionManager.loadExtensions(ExtensionManager.java:148)
at org.apache.nifi.nar.ExtensionManager.discoverExtensions(ExtensionManager.java:123)
at org.apache.nifi.web.server.JettyServer.start(JettyServer.java:838)
at org.apache.nifi.NiFi.<init>(NiFi.java:157)
at org.apache.nifi.NiFi.<init>(NiFi.java:71)
at org.apache.nifi.NiFi.main(NiFi.java:296)
Caused by: java.lang.NoClassDefFoundError: org/apache/nifi/mongodb/MongoDBClientService
at nifi.processors.nifi.MyProcessor.<clinit>(MyProcessor.java:79)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at java.lang.Class.newInstance(Class.java:442)
at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:380)
... 8 common frames omitted
Caused by: java.lang.ClassNotFoundException: org.apache.nifi.mongodb.MongoDBClientService
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 15 common frames omitted
2018-12-17 09:01:50,952 INFO [Thread-1] org.apache.nifi.NiFi Initiating shutdown of Jetty web server...
2018-12-17 09:01:50,954 INFO [Thread-1] org.apache.nifi.NiFi Jetty web server shutdown completed (nicely or otherwise).
... View more
Labels:
- Labels:
-
Apache NiFi
12-16-2018
04:06 PM
Below is my custom processor which selects data from mongoDB and write it in flowFile stream. Code is working fine but I need to modify the connection part so it uses connection pool instead of traditional connection. Similar to GetMongo processor String db_name = "cms", host_name = "localhost";
MongoClient mongo_client = new MongoClient(host_name, 27017);
MongoDatabase db = mongo_client.getDatabase(db_name); How to achieve this, Custom processor Below /*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package test.processors.test2;
import com.mongodb.MongoClient;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.io.OutputStreamWriter;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.exception.FlowFileAccessException;
import org.bson.Document;
@Tags({"example"})
@CapabilityDescription("Provide a description")
@SeeAlso({})
@ReadsAttributes({
@ReadsAttribute(attribute = "", description = "")})
@WritesAttributes({
@WritesAttribute(attribute = "", description = "")})
public class MyProcessor extends AbstractProcessor {
public static final PropertyDescriptor MY_PROPERTY = new PropertyDescriptor.Builder().name("MY_PROPERTY")
.displayName("Prefix property")
.description("Prefix Property")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("Success")
.description("Sucessful relationship")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("Fail")
.description("Failure relationship")
.build();
private List<PropertyDescriptor> descriptors;
private Set<Relationship> relationships;
@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
descriptors.add(MY_PROPERTY);
this.descriptors = Collections.unmodifiableList(descriptors);
final Set<Relationship> relationships = new HashSet<Relationship>();
relationships.add(REL_SUCCESS);
relationships.add(REL_FAILURE);
this.relationships = Collections.unmodifiableSet(relationships);
}
@Override
public Set<Relationship> getRelationships() {
return this.relationships;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
}
@OnScheduled
public void onScheduled(final ProcessContext context) {
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final ComponentLog logger = getLogger();
final String prefixStr = context.getProperty(MY_PROPERTY).getValue();
try {
flowFile = session.write(flowFile, (final InputStream inputStream, final OutputStream outputStream) -> {
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8));
BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(outputStream, StandardCharsets.UTF_8));
String line;
while ((line = bufferedReader.readLine()) != null) {
String xxx = mongoDB( prefixStr, line);
String aggregatedText = prefixStr + ": " + line + ": " + xxx;
bufferedWriter.write(aggregatedText);
bufferedWriter.newLine();
}
bufferedWriter.flush();
});
session.transfer(flowFile, REL_SUCCESS);
logger.info("successfully processed FlowFile {}", new Object[]{flowFile});
} catch (FlowFileAccessException e) {
session.transfer(flowFile, REL_FAILURE);
logger.info("Failed to add prefix for FlowFile: " + e.getMessage());
}
}
String db_name = "cms", host_name = "localhost";
MongoClient mongo_client = new MongoClient(host_name, 27017);
MongoDatabase db = mongo_client.getDatabase(db_name);
public String mongoDB(String arg1, String arg2) {
final ComponentLog logger = getLogger();
String result = "";
try {
String db_col_name = arg1;
MongoCollection<Document> coll = db.getCollection(db_col_name);
Document query = new Document();
query.put("msisdn", arg2);
logger.info(query.toJson());
FindIterable<Document> fi = coll.find(query);
try (MongoCursor<Document> cursor = fi.iterator()) {
while (cursor.hasNext()) {
result = result + cursor.next().toJson();
}
}
}
catch (Exception e)
{
logger.info("Error: " + e.getMessage());
}
return result;
}
}
... View more
Labels:
- Labels:
-
Apache NiFi
12-14-2018
08:25 PM
No reason actually, I tried to use the code provided above, but it didn't recognise the inputStream object flowFile = session.write(flowFile,{inputStream, outputStream ->BufferedReader br =newBufferedReader(newInputStreamReader(inputStream));
br.lines().forEach(line ->{String[] a = line.split("\\|",-1);
outputStream.write("SOMETHING WITH a[]".getBytes(StandardCharsets.UTF_8));
});
});
... View more
12-14-2018
01:05 PM
Thanks @Matt for your help I followed the steps as you highlighted, code below try {
FlowFile flowFile = session.get();
if ( flowFile == null ) {
return;
}
InputStream inputStream = session.read(flowFile) ;
BufferedReader br = new BufferedReader(new InputStreamReader(inputStream));
flowFile = session.write(flowFile, outputStream -> {
br.lines().forEach(line -> {
try {
outputStream.write((line + " SOMETHING WITH a[]").getBytes(StandardCharsets.UTF_8));
} catch (IOException ex) {
Logger.getLogger(MyProcessor.class.getName() + line).log(Level.SEVERE, null, ex);
}
});
} );
inputStream.close();
session.transfer (flowFile, MY_RELATIONSHIP);
} catch (IOException ex) {
} Not sure how should i close the stream, error below p.p1 {margin: 0.0px 0.0px 0.0px 0.0px; font: 11.0px Menlo; color: #000000}
span.s1 {font-variant-ligatures: no-common-ligatures}
span.Apple-tab-span {white-space:pre} 2018-12-14 13:35:55,706 ERROR [Timer-Driven Process Thread-10] nifi.processors.nifi.MyProcessor MyProcessor[id=ac7f9eb0-0167-1000-4632-c923e4c82eaf] MyProcessor[id=ac7f9eb0-0167-1000-4632-c923e4c82eaf] failed to process session due to java.lang.IllegalStateException: StandardFlowFileRecord[uuid=a85b94fb-fcad-4212-992e-5224ec7d4da1,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1544786914428-1, container=default, section=1], offset=199, length=1],offset=0,name=a85b94fb-fcad-4212-992e-5224ec7d4da1,size=1] already in use for an active callback or an InputStream created by ProcessSession.read(FlowFile) has not been closed; Processor Administratively Yielded for 1 sec: java.lang.IllegalStateException: StandardFlowFileRecord[uuid=a85b94fb-fcad-4212-992e-5224ec7d4da1,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1544786914428-1, container=default, section=1], offset=199, length=1],offset=0,name=a85b94fb-fcad-4212-992e-5224ec7d4da1,size=1] already in use for an active callback or an InputStream created by ProcessSession.read(FlowFile) has not been closed java.lang.IllegalStateException: StandardFlowFileRecord[uuid=a85b94fb-fcad-4212-992e-5224ec7d4da1,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1544786914428-1, container=default, section=1], offset=199, length=1],offset=0,name=a85b94fb-fcad-4212-992e-5224ec7d4da1,size=1] already in use for an active callback or an InputStream created by ProcessSession.read(FlowFile) has not been closed at org.apache.nifi.controller.repository.StandardProcessSession.validateRecordState(StandardProcessSession.java:3147) at org.apache.nifi.controller.repository.StandardProcessSession.validateRecordState(StandardProcessSession.java:3142) at org.apache.nifi.controller.repository.StandardProcessSession.write(StandardProcessSession.java:2632) at nifi.processors.nifi.MyProcessor.onTrigger(MyProcessor.java:111) at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1165) at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:203) at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
... View more
12-14-2018
01:05 PM
Hello @Matt. New code as you advised, had to put session.read(flowFile) for the inputStream BufferedReader br = new BufferedReader(new InputStreamReader(session.read(flowFile), StandardCharsets.UTF_8));
flowFile = session.write(flowFile, outputStream -> {
br.lines().forEach(line -> {
try {
outputStream.write((line + " SOMETHING").getBytes(StandardCharsets.UTF_8));
} catch (IOException ex) {
Logger.getLogger(MyProcessor.class.getName() + line).log(Level.SEVERE, null, ex);
}
});
} );
session.transfer (flowFile, MY_RELATIONSHIP); Error: Im still getting the below error ProcessSession.read(FlowFile) has not been closed p.p1 {margin: 0.0px 0.0px 0.0px 0.0px; font: 11.0px Menlo; color: #000000}
span.s1 {font-variant-ligatures: no-common-ligatures}
span.Apple-tab-span {white-space:pre} 2018-12-14 13:35:55,707 WARN [Timer-Driven Process Thread-10] o.a.n.controller.tasks.ConnectableTask Administratively Yielding MyProcessor[id=ac7f9eb0-0167-1000-4632-c923e4c82eaf] due to uncaught Exception: java.lang.IllegalStateException: StandardFlowFileRecord[uuid=a85b94fb-fcad-4212-992e-5224ec7d4da1,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1544786914428-1, container=default, section=1], offset=199, length=1],offset=0,name=a85b94fb-fcad-4212-992e-5224ec7d4da1,size=1] already in use for an active callback or an InputStream created by ProcessSession.read(FlowFile) has not been closed java.lang.IllegalStateException: StandardFlowFileRecord[uuid=a85b94fb-fcad-4212-992e-5224ec7d4da1,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1544786914428-1, container=default, section=1], offset=199, length=1],offset=0,name=a85b94fb-fcad-4212-992e-5224ec7d4da1,size=1] already in use for an active callback or an InputStream created by ProcessSession.read(FlowFile) has not been closed at org.apache.nifi.controller.repository.StandardProcessSession.validateRecordState(StandardProcessSession.java:3147) at org.apache.nifi.controller.repository.StandardProcessSession.validateRecordState(StandardProcessSession.java:3142) at org.apache.nifi.controller.repository.StandardProcessSession.write(StandardProcessSession.java:2632) at nifi.processors.nifi.MyProcessor.onTrigger(MyProcessor.java:111) at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1165) at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:203) at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
... View more
12-13-2018
06:35 PM
Any help ? How to rewrite the below in Java for custom processor flowFile = session.write(flowFile,{inputStream, outputStream -> inputStream.eachLine { line -> outputStream.write("Test ABCABC\n".toString().getBytes(StandardCharsets.UTF_8)) } }asStreamCallback)
... View more
12-12-2018
07:20 PM
Hi All, I'm trying to create simple custom processor, I used to work with executeScript processor to for loop using the below flowFile = session.write(flowFile, {inputStream, outputStream ->
inputStream.eachLine { line ->
def a = line.split("\\|", -1)
outputStream.write("$Test ABCABC\n".toString().getBytes(StandardCharsets.UTF_8))
}
} as StreamCallback) How can I rewrite this in Java, to inject in the below custom processor code /*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package hwx.processors.hwx;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@Tags({"example"})
@CapabilityDescription("Provide a description")
@SeeAlso({})
@ReadsAttributes({@ReadsAttribute(attribute="", description="")})
@WritesAttributes({@WritesAttribute(attribute="", description="")})
public class MyProcessor extends AbstractProcessor {
public static final PropertyDescriptor MY_PROPERTY = new PropertyDescriptor
.Builder().name("MY_PROPERTY")
.displayName("My property")
.description("Example Property")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final Relationship MY_RELATIONSHIP = new Relationship.Builder()
.name("MY_RELATIONSHIP")
.description("Example relationship")
.build();
private List<PropertyDescriptor> descriptors;
private Set<Relationship> relationships;
@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
descriptors.add(MY_PROPERTY);
this.descriptors = Collections.unmodifiableList(descriptors);
final Set<Relationship> relationships = new HashSet<Relationship>();
relationships.add(MY_RELATIONSHIP);
this.relationships = Collections.unmodifiableSet(relationships);
}
@Override
public Set<Relationship> getRelationships() {
return this.relationships;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
}
@OnScheduled
public void onScheduled(final ProcessContext context) {
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if ( flowFile == null ) {
return;
}
////// Code here
/////////////////
session.transfer(flowFile, MY_RELATIONSHIP);
}
} Thanks a lot
... View more
Labels:
- Labels:
-
Apache NiFi
12-12-2018
05:22 PM
Thank you Matt, I have used try/catch but the strange thing is that when I changed flowFilec to flowFile, in the session.transfer(flowFilec, REL_BACK), it generates an error but doesn't trigger the catch clause
... View more
12-12-2018
01:14 PM
Thanks @Matt Burgess I updated the code, I added "flowFilec" which is not declared. Thus will generate the UndeclaredThrowableException. I dont know why or how to handle this exception ?
... View more
12-12-2018
01:14 PM
Dears I managed to create simple InvokeScriptedProcessor.. Code is working fine. However, if I made code error intentionally I get the below error, UndeclaredThrowableException. 2018-12-11 12:30:59,400 WARN [Timer-Driven Process Thread-7] o.a.n.c.t.ContinuallyRunProcessorTask Administratively Yielding InvokeScriptedProcessor[id=115d1249-1fba-1c84-7af8-8918fa3e1ecd] due to uncaught Exception: java.lang.reflect.UndeclaredThrowableException 2018-12-11 12:30:59,400 WARN [Timer-Driven Process Thread-7] o.a.n.c.t.ContinuallyRunProcessorTask java.lang.reflect.UndeclaredThrowableException: null Code: import java.nio.charset.StandardCharsets
import java.io.IOException
class TESTSCRIPT implements Processor {
def REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("The flowfile with the specified query results was successfully transferred.")
.build();
def REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("An error occured while running the specified query.")
.build();
def REL_BACK = new Relationship.Builder()
.name("backup")
.description("Backup files.")
.build();
def PORT_NUMBER = new PropertyDescriptor.Builder()
.name('Port number')
.description('Port number')
.required(true)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.PORT_VALIDATOR)
.build()
def FILENAME = new PropertyDescriptor.Builder()
.name('File Name')
.description('Sets the filename attribute of the flowfile (do not include the extension).')
.required(true)
.expressionLanguageSupported(true)
.addValidator(Validator.VALID)
.build()
def ComponentLog log
@Override
void initialize(ProcessorInitializationContext context) {
log = context.getLogger()
}
@Override
Set<Relationship> getRelationships() {
return [REL_SUCCESS, REL_FAILURE, REL_BACK] as Set
}
@Override
void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
///// Code
def session = sessionFactory.createSession()
def flowFile = session.get()
if(!flowFile) return
def FILENAME = context.getProperty(FILENAME).value
def PORT_NUMBER = context.getProperty(PORT_NUMBER).value
flowFile = session.write(flowFile, {inputStream, outputStream ->
inputStream.eachLine { line ->
def a = line.split("\\|", -1)
outputStream.write("${PORT_NUMBER}: Test\n".toString().getBytes(StandardCharsets.UTF_8))
}
} as StreamCallback)
flowFile = session.putAttribute(flowFile, "filename", FILENAME)
try {
FlowFile flowFilec = session.clone(flowFile);
//log.error("ExecuteScript: transfer file")
session.transfer(flowFile, REL_SUCCESS)
session.transfer(flowFilec, REL_BACK)
session.commit()
}
catch (e) {
log.error('Scripting error: ', e)
session.transfer(flowFile, REL_FAILURE)
//session.rollback(true)
session.commit()
}
}
////////////
@Override
Collection<ValidationResult> validate(ValidationContext context) { return null }
@Override
PropertyDescriptor getPropertyDescriptor(String name) {
switch(name) {
case 'File Name': return FILENAME
case 'SMPP Port number': return PORT_NUMBER
default: return null
}
}
@Override
void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { }
@Override
List<PropertyDescriptor> getPropertyDescriptors() {
return [FILENAME, PORT_NUMBER] as List
}
@Override
String getIdentifier() { return 'TESTSCRIPT-InvokeScriptedProcessor' }
}
processor = new TESTSCRIPT() Not sure why this Exception is generated, how can I handle it? so I get only the errors that are related to the code problem
... View more
Labels:
- Labels:
-
Apache NiFi
10-28-2017
07:01 PM
One silly question plz.. where can I find the below import source files ? which folder if I need to check the content or add more imports to them ? import org.apache.nifi.controller.ControllerService
import groovy.sql.Sql
... View more
10-27-2017
06:26 PM
Thanks @Shu and @Matt for your answers, @Shu, the controller service Id is different because I recreated it couple of times before posting the questions and comments 🙂 What I did is that I moved this part ( flowFile = session.get()
if(!flowFile) return ) before defining the connection, code is working since yesterday but Im not sure if this was the issue ! @Matt, for your question... I was not able to find any other logs, I got this behaviour only on Nifi 1.4, version 1.0 is working fine. import org.apache.nifi.controller.ControllerService
import groovy.sql.Sql
flowFile = session.get()
if(!flowFile) return
def lookup = context.controllerServiceLookup
def dbServiceName = databaseConnectionPoolName.value
def dbcpServiceId = lookup.getControllerServiceIdentifiers(ControllerService).find {
cs -> lookup.getControllerServiceName(cs) == dbServiceName
}
def conn = lookup.getControllerService(dbcpServiceId).getConnection()
def sql = new Sql(conn)
def sqlInsert = new Sql(conn)
... View more
10-26-2017
01:32 PM
1 Kudo
Thanks @Shu I tried your suggestion as below . However, I still get my connection disconnected... and I can't see in the log that "select CURRENT_TIMESTAMP()" is being executed ? is this normal ? Also I dunno why the ConnectionPool is not showing that ExecuteScript is refering to it ? see below Thanks a lot...
... View more
10-26-2017
07:21 AM
Dear Forum I have the below Nifi flow to fetch data from mysql table and then load it to another mysql table (data enrichment) I used executescript processor to connect to DB and manipulate the data as below (image 1.png) this is my code import org.apache.nifi.controller.ControllerService
import groovy.sql.Sql
def lookup = context.controllerServiceLookup
def dbServiceName = databaseConnectionPoolName.value
def dbcpServiceId = lookup.getControllerServiceIdentifiers(ControllerService).find {
cs -> lookup.getControllerServiceName(cs) == dbServiceName
}
def conn = lookup.getControllerService(dbcpServiceId).getConnection()
def sql = new Sql(conn)
def sqlInsert = new Sql(conn)
try {
flowFile = session.get()
if(!flowFile) return
//log.error("ExecuteScript: Line records Exception: " + dbServiceName )
flowFile = session.write(flowFile, {inputStream, outputStream ->
inputStream.eachLine { line ->
def String[] lineCellArr = ["", "", "", "", "", "", ""]
def sqlSelectArr = ""
if (line != null){
//log.error("ExecuteScript: Line records Exception: " + dbServiceName )
lineArr = line.split("\,", -1)
for(int i = 0; i < lineArr.length ; i++) {
lineCellArr[i] = lineArr[i];
}
sql.rows("SELECT * FROM Polygon.cells where cellId='" + lineCellArr[6] +"'").eachWithIndex { row, idx ->
sqlSelectArr = row.values().join(';').split("\;", -1)
outputStream.write((line.toString() + row.values().join(';') + ";" + sqlSelectArr[1]).getBytes())
def insertResult1 = sqlInsert.executeUpdate("INSERT IGNORE INTO Polygon.recharges (rechargeNumber, rechargeTime, serviceClass, rechargeValue, rechargeType, balanceAfter, rechargeLocation, towerName) VALUES ('" + lineCellArr[0] + "','" + lineCellArr[1] + "','"+ lineCellArr[2] + "','"+ lineCellArr[3] + "','"+ lineCellArr[4] + "','"+ lineCellArr[5] + "','"+ lineCellArr[6] + "','"+ sqlSelectArr[1] + "')")
def insertResult2 = sqlInsert.executeUpdate("INSERT INTO Polygon.rechargesSummary (rechargeDate, serviceClass, rechargeValue, rechargeCount, rechargeType, rechargeLocation, towerName, latitude , longitude, region, subRegion) VALUES ('" + lineCellArr[1] + "','"+ lineCellArr[2] + "','"+ lineCellArr[3] + "','1','"+ lineCellArr[4] + "','"+ lineCellArr[6] + "','" + sqlSelectArr[1] + "','"+ sqlSelectArr[2] + "','"+ sqlSelectArr[3] + "','"+ sqlSelectArr[4] + "','"+ sqlSelectArr[5] + "') on DUPLICATE KEY UPDATE rechargeValue = rechargeValue + '"+ lineCellArr[3] +"', rechargeCount = rechargeCount + 1")
//write results
outputStream.write((";" + insertResult1.toString() +insertResult2.toString() + " \n").getBytes())
}
attrMap = ['Type': "Vouchers", 'Company': "Zain"]
}
}
} as StreamCallback)
flowFile = session.putAllAttributes(flowFile, attrMap)
session.transfer(flowFile, REL_SUCCESS)
} catch(e) {
log.error('Scripting error', e)
session.transfer(flowFile, REL_FAILURE)
}
conn.close() Everything works fine, but after couple of hours I start getting the below error Caused by: javax.script.ScriptException: javax.script.ScriptException: java.lang.IllegalStateException: Cannot invoke method public abstract java.sql.Connection org.apache.nifi.dbcp.DBCPService.getConnection() throws org.apache.nifi.processor.exception.ProcessException on Controller Service with identifier 7e50134b-1000-115f-d673-ccc9fbd344fe because the Controller Service is disabled
at org.codehaus.groovy.jsr223.GroovyScriptEngineImpl.eval(GroovyScriptEngineImpl.java:159)
at javax.script.AbstractScriptEngine.eval(AbstractScriptEngine.java:264)
at org.apache.nifi.script.impl.GroovyScriptEngineConfigurator.eval(GroovyScriptEngineConfigurator.java:54)
at org.apache.nifi.processors.script.ExecuteScript.onTrigger(ExecuteScript.java:220)
... 11 common frames omitted
Caused by: javax.script.ScriptException: java.lang.IllegalStateException: Cannot invoke method public abstract java.sql.Connection org.apache.nifi.dbcp.DBCPService.getConnection() throws org.apache.nifi.processor.exception.ProcessException on Controller Service with identifier 7e50134b-1000-115f-d673-ccc9fbd344fe because the Controller Service is disabled
at org.codehaus.groovy.jsr223.GroovyScriptEngineImpl.eval(GroovyScriptEngineImpl.java:355)
at org.codehaus.groovy.jsr223.GroovyScriptEngineImpl.eval(GroovyScriptEngineImpl.java:153)
... 14 common frames omitted
Caused by: java.lang.IllegalStateException: Cannot invoke method public abstract java.sql.Connection org.apache.nifi.dbcp.DBCPService.getConnection() throws org.apache.nifi.processor.exception.ProcessException on Controller Service with identifier 7e50134b-1000-115f-d673-ccc9fbd344fe because the Controller Service is disabled
at org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler.invoke(StandardControllerServiceInvocationHandler.java:84)
at com.sun.proxy.$Proxy77.getConnection(Unknown Source)
at org.apache.nifi.dbcp.DBCPService$getConnection.call(Unknown Source)
at Script4.run(Script4.groovy:19)
at org.codehaus.groovy.jsr223.GroovyScriptEngineImpl.eval(GroovyScriptEngineImpl.java:352)
... 15 common frames omitted
2017-10-26 09:55:16,873 ERROR [Timer-Driven Process Thread-12] o.a.nifi.processors.script.ExecuteScript ExecuteScript[id=7e50130c-1000-115f-dab6-0ef65aa69de2] Failed to process session due to org.apache.nifi.processor.exception.ProcessException: javax.script.ScriptException: javax.script.ScriptException: java.lang.IllegalStateException: Cannot invoke method public abstract java.sql.Connection org.apache.nifi.dbcp.DBCPService.getConnection() throws org.apache.nifi.processor.exception.ProcessException on Controller Service with identifier 7e50134b-1000-115f-d673-ccc9fbd344fe because the Controller Service is disabled: {}
org.apache.nifi.processor.exception.ProcessException: javax.script.ScriptException: javax.script.ScriptException: java.lang.IllegalStateException: Cannot invoke method public abstract java.sql.Connection org.apache.nifi.dbcp.DBCPService.getConnection() throws org.apache.nifi.processor.exception.ProcessException on Controller Service with identifier 7e50134b-1000-115f-d673-ccc9fbd344fe because the Controller Service is disabled
at org.apache.nifi.processors.script.ExecuteScript.onTrigger(ExecuteScript.java:230)
at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1119)
at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:147)
at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:128)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748) When I restart the connectionpool, everything works fine for couple of hours till I get this error again, could you please advise Thanks a lot
... View more
Labels:
- Labels:
-
Apache NiFi
06-19-2017
03:26 PM
Hello Matt, You are right, I added "sesson.commit()" and now files are generated. Thanks 🙂 what is the method that is triggered/called when I stop the processor ? To stop the SMPP process when I stop the processor ! Thanks a lot
... View more
06-19-2017
03:25 PM
Hello Matt, You are right, I added "sesson.commit()" and now files are generated. Thanks 🙂 what is the method that is triggered/called when I stop the processor ? To stop the SMPP process when I stop the processor ! Thanks a lot
... View more
06-19-2017
12:21 AM
Hello forum,
I'm trying to create custom Nifi processor to run Java code inside (processor to act as server to generate file once a request is received). All is working fine so far except that processor is not generating any file to the next processor through MY_RELATIONSHIP
Myprocessor code
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package hwx.processors.demo;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import jsmpp.SMPPServerSimulator;
import org.apache.log4j.BasicConfigurator;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.scheduling.SchedulingStrategy;
@Tags({"example"})
@CapabilityDescription("Provide a description")
@SeeAlso({})
@ReadsAttributes({
@ReadsAttribute(attribute = "", description = "")})
@WritesAttributes({
@WritesAttribute(attribute = "", description = "")})
public class MyProcessor extends AbstractProcessor {
private SchedulingStrategy schedulingStrategy; // guarded by read/write lock
public static final PropertyDescriptor MY_PROPERTY = new PropertyDescriptor.Builder().name("MY_PROPERTY")
.displayName("SMPP Server port")
.description("SMPP Server port")
.required(true)
.addValidator(StandardValidators.INTEGER_VALIDATOR)
.build();
public static final Relationship MY_RELATIONSHIP = new Relationship.Builder()
.name("SUCCESS_RELATIONSHIP")
.description("Success relationship")
.build();
private List<PropertyDescriptor> descriptors;
private Set<Relationship> relationships;
@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
descriptors.add(MY_PROPERTY);
this.descriptors = Collections.unmodifiableList(descriptors);
final Set<Relationship> relationships = new HashSet<Relationship>();
relationships.add(MY_RELATIONSHIP);
this.relationships = Collections.unmodifiableSet(relationships);
System.out.println("This is a custom processor that will receive flow file");
}
@Override
public Set<Relationship> getRelationships() {
return this.relationships;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
}
@OnScheduled
public void onScheduled(final ProcessContext context) {
}
@OnStopped
public void onStopped(final ProcessContext context) {
}
public SchedulingStrategy getSchedulingStrategy() {
return schedulingStrategy.TIMER_DRIVEN;
}
@Override
public void onTrigger(final ProcessContext context, ProcessSession session) throws ProcessException {
final int port = context.getProperty(MY_PROPERTY).evaluateAttributeExpressions().asInteger();
BasicConfigurator.configure();
SMPPServerSimulator smppServerSim = new SMPPServerSimulator(8058, session, MY_RELATIONSHIP);
smppServerSim.run();
}
}
SMPPServerSimulator.java
public MessageId onAcceptSubmitSm(SubmitSm submitSm,
SMPPServerSession source) throws ProcessRequestException {
MessageId messageId = messageIDGenerator.newMessageId();
logger.debug("\nReceiving submit_sm {}, and return message id {}\n", new String(submitSm.getShortMessage()), messageId.getValue());
if (SMSCDeliveryReceipt.SUCCESS.containedIn(submitSm.getRegisteredDelivery()) || SMSCDeliveryReceipt.SUCCESS_FAILURE.containedIn(submitSm.getRegisteredDelivery())) {
execServiceDelReciept.execute(new DeliveryReceiptTask(source, submitSm, messageId));
}
flowfile = session.create();
flowfile = session.putAttribute(flowfile, "match", new String(submitSm.getShortMessage()));
System.out.println(session + " flowfile: " + flowfile + " Relation: " + SUCCESS_RELATIONSHIP);
session.transfer(flowfile, MY_RELATIONSHIP);
System.out.println("Message Recieved: " + new String(submitSm.getShortMessage()));
return messageId;
}
Below Logs below for the System.out.println command. flowfile is created but not sent
p.p1 {margin: 0.0px 0.0px 0.0px 0.0px; font: 11.0px Menlo; color: #000000; background-color: #ffffff}
span.s1 {font-variant-ligatures: no-common-ligatures}2017-06-18 17:31:46,449 INFO [NiFi logging handler] org.apache.nifi.StdOut StandardProcessSession[id=0] flowfile: StandardFlowFileRecord[uuid=ad3128b6-0032-491e-a5ba-ee73f99b8f0b,claim=,offset=0,name=50727827275499,size=0] Relation: SUCCESS_RELATIONSHIP
can you help please..
... View more
Labels:
- Labels:
-
Apache NiFi
06-16-2017
01:18 PM
any help goros 🙂
... View more
06-15-2017
12:44 PM
Hello forum, I have Java code and I want to include it within the ExecuteScript processor, This is the Java code, import java.io.IOException
import java.util.Date
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.TimeoutException
import org.apache.log4j.BasicConfigurator
import org.jsmpp.PDUStringException
import org.jsmpp.SMPPConstant
import org.jsmpp.bean.CancelSm
import org.jsmpp.bean.DataCoding
import org.jsmpp.bean.DataSm
import org.jsmpp.bean.DeliveryReceipt
import org.jsmpp.bean.ESMClass
import org.jsmpp.bean.GSMSpecificFeature
import org.jsmpp.bean.MessageMode
import org.jsmpp.bean.MessageType
import org.jsmpp.bean.NumberingPlanIndicator
import org.jsmpp.bean.QuerySm
import org.jsmpp.bean.RegisteredDelivery
import org.jsmpp.bean.ReplaceSm
import org.jsmpp.bean.SMSCDeliveryReceipt
import org.jsmpp.bean.SubmitMulti
import org.jsmpp.bean.SubmitMultiResult
import org.jsmpp.bean.SubmitSm
import org.jsmpp.bean.TypeOfNumber
import org.jsmpp.extra.ProcessRequestException
import org.jsmpp.session.BindRequest
import org.jsmpp.session.DataSmResult
import org.jsmpp.session.QuerySmResult
import org.jsmpp.session.SMPPServerSession
import org.jsmpp.session.SMPPServerSessionListener
import org.jsmpp.session.ServerMessageReceiverListener
import org.jsmpp.session.ServerResponseDeliveryAdapter
import org.jsmpp.session.Session
import org.jsmpp.util.DeliveryReceiptState
import org.jsmpp.util.MessageIDGenerator
import org.jsmpp.util.MessageId
import org.jsmpp.util.RandomMessageIDGenerator
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.jsmpp.*
import jsmpp.*
public class SMPPServerSimulator extends ServerResponseDeliveryAdapter implements Runnable, ServerMessageReceiverListener {
private static final Integer DEFAULT_PORT = 8057
private static final Logger logger = LoggerFactory.getLogger(SMPPServerSimulator.class)
private final ExecutorService execService = Executors.newFixedThreadPool(5)
private final ExecutorService execServiceDelReciept = Executors.newFixedThreadPool(100)
private final MessageIDGenerator messageIDGenerator = new RandomMessageIDGenerator()
private int port
public SMPPServerSimulator(int port) {
this.port = port
}
public void run() {
try {
SMPPServerSessionListener sessionListener = new SMPPServerSessionListener(port)
logger.info("Listening on port {}", port)
while (true) {
SMPPServerSession serverSession = sessionListener.accept()
logger.info("Accepting connection for session {}", serverSession.getSessionId())
serverSession.setMessageReceiverListener(this)
serverSession.setResponseDeliveryListener(this)
execService.execute(new WaitBindTask(serverSession))
}
} catch (IOException e) {
logger.error("IO error occured", e)
}
}
@Override
public QuerySmResult onAcceptQuerySm(QuerySm querySm,
SMPPServerSession source) throws ProcessRequestException {
logger.info("Accepting query sm, but not implemented")
return null
}
@Override
public MessageId onAcceptSubmitSm(SubmitSm submitSm,
SMPPServerSession source) throws ProcessRequestException {
MessageId messageId = messageIDGenerator.newMessageId()
logger.debug("\nReceiving submit_sm {}, and return message id {}\n", new String(submitSm.getShortMessage()), messageId.getValue())
if (SMSCDeliveryReceipt.SUCCESS.containedIn(submitSm.getRegisteredDelivery()) || SMSCDeliveryReceipt.SUCCESS_FAILURE.containedIn(submitSm.getRegisteredDelivery())) {
execServiceDelReciept.execute(new DeliveryReceiptTask(source, submitSm, messageId))
}
flowFile = session.create()
flowFile = session.putAttribute(flowFile, 'myAttr', new String(submitSm.getShortMessage()))
session.transfer(flowFile, REL_SUCCESS)
System.out.println("Message Recieved: " + new String(submitSm.getShortMessage()))
return messageId
}
@Override
public void onSubmitSmRespSent(MessageId messageId,
SMPPServerSession source) {
logger.debug("submit_sm_resp with message_id {} has been sent", messageId)
}
@Override
public SubmitMultiResult onAcceptSubmitMulti(SubmitMulti submitMulti,
SMPPServerSession source) throws ProcessRequestException {
return null
}
@Override
public DataSmResult onAcceptDataSm(DataSm dataSm, Session source)
throws ProcessRequestException {
return null
}
@Override
public void onAcceptCancelSm(CancelSm cancelSm, SMPPServerSession source)
throws ProcessRequestException {
}
@Override
public void onAcceptReplaceSm(ReplaceSm replaceSm, SMPPServerSession source)
throws ProcessRequestException {
}
private class WaitBindTask implements Runnable {
private final SMPPServerSession serverSession
public WaitBindTask(SMPPServerSession serverSession) {
this.serverSession = serverSession
}
@Override
public void run() {
try {
BindRequest bindRequest = serverSession.waitForBind(1000)
logger.info("Accepting bind for session {}", serverSession.getSessionId())
try {
bindRequest.accept("sys")
} catch (PDUStringException e) {
logger.error("Invalid system id", e)
bindRequest.reject(SMPPConstant.STAT_ESME_RSYSERR)
}
} catch (IllegalStateException e) {
logger.error("System error", e)
} catch (TimeoutException e) {
logger.warn("Wait for bind has reach timeout", e)
} catch (IOException e) {
logger.error("Failed accepting bind request for session {}", serverSession.getSessionId())
}
}
}
private class DeliveryReceiptTask implements Runnable {
private final SMPPServerSession session
private final SubmitSm submitSm
private MessageId messageId
public DeliveryReceiptTask(SMPPServerSession session,
SubmitSm submitSm, MessageId messageId) {
this.session = session
this.submitSm = submitSm
this.messageId = messageId
}
@Override
public void run() {
try {
Thread.sleep(1000)
} catch (InterruptedException e1) {
e1.printStackTrace()
}
String stringValue = Integer.valueOf(messageId.getValue(), 16).toString()
try {
DeliveryReceipt delRec = new DeliveryReceipt(stringValue, 1, 1, new Date(), new Date(), DeliveryReceiptState.DELIVRD, null, new String(submitSm.getShortMessage()))
session.deliverShortMessage(
"mc",
TypeOfNumber.valueOf(submitSm.getDestAddrTon()),
NumberingPlanIndicator.valueOf(submitSm.getDestAddrNpi()),
submitSm.getDestAddress(),
TypeOfNumber.valueOf(submitSm.getSourceAddrTon()),
NumberingPlanIndicator.valueOf(submitSm.getSourceAddrNpi()),
submitSm.getSourceAddr(),
new ESMClass(MessageMode.DEFAULT, MessageType.SMSC_DEL_RECEIPT, GSMSpecificFeature.DEFAULT),
(byte) 0,
(byte) 0,
new RegisteredDelivery(0),
DataCoding.newInstance(0),
delRec.toString().getBytes())
logger.debug("Sending delivery reciept for message id " + messageId + ":" + stringValue)
} catch (Exception e) {
logger.error("Failed sending delivery_receipt for message id " + messageId + ":" + stringValue, e)
}
}
}
public static void main(String[] args) {
int port
try {
port = Integer.parseInt(System.getProperty("jsmpp.simulator.port", DEFAULT_PORT.toString()))
} catch (NumberFormatException e) {
port = DEFAULT_PORT
}
BasicConfigurator.configure()
SMPPServerSimulator smppServerSim = new SMPPServerSimulator(port)
smppServerSim.run()
}
} Processor is running with no errors but the results of the script is not as expected, (port 8057 should become ready to bind, Code acts as SMS GW server) Any help please ? Note that the above script is working if I execute it as groovy code from Netbeans IDE !
... View more
- Tags:
- executescript
- groovy