Reply
Highlighted
New Contributor
Posts: 2
Registered: ‎08-27-2017

Flume is not writing into HDFS

[ Edited ]

The following configuration is consuming from Kafka (some sample messages in topic), but not writing into HDFS. There are no error messages in the log even with debug level.

The path to hdfs is : hdfs://quickstart.cloudera:8020/data/raw/%Y-%m-%d

The path /data/raw exists under hadoop with user hdfs. But I do not think, the data ever goes to sink and failed there. Data does not seem even consumed into channel and goes to sink...

 

 

Metrics are:

{"CHANNEL.mem-channel":{"EventPutSuccessCount":"0","Type":"CHANNEL","EventPutAttemptCount":"0","ChannelSize":"0","StopTime":"0","StartTime":"1522586558279","EventTakeSuccessCount":"0","ChannelCapacity":"100000","EventTakeAttemptCount":"70"},"SOURCE.DefaultSource":{"KafkaEventGetTimer":"0","Type":"SOURCE","EventAcceptedCount":"0","AppendReceivedCount":"0","EventReceivedCount":"0","KafkaCommitTimer":"0","KafkaEmptyCount":"0","OpenConnectionCount":"0","AppendBatchReceivedCount":"0","AppendBatchAcceptedCount":"0","StopTime":"0","StartTime":"1522586601952","AppendAcceptedCount":"0"},"SINK.DefaultSink":{"BatchCompleteCount":"0","ConnectionFailedCount":"0","EventDrainAttemptCount":"0","ConnectionCreatedCount":"0","Type":"SINK","BatchEmptyCount":"70","ConnectionClosedCount":"0","EventDrainSuccessCount":"0","StopTime":"0","StartTime":"1522586558465","BatchUnderflowCount":"0"}}

 

Configuration is:

 

HDFS Service: Flume (Service-Wide) --> HDFS

Agent Name: Default Agent

 

 

DefaultAgent.sources = DefaultSource
DefaultAgent.channels = mem-channel
DefaultAgent.sinks = DefaultSink

 

#SOURCE
DefaultAgent.sources.DefaultSource.type = org.apache.flume.source.kafka.KafkaSource
DefaultAgent.sources.DefaultSource.channels = mem-channel
DefaultAgent.sources.DefaultSource.kafka.bootstrap.servers = quickstart.cloudera:9092
DefaultAgent.sources.DefaultSource.batchDurationMillis = 2000
DefaultAgent.sources.DefaultSource.kafka.topics = JSONFeed2
DefaultAgent.sources.DefaultSource.kafka.consumer.group.id = DefaultHDFS
DefaultAgent.sources.DefaultSource.interceptors = i1
DefaultAgent.sources.DefaultSource.interceptors.i1.type = timestamp
DefaultAgent.sources.DefaultSource.kafka.consumer.timeout.ms = 100


#CHANNEL
DefaultAgent.channels.mem-channel.type = memory
DefaultAgent.channels.mem-channel.capacity = 100000
DefaultAgent.channels.mem-channel.transactionCapacity = 100


#SINK
DefaultAgent.sinks.DefaultSink.type= hdfs
DefaultAgent.sinks.DefaultSink.hdfs.path = hdfs://quickstart.cloudera:8020/data/raw/%Y-%m-%d
DefaultAgent.sinks.DefaultSink.channel = mem-channel
DefaultAgent.sinks.DefaultSink.hdfs.useLocalTimeStamp = true
DefaultAgent.sinks.DefaultSink.hdfs.batchSize = 100
DefaultAgent.sinks.DefaultSink.hdfs.rollCount = 0
DefaultAgent.sinks.DefaultSink.hdfs.rollSize = 10
DefaultAgent.sinks.DefaultSink.hdfs.rollInterval = 3600
DefaultAgent.sinks.DefaultSink.hdfs.idleTimeout = 60000
DefaultAgent.sinks.DefaultSink.hdfs.filePrefix = defaultSink
DefaultAgent.sinks.DefaultSink.hdfs.fileType = DataStream
DefaultAgent.sinks.DefaultSink.hdfs.writeFormat = Text
DefaultAgent.sinks.DefaultSink.hdfs.minBlockReplicas = 1

 

 

LOGS:

11:00:40.767 AM INFO FlumeConfiguration

Added sinks: DefaultSink Agent: DefaultAgent

11:00:40.767 AM INFO FlumeConfiguration

Processing:DefaultSink

11:00:40.767 AM INFO FlumeConfiguration

Processing:DefaultSink

11:00:40.780 AM INFO FlumeConfiguration

Post-validation flume configuration contains configuration for agents: [DefaultAgent]

11:00:40.780 AM INFO AbstractConfigurationProvider

Creating channels

11:00:40.787 AM INFO DefaultChannelFactory

Creating instance of channel mem-channel type memory

11:00:40.793 AM INFO AbstractConfigurationProvider

Created channel mem-channel

11:00:40.794 AM INFO DefaultSourceFactory

Creating instance of source DefaultSource, type org.apache.flume.source.kafka.KafkaSource

11:00:40.828 AM INFO DefaultSinkFactory

Creating instance of sink: DefaultSink, type: hdfs

11:00:40.841 AM INFO AbstractConfigurationProvider

Channel mem-channel connected to [DefaultSource, DefaultSink]

11:00:40.847 AM INFO Application

Starting new configuration:{ sourceRunners:{DefaultSource=PollableSourceRunner: { source:org.apache.flume.source.kafka.KafkaSource{name:DefaultSource,state:IDLE} counterGroup:{ name:null counters:{} } }} sinkRunners:{DefaultSink=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@31798517 counterGroup:{ name:null counters:{} } }} channels:{mem-channel=org.apache.flume.channel.MemoryChannel{name: mem-channel}} }

11:00:40.848 AM INFO Application

Starting Channel mem-channel

11:00:40.889 AM INFO MonitoredCounterGroup

Monitored counter group for type: CHANNEL, name: mem-channel: Successfully registered new MBean.

11:00:40.896 AM INFO MonitoredCounterGroup

Component type: CHANNEL, name: mem-channel started

11:00:40.896 AM INFO Application

Starting Sink DefaultSink

11:00:40.897 AM INFO MonitoredCounterGroup

Monitored counter group for type: SINK, name: DefaultSink: Successfully registered new MBean.

11:00:40.898 AM INFO MonitoredCounterGroup

Component type: SINK, name: DefaultSink started

11:00:40.920 AM INFO Application

Starting Source DefaultSource

11:00:40.921 AM INFO KafkaSource

Starting org.apache.flume.source.kafka.KafkaSource{name:DefaultSource,state:IDLE}...

11:00:41.008 AM INFO ConsumerConfig

ConsumerConfig values:
request.timeout.ms = 40000
check.crcs = true
retry.backoff.ms = 100
ssl.truststore.password = null
ssl.keymanager.algorithm = SunX509
receive.buffer.bytes = 65536
ssl.cipher.suites = null
ssl.key.password = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.provider = null
sasl.kerberos.service.name = null
session.timeout.ms = 30000
sasl.kerberos.ticket.renew.window.factor = 0.8
bootstrap.servers = [quickstart.cloudera:9092]
client.id =
fetch.max.wait.ms = 500
fetch.min.bytes = 1
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
sasl.kerberos.kinit.cmd = /usr/bin/kinit
auto.offset.reset = latest
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
ssl.endpoint.identification.algorithm = null
max.partition.fetch.bytes = 1048576
ssl.keystore.location = null
ssl.truststore.location = null
ssl.keystore.password = null
metrics.sample.window.ms = 30000
metadata.max.age.ms = 300000
security.protocol = PLAINTEXT
auto.commit.interval.ms = 5000
ssl.protocol = TLS
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
ssl.trustmanager.algorithm = PKIX
group.id = DefaultHDFS8
enable.auto.commit = false
metric.reporters = []
ssl.truststore.type = JKS
send.buffer.bytes = 131072
reconnect.backoff.ms = 50
metrics.num.samples = 2
ssl.keystore.type = JKS
heartbeat.interval.ms = 3000

11:00:41.019 AM INFO log

Logging to org.slf4j.impl.Log4jLoggerAdapter(org.mortbay.log) via org.mortbay.log.Slf4jLog

11:00:41.113 AM INFO log

jetty-6.1.26.cloudera.4

11:00:41.164 AM WARN ConsumerConfig

The configuration timeout.ms = 100 was supplied but isn't a known config.

11:00:41.181 AM INFO AppInfoParser

Kafka version : 0.9.0-kafka-2.0.2

11:00:41.181 AM INFO AppInfoParser

Kafka commitId : unknown

11:00:41.186 AM INFO log

Started SelectChannelConnector@0.0.0.0:41414

11:00:41.371 AM INFO AbstractCoordinator

Discovered coordinator quickstart.cloudera:9092 (id: 2147483614) for group DefaultHDFS8.

11:00:41.371 AM INFO ConsumerCoordinator

Revoking previously assigned partitions [] for group DefaultHDFS8

11:00:41.372 AM INFO AbstractCoordinator

(Re-)joining group DefaultHDFS8

11:00:44.423 AM INFO AbstractCoordinator

Successfully joined group DefaultHDFS8 with generation 1

11:00:44.424 AM INFO ConsumerCoordinator

Setting newly assigned partitions [JSONFeed2-0] for group DefaultHDFS8

11:00:44.424 AM INFO SourceRebalanceListener

topic JSONFeed2 - partition 0 assigned.

11:00:44.441 AM INFO KafkaSource

Kafka source DefaultSource started.

11:00:44.466 AM INFO MonitoredCounterGroup

Monitored counter group for type: SOURCE, name: DefaultSource: Successfully registered new MBean.

11:00:44.467 AM INFO MonitoredCounterGroup

Component type: SOURCE, name: DefaultSource started

 

 

Thanks

CK

Announcements