Spark Load testing
framework built on a number of distributed technologies, including Gatling, Livy,
Akka, and HDP. Using Akka Server powered by LIVY {Spark as a Service} provides
the following benefits.
friendly and Docker Friendly Low latency
execution Sharing
cache across jobs Separation
of concern Multi
tenancy Direct
Spark SQL execution Configuration
at one place Auditing
and Logging Complete statement
history and metrics Livy Server Livy is an open source REST interface for interacting with
Apache Spark from anywhere. It supports executing snippets of code or programs
in a Spark context that runs locally or in Apache Hadoop YARN. Livy offers three modes to run Spark jobs:
Using programmatic
API Running interactive
statements through REST API Submitting batch
applications with REST API Livy provides the following features:
Interactive Scala,
Python, and R shells Batch submissions in
Scala, Java, Python Multiple users can
share the same server (impersonation support) Can be used for
submitting jobs from anywhere with REST Does not require any
code change to your programs Support Spark1/
Spark2, Scala 2.10/2.11 within one build. Livy provides the
following advantages:
upload jar file and run job. Add additional applications that will connect to
same cluster and upload jar with next job. If you use spark-submit, you must
upload manually JAR file to cluster and run command. Everything must be prepared
before run Use Spark in
interactive mode, hard to do with spark-submit or Thrift Server at scale. Security. Reduce
exposure of the cluster to the outside world. Stability. Spark is
a complex framework and there many factors which can affect its long term
performance and stability. Decoupling Spark context and application allows to
handle Spark issues gracefully, without full downtime of the application. Gatling Server Gatling is a highly capable load testing tool. It is designed
for ease of use, maintainability and high performance. Gatling server provides
the following benefits.
Powerful scripting using Scala Akka + Netty Run multiple scenarios in one
simulation Scenarios = code + DSL Graphical reports with clear
& concise graphs Gatling’s architecture is asynchronous as
long as the underlying protocol, such as HTTP, can be implemented in a non
blocking way. This kind of architecture lets us implement virtual users as
messages instead of dedicated threads, making them very resource cheap. Thus,
running thousands of concurrent virtual users is not an issue. val theScenarioBuilder =
scenario("Interactive Spark Command Scenario Using LIVY Rest Services $sessionId").exec(
/* myRequest1 is a name that describes the request. */
http("Interactive Spark Command Simulation")
).pause(4 second)
So, this is great, we can load
test our spark interactive command with one user! Let’s increase the number of
users. To increase the number of
simulated users, all you have to do is to change the configuration of the
simulation as follows: setUp(
).protocols(theHttpProtocolBuilder) If you want to simulate 3000
users, you might not want them to start at the same time. Indeed, real users
are more likely to connect to your web application gradually. Gatling provides rampUsers to implement this behavior. The
value of the ramp indicates the duration over which the users will be linearly
started. In our scenario let’s have 10 regular users ramp them over 10 seconds
so we don’t hammer the Livy server: setUp(
theScenarioBuilder.inject(rampUsers(10) over (10 seconds)),
@sudheer Could you please run the major compact after the ETL ingestion. please find below alter statement for the reference
alter table <<table_name>> compact 'MAJOR';
@timc c Can you also check the following properties and modify it according to your needs ? webhcat.proxyuser.knox.hosts *
webhcat.proxyuser.knox.groups * As Jay pointed - once you set the appropriate permission restart all the necessary components + all hive services.
It seems to be an issue with the SasParser file. Could you please check if you have the latest SasFileParser lib. Please find below my SBT confiig libraryDependencies ++= Seq(
"com.databricks" % "spark-csv_2.11" % "1.5.0",
"org.slf4j" % "slf4j-api" % "1.7.5"
Can you try with S3a instead of S3n and post the outcome here ?
what is your cloudstorage- wasb should be an issue
Also please check - the max container-size, and NM capacities. Looks like these parameters are set incorrectly on the cluster
Please check - the max container-size, and NM capacities. Looks like these parameters are set incorrectly on the cluster Could you please check on the following item in your cluster. 1. Login to zookeeper client using the following command -server $hostName:2181
Where $hostName = hostname of zookeeper server
2.Verify the ACLs set on the znode: getAcl /llap-sasl/user-hive
it should give the results like 'sasl,'hive
: cdrwa
: r
3. If the ACL's are not set propertly then manual Set ACLs on znode with comma separated
setAcl /llap-sasl/user-hive sasl:hive:cdrwa, world:anyone:r
4.Verify the ACLs set on the znode: getAcl /llap-sasl/user-hive
Sqoop Overview Sqoop
is a tool designed to transfer data between Hadoop and relational databases or
mainframes. You can use Sqoop to import data from a relational database
management system (RDBMS) such as MySQL or Oracle or a mainframe into the Hadoop
Distributed File System (HDFS), transform the data in Hadoop MapReduce, and
then export the data back into an RDBMS. Sqoop
automates most of this process, relying on the database to describe the schema
for the data to be imported. Sqoop uses MapReduce to import and export the
data, which provides parallel operation as well as fault tolerance. Sqoop Performance Tuning Best Practices Tune the following Sqoop arguments in JDBC connection or
Sqoop mapping to optimize performance
batch• split-by and boundary-query• direct• fetch-size• num-mapper• 2.Inserting Data in Batches Specifies that you can group the related SQL statements into
a batch when you export data. The JDBC interface exposes an API for doing batches in a
prepared statement with multiple sets of values. With the --batch parameter,
Sqoop can take advantage of this. This API is present in all JDBC drivers
because it is required by the JDBC interface. Enable JDBC batching using the --batch parameter. sqoop export --connect <<JDBC URL>> --username <<SQOOP_USER_NAME>> --password <<SQOOP_PASSWOR>> --table <<TABLE_NAME>> --export-dir <<FOLDER_URI>> --batch The second option is to use the property sqoop.export.records.per.statementto specify
the number of records that will be used in each insert statement: sqoop export
--connect <<JDBC URL>> --username
<<SQOOP_USER_NAME>> --password
<<SQOOP_PASSWORD>> --table
<<TABLE_NAME>> --export-dir
<<FOLDER_URI>> Finally, you can set how many rows will be inserted per
transaction with the sqoop.export.statements.per.transaction property: sqoop export
-Dsqoop.export.statements.per.transaction=10 --connect
<<JDBC URL>> --username
<<SQOOP_USER_NAME>> --password
<<SQOOP_PASSWORD>> --table
<<TABLE_NAME>> --export-dir
<<FOLDER_URI>> The default values can vary from connector to connector.
Sqoop defaults to disabled batching and to 100 for both sqoop.export.records.per.statementand sqoop.export.statements.per.transactionproperties. 2.Custom Boundary Queries Specifies the range of values that you can import. You can
use boundary-query if you do not get the desired results by using the split-by
argument alone. When you configure the boundary-query argument, you must
specify the min(id) and max(id) along with the table name. If you do not
configure the argument, Sqoop runs the following query. sqoop import --connect
<<JDBC URL>> --username
<<SQOOP_USER_NAME>> --password
<<SQOOP_PASSWORD>> --query <<QUERY>>
--split-by <<ID>>
--target-dir <<TARGET_DIR_URI>>
--boundary-query "select min(<<ID>>), max(<<ID>>)
from <<TABLE>>" 2.Importing Data Directly into Hive Specifies the direct import fast path when you
import data from RDBMS. Rather than using the JDBC interface for
transferring data, the direct mode delegates the job of transferring data to
the native utilities provided by the database vendor. In the case of MySQL, the
mysqldump and mysqlimport will be used for retrieving data from the database
server or moving data back. In the case of PostgreSQL, Sqoop will take
advantage of the pg_dump utility to import data. Using native utilities will
greatly improve performance, as they are optimized to provide the best possible
transfer speed while putting less burden on the database server. There are
several limitations that come with this faster import. For one, not all
databases have available native utilities. This mode is not available for every
supported database. Out of the box, Sqoop has direct support only for MySQL and
PostgreSQL. sqoop import --connect
<<JDBC URL>> --username
<<SQOOP_USER_NAME>> --password
<<SQOOP_PASSWORD>> --table
<<TABLE_NAME>> --direct 2.Importing Data using Fetch-size Specifies the number of entries that Sqoop can
import at a time. Use the following syntax: --fetch-size=<n> Where <n> represents the number of entries
that Sqoop must fetch at a time. Default is 1000. Increase the value of the fetch-size argument based
on the volume of data that need to read. Set the value based on the available
memory and bandwidth. 2.Controlling Parallelism Specifies number of map tasks that can run in parallel.
Default is 4. To optimize performance, set the number of map tasks to a value
lower than the maximum number of connections that the database supports. Use the parameter --num-mappers if you want Sqoop to use a
different number of mappers. For example, to suggest 10 concurrent tasks, use
the following Sqoop command: sqoop import --connect
jdbc:mysql:// --username
sqoop --password
sqoop --table
cities --num-mappers 10 Controlling the amount of parallelism that Sqoop will use to
transfer data is the main way to control the load on your database. Using more
mappers will lead to a higher number of concurrent data transfer tasks, which
can result in faster job completion. However, it will also increase the load on
the database as Sqoop will execute more concurrent queries. 2.Split-By Specifies the column name based on which Sqoop must split
the work units. Use the following syntax: --split-by <column name>
sqoop import --connect
<<JDBC URL>> --username
<<SQOOP_USER_NAME>> --password
<<SQOOP_PASSWORD>> --query <<QUERY>>
--split-by <<ID>>
Note: If you do not specify a column name, Sqoop splits the
work units based on the primary key.
