Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Flume seem to ignore parseAsFlumeEvent property in kafka channel

avatar
Explorer

Hi

I'm trying to ingest data from kafka to hdfs using flume.

I'm trying to run the topology of using a kafka channel to read the data directly from a kafka topic and then using hdfs sink for writing the data.

 

This is the config I use:

tier1.channels = kafka_chan
tier1.sinks = sink1

tier1.channels.kafka_chan.type = org.apache.flume.channel.kafka.KafkaChannel
tier1.channels.kafka_chan.kafka.bootstrap.servers = broker:9092
tier1.channels.kafka_chan.kafka.consumer.group.id = flume2
tier1.channels.kafka_chan.kafka.topic = test_hdfs4
tier1.channels.kafka_chan.kafka.parseAsFlumeEvent = false

tier1.sinks.sink1.type = hdfs
tier1.sinks.sink1.hdfs.path = /user/user/flume/%{topic}/%y-%m-%d
tier1.sinks.sink1.hdfs.rollInterval = 3600
tier1.sinks.sink1.hdfs.rollSize = 0
tier1.sinks.sink1.hdfs.rollCount = 0
tier1.sinks.sink1.hdfs.round = true
tier1.sinks.sink1.hdfs.roundvalue = 60
tier1.sinks.sink1.hdfs.rountUnit = minutes
tier1.sinks.sink1.hdfs.useLocalTimeStamp = true
tier1.sinks.sink1.hdfs.filePrefix = %H%M%S
tier1.sinks.sink1.hdfs.fileType = DataStream
tier1.sinks.sink1.channel = kafka_chan

When this runs, no data is written and flume agent has the following excpetion:

Error while getting events from Kafka. This is usually caused by trying to read a non-flume event. Ensure the setting for parseAsFlumeEvent is correct
java.lang.IndexOutOfBoundsException
	at java.io.ByteArrayInputStream.read(ByteArrayInputStream.java:180)
	at org.apache.avro.io.DirectBinaryDecoder.doReadBytes(DirectBinaryDecoder.java:184)
	at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263)
	at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:201)
	at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:430)
	at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:422)
	at org.apache.avro.generic.GenericDatumReader.readMapKey(GenericDatumReader.java:335)
	at org.apache.avro.generic.GenericDatumReader.readMap(GenericDatumReader.java:321)
	at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:177)
	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152)
	at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:240)
	at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:230)
	at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:174)
	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152)
	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144)
	at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.deserializeValue(KafkaChannel.java:635)
	at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:497)
	at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
	at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
	at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:362)
	at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
	at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
	at java.lang.Thread.run(Thread.java:745)

I'm using cdh 5.14

 

1 ACCEPTED SOLUTION

avatar
Try it without the kafka prefix.:
tier1.channels.kafka_chan.parseAsFlumeEvent = false

http://flume.apache.org/FlumeUserGuide.html#kafka-channel

-pd

View solution in original post

2 REPLIES 2

avatar
Try it without the kafka prefix.:
tier1.channels.kafka_chan.parseAsFlumeEvent = false

http://flume.apache.org/FlumeUserGuide.html#kafka-channel

-pd

avatar
Explorer

Thanks! I missed that one!

 

Now I have another error.

The channel seem to work but I'm getting the following excpetions from the sink

 

Serializer = TEXT, UseRawLocalFileSystem = false
Creating /tmp/flume//18-07-27/2230.1532730600004.tmp
Closing /tmp/flume//18-07-27/2228.1532730540064.tmp
Renaming /tmp/flume/18-07-27/2228.1532730540064.tmp to /tmp/flume/18-07-27/2228.1532730540064
Writer callback called.
Closing /tmp/flume//18-07-27/2229.1532730540112.tmp
Renaming /tmp/flume/18-07-27/2229.1532730540112.tmp to /tmp/flume/18-07-27/2229.1532730540112
Writer callback called.
Serializer = TEXT, UseRawLocalFileSystem = false
Creating /tmp/flume//18-07-27/2231.1532730660001.tmp
Closing /tmp/flume//18-07-27/2230.1532730600004.tmp
Renaming /tmp/flume/18-07-27/2230.1532730600004.tmp to /tmp/flume/18-07-27/2230.1532730600004
Writer callback called.
DFSOutputStream ResponseProcessor exception  for block BP-1416623460-10.142.0.16-1528837910668:blk_1073858363_117614
java.io.EOFException: Premature EOF: no length prefix available
	at org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2305)
	at org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:235)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:1093)
Error Recovery for block BP-1416623460-10.142.0.16-1528837910668:blk_1073858363_117614 in pipeline DatanodeInfoWithStorage[10.142.0.17:50010,DS-a09de94a-ae3b-41ca-af60-752c323479a3,DISK], DatanodeInfoWithStorage[10.142.0.18:50010,DS-3bbb6e8a-99bf-4cb2-ab4b-7d167d7f0674,DISK], DatanodeInfoWithStorage[10.142.0.15:50010,DS-16420612-d63b-4239-ae17-38dde742d065,DISK]: bad datanode DatanodeInfoWithStorage[10.142.0.17:50010,DS-a09de94a-ae3b-41ca-af60-752c323479a3,DISK]
Block Under-replication detected. Rotating file.
DataStreamer Exception
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException): No lease on /tmp/flume/18-07-27/2231.1532730660001 (inode 964064): File is not open for writing. Holder DFSClient_NONMAPREDUCE_419858644_38 does not have any open files.
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3776)
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalDatanode(FSNamesystem.java:3679)
	at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getAdditionalDatanode(NameNodeRpcServer.java:726)
	at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.getAdditionalDatanode(AuthorizationProviderProxyClientProtocol.java:233)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getAdditionalDatanode(ClientNamenodeProtocolServerSideTranslatorPB.java:530)
	at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)
	at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073)
	at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2281)
	at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2277)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)
	at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2275)

	at org.apache.hadoop.ipc.Client.call(Client.java:1504)
	at org.apache.hadoop.ipc.Client.call(Client.java:1441)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230)
	at com.sun.proxy.$Proxy19.getAdditionalDatanode(Unknown Source)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getAdditionalDatanode(ClientNamenodeProtocolTranslatorPB.java:448)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:258)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104)
	at com.sun.proxy.$Proxy20.getAdditionalDatanode(Unknown Source)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.addDatanode2ExistingPipeline(DFSOutputStream.java:1364)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1559)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.processDatanodeError(DFSOutputStream.java:1254)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:739)
Error while syncing
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException): No lease on /tmp/flume/18-07-27/2231.1532730660001 (inode 964064): File is not open for writing. Holder DFSClient_NONMAPREDUCE_419858644_38 does not have any open files.
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3776)
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalDatanode(FSNamesystem.java:3679)
	at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getAdditionalDatanode(NameNodeRpcServer.java:726)
	at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.getAdditionalDatanode(AuthorizationProviderProxyClientProtocol.java:233)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getAdditionalDatanode(ClientNamenodeProtocolServerSideTranslatorPB.java:530)
	at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)
	at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073)
	at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2281)
	at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2277)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)
	at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2275)

	at org.apache.hadoop.ipc.Client.call(Client.java:1504)
	at org.apache.hadoop.ipc.Client.call(Client.java:1441)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230)
	at com.sun.proxy.$Proxy19.getAdditionalDatanode(Unknown Source)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getAdditionalDatanode(ClientNamenodeProtocolTranslatorPB.java:448)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:258)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104)
	at com.sun.proxy.$Proxy20.getAdditionalDatanode(Unknown Source)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.addDatanode2ExistingPipeline(DFSOutputStream.java:1364)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1559)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.processDatanodeError(DFSOutputStream.java:1254)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:739)
Error while trying to hflushOrSync!
pre-close flush failed
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException): No lease on /tmp/flume/18-07-27/2231.1532730660001 (inode 964064): File is not open for writing. Holder DFSClient_NONMAPREDUCE_419858644_38 does not have any open files.
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3776)
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalDatanode(FSNamesystem.java:3679)
	at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getAdditionalDatanode(NameNodeRpcServer.java:726)
	at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.getAdditionalDatanode(AuthorizationProviderProxyClientProtocol.java:233)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getAdditionalDatanode(ClientNamenodeProtocolServerSideTranslatorPB.java:530)
	at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)
	at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073)
	at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2281)
	at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2277)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)
	at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2275)

	at org.apache.hadoop.ipc.Client.call(Client.java:1504)
	at org.apache.hadoop.ipc.Client.call(Client.java:1441)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230)
	at com.sun.proxy.$Proxy19.getAdditionalDatanode(Unknown Source)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getAdditionalDatanode(ClientNamenodeProtocolTranslatorPB.java:448)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:258)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104)
	at com.sun.proxy.$Proxy20.getAdditionalDatanode(Unknown Source)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.addDatanode2ExistingPipeline(DFSOutputStream.java:1364)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1559)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.processDatanodeError(DFSOutputStream.java:1254)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:739)
Closing /tmp/flume//18-07-27/2231.1532730660001.tmp
Error while trying to hflushOrSync!
failed to close() HDFSWriter for file (/tmp/flume//18-07-27/2231.1532730660001.tmp). Exception follows.
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException): No lease on /tmp/flume/18-07-27/2231.1532730660001 (inode 964064): File is not open for writing. Holder DFSClient_NONMAPREDUCE_419858644_38 does not have any open files.
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3776)
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalDatanode(FSNamesystem.java:3679)
	at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getAdditionalDatanode(NameNodeRpcServer.java:726)
	at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.getAdditionalDatanode(AuthorizationProviderProxyClientProtocol.java:233)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getAdditionalDatanode(ClientNamenodeProtocolServerSideTranslatorPB.java:530)
	at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)
	at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073)
	at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2281)
	at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2277)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)
	at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2275)

	at org.apache.hadoop.ipc.Client.call(Client.java:1504)
	at org.apache.hadoop.ipc.Client.call(Client.java:1441)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230)
	at com.sun.proxy.$Proxy19.getAdditionalDatanode(Unknown Source)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getAdditionalDatanode(ClientNamenodeProtocolTranslatorPB.java:448)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:258)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104)
	at com.sun.proxy.$Proxy20.getAdditionalDatanode(Unknown Source)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.addDatanode2ExistingPipeline(DFSOutputStream.java:1364)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1559)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.processDatanodeError(DFSOutputStream.java:1254)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:739)

 

 

When I used the same hdfs sink config with kafka source -> memory channel -> hdfs sink topology it worked.

The only think I changed now is using kafka cannel - > hdfs sink.

 

The sink seems to be working. Data is written, but I wonder these errors mean? Might they imply dataloss?

 

Any idea?