Member since
09-12-2017
41
Posts
0
Kudos Received
1
Solution
My Accepted Solutions
Title | Views | Posted |
---|---|---|
4108 | 07-15-2018 08:34 AM |
08-19-2019
08:34 PM
Little bit away from your question but this solution may work, If you create a FTP server on your local machine folder, and open your ip using public ip, then you can connect to your local folder from anywhere in the cloud or some other remote machine. For this, you have to switch PutFTP instead of PutFile.
... View more
06-06-2019
09:52 AM
Using Nifi or through Spark Programming.
... View more
02-07-2019
09:48 AM
Hey Raghav can you please share your template file for Apache drill connectivity with executesql nifi processor, we are doing the same but somewhere we are facing issue. Just want to cross check with your configuration.
... View more
08-03-2018
05:16 AM
can you send me the job runner code exact line with comment
... View more
07-15-2018
08:34 AM
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.processors.rangareddy_doubt;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.*;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.*;
import java.util.concurrent.atomic.AtomicReference;
@Tags({"example"})
@CapabilityDescription("Provide a description")
@SeeAlso({})
@ReadsAttributes({@ReadsAttribute(attribute="", description="")})
@WritesAttributes({@WritesAttribute(attribute="", description="")})
public class ClientProcessor extends AbstractProcessor {
public static final PropertyDescriptor MY_PROPERTY = new PropertyDescriptor
.Builder().name("My Property")
.description("Example Property")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final Relationship MY_RELATIONSHIP = new Relationship.Builder()
.name("my_relationship")
.description("Example relationship")
.build();
private List<PropertyDescriptor> descriptors;
private Set<Relationship> relationships;
@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
descriptors.add(MY_PROPERTY);
this.descriptors = Collections.unmodifiableList(descriptors);
final Set<Relationship> relationships = new HashSet<Relationship>();
relationships.add(MY_RELATIONSHIP);
this.relationships = Collections.unmodifiableSet(relationships);
}
@Override
public Set<Relationship> getRelationships() {
return this.relationships;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
}
@OnScheduled
public void onScheduled(final ProcessContext context) {
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException, SchedulerException {
FlowFile flowFile = session.get();
final AtomicReference<String> value = new AtomicReference<>();
if ( flowFile == null ) {
return;
}
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(InputStream in) throws IOException {// here InputStream "in" would be your inputstream which is coming from some other processor like GetFile or your sftp server.
String input = IOUtils.toString(in, "UTF-8");
JobRunner job = new JobRunner(input); // you can pass also in as inputstream of your file or direct file content which is in string format.
job.run();
value.set(input);
}
});
flowFile = session.write(flowFile, new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
out.write(value.toString().getBytes());
}
});
flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/csv");
}
}
it is the code to create a custom processor for nifi.
... View more
07-14-2018
07:03 PM
use onTrigger method to write your code whichever there in main method, and instead of passing full path address pass from that constructor input stream of that file. Give me one day I will come with nar file of that custom processor
... View more
07-13-2018
04:02 PM
Let me tell the environment which i am trying. I am using files.bcogc.ca ftp server which uses FTPS (FTP over explict SSL), When i connect using filezilla a certificate accept pop-up apperas after accepting connection gets open and able to do listing. Similarly used ftp-ssl protocol from ubuntu cli and it got opened. But when i use in ftp nifi processor with passive mode its giving connection timeout tried with active but still same error. Nifi vm is in Azure cloud and outgoing ports are open and some incoming ports also. Below is the stacktrace. ------------------------------------------------------------------------------------------------------------------------------------------------------- This is the log that I got from nifi-app-log:- 2018-07-13 15:53:40,428 ERROR [Timer-Driven Process Thread-8] o.a.nifi.processors.standard.ListFTP ListFTP[id=01641010-7d8e-102b-2639-020d0b6d6867] Failed to perform listing on remote host due to java.net.ConnectException: Connection timed out (Connection timed out): {}
java.net.ConnectException: Connection timed out (Connection timed out)
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at org.apache.commons.net.SocketClient._connect(SocketClient.java:243)
at org.apache.commons.net.SocketClient.connect(SocketClient.java:181)
at org.apache.nifi.processors.standard.util.FTPTransfer.getClient(FTPTransfer.java:582)
at org.apache.nifi.processors.standard.util.FTPTransfer.getListing(FTPTransfer.java:227)
at org.apache.nifi.processors.standard.util.FTPTransfer.getListing(FTPTransfer.java:191)
at org.apache.nifi.processors.standard.ListFileTransfer.performListing(ListFileTransfer.java:106)
at org.apache.nifi.processor.util.list.AbstractListProcessor.onTrigger(AbstractListProcessor.java:405)
at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1165)
at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:203)
at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
... View more
07-03-2018
09:13 AM
it is not fetching a single data from ftp server that I'm using in cloud nifi server, but same configuration of FTP server in local using GetFTP processor or ListFTP processor is able to fetch data or queue data from ftp server. If I'm using ListFTP processor in cloud nifi it is throwing an exception that is "Failed to perform listing on remote host due to java.net.SocketException", after some time (15 minutes interval). If in case I used GetFTP processor then it is not showing an exception or error message on the right corner of the nifi processor. Even in logs no error message or exception. while using GETFTP processor log only showing: - "Log :- 2018-06-30 10:24:18,631 INFO [NiFi Web Server-18] o.a.n.c.s.StandardProcessScheduler Starting GetFTP[id=01641000-7d8e-102b-7a2f-83073ddb9d5f] 2018-06-30 10:24:18,632 INFO [Monitor Processor Lifecycle Thread-1] o.a.n.c.s.TimerDrivenSchedulingAgent Scheduled GetFTP[id=01641000-7d8e-102b-7a2f-83073ddb9d5f] to run with 1 threads 2018-06-30 10:24:19,029 INFO [Flow Service Tasks Thread-2] o.a.nifi.controller.StandardFlowService Saved flow controller org.apache.nifi.controller.FlowController@587ea2b // Another save pending = false"
... View more
06-30-2018
10:30 AM
I am getting issue while working with GetFTP, ListFTP in my cloud nifi server, the configuration that I had for GetFtp is working in local nifi server but the same configuration is not working in the cloud environment. Some other FTP servers are working but that specific FTP server is not working in the cloud. Other FTP server I have which are running in passive mode but that FTP server which I want to fetch data is running on Active connection mode. I checked with the command line of that particular cloud it is working and showing connectivity. For firewall side, I disabled the firewall option. I'm not able to figure out How I will fetch data from that FTP server, please someone help me, it's very urgent. I tried in my Logs in logs nothing is showing, no exception and errors, only GetFTP server id and some writAheadLog something is there nothing else. Log :- 2018-06-30 10:24:18,631 INFO [NiFi Web Server-18] o.a.n.c.s.StandardProcessScheduler Starting GetFTP[id=01641000-7d8e-102b-7a2f-83073ddb9d5f]
2018-06-30 10:24:18,632 INFO [Monitor Processor Lifecycle Thread-1] o.a.n.c.s.TimerDrivenSchedulingAgent Scheduled GetFTP[id=01641000-7d8e-102b-7a2f-83073ddb9d5f] to run with 1 threads
2018-06-30 10:24:19,029 INFO [Flow Service Tasks Thread-2] o.a.nifi.controller.StandardFlowService Saved flow controller org.apache.nifi.controller.FlowController@587ea2b // Another save pending = false Even I changed the nifi version also. Now I'm using nifi-1.7.0 where also it is not fetching that particular FTP server.
... View more
Labels:
- Labels:
-
Apache NiFi
06-28-2018
06:40 PM
I have used same but I am getting error in ListFTP processor that is java.lang.reflect.InvocationTargetException: null
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotations(ReflectionUtils.java:137)
at org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotations(ReflectionUtils.java:125)
at org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotations(ReflectionUtils.java:70)
at org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotation(ReflectionUtils.java:47)
at org.apache.nifi.controller.StandardProcessorNode.lambda$initiateStart$3(StandardProcessorNode.java:1455)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Failed to properly migrate state to State Manager
at org.apache.nifi.processor.util.list.AbstractListProcessor.updateState(AbstractListProcessor.java:251)
... 15 common frames omitted
... View more