Member since
10-02-2017
112
Posts
71
Kudos Received
11
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
3098 | 08-09-2018 07:19 PM | |
3900 | 03-16-2018 09:21 AM | |
4038 | 03-07-2018 10:43 AM | |
1156 | 02-19-2018 11:42 AM | |
4029 | 02-02-2018 03:58 PM |
11-08-2017
10:39 PM
1 Kudo
Distributed System concepts are derived from the concepts working on a single Operating System (machine) , hence the motivation would be to understand the aspects on an Operating System which will help us understand the bigger and complex architecture of distributed system. Main Idea OS concepts Distributed System concept resource CPU , RAM , Network YARN filesystem NTFS , ext3 HDFS process java , perl, python process SPARK , MR database mysql NoSql authentication PAM module Knox authorization NSS module Ranger Holistically Security is based on foundation of
Authentication Authorization Audit Securely exchanging data during the above mentioned process is based on the concept of cryptography.
Symmetric-key Asymmetric-key Hashing
Symmetric-key
Same key is used to encrypt and decrypted the message . Example : AES 2. Asymmetric-key Message can be encrypted by public key and decrypted by private key or vice-versa. A message can never be encrypted and decrypted by private key only or Public key only. Private key is kept in a safe location and public key is distributed. A message encrypted with private key can be decrypted by any distributed public key Example : RSA encryption algorithm (PKCS#1), RSA encryption algorithm (PKCS#1) 3. Hashing
For a given message a consistent hash is generated From the hash the message can never be retrieved Example SHA DSA =============================================================================================== Lets focus on systems and mechanism which enable authentication and authorization on OS and of Services . ssl ssh Ldap kerberos PAM NSS. SSSD 1. SSL Server has a public-private key pair . Public key is shared with any client that wants to interact with server. Certificate Authority has a public-private key pair . The public key is distributed with anyone who wants it . Please check your browser Tools -> page info -> security on firefox to see the public key of known Certificate Authority Server gets a Certificate issued from the CA , CA takes the public key of server, url ... and generates a HASH out of it. It then encrypts the hash with its own private key. The server sends this certificate (which has the public key) to client , client can calculate the hash , it decrypts the encrypted hash present in the certificate and compares them, to check validity of the publick key. Client generates a random symmetric key and encrypts it with server public key which can only be encrypted by the server. The symmetric key is used for subsequent messages encryption. Asymmetric key is only used for intial exchange of symmetric key. Do remember server cannot encrypt any data with its own private key as it can be decrypted by anyone having its public key Symmetric key is used for secure data transfer. KeyStore (Server side , private key + signed publickey certificate ) and Trustore (client side +. CA public key certificate)
1)First and major difference between trustStore and keyStore is that
trustStore is used by TrustManager and keyStore is used by KeyManager class
in Java. KeyManager and TrustManager performs
different job in Java, TrustManager determines whether remote
connection should be trusted or not i.e. whether remote party is who it claims
to and KeyManager decides which authentication credentials should be sent to the remote host for authentication during SSL
handshake. if you are an SSL Server you will use private key during key
exchange algorithm and send certificates corresponding to your public keys to
client, this certificate is acquired from keyStore. On SSL client side, if its
written in Java, it will use certificates stored in trustStore to verify
identity of Server. SSL certificates are most commonly comes as .cer file which
is added into keyStore or trustStore by using any key management utility e.g. keytool.
See my post How
to add certificates into trustStore for step by step guide on adding
certificates into keyStore or trustStore in Java.
2) Another difference between trustStore and keyStore in rather
simple terms is that keyStore contains private keys and required only if you
are running a Server in SSL connection or you have enabled client authentication on server side. On the other hand trustStore stores
public key or certificates from CA (Certificate Authorities) which is used to
trust remote party or SSL connection. Read more: http://javarevisited.blogspot.com/2012/09/difference-between-truststore-vs-keyStore-Java-SSL.html#ixzz4xy8qlhdd 2. SSH 1. Server has private-public key pair. When a client connects it fetches public key from the server. 2. Client has to accept the servers public key which eventually gets saved in the known_hosts file. 3. Client and server finalize on a symmetric key using classic Diffe-Hellman algorithm. 4. Please note using the above algorithm the symmetric key is known to both without ever being sent on wire. 5. Client sends password encrypted using the symmetric key for authentication . For password less authentication 1. Client generates a public-private key pair. Client public key is manually placed in authorized_key file of server. 2. In this case Server generates a random number encrypt with public key of client present in authorized_key and send it to client. 3. Client is able to decrypt that using its private key and re-encrypt using the syymetric key and send it back to server. 4. Server decrypts using the symmetric key and if found same as the original number , passwordless authentication succeeds. 5. For testing use testLink 3. Kerberos.
1. KDC is Key Distribution center which also has component of AS (Authentication Server) and TGS (Tickket Granting Server). 2. Client password is manually saved in KDC . 3. Client password is never sent over the network. 4. Client username is sent to initiate the process of interaction between client and Authentication Server. 5. AS send client a symmetric key encrypted with client password. 6. A Kerberos realm is a set of managed nodes that share the same
Kerberos database. 4. LDAP : The Lightweight Directory Access Protocol (LDAP; /ˈɛl) is an open, vendor-neutral, industry standard application protocol for accessing and maintaining distributed directory information services over an Internet Protocol (IP) network. [1] Directory services play an important role in developing intranet
and Internet applications by allowing the sharing of information about
users, systems, networks, services, and applications throughout the
network. Most important terms used are : 1. DN - distinguished name (unique path) 2. OU - Organizational Unit department 3. DC - Domain Component (not domain controller for once) com org 4. CN - Common Name end ================================================================================================ Authentication and authorization in OS PAM NSS. SSSD 1. PAM : PAM is a framework that assists applications in performing what I'll
call "authentication-related activities". The core pieces of PAM are a
library (libpam) and a collection of PAM modules, which are dynamically
linked libraries (.so) files in the folder /lib/security. PAM configuration files are stored in the /etc/pam.d/ directory. 2. NSS The Name Service Switch (NSS) is a facility in Unix-like operating systems
that provides a variety of sources for common configuration databases
and name resolution mechanisms. These sources include local operating
system files (such as /etc/passwd, /etc/group, and /etc/hosts), the Domain Name System (DNS), the Network Information Service (NIS), and LDAP. NSS depends on groups passwd and shadow file for authorization. Groups : https://www.cyberciti.biz/faq/understanding-etcgroup-file/ Shadow: https://www.cyberciti.biz/faq/understanding-etcshadow-file/ Passed : https://www.cyberciti.biz/faq/understanding-etcpasswd-file-format/ Both PAM and NSS can be linked to LDAP. LDAP also has a independent ldap client which can also be used to access LDAP. There is a possibility that a user doesn't exist locally on a Operating system but exist in LDAP . To helps to break things down like this in your head: NSS - A module based system for controlling how
various OS-level databases are assembled in memory. This includes (but
is not limited to) passwd , group , shadow (this is important to note), and hosts . UID lookups use the passwd database, and GID lookups use the group database. PAM - A module based system for allowing service
based authentication and accounting. Unlike NSS, you are not extending
existing databases; PAM modules can use whatever logic they like, though
shell logins still depend on the passwd and group databases of NSS. (you always need UID/GID lookups) The important difference is that PAM does nothing on its own. If an
application does not link against the PAM library and make calls to it,
PAM will never get used. NSS is core to the operating system, and the
databases are fairly ubiquitous to normal operation of the OS. Now that we have that out of the way, here's the curve ball: while pam_ldap is the popular way to authenticate against LDAP, it's not the only way. If shadow is pointing at the ldap service within /etc/nsswitch.conf ,
any authentication that runs against the shadow database will succeed
if the attributes for those shadow field mappings (particularly the
encrypted password field) are present in LDAP and would permit login. This in turn means that pam_unix.so can potentially
result in authentication against LDAP, as it authenticates against the
shadow database. (which is managed by NSS, and may be pointing at LDAP) If a PAM module performs calls against a daemon that in turn queries the LDAP database (say, pam_sss.so , which hooks sssd ), it's possible that LDAP will be referenced. SSSD trouble shooting. The sssd daemon acts as the spider in the web,
controlling the login process and more. The login program communicates
with the configured pam and nss modules, which
in this case are provided by the SSSD package. These modules
communicate with the corresponding SSSD responders, which in turn talk
to the SSSD Monitor. SSSD looks up the user in the LDAP directory, then
contacts the Kerberos KDC for authentication and to aquire tickets. (PAM and NSS can also talk to LDAP directly using pam_ldap and
nss_ldap respectively. However SSSD provides additional functionality.) Of course, a lot of this depends on how SSSD has been configured;
there lots of different scenarios. For example, you can configure SSSD
to do authentication directly with LDAP, or authenticate via Kerberos. The sssd daemon does not actually do much that cannot be
done with a system that has been "assembled by hand", but has the
advantage that it handles everything in a centralised place. Another
important benefit of SSSD is that it caches the credentials, which eases
the load on servers and makes it possible to go offline and still
login. This way you don't need a local account on the machine for
offline authentication. In a nutshell SSSD is able to provide what nss_ldap, pam_ldap, and pam_krb, and ncsd used to provide in a seamless way. Please follow this Link to start digging how Authentication ,Authorization and audit is provided for a cluster. Please do keep in mind that there are multiple ways to log onto the cluster and hence all the paths needs to made secured. 1. Ambari views 2. ssh onto a node 3. Login to a node through OS UI 4. Knox . All of the component should talk to a LDAP to maintain a predefined set of user and provide authorization and authentication using Ranger and Knox.
PFA : sssd.pdf
... View more
11-07-2017
02:53 PM
Basics about avro (which differentiates it with thrift) in light of missing schema registry 1. Avro serialized data has no schema saved in the file. 2. User has to provide schema, both at write and read time . 3. Avro provides utility to check for schema evolution consistency check , hence its onto the user to make sure the avro schema evolution is compatible . In your case You will have to provide the schema wile reading . 1. Avro will try its best effort to convert the data saved based on the "read" schema 2. If the read schema has some missing or extra fields , based on default value it will fulfilled or nullified 3. If " write" and "read" schema are incompatible You will get exception . SchemaCompatibility.SchemaPairCompatibility compatResult =SchemaCompatibility.checkReaderWriterCompatibility(newSchema, oldSchema); Please follow the link http://bytepadding.com/big-data/spark/avro/avro-schema-compatibility-test/
... View more
11-07-2017
01:10 PM
2 Kudos
1) Currently, I'm not using any combiner. My map phase output
<key,value> pair is <string/text,string/text>. As my value
is string/text in map phase output <key,value> pair, I think that
It will be difficult to write the combiner. Usually,the function of the
combiner is same as the reducer. Here, I'm not able to think of writing
the combiner for this particular problem. Is it possible to write key , List<String> , you can byte-serialize the value , or use thrift definition to have the values together in one structure. You are saving on not emitting the same key again . Use this if your specific use case permits. Mapper generating more data as comped to input means , it is emitting more records as received , which means key duplication is happening (Generally speaking) 2) Currently,we tried with this compression for map output "-D
mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.Lz4Codec".
Is this configuration enough to compress map output? Do we have to
modify or write some statements in our mapreduce code to use this
compression? conf.set("mapreduce.compress.map.output", "true")
conf.set("mapreduce.output.compression.type", "BLOCK");
conf.set("mapreduce.map.output.compression.codec", "Use one enabled in your cluster LZ4/SNAPPY/GzipCodec");
3) May i know that where do you get this rule of thumb "A reducer should process 1 GB of data" ? This is mostly used in all framework like Pig , as well as generally Reducer heap is in order of 1.5 GB . This is a default number. Please tune according to your needs . Time taken to process X amount of data = Time to spawn the process(Scheduling time ) + time to do IO from file + time to process the logic. 1 GB is the minimum size below which job spawning time is comparable to the time taken for processing. 4) When i have 24 logical cores in one data node, Why you have mentioned 20 * 7? I think that it should be 24*7? I left 4 *7 containers for other services and other jobs running in your cluster. (Always good to underestimate while doing calculations for performance ) 5) How to handle skewed key? Can i handle it using partitioner? Do we have any other way? There are Many ways , best is to have a look at your skewed key and come to logical conclusion 1. Look at Pig Skewed Join implementation (add salt to your key and then reduce twice , divide and conquer ) 2. Take top N events for a given key (If logic permits).
... View more
11-07-2017
11:33 AM
1 Kudo
Lets start with basics and try to answer your questions. 1. Split size = Hdfs block size by default . changing the split size will have a impact on the number of mappers and not reducers. 128 MB split size is good to start with. 2. Rule of thumb : A reducer should process 1 GB of data ideally going by this logic you should have : 2.5TB / 1 GB = 2500 Reducers , 3. you have 20 * 7 = 140 containers(available in one go ) to run reducer , running 2500 reducers will take 2500 / 140 = 17 rounds which is a lot . Hence I will fix My reducer to some where aound 800 to 900. 3. Your mappers is producing more data as intermediate , what are you doing in this step , can you use combiner(is it possible) to make this intermediate data small ? can you move some filter operation at map stage . 4. If your reducer are getting stuck at 26% there can be several reason 1. You have a skewed key which results in one reducer getting stuck 2. 26% means its is stuck at shuffle phase itself , which is stating one reducer is getting a lot of data(another indication of skewed joins) 3. Have you enabled compression for map output ?
... View more
11-02-2017
11:33 AM
Trying to Understand Spark from Map Reduce Perspective .
If we look at the history of ETL we started with SQL on RDBMS => Excel => Map Reduce => Pig/Hive => Spark . Its just like evolution of Programming Language from C => C++
=> Java . In each stage of evolution the problems of priors were
tackled and new features were added . Same holds true for the Evolution
of ETL(Extract Transform
and Load ) . When thinking of a problem its very important to understand what is
the level of abstraction we are trying to find the solution in. For Example
: reading keys from a file and doing a lookup from RDBMS we think in
terms of the logical flow of events rather than how File buffer is being
maintained by JAVA or Operating System. In Big Data Map Reduce makes us think at very low level , Pig/Hive
Makes us think at a higher abstraction level of ETL and later translates the them into lower level Map Reduce. Spark Abstraction works a higher abstraction similar to PIG/Hive and internally translating the ETL into optimized ETL tasks. The direct translation from Map Reduce to Spark is little difficult
as the level of abstraction at which they work is different, yet the
concepts remains the same . Please read through Spark Code analysis for getting crystal clear clarity Lets take Word Count Example from Map Reduce and Spark and try to realize whats happening . MapReduce Word Count : Mapper: public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
// As TextInput Format has been used ,
// the key is the offset of line in the file , The actual line goes in the value
// Reuse the writables, to avoid GC.
private final IntWritable one = new IntWritable(1);
private Text word = new Text();
@Override
public void map(LongWritable key, Text value,
Mapper.Context context) throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
//Writable.Set() replaces the previous content of
// the writable Object with the new content
word.set(tokenizer.nextToken());
context.write(word, one);
}
}
}
Reducer : public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
context.write(key, new IntWritable(sum));
}
} Spark Word Count JavaRDD<String> stringJavaRDD = javaSparkContext.textFile(inputPath);
stringJavaRDD
.flatMap(line -> Arrays.asList(line.split(" ")))
// New Tuple is being formed for every row the in the input
.mapToPair(word -> new Tuple2<String, Integer>(word, 1))
// The reduceByKey Api => only take the values , key is not fed in the api
// Before reduceByKey results into a shuffle hence breaking the DAG into stages
.reduceByKey((a,b)->a+b)
//.reduceByKey((count1, count2) -> count1 + count2)
// How many partitions to slpit the output into
.repartition(conf.getInt(conf.get(NUM_PARTITIONS), 1))
.saveAsTextFile(outputPath);
Map Reduce Word Count Spark Word Count InputFormat = TextInputFormat InputFormat = TextInputFormat. (Input and output Format comes from hadoop hence same) Mappers : Read data from Split job.setMapperClass(WordCountMapper.class); job.setInputFormatClass(TextInputFormat.class);
FileInputFormat.setInputPaths(job, inputPath); Read data from Split. Based on InputFormat used , spawned mapTask = Number of Splits. JavaRDD<String> stringJavaRDD = javaSparkContext.textFile(inputPath); In Mapper You Get Key and Value. Key = Line Offset , Value = Lines In Mapper
Task you get only values as there is no concept keys in Spark. One gets
only Values i.e the whole line known as JavaRDD. Do remember Its the same TextInputFormat and LineRecordReader. Spark just takes the value from the RecordReader Inside Mapper , Each Line is split into word. StringTokenizer tokenizer = new StringTokenizer(line); flatMap(line -> Arrays.asList(line.split(” “))) FlatMap
is just a transformation thats being applied to each input line. Its
just like writing a UDF in pig/Hive , for every Row this function is
called. A transformation can be attached to either Map Task or
Reduce Task or other transformation task . Transformation are just chaining of functions. Input line => function1 => function2 => function3 …. In Mapper for Each Word , attach 1 as the value . context.write(word, one); Again chain another transformation to get another transformed value JavaRDD => FlatMap => mapToPair mapToPair(word -> new Tuple2<String, Integer>(word, 1)). This generates Key, Value like Mapper in Map Reduce If You Look at Spark DAG all these transformation are happening inside the same Mapper Task On Reducer You Collect for a Word , all the counts . for (IntWritable value : values) {
sum += value.get();
} reduceByKey((a,b)->a+b) Just Like Map Reduce you do Reduce , In Spark the key is not part of reduce function . Just like Map Reduce reduceByKey results into shuffle and ReduceTask is executed . Num Reducer is set in the driver . job.setNumReduceTasks() repartition(conf.getInt(conf.get(NUM_PARTITIONS), 1)) write to HDFS
context.write(key, new IntWritable(sum)); write to HDFS
saveAsTextFile(outputPath); Central Object which holds all info about the job Context context JavaSparkContext sparkContext = new JavaSparkContext(sparkConf) OutPut FileFormat = TextOutputFormat OutPut FileFormat = TextOutputFormat We have Mapper And Reducer running to perform word count The Spark code is scanned and translated to Task (Mapper and Reducer) We have a separate Driver, Mapper, Reducer code in Map Reduce In Case Of Spark the driver + Task (Mp Reduce) are part of same code.
Whatever code (lambda functions) we write inside the transformations
(Flat Map , map, mapPartitions ) are instantiated on Driver , serialized
and sent to the executor to run as Task code . In Terms of Map Reduce we have Mapper and Reducer. Reducer leads to shuffle In Terms of spark Whenever a shuffle happen , a stage is formed .
And a job is divided in stages.Every time you do a shuffle a stage is created .
... View more
Labels:
10-31-2017
04:32 PM
1 Kudo
Please have a look at the code of FixedInputFormat as provided in the github. The basic criteria is that each record should be of the same length.
What it means is each record in your file should be of length "fixedlengthinputformat.record.length" and record includes the delimiter too . 1. Please do understand TextInputFormat was created for reading a file with records which are delimited. 2. There can be a file which has multiple "fixed length records" without any delimiter. We save on disk space as idea of delimiter is redundant in these files. Only record.length length determines where one record end and where the next starts . It looks a like a file with one big row hence we use FixedInputFormat . Two solutions : 1. provide fixedlengthinputformat.record.length in conf object and set it to 23. Remove the delimiter in the map method. <code>Configuration conf =newConfiguration(true);
conf.set("fs.default.name","file:///");
conf.setInt("fixedlengthinputformat.record.length",23);
job.setInputFormatClass(FixedLengthInputFormat.class);
2. Use TextInputFormat , but it will do no records length checks that they are of same length , which you will have to do inside your map method.
... View more
10-30-2017
04:14 PM
By order do you mean the sequence of row in pig output and hive output . If yes then that will never match. Pig writes into file in hdfs using MR / TEZ . This file is mapped to a hive table. When you run a query on hive select * from Table , hive spawns a Map only job to read the file. now which mapper completes first and display output is not determenistic.
... View more
10-25-2017
04:19 PM
1 Kudo
lets think of basics. RDD is being saved , which is a distributed across machines and hence, if all of them start writing to same file in HDFS , one can only append and write will undergo huge number of locks as multiple clients are writing at the same time. Its a classical case of distributed concurrent clients trying to write to a file ( imagine multiple threads write to same log file). That´s the reason a directory is made and individual task write in their own file. Collectively all the files present in your output directory is the output of your Job. Solutions : 1. rdd.coalesce(1).saveAsTextFile('/path/outputdir'), and then In your driver use hdfs mv to move part-0000 to finename.txt. 2. assuming data is less ( as you want to write to a single file ) perform a rdd.collect() and write on to hdfs in the driver , by getting a hdfs handler.
... View more
10-25-2017
12:12 PM
1. dfs -mv is the fastest as compare to -cp or distcp . If possible move mydirectory instead of mydirectory/* into /targetdirectory
... View more
10-25-2017
09:47 AM
"java.lang.OutOfMemoryError: Java heap space" error in driver. Code is something like this ... Can you please share some more info . Why is data flowing back to your driver resulting in OOM ?
... View more
- « Previous
- Next »