Member since
09-23-2015
800
Posts
897
Kudos Received
185
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
1991 | 08-12-2016 01:02 PM | |
1194 | 08-08-2016 10:00 AM | |
1177 | 08-03-2016 04:44 PM | |
2455 | 08-03-2016 02:53 PM | |
684 | 08-01-2016 02:38 PM |
08-15-2016
08:39 AM
Linkedin? There is only one Benjamin Leonhardi there
... View more
08-12-2016
10:10 PM
@jovan karamacoski I think you might want to contact us for a services engagement. I strongly suspect that what you want to achieve and what you asking about are not compatible. On hadoop normally some files will be hot not specific blocks. And files will be per definition widely distributed across nodes. So moving specific "hot" drives will not make you happy. Also esp. If you write having some nodes with more network than others doesn't sound like a winning combination. Since slow nodes will be a bottleneck and it's all linked together. That's how hdfs works. If you want some files to be faster you might want to look at hdfs storage tiering. Using that you could put "hot" data on fast storage like ssds. You could also look at node labels to put specific applications on fast nodes with lots of cpu etc. But moving single drives ??? That will not make you happy. Per definitely hdfs will not care. One balancer later and all your careful planning is gone. Oh and lastly there is no online move of data nodes. You always need to stop a data node change the storage layout and start it again. It will send the updated block report to the Namenode.
... View more
08-12-2016
07:05 PM
1 Kudo
just use doAs=true make sure only hive can read the warehouse folder and you are done. Hive cli can start but not access anything
... View more
08-12-2016
01:02 PM
1 Kudo
"If it runs in the Appmaster, what exactly are "the computed input splits" that jobclient stores into HDFS while submitting the Job ??" InputSplits are simply the work assignments of a mapper. I.e. you have the inputfolder /in/file1
/in/file2 And assume file1 has 200MB and file2 100MB ( default block size 128MB ) So the InputFormat per default will generate 3 input splits ( on the appmaster its a function of InputFormat) InputSplit1: /in/file1:0:128000000
InputSplit2: /in/file1:128000001:200000000
InputSplit3:/in/file2:0:100000000 ( per default one split = 1 block but he COULD do whatever he wants. He does this for example for small files where he uses MultiFileInputSplits which span multiple files ) "And how map works if the split spans over data blocks in two different data nodes??" So the mapper comes up ( normally locally to the block ) and starts reading the file with the offset provided. HDFS by definition is global and if you read non local parts of a file he will read it over the network but local is obviously more efficient. But he COULD read anything. The HDFS API makes it transparent. So NORMALLY the InputSplit generation will be done in a way that this does not happen. So data can be read locally but its not a necessary precondition. Often maps are non local ( you can see that in the resource manager ) and then he can simply read the data over the network. The API call is identical. Reading an HDFS file in Java is the same as reading a local file. Its just an extension to the Java FileSystem API.
... View more
08-12-2016
09:24 AM
2 Kudos
Hello jovan, Yes you can simply move a folder. Data nodes are beautifully simple that way. We just did it on our cluster. Stop hdfs, copy the folder to a new location and change the location in the ambari configuration. just try it with a single drive on a single node ( using ambari groups) ( you can do an hadoop fsck / to check for under replicated blocks after the test). A single drive will not lead to inconsistencies in any case. In general data nodes do not care where the blocks are as long as they still find the files with the right block id in the data folders. You can theoretically do it on a running cluster but you need to use ambari groups do it one server at a time and make sure you do it quickly so Namenode doesn't start to schedule large number of replica additions because of the missing data node ( hdfs waits a biy before it fixes under replication in case a data node just reboots)
... View more
08-11-2016
06:58 PM
Not sure what you mean. Do you want to know WHY blocks get under replicated? There are different possibilities for a block to vanish but by and large its simple: a) The block replica doesn't get written in the first place This happens during network or node failure during a write. HDFS will still return the write of a block as successful as long as one of the block replica writes was successful . So if for example the third designated datanode dies during the write process the write is still successful but the block will be under replicated. The write process doesn't care and they depend on the Namenode to schedule a copy later on. b) The block replicas get deleted later. That can have lots of different reasons. Node dies, drive dies, you delete a block file in the drive. Blocks after all are simple bog standard Linux files with a name blkxxxx which is the block id. They can also get corrupted ( HDFS does CRC checks regularly and blocks that are corrupted will be replaced with a healthy copy. And many more ... So perhaps you should be a bit more specific with your question?
... View more
08-11-2016
12:53 PM
1 Kudo
The namenode has a list of all the files blocks and block replicas in memory. A gigantic hashtable. Datanodes send block reports to it to give it an overview of all the blocks in the system. Periodically the namenode checks if all blocks have the desired replication level. If not it schedules either block deletion ( if the replication level is too high which can happen if a node crashed and was re added to the cluster ) or block copies.
... View more
08-09-2016
10:39 AM
No the expunge should happen immediately, although HDFS may take a bit till the datanodes actually get around to delete the files but it shouldn't take long. So expunge doesn't help? Weird 🙂
... View more
08-09-2016
09:26 AM
2 Kudos
You see that line:
16/08/0909:16:13 INFO fs.TrashPolicyDefault:Namenode trash configuration:Deletion interval =360 minutes,Emptier interval =0 minutes. Per default HDFS uses a trash. You can bypass this with rm -skipTrash or just delete the trash with hadoop fs -expunge
... View more
08-08-2016
10:00 AM
1 Kudo
Gopal and me gave a couple of tips in here to increase the parallelity ( since Hive is normally not tuned for cartesian joins and creates too few mappers ). https://community.hortonworks.com/questions/44749/hive-query-running-on-tez-contains-a-mapper-that-h.html#comment-45388 Apart from that my second point still holds you should create some pre-filtering to reduce the amount of points you need to compare. There are a ton of different ways to do this: https://en.wikipedia.org/wiki/Spatial_database#Spatial_index You can put points in grids and make sure that a data point in one grid entry cannot be closer to any point of the other grid entry than your max distance for example.
... View more
08-04-2016
10:32 AM
Why don't you want a hive external table? It is just a temporary entry without any significant overhead. You can also use OrcStorage in pig to write orc files directly. http://pig.apache.org/docs/r0.15.0/func.html#OrcStorage Similar functions are available for spark Or you might be able to write a custom Mapreduce function using an ORC outputformat.
... View more
08-04-2016
10:22 AM
Puh I think you need to make that decision yourself. A single big application will always be more efficient i.e. faster. You can also modularize a Spark project so working on a single task doesn't change the code from the others. However it becomes complex and you as said need to stop/start the application whenever you make a change to any part of it. Also if you use something CPU heavy as Tika the overhead of a separate topic in the middle doesn't sound too big anymore. So I think I would also strive for something like Input sources -> Parsing, Tika, normalization -> Kafka with a normalized document format ( JSON? ) -> Analytical application. But its hard to make an intelligent answer from a mile away 🙂 .
... View more
08-04-2016
09:29 AM
Yeah see above. I think you just have to have a client like Hive that opens a TezClient, creates an Application master and then submits more DAGs to it. Specifically in Hive you have per default one Tez session per jdbc connection. So if you run multiple queries over the same jdbc connection they use the same Tez client, same Tez session and as long as the timeout is not reached the same application master. Yes I think it sounds a bit more magical than it is, the reuse is just the session mode where the client can send multiple DAGs to the same Tez AM. As said in LLAP you will have shared long running processes that can be discovered so its a bit different. http://hortonworks.com/blog/introducing-tez-sessions/
... View more
08-04-2016
09:27 AM
Yeah I have to say I didn't look into the hive code so I am not sure if you can actually "find" running Tez applications and attach to them. I think its just the TezClient being kept open in hive server/ pig whatever and then submitting more DAGs to the existing AM. But there might be ways for discovery. But basically Tez doesn't take over much of what yarn does. This will be a bit different with LLAP. Which is like a big yarn container running multiple tez tasks. That one will have some workload management, scheduling etc. https://tez.apache.org/releases/0.7.1/tez-api-javadocs/org/apache/tez/client/TezClient.html
... View more
08-03-2016
04:44 PM
2 Kudos
So he is definitely correct if you have a single application when you purely look at performance. You essentially get rid of the second kafka overhead. However there are situations where a second set of queues can be helpful: You decouple parsing and cleansing from analysis which has big advantages: - So you can have one parser app and then multiple analytical applications you can start and stop as you please without impacting the other parser and analytical apps. - You can write simple analytical applications that take a parsed, cleaned subset of data they are interested in so people can consume data they actually want and don't have to worry about the initial parsing/cleansing etc.
... View more
08-03-2016
02:56 PM
yeah but without the error we cannot really help. I suppose you mean a classnotfound exception? So your udf uses a lot of exotic imports?
... View more
08-03-2016
02:53 PM
3 Kudos
Hi Shiva, Its a Tez client API call you would need to do to find already existing Application Masters of your user in the cluster. You can then hook up with them. The main user at the moment is Hive. Which utilizes it to reduce the startup cost of a query. Essentially each JDBC connection of hive session ( if enabled ) map to one application master in yarn. So when you run a query hive will check if an application master already exists ( using the tez client api calls ) and uses that AM. Or creates a new one otherwise.
... View more
08-01-2016
02:38 PM
1 Kudo
Hive was essentially a java library that kicks off MapReduce jobs. So the hive cli for example runs a full "server" in its client. If you have multiple clients all of them do their own SQL parsing/optimization etc. In Hive1 there was a thrift server which was like a proxy server for this. So a thrift ( data serialization framework ) client could connect to it instead of doing all the computations locally. All of that is not relevant anymore since Hiveserver2 has been the default for many years in all distributions and is a proper database server with concurrency/security/logging/workload management... You still have the hive client available but this will be deprecated soon in favor of beeline which is a command line client that connects to hiveserver2 through jdbc. This is desirable since the hive cli punches a lot of holes into central hive administration. So forget about hiveserver1 thrift server and thrift client.
... View more
08-01-2016
01:31 PM
@Steven Hirsch I think you can try it for one application, one possibility is to simply switch off ATS for a bit that helped me once but not a second time ( Tez still tries to log to it ) So if you really want to switch it off completely you can add the following settings: tez.history.logging.service.class
= org.apache.tez.dag.history.logging.impl.SimpleHistoryLoggingService
and to see the logs:
tez.simple.history.logging.dir=/mylogdir
Also removing the following ATSHooks hive.exec.pre.hooks=
hive.exec.post.hooks= Also potentially reduce log levels hive.tez.log.level=ERROR And see if it makes things faster. Again if you don't see a difference you may have other issues. But its worth to rule out. ATS 1.5 has been enabled in HDP2.4. Also ATS1.0 has some tuning options. If that is really your bottleneck Hortonworks support may be able to help.
... View more
08-01-2016
11:46 AM
Again see below the block size doesn't really change anything apart from the number of total tasks. You still have the same number of tasks running at the same time which are defined by the executor-cores. Can you do a top on the machines while the job is runnoing? If your CPU utilization is low you want to increase the executor-cores if they are close to max. reducing it a bit might be working ( to reduce task switching ) . However if that doesn't change anything. you might just have to live with the prformance. Or buy more nodes.
... View more
08-01-2016
10:49 AM
1 Kudo
Running UDFs in pig is what pig is for. You should fix that problem. Have you registered your jars? http://pig.apache.org/docs/r0.16.0/udf.html#udf-java There are other possibilities as well, Spark comes to mind esp. with python it can be relatively easy to setup ( although it also has its problems like python versions ) And there are some ETL tools that can utilize hadoop. But by and large pig with java udfs is a very straight forward way to do custom data cleaning on data in hadoop. There is no reason you shouldn't get it to work.
... View more
08-01-2016
10:44 AM
Just want to support that answer. a repartition is not bad in any case since I have seen some interesting characteristics with kafka producers. If you use round robin for them they send data to a random partition and switch them every 10 min or so. So it is possible that a single Partition in Kafka will randomly get ALL the data and blow your spark application up ( Flume kafka connector was my example ). A repartition after the KafkaStream fixed that. You can parametrize this based on the number of executors etc.
... View more
08-01-2016
09:42 AM
How about you just try it? I am pretty sure it will be the same but just make two CTAS tables and test it quickly.
... View more
07-31-2016
07:57 PM
The number of cores are the number of tasks "slots" in the executor. This is the reference to "you want 2-3x of physical cores". You want to make sure that at the same time spark runs more tasks than cores of the cpu ( there is hyper threading and some overhead). So assuming you have 15000 tasks and 100 executor cores in total spark will run them in 150 "waves". Think of it like a yarn in yarn. Now you also have vcores in yarn and the executor cores are translated into vcore requests but normally they are not switched on and purely ornamental. I.e. Yarn only uses memory for assignment )
... View more
07-30-2016
11:21 PM
Sounds like a bug to me. Support ticket? There was an issue with ATS1.5 but it should definitely be fixed 2.4.2
... View more
07-30-2016
08:38 PM
2 Kudos
You do not need to reduce the number of tasks to 2x cores you need to reduce the number of tasks that run AT THE SAME TIME to 2-3 per core ( so 5 * 16 * 2 = 160 ) . You also don't need to change block size or anything. Also Executors work best with 10-50GB of RAM so 24GB executors or so seem fine. You can then set the number of tasks running with the --executor-cores y flag. This means that an executor can run y tasks at the same time. In your case 16 * 2-3 might be a good value. So the 15000 tasks will be executed one after another. You can tune these parameters accordingly. ( You can also try two smaller executors per node since Garbage Collection for long running tasks is an actual concern but as said 24GB is not too much. ) http://enterprisesecurityinjava.blogspot.co.uk/2016/04/slides-from-running-spark-in-production.html
... View more
07-29-2016
09:24 PM
1 Kudo
I am pretty sure that Hive Strings are not max. 32k long. I think the limit is something like 2GB. I am pretty sure if there exists a lower limit then it will be specific to a client or something. But I will verify that when I come back. That link seems to collaborate that and hive.apache.org also doesn't give a max. http://stackoverflow.com/questions/35030936/is-there-maximum-size-of-string-data-type-in-hive Also since both Varchar and String are String values and use dictionaries I am pretty sure the serialization will be pretty much identical. So why would Varchar be better? As I said I don't know but I would be surprised if there was a real difference. I assumed VARCHAR is simply a layer on top of String that checks values during insert. https://cwiki.apache.org/confluence/display/Hive/LanguageManual+ORC#LanguageManualORC-StringColumnSerialization
... View more
07-29-2016
10:45 AM
@Steven Hirsch the big issue is that ATS 1.0 often couldn't keep up with 10s of queries per second on large clusters. And in some situations this limited the number of queries running in the cluster. Like really bad. Like cluster being empty because it would wait for ATS bad. There were some tuning options to make that better but by and large the single ATS server and single leveldb backend had limitations. So less aesthetic and more performance. In ATS 1.5 they made it better ATS 2.0 hopefully fixes that problem once and for all.
... View more
07-28-2016
04:20 PM
yeah if you set it to 7 days he should just start cleaning older values after restart ( potentially after hitting the clean period the interval_ms thing)
... View more
07-28-2016
01:09 PM
3 Kudos
1. Is there a way to restrict max size that users can use for Spark executor and Driver when submitting jobs on Yarn cluster? You can set an upper limit for all task ( yarn max allocation mb or similar in yarn-site.xml ). But there is no way I am aware of to specifically restrcit spark applications or applications in one queue. 2. What the best practice around determining number of executor required for a job? Its a good question. There was an interesting presentation about that. The conclusion for executor size is: "It depends but usually 10-40GB and 3-6 cores per executor is a good limit. " A max number of executors is not that easy it depends on the amount of data you want to analyze and the speed you need. So let's assume you have 4 cores per executor and he can run 8 tasks in each and you want to analyze 100GB of data and you say you want around 128MB or one block per executor so you would need a thousand tasks in total. To run them all at the same time you could go up to 100 executors for max. performance but you can also make it smaller. It would then be slower. Bottomline its not unlike a mapreduce task. If you want a rule of thumb then the upper limit is data amount / hdfs block size / number of cores per executor x 2. More will not help you much. http://www.slideshare.net/HadoopSummit/running-spark-in-production-61337353 Is there a max limit that users can be restricted to? You can use yarn to create a queue for your spark users. There is a yarn parameter user limit which allows you to restrict a single user from having more than a specific amount of a queue. user-limit = 0.25 for example would restrcit a user from taking more than 25% of the queue. Or you could give every user a queue. 3. How RM handles resource allocation if most of the resources are consumed by Spark jobs in a queue? How preemption is handled? Like with any other task in yarn? Spark is not special. Preemption with Spark will kill executors and that is not great for spark ( although it can survive it for a while. ) I would avoid preemption if I could
... View more