Reply
New Contributor
Posts: 3
Registered: ‎05-03-2017

FLUME breaking json file into malformed events

Hello PFB my fluem conf.

 

I am using flume for ingetsing data to HDFS and also intercetptor to alter json data before indgestion.

Flume transfers json fiel into events ..but while parsing events via interceptor i ma getting malformed json exception as there rae multiple events and none of them are broken logically.

I want one event for single json file.

 

HDFS flume

SOURCE FLUME

localSource.sources = avro-collection-source
localSource.sinks = hdfs-sink
localSource.channels = mem-channel

localSource.sources.avro-collection-source.channels = mem-channel
localSource.sinks.hdfs-sink.channel = mem-channel
localSource.channels.mem-channel.type = memory

localSource.channels.mem-channel.capacity = 1000000
localSource.channels.mem-channel.transactionCapacity = 100000
localSource.sources.avro-collection-source.type = avro
localSource.sources.avro-collection-source.bind = 0.0.0.0
localSource.sources.avro-collection-source.interceptors=tweaker

localSource.sources.avro-collection-source.interceptors.tweaker.type=com.example.flume.interceptors.eventTweaker$Builder
localSource.sources.avro-collection-source.port = 41414

localSource.sinks.hdfs-sink.type = hdfs
localSource.sinks.hdfs-sink.hdfs.writeFormat = Text
localSource.sinks.hdfs-sink.hdfs.filePrefix =  help_now_prod




localSource.sinks.hdfs-sink.hdfs.rollSize = 3000000
localSource.sinks.hdfs-sink.hdfs.rollCount = 0


localSource.sinks.hdfs-sink.hdfs.batchSize = 10

localSource.sinks.hdfs-sink.hdfs.rollInterval = 0
localSource.sinks.hdfs-sink.hdfs.fileType = DataStream
localSource.sinks.hdfs-sink.hdfs.path = hdfs://localhost:9000/user/flume/HelpNowData




localflume.channels.ch1.type = memory

localflume.sources.spool.type = spooldir
localflume.sources.spool.channels = ch1
localflume.sources.spool.spoolDir = /app/logs/hdfs/datacenter/HelpNowLocal1
localflume.sources.spool.fileHeader = true
localflume.sources.spool.interceptors=tweaker
localflume.sources.spool.interceptors.tweaker.type=com.example.flume.interceptors.eventTweaker$Builder


localflume.sinks.fr1.type = file_roll
localflume.sinks.fr1.channel = ch1
localflume.sinks.fr1.rollInterval = 3600
localflume.sinks.fr1.batchSize = 1000
localflume.sinks.fr1.rollCount = 0

localflume.sinks.fr1.rollsize = 100000
localflume.sinks.fr1.sink.directory = /app/logs/hdfs/datacenter/HelpNowLocal2

localflume.channels = ch1
localflume.channels.ch1.capacity = 10000
localflume.channels.ch1.transactionCapacity =1000
localflume.sources = spool
localflume.sinks = fr1

 

 

Java class

package com.example.flume.interceptors;

import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.*;

import au.com.bytecode.opencsv.CSVReader;
import org.apache.commons.lang3.StringUtils;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import org.apache.log4j.Logger;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;





class eventTweaker implements Interceptor {

    private static final Logger LOG = Logger.getLogger(eventTweaker.class);
    List<String> dataCenterData=readCsv();
    // private means that only Builder can build me.
    private eventTweaker() {}
    String firstBody ="";
    private boolean flag =false;

    @Override
    public void initialize() {}



    @Override
    public Event intercept(Event event) {



            String body = new String(event.getBody());
            LOG.info("KYE BE "+body);
            JSONObject jsonObject = new JSONObject();

                try {
                     jsonObject = new JSONObject(body);

                   
                    String host_name_json = jsonObject.get("short_desscription").toString();
                    for(int i =0;i > dataCenterData.size();i++)
                    {
                        //int indexHost = StringUtils.indexOfAny(dataCenterData.toString(), host_name_json);
                        if(host_name_json.contains(dataCenterData.get(i)))
                        {
                            jsonObject.put("host_name", dataCenterData.get(i));
                            break;
                        }

                   
                    LOG.info("@@@@@@@@@@@@@@@ " + jsonObject.toString());
                    event.setBody(jsonObject.toString().getBytes());
                }
                catch (Exception e) {
                    LOG.error("exception  " + e.getStackTrace());
                    LOG.info("Received this log message that is not formatted in json: " + body + "\n");
                    //return event;
                }

                LOG.debug(new String(event.getBody()));
                //LOG.info(new String(event.getBody()));
                // System.exit(0);
                return event;
            }
       // }



    public List<String> readCsv() {


       // String strFile = "/app/logs/hdfs/datacenter/hostnames/hostnames.csv";
        String strFile = "/home/iot/flume_sink/hostnames.csv";
        List<String> dataCenterData =new ArrayList<String>();
        CSVReader reader = null;
        try {
            reader = new CSVReader(new FileReader(strFile));
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        }
        String[] nextLine;
        int lineNumber = 0;
        try {
            while ((nextLine = reader.readNext()) != null) {
                lineNumber++;
                System.out.println("Line # " + nextLine[2]);

                // nextLine[] is an array of values from the line
                dataCenterData.add(nextLine[2]);
            }
        } catch (IOException e) {
            LOG.error(e.getMessage());
        }
        dataCenterData = new ArrayList<String>(new LinkedHashSet<String>(dataCenterData));
        return dataCenterData;
    }


    @Override
    public List<Event> intercept(List<Event> events) {
       /* for (Event event:events) {
            intercept(event);
        }
        return events;*/
        List<Event> interceptedEvents =
                new ArrayList<Event>(events.size());
        LOG.info("^^^^^^^^^^^^^^ "+events.size());
        for (Event event : events) {
            // Intercept any event
            Event interceptedEvent = intercept(event);
            interceptedEvents.add(interceptedEvent);
        }
        //intercept(interceptedEvents);

        return interceptedEvents;
    }

    @Override
    public void close() {}

    public static class Builder implements Interceptor.Builder {

        @Override
        public Interceptor build() {
            return new eventTweaker();
        }

        @Override
        public void configure(Context context) {}
    }
}
Cloudera Employee
Posts: 175
Registered: ‎01-09-2014

Re: FLUME breaking json file into malformed events

Look into using the morphline Interceptor, you can use the readJson command if the events are formatted as json, and can manipulate the event or add metadata.

http://kitesdk.org/docs/current/morphlines/morphlines-reference-guide.html#readJson


-pd
New Contributor
Posts: 3
Registered: ‎05-03-2017

Re: FLUME breaking json file into malformed events

I am able to capture json events via current interceptor too, but issue arises when my source file has more than one json string.

Withe file more than one json string,the file while transferring gets converted into multiple events of say 2 or 3.Thes events when i try to manipulate them give me mlaformed json exception.

 

below

String body = new String(event.getBody());

when i try to get event body my json string is split into half and i gett exceptio.

I want to know how shoould i  modify my flume conf to get one file in as ingle event or how to get fully formed json events.

New Contributor
Posts: 3
Registered: ‎05-03-2017

Re: FLUME breaking json file into malformed events

I have json object in my source file which conatins multiple json strings as elements.
Announcements
New solutions