Member since
06-02-2020
331
Posts
64
Kudos Received
49
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
852 | 07-11-2024 01:55 AM | |
2350 | 07-09-2024 11:18 PM | |
2172 | 07-09-2024 04:26 AM | |
1611 | 07-09-2024 03:38 AM | |
1830 | 06-05-2024 02:03 AM |
06-24-2021
04:25 AM
1 Kudo
Hi you can find following tutorial. https://www.cloudera.com/tutorials/dataframe-and-dataset-examples-in-spark-repl.html
... View more
06-22-2021
02:19 AM
Hi @magicchu Please raise an Hive escalation, our Hive team will work on this issue.
... View more
06-09-2021
08:56 PM
22 Kudos
1. Introduction
Spark is an in-memory processing engine where all of the computation that a task does happens in memory. So, it is important to understand Spark Memory Management. This will help us develop Spark applications and perform performance tuning.
If the memory allocation is too large when committing, it will occupy resources. If the memory allocation is too small, memory overflow and full-GC problems will occur easily.
Efficient memory use is critical for good performance, but the reverse is also true: inefficient memory use leads to bad performance.
Spark Architecture:
The Spark application includes two JVM processes: driver and executor.
The driver is the main control process, which is responsible for creating the SparkSession/SparkContext, submitting the job, converting the job to a task, and coordinating the task execution between executors.
The executor is mainly responsible for performing specific calculation tasks and returning the results to the driver.
Driver's memory management is relatively simple; Spark does not make specific plans.
In this article, we can analyze executor memory management.
2. Executor memory
The executor acts as a JVM process launched on a worker node. So, it is important to understand JVM memory management.
JVM memory management is categorized into two types:
On-Heap Memory Management (In-Heap Memory): Objects are allocated on the JVM Heap and bound by GC.
Off-Heap Memory Management (External Memory): Objects are allocated in memory outside the JVM by serialization, managed by the application, and are not bound by GC.
In general, the objects' read and write speed is:
on-heap > off-heap > disk
3. Memory Management
Spark memory management is divided into two types:
Static Memory Manager (Static Memory Management), and
Unified Memory Manager (Unified memory management)
Since Spark 1.6.0, Unified Memory Manager has been set as the default memory manager for Spark.
Static Memory Manager has been deprecated because of its lack of flexibility.
In both memory managers, a portion of Java Heap is located for processing Spark applications, while the rest of the memory is reserved for Java class references and metadata usage.
Note: There will only be one MemoryManager per JVM.
// Determine whether to use the old memory management mode
val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", false)
val memoryManager: MemoryManager =
if (useLegacyMemoryManager) {
// The old version uses static memory management
new StaticMemoryManager(conf, numUsableCores)
} else {
// The new version uses unified memory management
UnifiedMemoryManager(conf, numUsableCores)
}
To store the information about the memory, both memory managers will use two memory pools, i.e.
ExecutionMemoryPool and
StorageMemoryPool
4. Static Memory Manager (SMM)
From Spark 1.0, May 2014
Static Memory Manager (SMM) is the traditional model and simple scheme for memory management.
It divides memory into two fixed partitions statically.
The size of storage memory, execution memory, and other memory is fixed during application processing, but users can configure it before the application starts.
Note: The Memory allocation method has been eliminated in Spark 3.0
Advantage:
The static Manager mechanism is simple to implement
Disadvantage:
Even though space is available with storage memory, we can’t use it, and there is a disk spill since executor memory is full. (vice versa).
In Spark 1.6+, static memory management can be enabled via spark.memory. useLegacyMode=true parameter.
Parameter
Description
spark.memory.useLegacyMode (default fasle)
The option to divide heap space into fixed-size regions
spark.shuffle.memoryFraction (default 0.2)
The fraction of the heap used for aggregation and cogrouping during shuffles. It works only if spark.memory.useLegacyMode=true
spark.storage.memoryFraction (default 0.6)
The fraction of the heap used for Spark’s memory cache. It works only if spark.memory.useLegacyMode=true
spark.storage.unrollFraction (default 0.2)
The fraction of spark.storage.memoryFractions are used for unrolling blocks in memory. This is dynamically allocated by dropping existing blocks when there is not enough free storage space to unroll the new block in its entirety. It works only if spark.memory.useLegacyMode is true.
Static memory management does not support the use of off-heap memory for storage, so all of it is allocated to the execution space.
5. Unified Memory Manager (UMM)
From Spark 1.6+, January 2016
Since Spark 1.6.0, a new memory manager has been adopted to replace the static memory manager and provide Spark with dynamic memory allocation.
It allocates a region of memory as a unified memory container that is shared by storage and execution.
When execution memory is not used, the storage memory can acquire all the available memory, and vice versa.
If any of the storage or execution memory needs more space, a function called acquireMemory() will expand one of the memory pools and shrink another one.
Borrowed storage memory can be evicted at any given time. Borrowed execution memory, however, will not be evicted in the first design due to complexities in implementation.
Advantages:
The boundary between storage memory and execution memory is not static, and in cases of memory pressure, the boundary would be moved, i.e., one region would grow by borrowing space from another one.
When the application has no cache and is propagating, execution uses all the memory to avoid unnecessary disk overflow.
When the application has a cache, it will reserve the minimum storage memory so that the data block is not affected.
This approach provides reasonable out-of-the-box performance for a variety of workloads without requiring user expertise in how memory is divided internally.
JVM has two types of memory:
On-Heap Memory
Off-Heap Memory
In addition to the above two JVM memory types, there is one more segment of memory that is accessed by Spark, i.e., external process memory. This kind of memory is mainly used for PySpark and SparkR applications. This is the memory used by the Python/R process, which resides outside the JVM.
5.1 On-Heap Memory
By default, Spark uses on-heap memory only. The size of the on-heap memory is configured by the --executor-memory or spark.executor.memory parameter when the Spark application starts.
The concurrent tasks running inside Executor share the JVM's on-heap memory.
Two main configurations control executor memory allocation:
Parameter
Description
spark.memory.fraction (default 0.6)
Fraction of the heap space used for execution and storage. The lower this is, the more frequently spills and cached data evictions occur. The purpose of this configuration is to set aside memory for internal metadata, user data structures, and imprecise size estimation in the case of sparse, unusually large records.
spark.memory.storageFraction (default 0.5)
The size of the storage region within the space set aside by spark.memory.fraction. Cached data may only be evicted if total storage exceeds this region.
Note: In Spark 1.6, the spark.memory.fraction value will be 0.75, and the spark.memory.storageFraction value will be 0.5.
Apache Spark supports three memory regions:
Reserved Memory
User Memory
Spark Memory
Reserved Memory:
Reserved memory is the memory reserved for the system and is used to store Spark's internal objects.
As of Spark v1.6.0+, the value is 300MB. That means 300MB of RAM does not participate in Spark memory region size calculations (SPARK-12081).
Reserved memory’s size is hard coded, and its size cannot be changed in any way without Spark recompilation or setting spark.testing.reservedMemory, which is not recommended as it is a testing parameter not intended to be used in production.
Formula:
RESERVED_SYSTEM_MEMORY_BYTES = 300 * 1024 * 1024 BYTES = 300 MB
UnifiedMemoryManager.scala
Note: If the executor memory is less than 1.5 times the reserved memory (1.5 * reserved memory = 450MB), then the Spark job will fail with the following exception message:
spark-shell --executor-memory 300m 21/06/21 03:55:51 ERROR repl.Main: Failed to initialize Spark session.
java.lang.IllegalArgumentException: Executor memory 314572800 must be at least 471859200. Please increase executor memory using the --executor-memory option or spark.executor.memory in Spark configuration.
at org.apache.spark.memory.UnifiedMemoryManager$.getMaxMemory(UnifiedMemoryManager.scala:225)
at org.apache.spark.memory.UnifiedMemoryManager$.apply(UnifiedMemoryManager.scala:199)
UnifiedMemoryManager.scala
User Memory:
User Memory is the memory used to store user-defined data structures, Spark internal metadata, any UDFs created by the user, and the data needed for RDD conversion operations, such as RDD dependency information, etc.
For example, we can rewrite Spark aggregation by using the mapPartitions transformation to maintain a hash table for this aggregation to run, which would consume so-called user memory.
This memory segment is not managed by Spark. Spark will not be aware of or maintain this memory segment.
Formula:
(Java Heap — Reserved Memory) * (1.0—Spark.memory.fraction)
Spark Memory (Unified Memory):
Spark Memory is the memory pool managed by Apache Spark. Spark Memory is responsible for storing intermediate states while doing task execution like joins or storing broadcast variables.
All the cached or persistent data will be stored in this segment, specifically in the storage memory of this segment.
Formula:
(Java Heap — Reserved Memory) * spark.memory.fraction
Spark tasks operate in two main memory regions:
Execution: Used for shuffles, joins, sorts, and aggregations.
Storage: used to cache partitions of data.
The boundary between them is set by spark.memory.storageFraction parameter, which defaults to 0.5 or 50%.
Storage Memory:
Storage Memory is used for storing all of the cached data, broadcast variables, unrolled data, etc. “Unroll” is essentially a process of deserializing serialized data.
Any persistent option that includes memory in it will store that data in this segment.
Spark clears space for new cache requests by removing old cached objects based on the Least Recently Used (LRU) mechanism.
Once the cached data is out of storage, it is either written to disk or recomputed based on configuration. Broadcast variables are stored in the cache at the MEMORY_AND_DISK persistent level. This is where we store cached data, which is long-lived.
Formula:
(Java Heap — Reserved Memory) * spark.memory.fraction * spark.memory.storageFraction
Execution Memory:
Execution Memory is used for storing the objects required during the execution of Spark tasks.
For example, it is used to store the shuffle intermediate buffer on the map side in memory. Also, it is used to store a hash table for the hash aggregation step.
This pool also supports spilling on disk if not enough memory is available, but the blocks from this pool cannot be forcefully evicted by other threads (tasks).
Execution memory tends to be more short-lived than storage. It is evicted immediately after each operation, making space for the next ones.
Formula:
(Java Heap — Reserved Memory) * spark.memory.fraction * (1.0 - spark.memory.storageFraction)
In Spark 1.6+, there is no hard boundary between execution memory and storage memory.
Due to the nature of execution memory, blocks cannot be forcefully evicted from this pool; otherwise, execution will break since the block it refers to won’t be found.
But when it comes to storage memory, blocks can be evicted from memory and written to disk or recomputed (if the persistence level is MEMORY_ONLY) as required.
Storage and execution pool borrowing rules:
Storage memory can borrow space from execution memory only if blocks are not used in execution memory.
Execution memory can also borrow space from storage memory if blocks are not used in storage memory.
If blocks from Execution memory is used by Storage memory and Execution needs more memory, it can forcefully evict the excess blocks occupied by Storage Memory
If blocks from storage memory are used by execution memory and storage needs more memory, it cannot forcefully evict the excess blocks occupied by execution memory; it will have less memory area. It will wait until Spark releases the excess blocks stored in execution memory and then occupies them.
Calculate the memory for 5 GB of executor memory:
To calculate reserved memory, user memory, spark memory, storage memory, and execution memory, we will use the following parameters:
spark.executor.memory=5g
spark.memory.fraction=0.6
spark.memory.storageFraction=0.5
Java Heap Memory = 5 GB = 5 * 1024 MB = 5120 MB Reserved Memory = 300 MB Usable Memory = (Java Heap Memory — Reserved Memory) = 5120 MB - 300 MB = 4820 MB User Memory = Usable Memory * (1.0 * spark.memory.fraction) = 4820 MB * (1.0 - 0.6) = 4820 MB * 0.4 = 1928 MB Spark Memory = Usable Memory * spark.memory.fraction = 4820 MB * 0.6 = 2892 MB Spark Storage Memory = Spark Memory * Spark.memory.storageFraction = 2892 MB * 0.5 = 1446 MB Spark Execution Memory = Spark Memory * (1.0 - spark.memory.storageFraction) = 2892 MB * ( 1 - 0.5) = 2892 MB * 0.5 = 1446 MB
Reserved Memory — 300 MB — 5.85%
User Memory — 1928 MB — 37.65%
Spark Memory — 2892 MB — 56.48%
Off-Heap Memory (External memory)
Off-Heap memory means allocating memory objects (serialized to byte array) to memory outside the heap of the Java virtual machine(JVM), which is directly managed by the operating system (not the virtual machine), but stored outside the process heap in native memory (therefore, they are not processed by the garbage collector).
The result of this is to keep a smaller heap to reduce the impact of garbage collection on the application.
Accessing this data is slightly slower than accessing the on-heap storage, but still faster than reading/writing from a disk. The downside is that the user has to manually deal with managing the allocated memory.
This model does not apply within the JVM memory, but calls the Java API for the unsafe related language, such as C, inside malloc () directly to the operating system for memory. Since this method is not been to the JVM memory management, so avoid frequent GC. The disadvantage of this application is that memory must write its own logic and memory applications release.
Spark 1.6+ began to introduce Off-heap memory (SPARK-11389). Unified Memory Manager can optionally be allocated using off-heap memory.
Parameter
Description
spark.memory.offHeap.enabled (default false)
The option to use off-heap memory for certain operations
spark.memory.offHeap.size (default 0)
The total amount of memory in bytes for off-heap allocation. It has no impact on heap memory usage, so make sure not to exceed your executor’s total limits.
By default, Off-heap memory is disabled, but we can enable it by the spark.memory.offHeap.enabled (false by default) parameter, and set the memory size by spark.memory.offHeap.size (0 by default) parameter.
spark-shell \ --conf spark.memory.offHeap.enabled=true \ --conf spark.memory.offHeap.size=5g
Off-heap memory supports OFF_HEAP persistence level. Compared to the on-heap memory, the model of the off-heap memory is relatively simple, including only Storage Memory and Execution Memory.
If the off-heap memory is enabled, there will be both on-heap and off-heap memory in the Executor.
Spark UI with OffHeap Enabled
The Execution Memory in the Executor is the sum of the Execution memory inside the heap and the Execution memory outside the heap. The same is true for Storage Memory.
Note: Project Techyon supports in-memory storage for Spark RDDs in off-heap space.
Project Tungsten supports storing shuffle objects in off-heap space.
Advantage(s):
It can still reduce memory usage, reduce frequent GC, and improve program performance.
When an executor is killed, all cached data for that executor would be gone but with off-heap memory, the data would persist. The lifetime of JVM and the lifetime of cached data are decoupled.
Disadvantage(s):
Using OFF_HEAP does not back up data, nor can it guarantee high data availability like Alluxio does, and data loss requires recalculation.
6. Understand the Memory Allocation using Spark UI
6.1 Using On Heap Memory:
Let's launch the Spark shell with 5GB On Heap Memory to understand the Storage Memory in Spark UI.
spark-shell \ --driver-memory 5g \ --executor-memory 5g
Let's see the available Storage Memory displayed on the Spark UI Executor tab is 2.7 GB, as follows:
Based on our 5GB calculation, we can see the following memory values:
Java Heap Memory = 5 GB Reserved Memory = 300 MB Usable Memory = 4820 MB User Memory = 1928 MB Spark Memory = 2892 MB = 2.8242 GB Spark Storage Memory = 1446 MB = 1.4121 GB Spark Execution Memory = 1446 MB = 1.4121 GB
From Spark UI, the Storage Memory value is 2.7 GB and from our calculation, the Storage Memory value is 1.4121 GB. Both Storage Memory values are not matched because from Spark UI Storage Memory value is a sum of Storage Memory and Execution Memory.
Storage Memory = Spark Storage Memory + Spark Execution Memory = 1.4121 GB + 1.4121 GB = 2.8242 GB
We can see still Spark UI Storage Memory (2.7 GB) is still not matched with the above memory calculation Storage Memory (2.8242 GB) because we set --executor-memory as 5g. The memory obtained by Spark's Executor through Runtime.getRuntime.maxMemory is 4772593664 bytes, so Java Heap Memory is only 4772593664 bytes.
Java Heap Memory = 4772593664 bytes = 4772593664/(1024 * 1024) = 4551 MB Reserved Memory = 300 MB Usable Memory = (Java Heap Memory - Reserved Memory) = (4551 - 300) MB = 4251 MB User Memory = (Usable Memory * (1 -spark.memory.fraction)) = 1700.4 MB Spark Memory = (Usable Memory * spark.memory.fraction) = 2550.6 MB Spark Storage Memory = 1275.3 MB Spark Execution Memory = 1275.3 MB
Spark Memory (2550.6 MB/2.4908 GB) still does not match what is displayed on the Spark UI (2.7 GB) because while converting Java Heap Memory bytes into MB we used 1024 * 1024 but in Spark UI converts bytes by dividing by 1000 * 1000.
Java Heap Memory = 4772593664 bytes = 4772593664/(1000 * 1000) = 4772.593664 MB Reserved Memory = 300 MB Usable Memory = (Java Heap Memory - Reserved Memory) = (4472.593664 - 300) MB = 4472.593664 MB User Memory = (Usable Memory * (1 -spark.memory.fraction)) = 1789.0374656 MB Spark Memory = (Usable Memory * spark.memory.fraction) = 2683.5561984 MB = ~ 2.7 GB Spark Storage Memory = 1341.7780992 MB Spark Execution Memory = 1341.7780992 MB
Logic for converting bytes into GB:
Spark 2.X
function formatBytes(bytes, type) {
if (type !== 'display') return bytes;
if (bytes == 0) return '0.0 B';
var k = 1000;
var dm = 1;
var sizes = ['B', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB'];
var i = Math.floor(Math.log(bytes) / Math.log(k));
return parseFloat((bytes / Math.pow(k, i)).toFixed(dm)) + ' ' + sizes[i];
}
Spark 3.X
function formatBytes(bytes, type) {
if (type !== 'display') return bytes;
if (bytes <= 0) return '0.0 B';
var k = 1024;
var dm = 1;
var sizes = ['B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB', 'YiB'];
var i = Math.floor(Math.log(bytes) / Math.log(k));
return parseFloat((bytes / Math.pow(k, i)).toFixed(dm)) + ' ' + sizes[i];
}
6.2 Using On Heap Memory + Off Heap Memory:
Let's launch the spark shell with 1GB On Heap memory and 5GB Off Heap memory to understand the Storage Memory.
spark-shell \ --driver-memory 1g \ --executor-memory 1g \ --conf spark.memory.offHeap.enabled=true \ --conf spark.memory.offHeap.size=5g
Let's see the available Storage Memory displayed on the Spark UI Executor tab is 5.8 GB, as follows:
Storage Memory = On Heap Memory + Off Heap Memory
Memory Calculation:
On Heap Memory
Java Heap Memory = 954728448 bytes = 954728448/1000/1000 = 954 MB
Reserved Memory = 300 MB
Usable Memory = (Java Heap Memory - Reserved Memory) = (954 - 300) MB = 654 MB
User Memory = (Usable Memory * (1 -spark.memory.fraction)) = 261.6 MB
Spark Memory = (Usable Memory * spark.memory.fraction) = 392.4 MB
Spark Storage Memory = 196.2 MB
Spark Execution Memory = 196.2 MB
Off Heap Memory
spark.memory.offHeap.size = 5 GB = 5 * 1000 MB = 5000 MB
Storage Memory
Storage Memory = On Heap Memory + Off Heap Memory
= 392.4 MB + 5000 MB
= 5392.4 MB
= 5.4 GB
Simple Java Program to calculate the Spark Memory:
// JVM Arguments: -Xmx5g
public class SparkMemoryCalculation {
private static final long MB = 1024 * 1024;
private static final long RESERVED_SYSTEM_MEMORY_BYTES = 300 * MB;
private static final double SparkMemoryStorageFraction = 0.5;
private static final double SparkMemoryFraction = 0.6;
public static void main(String[] args) {
long systemMemory = Runtime.getRuntime().maxMemory();
long usableMemory = systemMemory - RESERVED_SYSTEM_MEMORY_BYTES;
long sparkMemory = convertDoubletLong(usableMemory * SparkMemoryFraction);
long userMemory = convertDoubletLong(usableMemory * (1 - SparkMemoryFraction));
long storageMemory = convertDoubletLong(sparkMemory * SparkMemoryStorageFraction);
long executionMemory = convertDoubletLong(sparkMemory * (1 - SparkMemoryStorageFraction));
printMemoryInMB("Heap Memory\t\t", systemMemory);
printMemoryInMB("Reserved Memory", RESERVED_SYSTEM_MEMORY_BYTES);
printMemoryInMB("Usable Memory\t", usableMemory);
printMemoryInMB("User Memory\t\t", userMemory);
printMemoryInMB("Spark Memory\t", sparkMemory);
printMemoryInMB("Storage Memory\t", storageMemory);
printMemoryInMB("Execution Memory", executionMemory);
System.out.println();
printStorageMemoryInMB("Spark Storage Memory", sparkMemory);
printStorageMemoryInMB("Storage Memory UI \t", storageMemory);
printStorageMemoryInMB("Execution Memory UI", executionMemory);
}
private static void printMemoryInMB(String type, long memory) {
System.out.println(type + " \t=\t"+ (memory/MB) +" MB");
}
private static void printStorageMemoryInMB(String type, long memory) {
System.out.println(type + " \t=\t"+ (memory/(1000*1000)) +" MB");
}
private static Long convertDoubletLong(double val) {
return new Double(val).longValue();
}
}
Thanks for visiting this article. If you liked this article, give kudos.
... View more
Labels:
06-09-2021
07:28 AM
Hi @magicchu Could you please try following example. https://github.com/rangareddy/spark-hive-udf Build the application and copy the jar and upload it to hdfs. After that register as function and try the example. If it is working then try to implement same way.
... View more
06-04-2021
04:30 AM
Hi @jamesstarks Could you please enable spark authentication using following param. --conf spark.authenticate=true
... View more
05-18-2021
08:22 PM
2 Kudos
Introduction
In this article, we will learn how to query the Hive tables data by using column names with regular expressions in Spark.
Assume we have a table with column names like col1, col2, col3, col4, col5, etc. If we want to select the data, we will use queries like select col1, col2, col3, col4, col5 from the table. Instead of specifying col1, col2, col3, col4, col5, we can use regular expressions while selecting columns like select `col.*` from table.
From Hive, this feature is supported from Hive 0.13.0 onwards. By default, this feature is disabled, and in order to enable it, we need to use set hive.support.quoted.identifiers=none.
From Spark, this feature is supported from Spark 2.3.0 onwards. By default this feature is disabled, and in order to enable it, we need use set spark.sql.parser.quotedRegexColumnNames=true.
Steps
Hive
Create the Hive table and insert the data: beeline -u jdbc:hive2://host:port
> create database if not exists regex_test;
> create external table if not exists regex_test.regex_test_tbl (col1 int, col2 string, col3 float, `timestamp` timestamp) stored as PARQUET;
> insert into regex_test.regex_test_tbl values (1, 'Ranga', 23000, cast('1988-01-06 00:59:59.345' as timestamp));
> insert into regex_test.regex_test_tbl values (2, 'Nishanth', 38000, cast('2018-05-29 17:32:59.345' as timestamp));
> insert into regex_test.regex_test_tbl values (3, 'Raja', 18000, cast('2067-05-29 17:32:59.345' as timestamp));
Select the regex_test.regex_test_tbl table data. > SET hive.cli.print.header=true;
> select * from regex_test.regex_test_tbl;
+----------------------+----------------------+----------------------+---------------------------+
| regex_test_tbl.col1 | regex_test_tbl.col2 | regex_test_tbl.col3 | regex_test_tbl.timestamp |
+----------------------+----------------------+----------------------+---------------------------+
| 1 | Ranga | 23000.0 | 1988-01-06 00:59:59.345 |
| 2 | Nishanth | 38000.0 | 2018-05-29 17:32:59.345 |
| 3 | Raja | 18000.0 | 2067-05-29 17:32:59.345 |
+----------------------+----------------------+----------------------+---------------------------+
Without setting set hive.support.quoted.identifiers=none; try to run the query using regular expressions. The following error is noticed: > select `col.*` from regex_test.regex_test_tbl;
FAILED: SemanticException [Error 10004]: Line 1:7 Invalid table alias or column reference 'col.*': (possible column names are: col1, col2, col3, timestamp)
Now set the hive.support.quoted.identifiers=none and execute the above query. > set hive.support.quoted.identifiers=none;
> select `col.*` from regex_test.regex_test_tbl;
+----------------------+----------------------+----------------------+
| regex_test_tbl.col1 | regex_test_tbl.col2 | regex_test_tbl.col3 |
+----------------------+----------------------+----------------------+
| 1 | Ranga | 23000.0 |
| 2 | Nishanth | 38000.0 |
| 3 | Raja | 18000.0 |
+----------------------+----------------------+----------------------+
We can also select a specific column name while querying the data. > select `timestamp` from regex_test.regex_test_tbl;
+--------------------------+
| timestamp |
+--------------------------+
| 1988-01-06 00:59:59.345 |
| 2018-05-29 17:32:59.345 |
| 2067-05-29 17:32:59.345 |
+--------------------------+
Spark
Launch the spark shell and execute the following query to select the regex_test.regex_test_tbl table data. # spark-shell
scala> spark.sql("select * from regex_test.regex_test_tbl").show(truncate=false)
+----+--------+-------+-----------------------+
|col1|col2 |col3 |timestamp |
+----+--------+-------+-----------------------+
|2 |Nishanth|38000.0|2018-05-29 17:32:59.345|
|1 |Ranga |23000.0|1988-01-06 00:59:59.345|
|3 |Raja |18000.0|2067-05-29 17:32:59.345|
+----+--------+-------+-----------------------+
Without setting spark.sql.parser.quotedRegexColumnNames=true, try to run the query using regular expressions. We will get the following error: scala> spark.sql("select `col.*` from regex_test.regex_test_tbl").show(false)
org.apache.spark.sql.AnalysisException: cannot resolve '`col.*`' given input columns: [spark_catalog.regex_test.regex_test_tbl.col1, spark_catalog.regex_test.regex_test_tbl.col2, spark_catalog.regex_test.regex_test_tbl.col3, spark_catalog.regex_test.regex_test_tbl.timestamp]; line 1 pos 7;
'Project ['`col.*`]
+- SubqueryAlias spark_catalog.regex_test.regex_test_tbl
+- Relation[col1#203,col2#204,col3#205,timestamp#206] parquet
Now set the spark.sql.parser.quotedRegexColumnNames=true and execute the above query. scala> spark.sql("SET spark.sql.parser.quotedRegexColumnNames=true").show(false)
+---------------------------------------+-----+
|key |value|
+---------------------------------------+-----+
|spark.sql.parser.quotedRegexColumnNames|true |
+---------------------------------------+-----+
scala> spark.sql("select `col.*` from regex_test.regex_test_tbl").show(false)
+----+--------+-------+
|col1|col2 |col3 |
+----+--------+-------+
|2 |Nishanth|38000.0|
|1 |Ranga |23000.0|
|3 |Raja |18000.0|
+----+--------+-------+
We can also select a specific column name while querying the data. scala> spark.sql("select `timestamp` from regex_test.regex_test_tbl").show(false)
+-----------------------+
|timestamp |
+-----------------------+
|2018-05-29 17:32:59.345|
|1988-01-06 00:59:59.345|
|2067-05-29 17:32:59.345|
+-----------------------+ Note: Currently in Spark, there is a limitation while selecting the data with the alias name. scala> spark.sql("select `col1` as `col` from regex_test.regex_test_tbl").show(false)
org.apache.spark.sql.AnalysisException: Invalid usage of '*' in expression 'alias';
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:43)
at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:95)
References:
REGEX Column Specification
SPARK-12139
Thanks for visiting this article and happy learning !!
... View more
Labels:
05-10-2021
11:15 PM
4 Kudos
In this article, we will learn how to integrate Spark with Hive JdbcStorageHandler in CDP.
JdbcStorageHandler
By using the JdbcStorageHandler, we can connect Apache Hive to JDBC Data Sources (MySQL, PostgreSQL, Oracle, DB2, or Derby etc). Currently writing to a JDBC data source is not supported. To use JdbcStorageHandler, we need to create an external table using JdbcStorageHandler.
Example:
CREATE EXTERNAL TABLE employee
(
id BIGINT,
name STRING,
age INT,
salary DOUBLE
)
STORED BY 'org.apache.hive.storage.jdbc.JdbcStorageHandler'
TBLPROPERTIES (
"hive.sql.database.type" = "MYSQL",
"hive.sql.jdbc.driver" = "com.mysql.jdbc.Driver",
"hive.sql.jdbc.url" = "jdbc:mysql://localhost/test",
"hive.sql.dbcp.username" = "username",
"hive.sql.dbcp.password" = "password",
"hive.sql.table" = "EMPLOYEE",
"hive.sql.dbcp.maxActive" = "1"
);
Supported Data Types:
The column data type for a Hive JdbcStorageHandler table can be:
Numeric data type: byte, short, int, long, float, double
Decimal with scale and precision
String data type: string, char, varchar
Date
Timestamp
Note: Complex data type: struct, map, array are not supported.
The following are the steps to integrate with MySQL database:
Create the employee mysql table: mysql> create database test;
Query OK, 1 row affected (0.00 sec)
mysql> use test;
Database changed
mysql> drop table if EXISTS test.EMPLOYEE;
mysql> CREATE TABLE test.EMPLOYEE(
id INT,
name varchar(255),
salary DECIMAL,
dob DATE NOT NULL DEFAULT '2021-05-01',
doj TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP(),
PRIMARY KEY (id)
);
Query OK, 0 rows affected (0.01 sec)
Insert the data into the employee table: mysql> INSERT INTO test.EMPLOYEE (id, name, salary, dob, doj) VALUES (1, "Ranga", 10000.00, '1988-06-01', '2020-03-16 09:00:01.000000');
Query OK, 1 row affected (0.01 sec)
mysql> INSERT INTO test.EMPLOYEE (id, name, salary,dob) VALUES (2, "Nishanth", 50000.00, '2018-05-29');
Query OK, 1 row affected (0.01 sec)
mysql> INSERT INTO test.EMPLOYEE (id, name, salary) VALUES (3, "Raja", 30000.00);
Query OK, 1 row affected (0.01 sec)
Check the data in MySQL employee table: mysql> select * from test.EMPLOYEE;
+----+----------+--------+------------+---------------------+
| id | name | salary | dob | doj |
+----+----------+--------+------------+---------------------+
| 1 | Ranga | 10000 | 1988-06-01 | 2020-03-16 09:00:01 |
| 2 | Nishanth | 50000 | 2018-05-29 | 2021-05-01 09:02:33 |
| 3 | Raja | 30000 | 2021-05-01 | 2021-05-01 09:02:45 |
+----+----------+--------+------------+---------------------+
3 rows in set (0.00 sec)
Login to Beeline: #beeline -u jdbc:hive2://<hiveserver2_host>:<port>/<db>
beeline -u jdbc:hive2://localhost:10000/default
Create an employee table in hive using JdbcStorageHandler: > create database if not exists db_test;
INFO : OK
> use db_test;
INFO : OK
> DROP TABLE IF EXISTS db_test.employee;
> CREATE EXTERNAL TABLE db_test.employee(
id INT,
name STRING,
salary DOUBLE,
dob DATE,
doj TIMESTAMP
)
STORED BY 'org.apache.hive.storage.jdbc.JdbcStorageHandler'
TBLPROPERTIES (
"hive.sql.database.type" = "MYSQL",
"hive.sql.jdbc.driver" = "com.mysql.jdbc.Driver",
"hive.sql.jdbc.url" = "jdbc:mysql://localhost/test",
"hive.sql.dbcp.username" = "ranga",
"hive.sql.dbcp.password" = "ranga",
"hive.sql.query" = "select * from test.EMPLOYEE",
"hive.sql.dbcp.maxActive" = "1"
);
> show tables;
INFO : OK
+-----------+
| tab_name |
+-----------+
| employee |
+-----------+
Check the employee data in the Hive table: > select * from db_test.employee;
INFO : OK
+--------------+----------------+------------------+---------------+------------------------+
| employee.id | employee.name | employee.salary | employee.dob | employee.doj |
+--------------+----------------+------------------+---------------+------------------------+
| 1 | Ranga | 10000.0 | 1988-06-01 | 2020-03-16 09:00:01.0 |
| 2 | Nishanth | 50000.0 | 2018-05-29 | 2021-05-01 09:02:33.0 |
| 3 | Raja | 30000.0 | 2021-05-01 | 2021-05-01 09:02:45.0 |
+--------------+----------------+------------------+---------------+------------------------+
3 rows selected (0.349 seconds)
> !exit
Launch the spark-shell in CDP: Note: Copy the mysql-connector-java.jar to any location and provide it in spark-shell. In my case i have copied mysql-connector.jar to /usr/share/java/ location: spark-shell --master yarn --jars /opt/cloudera/parcels/CDH/lib/hive/lib/hive-jdbc-handler.jar,/usr/share/java/mysql-connector-java.jar
Select the employee table data using spark-sql: scala> spark.sql("select * from db_test.employee").show(truncate=false)
Hive Session ID = 4e572b40-76c2-4991-b41e-ee7830c9bff7
+---+--------+-------+----------+-------------------+
|id |name |salary |dob |doj |
+---+--------+-------+----------+-------------------+
|1 |Ranga |10000.0|1988-06-01|2020-03-16 09:00:01|
|2 |Nishanth|50000.0|2018-05-29|2021-05-01 09:02:33|
|3 |Raja |30000.0|2021-05-01|2021-05-01 09:02:45|
+---+--------+-------+----------+-------------------+
References:
JDBC Storage Handler
Hive query SQL using jdbcstoragehandler
Thank You!
... View more
Labels:
05-04-2021
02:18 AM
Hi @Sugumar As we discussed in https://community.cloudera.com/t5/Support-Questions/Java-Spark-driver-and-executor-logs-in-cluster-mode/m-p/315859#M226599 post, if your cluster is DSE then please check DSE team. If it is yarn then u can use following command: yarn logs -applicationId <Application_ID> > application_id.log
... View more
05-04-2021
01:48 AM
Hi @Sugumar I don't have much idea on DSE cluster. Please check the following link maybe it will help https://docs.datastax.com/en/dse/6.7/dse-admin/datastax_enterprise/spark/sparkLogging.html
... View more
05-03-2021
11:56 PM
Hi @Sugumar When you run application in client mode, you can see driver logs in your console (where you have submitted application) and executor logs in respected container. But in the case of cluster mode, spark driver will be launched in one of the container. So you will not see driver logs in console. To get the logs in yarn, yarn logs -applicationId <Application_ID> > application_id.log
... View more