Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Log parsing and loading to Hive/Impala tables

Highlighted

Log parsing and loading to Hive/Impala tables

Expert Contributor

We got the data ingestion of raw apache access logs through Flume to HDFS. I'm looking for ways to parse the logs for various fields like timestamp, ip, query params, etc. and load the data into appropriate Hive/Impala tables.

 

Can all this be done as part of Flume?

 

Thanks!

16 REPLIES 16

Re: Log parsing and loading to Hive/Impala tables

Contributor

Hi!

 

Yes, you can use Flume for this use case. You might want to take a look at this example:

 

https://github.com/kite-sdk/kite-examples/tree/master/json

 

It's designed for JSON data, but the same principles will apply here. You can send the data to a suitable Flume source and then use Morphlines to parse the data into Avro. If you use the Kite DatasetSink, then you can easily target a Hive-backed dataset which will map to a Hive/Impala table.

 

Let me know if you need more details!

 

-Joey

Re: Log parsing and loading to Hive/Impala tables

Expert Contributor

While trying to parse the apache access log using morphlines interceptor in Flume, I'm running into OOM error. I've verified the grok regex to be working as expected using the online Grok Debugger.

 

Please let me know if I'm missing something in the morphlines. 

 

Here is the morphlines.conf:

commands : [

      {

        # Parse input attachment and emit a record for each input line

        readLine {

          charset : UTF-8

        }

      }

 

      {

        grok {

          # Consume the output record of the previous command and pipe another

          # record downstream.

          #

          # A grok-dictionary is a config file that contains prefabricated

          # regular expressions that can be referred to by name. grok patterns

          # specify such a regex name, plus an optional output field name.

          # The syntax is %{REGEX_NAME:OUTPUT_FIELD_NAME}

          # The input line is expected in the "message" input field.

          dictionaryFiles : [/path/to/grok-dictionaries]

          expressions : {

            message : """%{IP:ip} %{WORD+:geo} - \[%{HTTPDATE:date}\] \"(?:%{WORD:verb} %{URIPATHPARAM:request}(?: HTTP/%{NUMBER:httpversion})?|-)\" %{QS:referrer} %{QS: useragent}"""

          }

        }

      }

    ]

~~~~

Heres the interceptor in flume.conf:

 

agent.sources.kafkaSource.interceptors = morphlineinterceptor

agent.sources.kafkaSource.interceptors.morphlineinterceptor.type = org.apache.flume.sink.solr.morphline.MorphlineInterceptor$Builder

agent.sources.kafkaSource.interceptors.morphlineinterceptor.morphlineFile = /path/to/morphlines.conf

agent.sources.kafkaSource.interceptors.morphlineinterceptor.morphlineId = morphline1

 

~~~~~

 

Error:

 

java.lang.OutOfMemoryError: Java heap space

at java.util.Arrays.copyOfRange(Arrays.java:2694)

at java.lang.String.<init>(String.java:203)

at java.util.zip.ZipCoder.toString(ZipCoder.java:59)

at java.util.zip.ZipFile.getZipEntry(ZipFile.java:531)

at java.util.zip.ZipFile.access$900(ZipFile.java:56)

at java.util.zip.ZipFile$1.nextElement(ZipFile.java:513)

at java.util.zip.ZipFile$1.nextElement(ZipFile.java:483)

at java.util.jar.JarFile$1.nextElement(JarFile.java:243)

at java.util.jar.JarFile$1.nextElement(JarFile.java:238)

at org.kitesdk.morphline.shaded.com.google.common.reflect.ClassPath$Scanner.scanJar(ClassPath.java:341)

at org.kitesdk.morphline.shaded.com.google.common.reflect.ClassPath$Scanner.scanFrom(ClassPath.java:286)

at org.kitesdk.morphline.shaded.com.google.common.reflect.ClassPath$Scanner.scan(ClassPath.java:274)

at org.kitesdk.morphline.shaded.com.google.common.reflect.ClassPath.from(ClassPath.java:82)

at org.kitesdk.morphline.api.MorphlineContext.getTopLevelClasses(MorphlineContext.java:149)

at org.kitesdk.morphline.api.MorphlineContext.importCommandBuilders(MorphlineContext.java:91)

at org.kitesdk.morphline.stdlib.Pipe.<init>(Pipe.java:43)

at org.kitesdk.morphline.stdlib.PipeBuilder.build(PipeBuilder.java:40)

at org.kitesdk.morphline.base.Compiler.compile(Compiler.java:126)

at org.kitesdk.morphline.base.Compiler.compile(Compiler.java:55)

at org.apache.flume.sink.solr.morphline.MorphlineHandlerImpl.configure(MorphlineHandlerImpl.java:101)

at org.apache.flume.sink.solr.morphline.MorphlineInterceptor$LocalMorphlineInterceptor.<init>(MorphlineInterceptor.java:135)

at org.apache.flume.sink.solr.morphline.MorphlineInterceptor.<init>(MorphlineInterceptor.java:55)

at org.apache.flume.sink.solr.morphline.MorphlineInterceptor$Builder.build(MorphlineInterceptor.java:112)

at org.apache.flume.sink.solr.morphline.MorphlineInterceptor$Builder.build(MorphlineInterceptor.java:103)

at org.apache.flume.channel.ChannelProcessor.configureInterceptors(ChannelProcessor.java:111)

at org.apache.flume.channel.ChannelProcessor.configure(ChannelProcessor.java:80)

at org.apache.flume.conf.Configurables.configure(Configurables.java:41)

at org.apache.flume.node.AbstractConfigurationProvider.loadSources(AbstractConfigurationProvider.java:348)

at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:97)

at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140)

at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)

at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)

Re: Log parsing and loading to Hive/Impala tables

Expert Contributor

I was able to resolve the OOM errors by passing '-Xmx1024m' to flume-ng agent.

Re: Log parsing and loading to Hive/Impala tables

Contributor
Great! Let us know if you need any more help.

Re: Log parsing and loading to Hive/Impala tables

Expert Contributor

Without the morphile interceptor, I'm able to get the data to HDFS from Kafka through Flume but when I introduce the morphlines I get empty file generated in HDFS.

 

Here is the flume.conf and morphiles.conf that I'm currently using, is there some other config that I'm missing?

 

~~~~

 

agent.sources.kafkaSource.interceptors = morphlineinterceptor

agent.sources.kafkaSource.interceptors.morphlineinterceptor.type = org.apache.flume.sink.solr.morphline.MorphlineInterceptor$Builder

agent.sources.kafkaSource.interceptors.morphlineinterceptor.morphlineFile = /path/to/morphlines.conf

agent.sources.kafkaSource.interceptors.morphlineinterceptor.morphlineId = morphline1

 

agent.sinks.hdfsSink.type = hdfs

agent.sinks.hdfsSink.channel = memChannel

agent.sinks.hdfsSink.hdfs.path = /path/to/output

agent.sinks.hdfsSink.hdfs.filePrefix = access.log

agent.sinks.hdfsSink.hdfs.fileType = DataStream

agent.sinks.hdfsSink.hdfs.rollInterval = 60

agent.sinks.hdfsSink.hdfs.rollSize = 0

agent.sinks.hdfsSink.hdfs.rollCount = 10000

agent.sinks.hdfsSink.hdfs.batchSize = 1000

agent.sinks.hdfsSink.hdfs.retryInterval = 5

 

agent.channels.memChannel.type = memory

agent.channels.memChannel.capacity = 100000

agent.channels.memChannel.transactionCapacity = 1000

~~~~~

commands : [

      {

        # Parse input attachment and emit a record for each input line                

        readLine {

          charset : UTF-8

        }

      }

 

      {

        grok {

          # Consume the output record of the previous command and pipe another

          # record downstream.

          #

          # A grok-dictionary is a config file that contains prefabricated

          # regular expressions that can be referred to by name. grok patterns

          # specify such a regex name, plus an optional output field name.

          # The syntax is %{REGEX_NAME:OUTPUT_FIELD_NAME}

          # The input line is expected in the "message" input field.

          dictionaryFiles : [/home/hadoopdev/a8c/nosara/tracks-etl-flume/grok-dictionaries]

          expressions : {

            message : """%{IP:ip} %{WORD:geo} - \[%{HTTPDATE:date}\] \"(?:%{WORD:verb} %{URIPATHPARAM:request}(?: HTTP/%{NUMBER:httpversion})?|-)\" %{QS:referrer} %{QS:useragent}"""

          }

        }

      }

 

  { logDebug { format : "output record: {}", args : [ "@{}"] } }

    ]

~~~~

 

Also I've been trying to debug the morphlines and posted the question here:

http://community.cloudera.com/t5/Kite-SDK-includes-Morphlines/How-to-test-morphlines/m-p/22314#U2231...

 

Thanks!

Re: Log parsing and loading to Hive/Impala tables

Expert Contributor

I was able to view the output records and do see the expected output from the morphlines in the log. However Flume is still storing empty records in HDFS. Do we need to apply any other transformation in order for Flume HDFS sink to save these output records?

 

Thanks!

Re: Log parsing and loading to Hive/Impala tables

Contributor

The morphline file you posted will end up extracting values into new fields in the morphline record. Those fields are then turned into headers in the Flume event. You need to add an additional command to convert the morphline record into the body of the Flume event. Assuming you want to serialize the data to Avro, your best bet would be to add a toAvro command followed by a writeAvroToByteArray command to the end of your morphline config. That's what we do in the JSON example:

 

https://github.com/kite-sdk/kite-examples/blob/master/json/morphline.conf

 

Let me know if you need it in a some other format.

 

You may also consider using the DatasetSink if you do want Avro. That will let get features like partitioning for free.

Re: Log parsing and loading to Hive/Impala tables

Expert Contributor

Let me give more info on what the input records are and what I would like to achieve - the input records are the apache logs with events logged as query string eg: x.gif?action=view&prop1=val1&prop2=val2 where action takes a predefined set of values and associated properties are logged which may vary for each action. 

 

Ultimately I need to be able to query the data using Impala or Hive. I got morphlines to break the query string into individual components.

 

* I notice that the toAvro requires a schema but for a dynamic dataset, how does one define a schema?

* If I were to write certain type of records (with the corresponding Avro schema) into its own HDFS location, how can I achieve this in Flume?

 

Thanks!

 

 

Re: Log parsing and loading to Hive/Impala tables

Contributor

For dynamic data you need to convert the dynamic portion of the schema to an Avro map. That way you can handle whatever query variables and values are set. So your schema would have the static fields of the log and would parse out the dynamic portion into the map field.

 

I don't think Morphline's toAvro supports nested data, so you might need a custom morphlines command to do everything you want.

 

Once you convert the records to avro and then serialize the avro record to bytes, then you can easily write it HDFS using the DatasetSink (http://flume.apache.org/FlumeUserGuide.html#kite-dataset-sink-experimental). You'd start by creating the dataset using the Avro schema you write using the CLI:

 

kite-dataset create -s querylog.avsc logs

 

That will create a Hive/Impala table called logs for the data and you'd configure the sink with a URI such as:

 

kite.repo.uri=repo:hive

kite.dataset.name=logs

 

For this to work, you need to make sure that the Hive configuration directory and Hive jars are in the classpath. You can see an example of that in this Kite example:

 

https://github.com/kite-sdk/kite-examples/tree/master/logging

 

If you using CDH 5.2, then most of these steps are already done for you but I think you still need to add the Hive configuration directory to the FLUME_CLASSPATH.

 

You'll be able to query all of the data with Hive. Impala does not yet support nested data types, so you won't be able to query the map field from Impala but support for nested types is coming in a future release.

 

 

Don't have an account?
Coming from Hortonworks? Activate your account here