New Contributor
Posts: 3
Registered: ‎05-03-2017

FLUME breaking json file into malformed events

Hello PFB my fluem conf.


I am using flume for ingetsing data to HDFS and also intercetptor to alter json data before indgestion.

Flume transfers json fiel into events ..but while parsing events via interceptor i ma getting malformed json exception as there rae multiple events and none of them are broken logically.

I want one event for single json file.


HDFS flume


localSource.sources = avro-collection-source
localSource.sinks = hdfs-sink
localSource.channels = mem-channel

localSource.sources.avro-collection-source.channels = mem-channel = mem-channel
localSource.channels.mem-channel.type = memory

localSource.channels.mem-channel.capacity = 1000000
localSource.channels.mem-channel.transactionCapacity = 100000
localSource.sources.avro-collection-source.type = avro
localSource.sources.avro-collection-source.bind =

localSource.sources.avro-collection-source.port = 41414

localSource.sinks.hdfs-sink.type = hdfs
localSource.sinks.hdfs-sink.hdfs.writeFormat = Text
localSource.sinks.hdfs-sink.hdfs.filePrefix =  help_now_prod

localSource.sinks.hdfs-sink.hdfs.rollSize = 3000000
localSource.sinks.hdfs-sink.hdfs.rollCount = 0

localSource.sinks.hdfs-sink.hdfs.batchSize = 10

localSource.sinks.hdfs-sink.hdfs.rollInterval = 0
localSource.sinks.hdfs-sink.hdfs.fileType = DataStream
localSource.sinks.hdfs-sink.hdfs.path = hdfs://localhost:9000/user/flume/HelpNowData

localflume.channels.ch1.type = memory

localflume.sources.spool.type = spooldir
localflume.sources.spool.channels = ch1
localflume.sources.spool.spoolDir = /app/logs/hdfs/datacenter/HelpNowLocal1
localflume.sources.spool.fileHeader = true

localflume.sinks.fr1.type = file_roll = ch1
localflume.sinks.fr1.rollInterval = 3600
localflume.sinks.fr1.batchSize = 1000
localflume.sinks.fr1.rollCount = 0

localflume.sinks.fr1.rollsize = 100000 = /app/logs/hdfs/datacenter/HelpNowLocal2

localflume.channels = ch1
localflume.channels.ch1.capacity = 10000
localflume.channels.ch1.transactionCapacity =1000
localflume.sources = spool
localflume.sinks = fr1



Java class

package com.example.flume.interceptors;

import java.util.*;

import org.apache.commons.lang3.StringUtils;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import org.apache.log4j.Logger;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;

class eventTweaker implements Interceptor {

    private static final Logger LOG = Logger.getLogger(eventTweaker.class);
    List<String> dataCenterData=readCsv();
    // private means that only Builder can build me.
    private eventTweaker() {}
    String firstBody ="";
    private boolean flag =false;

    public void initialize() {}

    public Event intercept(Event event) {

            String body = new String(event.getBody());
  "KYE BE "+body);
            JSONObject jsonObject = new JSONObject();

                try {
                     jsonObject = new JSONObject(body);

                    String host_name_json = jsonObject.get("short_desscription").toString();
                    for(int i =0;i > dataCenterData.size();i++)
                        //int indexHost = StringUtils.indexOfAny(dataCenterData.toString(), host_name_json);
                            jsonObject.put("host_name", dataCenterData.get(i));

          "@@@@@@@@@@@@@@@ " + jsonObject.toString());
                catch (Exception e) {
                    LOG.error("exception  " + e.getStackTrace());
          "Received this log message that is not formatted in json: " + body + "\n");
                    //return event;

                LOG.debug(new String(event.getBody()));
                // String(event.getBody()));
                // System.exit(0);
                return event;
       // }

    public List<String> readCsv() {

       // String strFile = "/app/logs/hdfs/datacenter/hostnames/hostnames.csv";
        String strFile = "/home/iot/flume_sink/hostnames.csv";
        List<String> dataCenterData =new ArrayList<String>();
        CSVReader reader = null;
        try {
            reader = new CSVReader(new FileReader(strFile));
        } catch (FileNotFoundException e) {
        String[] nextLine;
        int lineNumber = 0;
        try {
            while ((nextLine = reader.readNext()) != null) {
                System.out.println("Line # " + nextLine[2]);

                // nextLine[] is an array of values from the line
        } catch (IOException e) {
        dataCenterData = new ArrayList<String>(new LinkedHashSet<String>(dataCenterData));
        return dataCenterData;

    public List<Event> intercept(List<Event> events) {
       /* for (Event event:events) {
        return events;*/
        List<Event> interceptedEvents =
                new ArrayList<Event>(events.size());"^^^^^^^^^^^^^^ "+events.size());
        for (Event event : events) {
            // Intercept any event
            Event interceptedEvent = intercept(event);

        return interceptedEvents;

    public void close() {}

    public static class Builder implements Interceptor.Builder {

        public Interceptor build() {
            return new eventTweaker();

        public void configure(Context context) {}
Cloudera Employee
Posts: 198
Registered: ‎01-09-2014

Re: FLUME breaking json file into malformed events

Look into using the morphline Interceptor, you can use the readJson command if the events are formatted as json, and can manipulate the event or add metadata.

New Contributor
Posts: 3
Registered: ‎05-03-2017

Re: FLUME breaking json file into malformed events

I am able to capture json events via current interceptor too, but issue arises when my source file has more than one json string.

Withe file more than one json string,the file while transferring gets converted into multiple events of say 2 or 3.Thes events when i try to manipulate them give me mlaformed json exception.



String body = new String(event.getBody());

when i try to get event body my json string is split into half and i gett exceptio.

I want to know how shoould i  modify my flume conf to get one file in as ingle event or how to get fully formed json events.

New Contributor
Posts: 3
Registered: ‎05-03-2017

Re: FLUME breaking json file into malformed events

I have json object in my source file which conatins multiple json strings as elements.