1973
Posts
1225
Kudos Received
124
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 2453 | 04-03-2024 06:39 AM | |
| 3785 | 01-12-2024 08:19 AM | |
| 2049 | 12-07-2023 01:49 PM | |
| 3026 | 08-02-2023 07:30 AM | |
| 4149 | 03-29-2023 01:22 PM |
06-08-2016
09:15 PM
@Ancil McBarnett is there an upgrade? HDP 2.4 was just mentioned as supported by EMC.
... View more
06-01-2016
01:54 PM
2 Kudos
I will add the NiFi flow information here tomorrow. For a rough draft, I wanted to show what it could do as it's pretty cool. cat all.txt| jq --raw-output '.["text"]' | syntaxnet/demo.sh From NiFi I collect a stream of Twitter data and send that to a file as JSON (all.txt). There are many ways to parse that, but I am a fan of the simple command line tool JQ which is an awesome tool to parse JSON from the command line and is available for MacOSX and Linux. So from the Twitter feed I just grab the tweet text to parse with Parsey. Initially I was going to install TensorFlow Syntaxnet (Parsey McParseface) on the HDP 2.4 Sandbox, but Centos 6 and TensorFlow do not play well. So for now the easiest route is to install HDF on a Mac and build Syntaxnet on your Mac. The install instructions are very detailed, but the build is very particular and very machine intensive. It's best to let the build run and go off and do something else with everything else shutdown (no Chrome, VM, editors, ...). After running McParseface, here are some results: Input: RT @ Data_Tactical : Scary and fascinating : The future of big data https : //t.co/uwHoV8E49N # bigdata # datanews # datascience # datacenter https : ...
Parse:
Data_Tactical JJ ROOT
+-- RT NNP nn
+-- @ IN nn
+-- : : punct
+-- Scary JJ dep
| +-- and CC cc
| +-- fascinating JJ conj
+-- future NN dep
| +-- The DT det
| +-- of IN prep
| +-- data NNS pobj
| +-- big JJ amod
+-- https ADD dep
+-- # $ dep
| +-- //t.co/uwHoV8E49N CD num
| +-- datanews NNS dep
| | +-- bigdata NNP nn
| | +-- # $ nn
| +-- # $ dep
| +-- datacenter NN dep
| | +-- # NN nn
| | +-- datascience NN nn
| +-- https ADD dep
+-- ... . punct
INFO:tensorflow:Read 4 documents
Input: u_t=11x^2u_xx+ -LRB- 11x+2t -RRB- u_x+-1u https : //t.co/NHXcebT9XC # trading # bigdata https : //t.co/vOM8S5Ewwq
Parse:
u_t=11x^2u_xx+ LS ROOT
+-- 11x+2t LS dep
| +-- -LRB- -LRB- punct
| +-- -RRB- -RRB- punct
+-- u_x+-1u CD dep
+-- https ADD dep
+-- : : punct
+-- //t.co/vOM8S5Ewwq CD dep
Input: RT @ weloveknowles : When Beyoncé thinks the song is over but the hive has other ideas https : //t.co/0noxKaYveO
Parse:
RT NNP ROOT
+-- @ IN prep
| +-- weloveknowles NNS pobj
+-- : : punct
+-- thinks VBZ dep
| +-- When WRB advmod
| +-- Beyoncé NNP nsubj
| +-- is VBZ ccomp
| | +-- song NN nsubj
| | | +-- the DT det
| | +-- over RB advmod
| +-- but CC cc
| +-- has VBZ conj
| +-- hive NN nsubj
| | +-- the DT det
| +-- ideas NNS dobj
| | +-- other JJ amod
| +-- https ADD advmod
+-- //t.co/0noxKaYveO ADD dep
Input: RT @ KirkDBorne : Enabling the # BigData Revolution -- An International # OpenData Roadmap : https : //t.co/e89xNNNkUe # Data4Good HT @ Devbd https : / ...
Parse:
RT NNP ROOT
+-- @ IN prep
| +-- KirkDBorne NNP pobj
+-- : : punct
+-- Enabling VBG dep
| +-- Revolution NNP dobj
| +-- the DT det
| +-- # $ nn
| +-- BigData NNP nn
| +-- -- : punct
| +-- Roadmap NNP dep
| | +-- An DT det
| | +-- International NNP nn
| | +-- OpenData NNP nn
| | +-- # NN nn
| +-- : : punct
| +-- https ADD dep
| +-- //t.co/e89xNNNkUe LS dep
| +-- @ NN dep
| +-- Data4Good CD nn
| | +-- # $ nn
| +-- HT FW nn
| +-- Devbd NNP dep
| +-- https ADD dep
| +-- : : punct
+-- / NFP punct
+-- ... . punct
Input: RT @ DanielleAlberti : It 's like 10 , 000 bees when all you need is a hive. https : //t.co/ElGLLbykN8
Parse:
RT NNP ROOT
+-- @ IN prep
| +-- DanielleAlberti NNP pobj
+-- : : punct
+-- 's VBZ dep
| +-- It PRP nsubj
| +-- like IN prep
| | +-- 10 CD pobj
| +-- , , punct
| +-- bees NNS appos
| +-- 000 CD num
| +-- https ADD rcmod
| +-- when WRB advmod
| +-- all DT nsubj
| | +-- need VBP rcmod
| | +-- you PRP nsubj
| +-- is VBZ cop
| +-- a DT det
| +-- hive. NN nn
+-- //t.co/ElGLLbykN8 ADD dep
I am going to wire this up to NiFi to drop these in HDFS for further data analysis in Zeppelin. The main problems are you need to have very specific versions of Python (2.7), Bazel (0.2.0 - 0.2.2b), Numpy, Protobuf, ASCIITree and others. Some of these don't play well with older versions of Centos. If you are on a clean Mac or Ubuntu, things should go smooth. My CentOS was missing a bunch of libraries so I tried to install them: sudo yum -y install swigpip install -U
protobuf==3.0.0b2 pip install asciitreepip install numpyPip install noseWget https://github.com/bazelbuild/bazel/releases/download/0.2.2b/bazel-0.2.2b-installer-linux-x86_64.shsudo yum -y install
libstdc++ ./configuresudo yum -y install
pkg-config zip g++ zlib1g-dev unzipcd ..
bazel test syntaxnet/... util/utf8/...# On Mac, run the following:
bazel test --linkopt=-headerpad_max_install_names \
syntaxnet/... util/utf8/…cat /etc/redhat-releaseCentOS release 6.7
(Final)sudo yum -y install
glibcsudo yum
-y install epel_release
sudo yum -y install gcc gcc-c++ python-pip python-devel atlas atlas-devel
gcc-gfortran openssl-devel libffi-devel
pip install --upgrade virtualenv
virtualenv --system-site-packages ~/venvs/tensorflow
source
~/venvs/tensorflow/bin/activate
pip install --upgrade numpy scipy wheel cryptography #optional
pip install --upgrade https://storage.googleapis.com/tensorflow/linux/cpu/tensorflow-0.8.0-cp27-none-linux_x86_64.whl
# or below if you want gpu, support, but cuda and cudnn are required, see docs
for more install instructions
pip install --upgrade https://storage.googleapis.com/tensorflow/linux/gpu/tensorflow-0.8.0-cp27-none-linux_x86_64.whlsudo yum -y install
python-numpy swig python-devsudo yum -y upgradeyum install python27 It's worth a try for the patient or people with newer CentOS. Your mileage may vary! References: http://googleresearch.blogspot.com/2016/05/announcing-syntaxnet-worlds-most.html http://arxiv.org/abs/1603.06042 https://www.cis.upenn.edu/~treebank/ http://googleresearch.blogspot.com/2011/03/building-resources-to-syntactically.html https://github.com/tensorflow/models/tree/master/syntaxnet http://hoolihan.net/blog-tim/2016/03/02/installing-tensorflow-on-centos/ https://github.com/tensorflow/models/tree/master/syntaxnet#getting-started http://googleresearch.blogspot.com/2016/05/announcing-syntaxnet-worlds-most.html https://dmngaya.com/2015/10/25/installing-python-2-7-on-centos-6-7/
... View more
Labels:
05-26-2016
09:35 PM
2 Kudos
Alluxio is available to install on HDP and works with your existing HDFS.
http://www.alluxio.org/documentation/en/Configuring-Alluxio-with-HDFS.html
http://www.alluxio.com/2016/04/getting-started-with-alluxio-and-spark/
To use on HDP, you need to edit the /opt/alluxio-1.0.1/conf/alluxio-env.sh after you create it and change the port from 9000 to 8020.
You can also override at the command line:
export
ALLUXIO_UNDERFS_ADDRESS=hdfs://localhost:8020
To Setup Alluxio. First download it.
bin/alluxio format[root@sandbox
alluxio-1.0.1]# bin/alluxio formatConnecting to
localhost as root...Warning: Permanently
added 'localhost' (RSA) to the list of known hosts.Formatting Alluxio
Worker @ sandbox.hortonworks.comConnection to
localhost closed.Formatting Alluxio
Master @ localhost[root@sandbox
alluxio-1.0.1]# bin/alluxio-start.sh local
Killed 0 processes
on sandbox.hortonworks.comKilled 0 processes
on sandbox.hortonworks.comConnecting to
localhost as root...Killed 0 processes
on sandbox.hortonworks.comConnection to
localhost closed.Formatting RamFS:
/mnt/ramdisk (1gb)Starting master @
localhost. Logging to /opt/alluxio-1.0.1/logsStarting worker @
sandbox.hortonworks.com. Logging to /opt/alluxio-1.0.1/logs
==>
/opt/alluxio-1.0.1/logs/user.log <==2016-05-25
17:22:45,622 INFO logger.type
(Format.java:formatFolder) - Formatting
JOURNAL_FOLDER:/opt/alluxio-1.0.1/journal/2016-05-25
17:22:45,656 INFO logger.type
(Format.java:formatFolder) - Formatting
BlockMaster_JOURNAL_FOLDER:/opt/alluxio-1.0.1/journal/BlockMaster2016-05-25
17:22:45,657 INFO logger.type
(Format.java:formatFolder) - Formatting
FileSystemMaster_JOURNAL_FOLDER:/opt/alluxio-1.0.1/journal/FileSystemMaster2016-05-25
17:22:45,659 INFO logger.type
(Format.java:formatFolder) - Formatting
LineageMaster_JOURNAL_FOLDER:/opt/alluxio-1.0.1/journal/LineageMaster
==>
/opt/alluxio-1.0.1/logs/worker.log <==2016-05-26
20:46:19,189 INFO
server.AbstractConnector (AbstractConnector.java:doStart) - Started
SelectChannelConnector@0.0.0.0:300002016-05-26
20:46:19,189 INFO logger.type
(UIWebServer.java:startWebServer) - Alluxio Worker Web service started @
0.0.0.0/0.0.0.0:300002016-05-26
20:46:19,191 INFO logger.type
(AbstractClient.java:connect) - Alluxio client (version 1.0.1) is trying to
connect with BlockMasterWorker master @ sandbox.hortonworks.com/10.0.2.15:199982016-05-26
20:46:19,199 INFO logger.type
(AbstractClient.java:connect) - Client registered with BlockMasterWorker master
@ sandbox.hortonworks.com/10.0.2.15:199982016-05-26
20:46:19,312 INFO logger.type
(AlluxioWorker.java:start) - Started worker with id 12016-05-26
20:46:19,312 INFO logger.type
(AlluxioWorker.java:start) - Alluxio Worker version 1.0.1 started @
sandbox.hortonworks.com/10.0.2.15:299982016-05-26
20:46:20,311 INFO logger.type
(AbstractClient.java:connect) - Alluxio client (version 1.0.1) is trying to
connect with FileSystemMasterWorker master @
sandbox.hortonworks.com/10.0.2.15:199982016-05-26
20:46:20,311 INFO logger.type
(AbstractClient.java:connect) - Client registered with FileSystemMasterWorker
master @ sandbox.hortonworks.com/10.0.2.15:199982016-05-26
20:46:20,313 INFO logger.type
(AbstractClient.java:connect) - Alluxio client (version 1.0.1) is trying to
connect with FileSystemMasterWorker master @
sandbox.hortonworks.com/10.0.2.15:199982016-05-26
20:46:20,314 INFO logger.type
(AbstractClient.java:connect) - Client registered with FileSystemMasterWorker
master @ sandbox.hortonworks.com/10.0.2.15:19998
To validate your install, run the tests:
./bin/alluxio runTests
All tests passed
You can view from
the command line (
http://www.alluxio.org/documentation/en/Command-Line-Interface.html ) [root@sandbox
alluxio-1.0.1]# ./bin/alluxio fs ls /default_tests_files
80.00B 05-26-2016 20:49:01:243 In Memory
/default_tests_files/BasicFile_CACHE_PROMOTE_MUST_CACHE
84.00B 05-26-2016 20:49:02:877 In Memory
/default_tests_files/BasicNonByteBuffer_CACHE_PROMOTE_MUST_CACHE
80.00B 05-26-2016 20:49:04:432 In Memory
/default_tests_files/BasicFile_CACHE_PROMOTE_CACHE_THROUGH
84.00B 05-26-2016 20:49:08:236 In Memory
/default_tests_files/BasicNonByteBuffer_CACHE_PROMOTE_CACHE_THROUGH
80.00B 05-26-2016 20:49:12:342 In Memory
/default_tests_files/BasicFile_CACHE_PROMOTE_THROUGH
84.00B 05-26-2016 20:49:16:392 In Memory
/default_tests_files/BasicNonByteBuffer_CACHE_PROMOTE_THROUGH
80.00B 05-26-2016 20:49:20:851 In Memory
/default_tests_files/BasicFile_CACHE_PROMOTE_ASYNC_THROUGH
84.00B 05-26-2016 20:49:23:190 In Memory
/default_tests_files/BasicNonByteBuffer_CACHE_PROMOTE_ASYNC_THROUGH
80.00B 05-26-2016 20:49:25:152 In Memory
/default_tests_files/BasicFile_CACHE_MUST_CACHE
84.00B 05-26-2016 20:49:26:975 In Memory
/default_tests_files/BasicNonByteBuffer_CACHE_MUST_CACHE
80.00B 05-26-2016 20:49:28:595 In Memory
/default_tests_files/BasicFile_CACHE_CACHE_THROUGH
84.00B 05-26-2016 20:49:32:375 In Memory
/default_tests_files/BasicNonByteBuffer_CACHE_CACHE_THROUGH
80.00B 05-26-2016 20:49:36:505 In Memory
/default_tests_files/BasicFile_CACHE_THROUGH
84.00B 05-26-2016 20:49:40:823 In Memory
/default_tests_files/BasicNonByteBuffer_CACHE_THROUGH
80.00B 05-26-2016 20:49:44:827 In Memory
/default_tests_files/BasicFile_CACHE_ASYNC_THROUGH
84.00B 05-26-2016 20:49:47:248 In Memory
/default_tests_files/BasicNonByteBuffer_CACHE_ASYNC_THROUGH
80.00B 05-26-2016 20:49:49:614 In Memory
/default_tests_files/BasicFile_NO_CACHE_MUST_CACHE
84.00B 05-26-2016 20:49:52:384 In Memory
/default_tests_files/BasicNonByteBuffer_NO_CACHE_MUST_CACHE
80.00B 05-26-2016 20:49:55:107 In Memory
/default_tests_files/BasicFile_NO_CACHE_CACHE_THROUGH
84.00B 05-26-2016 20:49:59:675 In Memory
/default_tests_files/BasicNonByteBuffer_NO_CACHE_CACHE_THROUGH
80.00B 05-26-2016 20:50:03:639 Not In Memory
/default_tests_files/BasicFile_NO_CACHE_THROUGH
84.00B 05-26-2016 20:50:07:425 Not In Memory
/default_tests_files/BasicNonByteBuffer_NO_CACHE_THROUGH
80.00B 05-26-2016 20:50:11:384 In Memory
/default_tests_files/BasicFile_NO_CACHE_ASYNC_THROUGH
84.00B 05-26-2016 20:50:13:310 In Memory
/default_tests_files/BasicNonByteBuffer_NO_CACHE_ASYNC_THROUGH
You can access these
files from Spark and Flink. Alluxio has
configurable storage tiers (memory, HHD, SSD) and can sit on top of HDFS.
To Browse The Alluxio File System and Also View Metrics
http://localhost:19999/home
References:
Presentation on
Alluxio (formerely Tachyon)
http://www.slideshare.net/TachyonNexus/tachyon-presentation-at-ampcamp-6-november-2015
Unified Name Space
http://www.alluxio.com/2016/04/unified-namespace-allowing-applications-to-access-data-anywhere/
Getting Started with
Alluxio and Spark
http://www.alluxio.com/2016/04/getting-started-with-alluxio-and-spark/
... View more
Labels:
05-25-2016
04:30 PM
1 Kudo
Twitter
has opened source another real-time, distributed, fault-tolerant stream
processing engine called Heron. They
see as the successor for Storm. It is
backwards compatible with Storm's topology API. First I followed the getting started guide. Downloading and installing on MacOsx. Downloads ./heron-client-install-0.14.0-darwin.sh --user
Heron client installer
----------------------
Uncompressing......
Heron is now installed!
Make sure you have "/usr/local/bin" in your path.
See http://heronstreaming.io/docs/getting-started.html for how to use Heron.
heron.build.version : 0.14.0
heron.build.time : Tue May 24 22:44:01 PDT 2016
heron.build.timestamp : 1464155053000
heron.build.host : tw-mbp-kramasamy
heron.build.user : kramasamy
heron.build.git.revision : be87b09f348e0ed05f45503340a2245a4ef68a35
heron.build.git.status : Clean
➜ Downloads export PATH=$PATH::/usr/local/bin
➜ Downloads ./heron-tools-install-0.14.0-darwin.sh --user
Heron tools installer
---------------------
Uncompressing......
Heron Tools is now installed!
Make sure you have "/usr/local/bin" in your path.
See http://heronstreaming.io/docs/getting-started.html for how to use Heron.
heron.build.version : 0.14.0
heron.build.time : Tue May 24 22:44:01 PDT 2016
heron.build.timestamp : 1464155053000
heron.build.host : tw-mbp-kramasamy
heron.build.user : kramasamy
heron.build.git.revision : be87b09f348e0ed05f45503340a2245a4ef68a35
heron.build.git.status : Clean
http://twitter.github.io/heron/docs/getting-started/ Run the example to make sure everything is installed heron submit local ~/.heron/examples/heron-examples.jar com.twitter.heron.examples.ExclamationTopology ExclamationTopology
[2016-05-25 16:16:32 -0400] com.twitter.heron.scheduler.local.LocalLauncher INFO: For checking the status and logs of the topology, use the working directory /Users/tspann/.herondata/topologies/local/tspann/ExclamationTopology
INFO: Topology 'ExclamationTopology' launched successfully
INFO: Elapsed time: 4.722s.
heron activate local ExclamationTopology
[2016-05-25 16:19:38 -0400] com.twitter.heron.spi.utils.TMasterUtils SEVERE: Topology is already activateed
INFO: Successfully activated topology 'ExclamationTopology'
INFO: Elapsed time: 2.739s.
heron activate local ExclamationTopology
[2016-05-25 16:19:38 -0400] com.twitter.heron.spi.utils.TMasterUtils SEVERE: Topology is already activateed
INFO: Successfully activated topology 'ExclamationTopology'
INFO: Elapsed time: 2.739s.
Run the UI sudo heron-ui
25 May 2016 16:20:31-INFO:main.py:101: Listening at http://192.168.1.5:8889
25 May 2016 16:20:31-INFO:main.py:102: Using tracker url: http://localhost:8888
To not step on HDP ports, I change the port sudo heron-tracker --port 8881
25 May 2016 16:24:14-INFO:main.py:183: Running on port: 8881
25 May 2016 16:24:14-INFO:main.py:184: Using config file: /usr/local/herontools/conf/heron_tracker.yaml
Look at the heron website: http://localhost:8881/topologies {"status": "success", "executiontime": 4.291534423828125e-05, "message": "", "version": "1.0.0", "result": {}} Let's run the UI: sudo heron-ui --port 8882 --tracker_url http://localhost:8881
25 May 2016 16:28:53-INFO:main.py:101: Listening at http://192.168.1.5:8882
25 May 2016 16:28:53-INFO:main.py:102: Using tracker url: http://localhost:8881
Look at the Heron Cluster http://localhost:8881/clusters
{"status": "success", "executiontime": 1.9073486328125e-05, "message": "",
"version": "1.0.0", "result": ["localzk", "local"]} Using Heron CLI heron
usage: heron <command> <options> ...
Available commands:
activate Activate a topology
deactivate Deactivate a topology
help Prints help for commands
kill Kill a topology
restart Restart a topology
submit Submit a topology
version Print version of heron-cli
Getting more help:
heron help <command> Prints help and options for <command>
For detailed documentation, go to http://heronstreaming.io
If you need to restart a topology: heron restart local ExclamationTopology
INFO: Successfully restarted topology 'ExclamationTopology'
INFO: Elapsed time: 3.928s. Look at my topology http://localhost:8881/topologies#/all/all/ExclamationTopology
{
"status": "success", "executiontime": 7.104873657226562e-05, "message": "",
"version": "1.0.0",
"result": {"local": {"default": ["ExclamationTopology"]}}
} Adding --verbose will add a ton of debug logs. Attached are some screen shots. The Heron UI is decent. I am hoping Heron screens will be integrated into Ambari.
... View more
Labels:
05-22-2016
08:29 PM
1 Kudo
Create a Hive Table as ORC File through Spark SQL in Zeppelin. %sql
create table default.logs_orc_table (clientIp STRING, clientIdentity STRING, user STRING, dateTime STRING, request STRING, statusCode INT, bytesSent FLOAT, referer STRING, userAgent STRING) stored as orc Load data from a DataFrame into this table: %sql
insert into table default.logs_orc_table select t.* from accessLogsDF t
I can create a table in the Hive View from Ambari. CREATE TABLE IF NOT EXISTS survey
( firstName STRING, lastName STRING, gender STRING,
phone STRING, email STRING,
address STRING,
city STRING,
postalcode STRING,
surveyanswer STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n' STORED AS TEXTFILE;
Then really easy to load some data from a CSV file. LOAD DATA INPATH '/demo/survey1.csv' OVERWRITE INTO TABLE survey; I can create an ORC based table in Hive from Hive View in Ambari, Spark / Spark SQL or Hive areas in Zeppelin: create table survey_orc(
firstName varchar(255),
lastName varchar(255),
gender varchar(255),
phone varchar(255),
email varchar(255),
address varchar(255),
city varchar(255),
postalcode varchar(255),
surveyanswer varchar(255)
) stored as orc tblproperties
("orc.compress"="NONE");
I can do the same insert into from Hive. %hive
insert into table default.survey_orc select t.* from survey t
I can query Hive tables from Spark SQL or Hive easily.
... View more
Labels:
05-20-2016
03:20 PM
I will give this a try and I'll post the results. For Windows and DBVisualizer, there's an article with step by step details. DBVisualizer Windows For Tableau: http://kb.tableau.com/articles/knowledgebase/connecting-to-hive-server-2-in-secure-mode For Squirrel SQL: https://community.hortonworks.com/questions/17381/hive-with-dbvisualiser-or-squirrel-sql-client.html
... View more
05-19-2016
04:33 PM
3 Kudos
Example Log 94.158.95.124 - - [24/Feb/2016:00:11:58 -0500] "GET / HTTP/1.1" 200 91966 "http://creativelabs.biz" "Mozilla/5.0 (Windows NT 10.0; WOW64; rv:40.0) Gecko/20100101 Firefox/40.0" SBT name := "Logs"
version := "1.0"
scalaVersion := "2.10.6"
jarName in assembly := "Logs.jar"
libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.6.1" % "provided"
libraryDependencies += "org.apache.spark" % "spark-sql_2.10" % "1.6.1" % "provided"
libraryDependencies += "com.databricks" %% "spark-avro" % "2.0.1"
Scala Program Pieces import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.serializer.{KryoSerializer}
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.serializer.{KryoSerializer, KryoRegistrator}
import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.sql.SQLContext
import com.databricks.spark.avro._
case class LogRecord( clientIp: String, clientIdentity: String, user: String, dateTime: String, request:String,statusCode:Int, bytesSent:Long, referer:String, userAgent:String )
object Logs {
val PATTERN = """^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+) (\S+) (\S+)" (\d{3}) (\S+) "(\S+)" "([^"]*)"""".r
def parseLogLine(log: String): LogRecord = {
try {
val res = PATTERN.findFirstMatchIn(log)
if (res.isEmpty) {
println("Rejected Log Line: " + log)
LogRecord("Empty", "-", "-", "", "", -1, -1, "-", "-" )
}
else {
val m = res.get
// NOTE: HEAD does not have a content size.
if (m.group(9).equals("-")) {
LogRecord(m.group(1), m.group(2), m.group(3), m.group(4),
m.group(5), m.group(8).toInt, 0, m.group(10), m.group(11))
}
else {
LogRecord(m.group(1), m.group(2), m.group(3), m.group(4),
m.group(5), m.group(8).toInt, m.group(9).toLong, m.group(10), m.group(11))
}
}
} catch
{
case e: Exception =>
println("Exception on line:" + log + ":" + e.getMessage);
LogRecord("Empty", "-", "-", "", "-", -1, -1, "-", "-" )
}
}
//// Main Spark Program
def main(args: Array[String]) {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.spark.storage.BlockManager").setLevel(Level.ERROR)
Logger.getLogger("com.hortonworks.spark.Logs").setLevel(Level.INFO)
val log = Logger.getLogger("com.hortonworks.spark.Logs")
log.info("Started Logs Analysis")
val sparkConf = new SparkConf().setAppName("Logs")
sparkConf.set("spark.cores.max", "16")
sparkConf.set("spark.serializer", classOf[KryoSerializer].getName)
sparkConf.set("spark.sql.tungsten.enabled", "true")
sparkConf.set("spark.eventLog.enabled", "true")
sparkConf.set("spark.app.id", "Logs")
sparkConf.set("spark.io.compression.codec", "snappy")
sparkConf.set("spark.rdd.compress", "false")
sparkConf.set("spark.suffle.compress", "true")
val sc = new SparkContext(sparkConf)
val logFile = sc.textFile("data/access3.log")
val accessLogs = logFile.map(parseLogLine).filter(!_.clientIp.equals("Empty"))
log.info("# of Partitions %s".format(accessLogs.partitions.size))
try {
println("===== Log Count: %s".format(accessLogs.count()))
accessLogs.take(5).foreach(println)
try {
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val df1 = accessLogs.toDF()
df1.registerTempTable("accessLogsDF")
df1.printSchema()
df1.describe("bytesSent").show()
df1.first()
df1.head()
df1.explain()
df1.write.format("avro").mode(org.apache.spark.sql.SaveMode.Append).partitionBy("statusCode").avro("avroresults")
} catch {
case e: Exception =>
log.error("Writing files after job. Exception:" + e.getMessage);
e.printStackTrace();
}
// Calculate statistics based on the content size.
val contentSizes = accessLogs.map(log => log.bytesSent)
val contentTotal = contentSizes.reduce(_ + _)
println("===== Number of Log Records: %s Content Size Total: %s, Avg: %s, Min: %s, Max: %s".format(
contentSizes.count,
contentTotal,
contentTotal / contentSizes.count,
contentSizes.min,
contentSizes.max))
sc.stop()
}
}
First, I set up a Scala Case Class that will hold the parsed record (clientIP, clientID, User, DateTime, Request, StatusCode, BytesSent, Referer, UserAgent). Next I have a regex and method to parse the logs (one line at a time) into case classes. I filter out the empty records. I use Spark SQL to examine the data. Then I write out the data to AVRO and finally do some counts.
... View more
Labels:
05-19-2016
01:06 PM
3 Kudos
It is very easy to connect various tools to Hive. Apache Zeppelin is my favorite tool to query Hive via HiveQL, but you have a ton of options. First, DBVisualizer, I am using the Free Edition for MacOSx. For both tools, you will need to copy (or acquire) a copy of two Hive JARS: hadoop-common.jar hive-jdbc-1.2.1000.2.4.0.0-169-standalone.jar On HDP 2.4: /usr/hdp/current/hive-client/lib/hive-jdbc-1.2.1000.2.4.0.0-169-standalone.jar /usr/hdp/current/hadoop-client/hadoop-common.jar Another tool, DBeaver which has a unique twist (it finds and downloads the JARS itself via Maven, though it takes a bit for the initial driver install). It also supports Phoenix, Drill and GemfireXD. Also I did SquirrelSQL. For SquirrelSQL setup details, check out the official Hive Docs. jdbc:hive2://localhost:10000/default
org.apache.hive.jdbc.HiveDriver For Hive URL, see the Hive Docs. I also connected from SQLWorkBench/J as well. This is a free SQL Tool.
... View more
Labels:
05-19-2016
10:40 AM
Having run a bunch of Spark jobs locally, in Spark Standalone clusters and in HDP Yarn Clusters; I have found a few JVM settings that helped with debugging non-production jobs and assist with better Garbage Collection. This is important even with off-heap storage and bare metal optimizations. spark-submit --driver-java-options "-XX:+PrintGCDetails -XX:+UseG1GC -XX:MaxGCPauseMillis=400"
You can also set options extra options in the runtime environment (see Spark Documentation). For HDP / Spark, you can add this from Ambari. In your Scala Spark Program: sparkConf.set("spark.cores.max", "4")
sparkConf.set("spark.serializer", classOf[KryoSerializer].getName)
sparkConf.set("spark.sql.tungsten.enabled", "true")
sparkConf.set("spark.eventLog.enabled", "true")
sparkConf.set("spark.app.id", "MyAppIWantToFind")
sparkConf.set("spark.io.compression.codec", "snappy")
sparkConf.set("spark.rdd.compress", "false")
sparkConf.set("spark.suffle.compress", "true")
Make sure you have Tungsten on, the KryoSerializer, eventLog enabled and use Logging. Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.spark.storage.BlockManager").setLevel(Level.ERROR)
val log = Logger.getLogger("com.hortonworks.myapp")
log.info("Started Logs Analysis") Also, whenever possible include relevant filters on your datasets: "filter(!_.clientIp.equals("Empty"))".
... View more
Labels:
05-13-2016
07:50 PM
4 Kudos
Below is a short Scala program and SBT build script to generate a Spark application to submit to YARN that will run on the HDP Sandbox with Spark 1.6. We consume KAFKA messages in microbatches of 2 seconds. We set a number of parameters for performance of the SparkSQL write to HDFS including enabling Tungsten, using Snappy compression, Backpressure and the KryoSerializer. We setup the Spark Context and the Spark Streaming Context, then use the KafkaUtils to prepare a Stream of generic Avro data records from a byte array. From there we then convert to a Scala Case Class that models the Twitter tweet from our source. We convert to a DataFrame, register it as a Temp Table and then save to ORC File, AVRO, Parquet and JSON. To Build this Application: sbt clean assembly
Create Kafka Topic cd /usr/hdp/current/kafka-broker
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic meetup
To Prepare Hadoop Environment # Create HDFS Directories (also avro, orc, json, ... )
hdfs dfs -mkdir /parquetresults
hdfs dfs -chmod -R 777 /parquetresults
hdfs dfs -ls /
To Deploy to YARN # Check YARN
yarn application --list
# If need be
# yarn application --kill <id from list>
spark-submit --class "com.sparkdeveloper.receiver.KafkaConsumer" \
--master yarn --deploy-mode client --driver-memory 512m --executor-memory 512m --conf spark.ui.port=4244 kafkaconsumer.jar
Spark Scala Program package com.sparkdeveloper.receiver
import java.io.ByteArrayOutputStream
import java.util.HashMap
import org.apache.avro.SchemaBuilder
import org.apache.avro.io.EncoderFactory
import org.apache.avro.specific.SpecificDatumWriter
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.serializer.KryoSerializer
import kafka.serializer._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import java.io.{ByteArrayOutputStream, File, IOException}
import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
import org.apache.avro.file.{DataFileReader, DataFileWriter}
import org.apache.avro.generic.{GenericDatumReader, GenericDatumWriter, GenericRecord, GenericRecordBuilder}
import org.apache.avro.io.EncoderFactory
import org.apache.avro.io._
import org.apache.avro.SchemaBuilder
import org.apache.avro.Schema
import org.apache.avro._
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.kafka._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.rdd.RDD
import com.databricks.spark.avro._
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.DeserializationFeature
case class HashtagEntities(text: String, start: Double, end: Double)
case class User(id: Double, name: String,
screenName: String, location: String, description: String, url: String, statusesCount: Double)
case class Tweet(text: String, createdAt: String, lang: String, source: String, expandedURL: String,
url: String, screenName: String, description: String, name: String, retweetCount: Double, timestamp: Long,
favoriteCount: Double, user: Option[User], hashtags: HashtagEntities)
/**
* Created by timothyspann
*/
object KafkaConsumer {
val tweetSchema = SchemaBuilder
.record("tweet")
.fields
.name("tweet").`type`().stringType().noDefault()
.name("timestamp").`type`().longType().noDefault()
.endRecord
def main(args: Array[String]) {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.spark.storage.BlockManager").setLevel(Level.ERROR)
val logger: Logger = Logger.getLogger("com.sparkdeveloper.receiver.KafkaConsumer")
val sparkConf = new SparkConf().setAppName("Avro to Kafka Consumer")
sparkConf.set("spark.cores.max", "24") // For my sandbox
sparkConf.set("spark.serializer", classOf[KryoSerializer].getName)
sparkConf.set("spark.sql.tungsten.enabled", "true")
sparkConf.set("spark.eventLog.enabled", "true")
sparkConf.set("spark.app.id", "KafkaConsumer") // want to know your app in the UI
sparkConf.set("spark.io.compression.codec", "snappy")
sparkConf.set("spark.rdd.compress", "true")
sparkConf.set("spark.streaming.backpressure.enabled", "true")
sparkConf.set("spark.sql.parquet.compression.codec", "snappy")
sparkConf.set("spark.sql.parquet.mergeSchema", "true")
sparkConf.set("spark.sql.parquet.binaryAsString", "true")
val sc = new SparkContext(sparkConf)
sc.hadoopConfiguration.set("parquet.enable.summary-metadata", "false")
val ssc = new StreamingContext(sc, Seconds(2))
try {
val kafkaConf = Map(
"metadata.broker.list" -> "sandbox.hortonworks.com:6667",
"zookeeper.connect" -> "sandbox.hortonworks.com:2181", // Default zookeeper location
"group.id" -> "KafkaConsumer",
"zookeeper.connection.timeout.ms" -> "1000")
val topicMaps = Map("meetup" -> 1)
// Create a new stream which can decode byte arrays.
val tweets = KafkaUtils.createStream[String, Array[Byte], DefaultDecoder, DefaultDecoder]
(ssc, kafkaConf,topicMaps, StorageLevel.MEMORY_ONLY_SER)
try {
tweets.foreachRDD((rdd, time) => {
if (rdd != null) {
try {
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val rdd2 = rdd.map { case (k, v) => parseAVROToString(v) }
try {
val result = rdd2.mapPartitions(records => {
val mapper = new ObjectMapper()
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
mapper.registerModule(DefaultScalaModule)
records.flatMap(record => {
try {
Some(mapper.readValue(record, classOf[Tweet]))
} catch {
case e: Exception => None;
}
})
}, true)
val df1 = result.toDF()
logger.error("Registered tweets: " + df1.count())
df1.registerTempTable("tweets")
// To show how easy it is to write multiple formats
df1.write.format("orc").mode(org.apache.spark.sql.SaveMode.Append).orc("orcresults")
df1.write.format("avro").mode(org.apache.spark.sql.SaveMode.Append).avro("avroresults")
df1.write.format("parquet").mode(org.apache.spark.sql.SaveMode.Append).parquet("parquetresults")
df1.write.format("json").mode(org.apache.spark.sql.SaveMode.Append).json("jsonresults")
} catch {
case e: Exception => None;
}
}
catch {
case e: Exception => None;
}
}
})
} catch {
case e: Exception =>
println("Writing files after job. Exception:" + e.getMessage);
e.printStackTrace();
}
} catch {
case e: Exception =>
println("Kafka Stream. Writing files after job. Exception:" + e.getMessage);
e.printStackTrace();
}
ssc.start()
ssc.awaitTermination()
}
def parseAVROToString(rawTweet: Array[Byte]): String = {
try {
if (rawTweet.isEmpty) {
println("Rejected Tweet")
"Empty"
}
else {
deserializeTwitter(rawTweet).get("tweet").toString
}
} catch {
case e: Exception =>
println("Exception:" + e.getMessage);
"Empty"
}
}
def deserializeTwitter(tweet: Array[Byte]): GenericRecord = {
try {
val reader = new GenericDatumReader[GenericRecord](tweetSchema)
val decoder = DecoderFactory.get.binaryDecoder(tweet, null)
reader.read(null, decoder)
} catch {
case e: Exception => None;
null;
}
}
}
// scalastyle:on println
build.sbt
name := "KafkaConsumer"
version := "1.0"
scalaVersion := "2.10.6"
jarName in assembly := "kafkaconsumer.jar"
libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.6.0" % "provided"
libraryDependencies += "org.apache.spark" % "spark-sql_2.10" % "1.6.0" % "provided"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.6.0" % "provided"
libraryDependencies += "com.databricks" %% "spark-avro" % "2.0.1"
libraryDependencies += "org.apache.avro" % "avro" % "1.7.6" % "provided"
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.6.0"
libraryDependencies += "org.codehaus.jackson" % "jackson-mapper-asl" % "1.9.13"
libraryDependencies += "com.google.code.gson" % "gson" % "2.3"
mergeStrategy in assembly := {
case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard
case m if m.toLowerCase.matches("meta-inf.*\\.sf$") => MergeStrategy.discard
case "log4j.properties" => MergeStrategy.discard
case m if m.toLowerCase.startsWith("meta-inf/services/") => MergeStrategy.filterDistinctLines
case "reference.conf" => MergeStrategy.concat
case _ => MergeStrategy.first
}
... View more
Labels:
- « Previous
- Next »