Reply
Highlighted
New Contributor
Posts: 4
Registered: ‎07-27-2018
Accepted Solution

Flume seem to ignore parseAsFlumeEvent property in kafka channel

[ Edited ]

Hi

I'm trying to ingest data from kafka to hdfs using flume.

I'm trying to run the topology of using a kafka channel to read the data directly from a kafka topic and then using hdfs sink for writing the data.

 

This is the config I use:

tier1.channels = kafka_chan
tier1.sinks = sink1

tier1.channels.kafka_chan.type = org.apache.flume.channel.kafka.KafkaChannel
tier1.channels.kafka_chan.kafka.bootstrap.servers = broker:9092
tier1.channels.kafka_chan.kafka.consumer.group.id = flume2
tier1.channels.kafka_chan.kafka.topic = test_hdfs4
tier1.channels.kafka_chan.kafka.parseAsFlumeEvent = false

tier1.sinks.sink1.type = hdfs
tier1.sinks.sink1.hdfs.path = /user/user/flume/%{topic}/%y-%m-%d
tier1.sinks.sink1.hdfs.rollInterval = 3600
tier1.sinks.sink1.hdfs.rollSize = 0
tier1.sinks.sink1.hdfs.rollCount = 0
tier1.sinks.sink1.hdfs.round = true
tier1.sinks.sink1.hdfs.roundvalue = 60
tier1.sinks.sink1.hdfs.rountUnit = minutes
tier1.sinks.sink1.hdfs.useLocalTimeStamp = true
tier1.sinks.sink1.hdfs.filePrefix = %H%M%S
tier1.sinks.sink1.hdfs.fileType = DataStream
tier1.sinks.sink1.channel = kafka_chan

When this runs, no data is written and flume agent has the following excpetion:

Error while getting events from Kafka. This is usually caused by trying to read a non-flume event. Ensure the setting for parseAsFlumeEvent is correct
java.lang.IndexOutOfBoundsException
	at java.io.ByteArrayInputStream.read(ByteArrayInputStream.java:180)
	at org.apache.avro.io.DirectBinaryDecoder.doReadBytes(DirectBinaryDecoder.java:184)
	at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263)
	at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:201)
	at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:430)
	at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:422)
	at org.apache.avro.generic.GenericDatumReader.readMapKey(GenericDatumReader.java:335)
	at org.apache.avro.generic.GenericDatumReader.readMap(GenericDatumReader.java:321)
	at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:177)
	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152)
	at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:240)
	at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:230)
	at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:174)
	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152)
	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144)
	at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.deserializeValue(KafkaChannel.java:635)
	at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:497)
	at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
	at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
	at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:362)
	at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
	at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
	at java.lang.Thread.run(Thread.java:745)

I'm using cdh 5.14

 

Cloudera Employee
Posts: 234
Registered: ‎01-09-2014

Re: Flume seem to ignore parseAsFlumeEvent property in kafka channel

Try it without the kafka prefix.:
tier1.channels.kafka_chan.parseAsFlumeEvent = false

http://flume.apache.org/FlumeUserGuide.html#kafka-channel

-pd
New Contributor
Posts: 4
Registered: ‎07-27-2018

Re: Flume seem to ignore parseAsFlumeEvent property in kafka channel

[ Edited ]

Thanks! I missed that one!

 

Now I have another error.

The channel seem to work but I'm getting the following excpetions from the sink

 

Serializer = TEXT, UseRawLocalFileSystem = false
Creating /tmp/flume//18-07-27/2230.1532730600004.tmp
Closing /tmp/flume//18-07-27/2228.1532730540064.tmp
Renaming /tmp/flume/18-07-27/2228.1532730540064.tmp to /tmp/flume/18-07-27/2228.1532730540064
Writer callback called.
Closing /tmp/flume//18-07-27/2229.1532730540112.tmp
Renaming /tmp/flume/18-07-27/2229.1532730540112.tmp to /tmp/flume/18-07-27/2229.1532730540112
Writer callback called.
Serializer = TEXT, UseRawLocalFileSystem = false
Creating /tmp/flume//18-07-27/2231.1532730660001.tmp
Closing /tmp/flume//18-07-27/2230.1532730600004.tmp
Renaming /tmp/flume/18-07-27/2230.1532730600004.tmp to /tmp/flume/18-07-27/2230.1532730600004
Writer callback called.
DFSOutputStream ResponseProcessor exception  for block BP-1416623460-10.142.0.16-1528837910668:blk_1073858363_117614
java.io.EOFException: Premature EOF: no length prefix available
	at org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2305)
	at org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:235)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:1093)
Error Recovery for block BP-1416623460-10.142.0.16-1528837910668:blk_1073858363_117614 in pipeline DatanodeInfoWithStorage[10.142.0.17:50010,DS-a09de94a-ae3b-41ca-af60-752c323479a3,DISK], DatanodeInfoWithStorage[10.142.0.18:50010,DS-3bbb6e8a-99bf-4cb2-ab4b-7d167d7f0674,DISK], DatanodeInfoWithStorage[10.142.0.15:50010,DS-16420612-d63b-4239-ae17-38dde742d065,DISK]: bad datanode DatanodeInfoWithStorage[10.142.0.17:50010,DS-a09de94a-ae3b-41ca-af60-752c323479a3,DISK]
Block Under-replication detected. Rotating file.
DataStreamer Exception
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException): No lease on /tmp/flume/18-07-27/2231.1532730660001 (inode 964064): File is not open for writing. Holder DFSClient_NONMAPREDUCE_419858644_38 does not have any open files.
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3776)
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalDatanode(FSNamesystem.java:3679)
	at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getAdditionalDatanode(NameNodeRpcServer.java:726)
	at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.getAdditionalDatanode(AuthorizationProviderProxyClientProtocol.java:233)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getAdditionalDatanode(ClientNamenodeProtocolServerSideTranslatorPB.java:530)
	at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)
	at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073)
	at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2281)
	at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2277)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)
	at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2275)

	at org.apache.hadoop.ipc.Client.call(Client.java:1504)
	at org.apache.hadoop.ipc.Client.call(Client.java:1441)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230)
	at com.sun.proxy.$Proxy19.getAdditionalDatanode(Unknown Source)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getAdditionalDatanode(ClientNamenodeProtocolTranslatorPB.java:448)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:258)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104)
	at com.sun.proxy.$Proxy20.getAdditionalDatanode(Unknown Source)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.addDatanode2ExistingPipeline(DFSOutputStream.java:1364)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1559)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.processDatanodeError(DFSOutputStream.java:1254)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:739)
Error while syncing
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException): No lease on /tmp/flume/18-07-27/2231.1532730660001 (inode 964064): File is not open for writing. Holder DFSClient_NONMAPREDUCE_419858644_38 does not have any open files.
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3776)
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalDatanode(FSNamesystem.java:3679)
	at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getAdditionalDatanode(NameNodeRpcServer.java:726)
	at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.getAdditionalDatanode(AuthorizationProviderProxyClientProtocol.java:233)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getAdditionalDatanode(ClientNamenodeProtocolServerSideTranslatorPB.java:530)
	at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)
	at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073)
	at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2281)
	at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2277)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)
	at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2275)

	at org.apache.hadoop.ipc.Client.call(Client.java:1504)
	at org.apache.hadoop.ipc.Client.call(Client.java:1441)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230)
	at com.sun.proxy.$Proxy19.getAdditionalDatanode(Unknown Source)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getAdditionalDatanode(ClientNamenodeProtocolTranslatorPB.java:448)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:258)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104)
	at com.sun.proxy.$Proxy20.getAdditionalDatanode(Unknown Source)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.addDatanode2ExistingPipeline(DFSOutputStream.java:1364)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1559)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.processDatanodeError(DFSOutputStream.java:1254)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:739)
Error while trying to hflushOrSync!
pre-close flush failed
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException): No lease on /tmp/flume/18-07-27/2231.1532730660001 (inode 964064): File is not open for writing. Holder DFSClient_NONMAPREDUCE_419858644_38 does not have any open files.
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3776)
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalDatanode(FSNamesystem.java:3679)
	at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getAdditionalDatanode(NameNodeRpcServer.java:726)
	at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.getAdditionalDatanode(AuthorizationProviderProxyClientProtocol.java:233)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getAdditionalDatanode(ClientNamenodeProtocolServerSideTranslatorPB.java:530)
	at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)
	at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073)
	at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2281)
	at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2277)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)
	at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2275)

	at org.apache.hadoop.ipc.Client.call(Client.java:1504)
	at org.apache.hadoop.ipc.Client.call(Client.java:1441)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230)
	at com.sun.proxy.$Proxy19.getAdditionalDatanode(Unknown Source)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getAdditionalDatanode(ClientNamenodeProtocolTranslatorPB.java:448)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:258)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104)
	at com.sun.proxy.$Proxy20.getAdditionalDatanode(Unknown Source)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.addDatanode2ExistingPipeline(DFSOutputStream.java:1364)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1559)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.processDatanodeError(DFSOutputStream.java:1254)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:739)
Closing /tmp/flume//18-07-27/2231.1532730660001.tmp
Error while trying to hflushOrSync!
failed to close() HDFSWriter for file (/tmp/flume//18-07-27/2231.1532730660001.tmp). Exception follows.
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException): No lease on /tmp/flume/18-07-27/2231.1532730660001 (inode 964064): File is not open for writing. Holder DFSClient_NONMAPREDUCE_419858644_38 does not have any open files.
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3776)
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalDatanode(FSNamesystem.java:3679)
	at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getAdditionalDatanode(NameNodeRpcServer.java:726)
	at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.getAdditionalDatanode(AuthorizationProviderProxyClientProtocol.java:233)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getAdditionalDatanode(ClientNamenodeProtocolServerSideTranslatorPB.java:530)
	at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)
	at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073)
	at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2281)
	at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2277)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)
	at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2275)

	at org.apache.hadoop.ipc.Client.call(Client.java:1504)
	at org.apache.hadoop.ipc.Client.call(Client.java:1441)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230)
	at com.sun.proxy.$Proxy19.getAdditionalDatanode(Unknown Source)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getAdditionalDatanode(ClientNamenodeProtocolTranslatorPB.java:448)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:258)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104)
	at com.sun.proxy.$Proxy20.getAdditionalDatanode(Unknown Source)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.addDatanode2ExistingPipeline(DFSOutputStream.java:1364)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1559)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.processDatanodeError(DFSOutputStream.java:1254)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:739)

 

 

When I used the same hdfs sink config with kafka source -> memory channel -> hdfs sink topology it worked.

The only think I changed now is using kafka cannel - > hdfs sink.

 

The sink seems to be working. Data is written, but I wonder these errors mean? Might they imply dataloss?

 

Any idea?

Announcements