Community Articles

Find and share helpful community-sourced technical articles.
Announcements
Celebrating as our community reaches 100,000 members! Thank you!
Labels (1)
avatar
Rising Star

PROBLEM DESCRIPTION:

Flume agent configured without any sources fails to start in Ambari. However, a message in the service status log indicates that the flume agent has started successfully.

The following sample configuration file works in flume node if ran manually with flume-ng command. However, the same configuration fails with Ambari.

# Flume agent config
agent1.sinks = HdfsSink1
agent1.channels = channel1

agent1.channels.channel1.type=org.apache.flume.channel.kafka.KafkaChannel
agent1.channels.channel1.brokerList=node11.openstacklocal:6667
agent1.channels.channel1.kafka.topic=test
agent1.channels.channel1.zookeeperConnect=node11.openstacklocal:2181
agent1.channels.channel1.capacity=10000
agent1.channels.channel1.transactionCapacity=1000
agent1.channels.channel1.parseAsFlumeEvent=false
agent1.channels.channel1.kafka.consumer.group.id=test.hdfs-c

agent1.sinks.HdfsSink1.channel=channel1
agent1.sinks.HdfsSink1.hdfs.appendTimeout=10000
agent1.sinks.HdfsSink1.hdfs.batchSize=1000
agent1.sinks.HdfsSink1.hdfs.callTimeout=10000
agent1.sinks.HdfsSink1.hdfs.filePrefix=xrs-SegmentEventData
agent1.sinks.HdfsSink1.hdfs.fileSuffix=.avro
agent1.sinks.HdfsSink1.hdfs.fileType=DataStream
agent1.sinks.HdfsSink1.hdfs.maxOpenFiles=50
##agent1.sinks.HdfsSink1.hdfs.path=/data/%{topic}/%y-%m-%d 
agent1.sinks.HdfsSink1.hdfs.path=/tmp/%y-%m-%d
agent1.sinks.HdfsSink1.hdfs.rollCount=1000
agent1.sinks.HdfsSink1.hdfs.rollInterval=60
agent1.sinks.HdfsSink1.hdfs.rollSize=0
agent1.sinks.HdfsSink1.hdfs.rollTimerPoolSize=1
agent1.sinks.HdfsSink1.hdfs.threadsPoolSize=100
agent1.sinks.HdfsSink1.hdfs.txnEventMax=40000
agent1.sinks.HdfsSink1.hdfs.useLocalTimeStamp=true
agent1.sinks.HdfsSink1.hdfs.writeFormat=Text
agent1.sinks.HdfsSink1.type=hdfs

Ambari logs for service startup shows that command ran successfully

[..]
2016-11-09 09:25:11,411 - File['/etc/hadoop/conf/topology_script.py'] {'content': StaticFile('topology_script.py'),
 'only_if': 'test -d /etc/hadoop/conf', 'mode': 0755}
2016-11-09 09:25:11,715 - File['/var/run/flume/ambari-state.txt'] {'content': 'INSTALLED'}
2016-11-09 09:25:11,719 - Writing File['/var/run/flume/ambari-state.txt'] because contents don't match
2016-11-09 09:25:11,723 - Directory['/var/run/flume'] {'owner': 'flume', 'group': 'hadoop'}
2016-11-09 09:25:11,723 - Directory['/usr/hdp/current/flume-server/conf'] {'owner': 'flume', 'create_parents': True}
2016-11-09 09:25:11,724 - Directory['/var/log/flume'] {'owner': 'flume', 'group': 'hadoop', 'create_parents': True,
 'mode': 0755, 'cd_access': 'a'}
2016-11-09 09:25:11,726 - File['/var/run/flume/ambari-state.txt'] {'content': 'STARTED'}
2016-11-09 09:25:11,726 - Writing File['/var/run/flume/ambari-state.txt'] because contents don't match

Command completed successfully!

ROOT CAUSE:

This issue occurs because the flume.py script is designed to checks for the agent name by parsing the line containing "sources" definition in flume.conf. And it ignores the other parameters like channels and sinks.
# vim /var/lib/ambari-server/resources/common-services/FLUME/1.4.0.2.0/package/scripts/flume.py 

[..] 
def build_flume_topology(content): 

  result = {} 
  agent_names = [] 

  for line in content.split('\n'): 
    rline = line.strip() 
    if 0 != len(rline) and not rline.startswith('#'): 
      pair = rline.split('=') 
      lhs = pair[0].strip() 
      # workaround for properties that contain '=' 
      rhs = "=".join(pair[1:]).strip() 

      part0 = lhs.split('.')[0] 

      if lhs.endswith(".sources"): 
        agent_names.append(part0) 

      if not result.has_key(part0): 
        result[part0] = {} 

      result[part0][lhs] = rhs 

  # trim out non-agents 
  for k in result.keys(): 
    if not k in agent_names: 
      del result[k] 

  return result 
[..]

SOLUTION/WORKAROUND

Add a dummy source definition in the starting of the flume.conf. This will ensure that Ambari detects the flume agent and adds the same in the array of flume agents. Sample source definition:
# Flume agent config
agent1.sources = dummysource
agent1.sinks = HdfsSink1
agent1.channels = channel1 
994 Views
0 Kudos