Created on 06-30-2017 03:00 PM
PROBLEM DESCRIPTION:
Flume agent configured without any sources fails to start in Ambari. However, a message in the service status log indicates that the flume agent has started successfully.
The following sample configuration file works in flume node if ran manually with flume-ng command. However, the same configuration fails with Ambari.
# Flume agent config
agent1.sinks = HdfsSink1
agent1.channels = channel1
agent1.channels.channel1.type=org.apache.flume.channel.kafka.KafkaChannel
agent1.channels.channel1.brokerList=node11.openstacklocal:6667
agent1.channels.channel1.kafka.topic=test
agent1.channels.channel1.zookeeperConnect=node11.openstacklocal:2181
agent1.channels.channel1.capacity=10000
agent1.channels.channel1.transactionCapacity=1000
agent1.channels.channel1.parseAsFlumeEvent=false
agent1.channels.channel1.kafka.consumer.group.id=test.hdfs-c
agent1.sinks.HdfsSink1.channel=channel1
agent1.sinks.HdfsSink1.hdfs.appendTimeout=10000
agent1.sinks.HdfsSink1.hdfs.batchSize=1000
agent1.sinks.HdfsSink1.hdfs.callTimeout=10000
agent1.sinks.HdfsSink1.hdfs.filePrefix=xrs-SegmentEventData
agent1.sinks.HdfsSink1.hdfs.fileSuffix=.avro
agent1.sinks.HdfsSink1.hdfs.fileType=DataStream
agent1.sinks.HdfsSink1.hdfs.maxOpenFiles=50
##agent1.sinks.HdfsSink1.hdfs.path=/data/%{topic}/%y-%m-%d
agent1.sinks.HdfsSink1.hdfs.path=/tmp/%y-%m-%d
agent1.sinks.HdfsSink1.hdfs.rollCount=1000
agent1.sinks.HdfsSink1.hdfs.rollInterval=60
agent1.sinks.HdfsSink1.hdfs.rollSize=0
agent1.sinks.HdfsSink1.hdfs.rollTimerPoolSize=1
agent1.sinks.HdfsSink1.hdfs.threadsPoolSize=100
agent1.sinks.HdfsSink1.hdfs.txnEventMax=40000
agent1.sinks.HdfsSink1.hdfs.useLocalTimeStamp=true
agent1.sinks.HdfsSink1.hdfs.writeFormat=Text
agent1.sinks.HdfsSink1.type=hdfsAmbari logs for service startup shows that command ran successfully.
[..]
2016-11-09 09:25:11,411 - File['/etc/hadoop/conf/topology_script.py'] {'content': StaticFile('topology_script.py'),
'only_if': 'test -d /etc/hadoop/conf', 'mode': 0755}
2016-11-09 09:25:11,715 - File['/var/run/flume/ambari-state.txt'] {'content': 'INSTALLED'}
2016-11-09 09:25:11,719 - Writing File['/var/run/flume/ambari-state.txt'] because contents don't match
2016-11-09 09:25:11,723 - Directory['/var/run/flume'] {'owner': 'flume', 'group': 'hadoop'}
2016-11-09 09:25:11,723 - Directory['/usr/hdp/current/flume-server/conf'] {'owner': 'flume', 'create_parents': True}
2016-11-09 09:25:11,724 - Directory['/var/log/flume'] {'owner': 'flume', 'group': 'hadoop', 'create_parents': True,
'mode': 0755, 'cd_access': 'a'}
2016-11-09 09:25:11,726 - File['/var/run/flume/ambari-state.txt'] {'content': 'STARTED'}
2016-11-09 09:25:11,726 - Writing File['/var/run/flume/ambari-state.txt'] because contents don't match
Command completed successfully!# vim /var/lib/ambari-server/resources/common-services/FLUME/1.4.0.2.0/package/scripts/flume.py
[..]
def build_flume_topology(content):
result = {}
agent_names = []
for line in content.split('\n'):
rline = line.strip()
if 0 != len(rline) and not rline.startswith('#'):
pair = rline.split('=')
lhs = pair[0].strip()
# workaround for properties that contain '='
rhs = "=".join(pair[1:]).strip()
part0 = lhs.split('.')[0]
if lhs.endswith(".sources"):
agent_names.append(part0)
if not result.has_key(part0):
result[part0] = {}
result[part0][lhs] = rhs
# trim out non-agents
for k in result.keys():
if not k in agent_names:
del result[k]
return result
[..]# Flume agent config agent1.sources = dummysource agent1.sinks = HdfsSink1 agent1.channels = channel1