Member since
05-09-2019
5
Posts
0
Kudos Received
0
Solutions
07-23-2019
01:11 PM
In NiFi first I'm converting JSON to Avro and then from Avro to JSON. But while converting from Avro to JSON I'm getting exception. while converting from Avro to JSON I'm getting the below exception: 2019-07-23 12:48:04,043 ERROR [Timer-Driven Process Thread-9] o.a.n.processors.avro.ConvertAvroToJSON ConvertAvroToJSON[id=1db0939d-016c-1000-caa3-80d0993c3468] ConvertAvroToJSON[id=1db0939d-016c-1000-caa3-80d0993c3468] failed to process session due to org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -40; Processor Administratively Yielded for 1 sec: org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -40
org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -40
at org.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:336)
at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263)
at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:201)
at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:430)
at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:422)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152)
at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:240)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:230)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:174)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144)
at org.apache.nifi.processors.avro.ConvertAvroToJSON$1.process(ConvertAvroToJSON.java:161)
at org.apache.nifi.controller.repository.StandardProcessSession.write(StandardProcessSession.java:2887)
at org.apache.nifi.processors.avro.ConvertAvroToJSON.onTrigger(ConvertAvroToJSON.java:148)
at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1162)
at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:209)
at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748) avro_to_json_and_json_to_avro.xml The below is the flow image:
... View more
Labels:
05-26-2019
01:26 AM
I'm trying to integrate nifi REST API's with my application. So by mapping input and output from my application, I am trying to call nifi REST api for flow creation. So, in my use case most of the times I will extract the JSON values and will apply expression languages. So, for simplifying all the use-cases I am using evaluate JSONpath processor for fetching all attributes using jsonpath and apply expression language function on that in extract processor. Below is the flow diagram regarding that. Is it the right approach because for JSON to JSON manipulation having 30 keys this is the simplest way, and as I am trying to integrate nifi REST API's with my application I cannot generate JOLT transformation logic dynamically based on the user mapping. So, in this case, does the usage of evaluating JSONpath processor creates any performance issues for about 50 use case with different transformation logic because as I saw in documentation attribute usage creates performance(regarding memory) issues.
... View more
Labels:
05-22-2019
07:59 PM
In nifi I am trying to configure ssl, and authenticate over Knox and authorize with ranger. I am following https://community.hortonworks.com/articles/58009/hdf-20-enable-ssl-for-apache-nifi-from-ambari.html link for ssl configuration but it is not working. Nifi is working in standalone mode and installed through ambari. Below is the exception I am getting Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'clusterCoordinationProtocolSender' defined in class path resource [nifi-cluster-protocol-context.xml]: Cannot resolve reference to bean 'protocolSocketConfiguration' while setting constructor argument; nested exception is org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'protocolSocketConfiguration': FactoryBean threw exception on object creation; nested exception is java.security.UnrecoverableKeyException: Cannot recover key
at org.springframework.beans.factory.support.BeanDefinitionValueResolver.resolveReference(BeanDefinitionValueResolver.java:359)
at org.springframework.beans.factory.support.BeanDefinitionValueResolver.resolveValueIfNecessary(BeanDefinitionValueResolver.java:108)
at org.springframework.beans.factory.support.ConstructorResolver.resolveConstructorArguments(ConstructorResolver.java:648)
at org.springframework.beans.factory.support.ConstructorResolver.autowireConstructor(ConstructorResolver.java:145)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.autowireConstructor(AbstractAutowireCapableBeanFactory.java:1198)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1100)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:511)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:481)
at org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:312)
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:230)
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:308)
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:197)
at org.springframework.beans.factory.support.BeanDefinitionValueResolver.resolveReference(BeanDefinitionValueResolver.java:351)
... 76 common frames omitted
Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'protocolSocketConfiguration': FactoryBean threw exception on object creation; nested exception is java.security.UnrecoverableKeyException: Cannot recover key
at org.springframework.beans.factory.support.FactoryBeanRegistrySupport.doGetObjectFromFactoryBean(FactoryBeanRegistrySupport.java:185)
at org.springframework.beans.factory.support.FactoryBeanRegistrySupport.getObjectFromFactoryBean(FactoryBeanRegistrySupport.java:103)
at org.springframework.beans.factory.support.AbstractBeanFactory.getObjectForBeanInstance(AbstractBeanFactory.java:1640)
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:323)
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:197)
at org.springframework.beans.factory.support.BeanDefinitionValueResolver.resolveReference(BeanDefinitionValueResolver.java:351)
... 88 common frames omitted
Caused by: java.security.UnrecoverableKeyException: Cannot recover key
at sun.security.provider.KeyProtector.recover(KeyProtector.java:328)
at sun.security.provider.JavaKeyStore.engineGetKey(JavaKeyStore.java:146)
at sun.security.provider.JavaKeyStore$JKS.engineGetKey(JavaKeyStore.java:56)
at sun.security.provider.KeyStoreDelegator.engineGetKey(KeyStoreDelegator.java:96)
at sun.security.provider.JavaKeyStore$DualFormatJKS.engineGetKey(JavaKeyStore.java:70)
at java.security.KeyStore.getKey(KeyStore.java:1023)
at sun.security.ssl.SunX509KeyManagerImpl.<init>(SunX509KeyManagerImpl.java:133)
at sun.security.ssl.KeyManagerFactoryImpl$SunX509.engineInit(KeyManagerFactoryImpl.java:70)
at javax.net.ssl.KeyManagerFactory.init(KeyManagerFactory.java:256)
at org.apache.nifi.io.socket.SSLContextFactory.<init>(SSLContextFactory.java:70)
at org.apache.nifi.cluster.protocol.spring.SocketConfigurationFactoryBean.getObject(SocketConfigurationFactoryBean.java:45)
at org.apache.nifi.cluster.protocol.spring.SocketConfigurationFactoryBean.getObject(SocketConfigurationFactoryBean.java:30)
at org.springframework.beans.factory.support.FactoryBeanRegistrySupport.doGetObjectFromFactoryBean(FactoryBeanRegistrySupport.java:178)
... 93 common frames omitted
... View more
Labels:
05-11-2019
10:16 PM
In nifi, I am creating a custom processor which reads multiple row csv data and converts each row into a json and sends. Below is the custom processor code: package hwx.processors.demo;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@Tags({"example"})
@CapabilityDescription("Provide a description")
@SeeAlso({})
@ReadsAttributes({@ReadsAttribute(attribute="", description="")})
@WritesAttributes({@WritesAttribute(attribute="", description="")})
public class MyProcessor extends AbstractProcessor {
public static final PropertyDescriptor EXPECTED_JSON = new PropertyDescriptor
.Builder().name("EXPECTED_JSON")
.displayName("EXPECTED_JSON")
.description("EXPECTED_JSON")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor DELIMITER = new PropertyDescriptor
.Builder().name("DELIMITER")
.displayName("DELIMITER")
.description("DELIMITER For CSV")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final Relationship SUCCESS = new Relationship.Builder()
.name("SUCCESS")
.description("Example relationship")
.build();
private List<PropertyDescriptor> descriptors;
private Set<Relationship> relationships;
@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
descriptors.add(EXPECTED_JSON);
descriptors.add(DELIMITER);
this.descriptors = Collections.unmodifiableList(descriptors);
final Set<Relationship> relationships = new HashSet<Relationship>();
relationships.add(SUCCESS);
this.relationships = Collections.unmodifiableSet(relationships);
}
@Override
public Set<Relationship> getRelationships() {
return this.relationships;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
}
@OnScheduled
public void onScheduled(final ProcessContext context) {
}
final AtomicReference<String> value = new AtomicReference<>();
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if ( flowFile == null ) {
return;
}
//String r=context.getProperty("MY_PROPERTY").toString();
//Pattern pattern = Pattern.compile("\$(\d)+");
session.read(flowFile, in -> {
String r="";
try {
r= IOUtils.toString(in);
/*if(r.contains("$"))
{
r=r.replaceAll("\$", "\\\$");
}*/
}
catch (Exception e) {
getLogger().error(e.getMessage() + " exception.", e);
}
String[] test=r.split(context.getProperty("DELIMITER").toString());
for(String s1:test){
try{
String[] s=s1.replaceAll("\r\n", " ").split(" ");
String[] s2=s1.split(",");
Pattern pattern = Pattern.compile("\$(\d)+");
String text=context.getProperty("EXPECTED_JSON").toString();
Matcher m = pattern.matcher(text);
StringBuffer sb = new StringBuffer();
while (m.find()) {
if(Integer.parseInt(m.group(1))>s2.length)
{
m.appendReplacement(sb, "");
}
else{
m.appendReplacement(sb, s2[Integer.parseInt(m.group(1)) - 1]);
}
}
m.appendTail(sb);
value.set(sb.toString());
sendRecord(session,flowFile);
} catch (Exception e) {
getLogger().error(e.getMessage() + " Routing to failure.", e);
}
}
});
}
public void sendRecord(final ProcessSession session,FlowFile flowFile)
{
flowFile = session.write(flowFile, new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
out.write(value.get().getBytes());
}
});
session.transfer(flowFile,SUCCESS);
session.commit();
}
} Below is the custom processor configuration here in EXPECTED_JSON user will add JSON payload required and custom processor will replace $1 by first position element of csv record,$2 by second like that. So, each row of csv has to be converted as seperate json and send. EXPECTED_JSON payload:{"test1":"$1","test2":"$2","test3":"$3","test4":"$4"} Below is the exception from custom processor: 2019-05-10 19:40:40,239 ERROR [Timer-Driven Process Thread-10] hwx.processors.demo.MyProcessor MyProcessor[id=a211cdad-016a-1000-7505-c725dfdbe9da] MyProcessor[id=a211cdad-016a-1000-7505-c725dfdbe9da] failed to process session due to org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=9e2161a0-517a-4c5e-bee2-30a7f22f42f7,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1557497351911-1, container=default, section=1], offset=0, length=533],offset=0,name=9e2161a0-517a-4c5e-bee2-30a7f22f42f7,size=533] transfer relationship not specified; Processor Administratively Yielded for 1 sec: org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=9e2161a0-517a-4c5e-bee2-30a7f22f42f7,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1557497351911-1, container=default, section=1], offset=0, length=533],offset=0,name=9e2161a0-517a-4c5e-bee2-30a7f22f42f7,size=533] transfer relationship not specified
org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=9e2161a0-517a-4c5e-bee2-30a7f22f42f7,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1557497351911-1, container=default, section=1], offset=0, length=533],offset=0,name=9e2161a0-517a-4c5e-bee2-30a7f22f42f7,size=533] transfer relationship not specified
at org.apache.nifi.controller.repository.StandardProcessSession.checkpoint(StandardProcessSession.java:271)
at org.apache.nifi.controller.repository.StandardProcessSession.commit(StandardProcessSession.java:342)
at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:28)
at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1162)
at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:209)
at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2019-05-10 19:40:40,239 WARN [Timer-Driven Process Thread-10] o.a.n.controller.tasks.ConnectableTask Administratively Yielding MyProcessor[id=a211cdad-016a-1000-7505-c725dfdbe9da] due to uncaught Exception: org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=9e2161a0-517a-4c5e-bee2-30a7f22f42f7,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1557497351911-1, container=default, section=1], offset=0, length=533],offset=0,name=9e2161a0-517a-4c5e-bee2-30a7f22f42f7,size=533] transfer relationship not specified
org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=9e2161a0-517a-4c5e-bee2-30a7f22f42f7,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1557497351911-1, container=default, section=1], offset=0, length=533],offset=0,name=9e2161a0-517a-4c5e-bee2-30a7f22f42f7,size=533] transfer relationship not specified
at org.apache.nifi.controller.repository.StandardProcessSession.checkpoint(StandardProcessSession.java:271)
at org.apache.nifi.controller.repository.StandardProcessSession.commit(StandardProcessSession.java:342)
at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:28)
at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1162)
at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:209)
at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748) So, how to resolve the exception and send each csv record/row as seprate json.
... View more
Labels:
05-10-2019
03:39 PM
In NIFI I am trying to create a custom processor, SO I want to fetch all the properties entered by the user. So how can I create and fetch those properties in custom processor.
... View more
Labels: