Created on 07-27-2018 02:26 PM - edited 09-16-2022 06:31 AM
Hi
I'm trying to ingest data from kafka to hdfs using flume.
I'm trying to run the topology of using a kafka channel to read the data directly from a kafka topic and then using hdfs sink for writing the data.
This is the config I use:
tier1.channels = kafka_chan tier1.sinks = sink1 tier1.channels.kafka_chan.type = org.apache.flume.channel.kafka.KafkaChannel tier1.channels.kafka_chan.kafka.bootstrap.servers = broker:9092 tier1.channels.kafka_chan.kafka.consumer.group.id = flume2 tier1.channels.kafka_chan.kafka.topic = test_hdfs4 tier1.channels.kafka_chan.kafka.parseAsFlumeEvent = false tier1.sinks.sink1.type = hdfs tier1.sinks.sink1.hdfs.path = /user/user/flume/%{topic}/%y-%m-%d tier1.sinks.sink1.hdfs.rollInterval = 3600 tier1.sinks.sink1.hdfs.rollSize = 0 tier1.sinks.sink1.hdfs.rollCount = 0 tier1.sinks.sink1.hdfs.round = true tier1.sinks.sink1.hdfs.roundvalue = 60 tier1.sinks.sink1.hdfs.rountUnit = minutes tier1.sinks.sink1.hdfs.useLocalTimeStamp = true tier1.sinks.sink1.hdfs.filePrefix = %H%M%S tier1.sinks.sink1.hdfs.fileType = DataStream tier1.sinks.sink1.channel = kafka_chan
When this runs, no data is written and flume agent has the following excpetion:
Error while getting events from Kafka. This is usually caused by trying to read a non-flume event. Ensure the setting for parseAsFlumeEvent is correct java.lang.IndexOutOfBoundsException at java.io.ByteArrayInputStream.read(ByteArrayInputStream.java:180) at org.apache.avro.io.DirectBinaryDecoder.doReadBytes(DirectBinaryDecoder.java:184) at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263) at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:201) at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:430) at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:422) at org.apache.avro.generic.GenericDatumReader.readMapKey(GenericDatumReader.java:335) at org.apache.avro.generic.GenericDatumReader.readMap(GenericDatumReader.java:321) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:177) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152) at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:240) at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:230) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:174) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144) at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.deserializeValue(KafkaChannel.java:635) at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:497) at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113) at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95) at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:362) at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67) at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145) at java.lang.Thread.run(Thread.java:745)
I'm using cdh 5.14
Created 07-27-2018 02:58 PM
Created 07-27-2018 02:58 PM
Created on 07-27-2018 03:35 PM - edited 07-27-2018 03:45 PM
Thanks! I missed that one!
Now I have another error.
The channel seem to work but I'm getting the following excpetions from the sink
Serializer = TEXT, UseRawLocalFileSystem = false Creating /tmp/flume//18-07-27/2230.1532730600004.tmp Closing /tmp/flume//18-07-27/2228.1532730540064.tmp Renaming /tmp/flume/18-07-27/2228.1532730540064.tmp to /tmp/flume/18-07-27/2228.1532730540064 Writer callback called. Closing /tmp/flume//18-07-27/2229.1532730540112.tmp Renaming /tmp/flume/18-07-27/2229.1532730540112.tmp to /tmp/flume/18-07-27/2229.1532730540112 Writer callback called. Serializer = TEXT, UseRawLocalFileSystem = false Creating /tmp/flume//18-07-27/2231.1532730660001.tmp Closing /tmp/flume//18-07-27/2230.1532730600004.tmp Renaming /tmp/flume/18-07-27/2230.1532730600004.tmp to /tmp/flume/18-07-27/2230.1532730600004 Writer callback called. DFSOutputStream ResponseProcessor exception for block BP-1416623460-10.142.0.16-1528837910668:blk_1073858363_117614 java.io.EOFException: Premature EOF: no length prefix available at org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2305) at org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:235) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:1093) Error Recovery for block BP-1416623460-10.142.0.16-1528837910668:blk_1073858363_117614 in pipeline DatanodeInfoWithStorage[10.142.0.17:50010,DS-a09de94a-ae3b-41ca-af60-752c323479a3,DISK], DatanodeInfoWithStorage[10.142.0.18:50010,DS-3bbb6e8a-99bf-4cb2-ab4b-7d167d7f0674,DISK], DatanodeInfoWithStorage[10.142.0.15:50010,DS-16420612-d63b-4239-ae17-38dde742d065,DISK]: bad datanode DatanodeInfoWithStorage[10.142.0.17:50010,DS-a09de94a-ae3b-41ca-af60-752c323479a3,DISK] Block Under-replication detected. Rotating file. DataStreamer Exception org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException): No lease on /tmp/flume/18-07-27/2231.1532730660001 (inode 964064): File is not open for writing. Holder DFSClient_NONMAPREDUCE_419858644_38 does not have any open files. at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3776) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalDatanode(FSNamesystem.java:3679) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getAdditionalDatanode(NameNodeRpcServer.java:726) at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.getAdditionalDatanode(AuthorizationProviderProxyClientProtocol.java:233) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getAdditionalDatanode(ClientNamenodeProtocolServerSideTranslatorPB.java:530) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2281) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2277) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2275) at org.apache.hadoop.ipc.Client.call(Client.java:1504) at org.apache.hadoop.ipc.Client.call(Client.java:1441) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230) at com.sun.proxy.$Proxy19.getAdditionalDatanode(Unknown Source) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getAdditionalDatanode(ClientNamenodeProtocolTranslatorPB.java:448) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:258) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104) at com.sun.proxy.$Proxy20.getAdditionalDatanode(Unknown Source) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.addDatanode2ExistingPipeline(DFSOutputStream.java:1364) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1559) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.processDatanodeError(DFSOutputStream.java:1254) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:739) Error while syncing org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException): No lease on /tmp/flume/18-07-27/2231.1532730660001 (inode 964064): File is not open for writing. Holder DFSClient_NONMAPREDUCE_419858644_38 does not have any open files. at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3776) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalDatanode(FSNamesystem.java:3679) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getAdditionalDatanode(NameNodeRpcServer.java:726) at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.getAdditionalDatanode(AuthorizationProviderProxyClientProtocol.java:233) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getAdditionalDatanode(ClientNamenodeProtocolServerSideTranslatorPB.java:530) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2281) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2277) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2275) at org.apache.hadoop.ipc.Client.call(Client.java:1504) at org.apache.hadoop.ipc.Client.call(Client.java:1441) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230) at com.sun.proxy.$Proxy19.getAdditionalDatanode(Unknown Source) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getAdditionalDatanode(ClientNamenodeProtocolTranslatorPB.java:448) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:258) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104) at com.sun.proxy.$Proxy20.getAdditionalDatanode(Unknown Source) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.addDatanode2ExistingPipeline(DFSOutputStream.java:1364) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1559) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.processDatanodeError(DFSOutputStream.java:1254) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:739) Error while trying to hflushOrSync! pre-close flush failed org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException): No lease on /tmp/flume/18-07-27/2231.1532730660001 (inode 964064): File is not open for writing. Holder DFSClient_NONMAPREDUCE_419858644_38 does not have any open files. at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3776) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalDatanode(FSNamesystem.java:3679) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getAdditionalDatanode(NameNodeRpcServer.java:726) at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.getAdditionalDatanode(AuthorizationProviderProxyClientProtocol.java:233) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getAdditionalDatanode(ClientNamenodeProtocolServerSideTranslatorPB.java:530) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2281) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2277) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2275) at org.apache.hadoop.ipc.Client.call(Client.java:1504) at org.apache.hadoop.ipc.Client.call(Client.java:1441) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230) at com.sun.proxy.$Proxy19.getAdditionalDatanode(Unknown Source) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getAdditionalDatanode(ClientNamenodeProtocolTranslatorPB.java:448) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:258) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104) at com.sun.proxy.$Proxy20.getAdditionalDatanode(Unknown Source) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.addDatanode2ExistingPipeline(DFSOutputStream.java:1364) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1559) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.processDatanodeError(DFSOutputStream.java:1254) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:739) Closing /tmp/flume//18-07-27/2231.1532730660001.tmp Error while trying to hflushOrSync! failed to close() HDFSWriter for file (/tmp/flume//18-07-27/2231.1532730660001.tmp). Exception follows. org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException): No lease on /tmp/flume/18-07-27/2231.1532730660001 (inode 964064): File is not open for writing. Holder DFSClient_NONMAPREDUCE_419858644_38 does not have any open files. at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3776) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalDatanode(FSNamesystem.java:3679) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getAdditionalDatanode(NameNodeRpcServer.java:726) at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.getAdditionalDatanode(AuthorizationProviderProxyClientProtocol.java:233) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getAdditionalDatanode(ClientNamenodeProtocolServerSideTranslatorPB.java:530) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2281) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2277) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2275) at org.apache.hadoop.ipc.Client.call(Client.java:1504) at org.apache.hadoop.ipc.Client.call(Client.java:1441) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230) at com.sun.proxy.$Proxy19.getAdditionalDatanode(Unknown Source) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getAdditionalDatanode(ClientNamenodeProtocolTranslatorPB.java:448) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:258) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104) at com.sun.proxy.$Proxy20.getAdditionalDatanode(Unknown Source) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.addDatanode2ExistingPipeline(DFSOutputStream.java:1364) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1559) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.processDatanodeError(DFSOutputStream.java:1254) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:739)
When I used the same hdfs sink config with kafka source -> memory channel -> hdfs sink topology it worked.
The only think I changed now is using kafka cannel - > hdfs sink.
The sink seems to be working. Data is written, but I wonder these errors mean? Might they imply dataloss?
Any idea?