Member since
08-23-2017
5
Posts
0
Kudos Received
0
Solutions
10-03-2018
07:04 AM
We use Spark 2.1.0 on Yarn for batch elaboration of multiline records. Our job is written in Pyspark and runs once every day. The input folder contains ~45000 very small files (the range is 1kB-100kB each file), for a total of ~2GB. Every file contains a different number of multiline record. The first line of a record has a standard pattern, a timestamp followed by a greek µ and some other infos. For example:
28/09/2018 08:54:22µfirst record metadata
first record content with
undefined
number of
lines
28/09/2018 08:57:12µsecond record metadata
second record content
with a different
number of lines
This is how we read files in our Dataframe:
df=spark.sparkContext.binaryFiles(input_path).toDF(['filename', 'content'])
raw = df.select('filename', explode(split(df.content, r'(?=\d{2}\/\d{2}\/\d{4} \d{2}:\d{2}:\d{2}µ)'))).cache()
The first line's output is a dataframe with one entry for every file, the second line's output is a dataframe with one entry for every record. Dataframe is then cached and other operations are performed. We are actually testing the solution and this is the current deploy mode for the job (memory requirements, however, are oversized):
spark2-submit --master yarn \
--conf spark.kryoserializer.buffer.max=1g \
--deploy-mode cluster \
--driver-memory 16g \
--driver-cores 1 \
--conf spark.yarn.driver.memoryOverhead=1g \
--num-executors 20 \
--executor-memory 16g \
--executor-cores 1 \
--conf spark.yarn.executor.memoryOverhead=1g \
spark_etl.py
The job runs fine almost every day and it performs all its operations in 10-15 minutes, writing results to HDFS. The problem is, once every 7-10 days one of the ~45000 input files has a completely different size compared to the others: 100MB to 1GB (less than 2GB, anyway). In this case, our job (in particular, one of the executors) hangs and seems to be doing nothing the entire time. There are no new log lines after the first minutes. It takes hours and we never saw the end of these job, because we have to kill them before some hours. We suspect this is because of the "big" file, in fact the job runs fine if we remove it from the input folder. These are screenshots taken from our last run:
Pyspark documentation notes "Small files are preferred, large file is also allowable, but may cause bad performance.". We can accept a performance worsening, but we think this is not the case, because it seems to us that the job is simply doing nothing during the whole time.
Is a 200MB file really a "large file" in Spark point of view? If yes, how can we improve performances of our job, or at least understand if it is actually doing something?
Thank you,
Sergio
... View more
10-10-2017
04:57 AM
Hello, I'm trying to set a name for indexing jobs launched with cmd_hdp=$(
HADOOP_OPTS="-Djava.security.auth.login.config=jaas.conf" hadoop --config /etc/hadoop/conf.cloudera.yarn \
jar /opt/cloudera/parcels/CDH/lib/solr/contrib/mr/search-mr-*-job.jar \
org.apache.solr.hadoop.MapReduceIndexerTool \
-D morphlineVariable.ZK_HOST=myZKHost:2181/solr \
-D morphlineVariable.COLLECTION=myCollection \
-D mapreduce.map.memory.mb=8192 \
-D mapred.child.java.opts=-Xmx4096m \
-D mapreduce.reduce.java.opts=-Xmx4096m \
-D mapreduce.reduce.memory.mb=8192 \
--output-dir hdfs://myHdfsHost:8020/tmp/my/data \
--morphline-file morphlines/myMorphline.conf \
--log4j log4j.properties \
--go-live \
--collection myCollection \
--zk-host myZKHost:2181/solr \
hdfs://myHdfsHost:8020/my/data/
) Actual name is "org.apache.solr.hadoop.MapReduceIndexerTool/MorphlineMapper" but I'd like to change it. I already tried setting -D mapreduce.job.name=myName but it does not change.
... View more
08-29-2017
03:38 AM
08-29-2017
01:49 AM
Hi, yes I forgot to say that I'm a colleague of the guy in the first post. Thanks for your help, I'll try to be as clear as possible. Usualy, the "java.opts" parameters are set to 80% of the memory.mb one. Is there some specific reason you have set it to "50%" only ? We didn't find good documentation about this, we just set these parameters based on our experience. Are there resources about this? We upgraded our cluster since the first post. Now we have 8 workers, with a total of 200 vcores and 1,5TB of memory. How is configured your collection ? How many shards (and how many replica per shard) ? Our collections have 12 shards and 2 replicas per shard. The workload is balanced between machines, so there are 3 cores per machine. In the first post, it is said that the mapper phase takes 5 hours (for roughly 15GB of compressed data). What is the processing time of a "single mapper task" ? How many mapper tasks are launched in this phase ? Is the CPUs of the "worker nodes" overused during an indexation ? Or is it "idle" ? How is Solr handling the load at the end of the indexing process ? (when Solr is loading the data) Our source produces about 20GB of compressed data every day, split into about 550 compressed files. The number of map tasks is the same as the number of input files. We run one indexing task per day. A single MorphlineMapper's map task takes about 20 minutes to complete. Considering the total number of cores, with a full unloaded cluster, the map phase takes about 1 hour to complete. During this phase, the worker's CPUs are almost 100% loaded. The reduce phase takes almost 4 hours to complete. We tried two different approaches here: We started indexing without setting the "--reducers" parameter. In this case, this phase takes 24 cores and almost 3 hours. When it ends, the TreeMergeMapper starts, which takes almost 2 hours to complete. As far as I know, during this phases 24 "virtual shards" are created, then they are merged into the final 12 desired shards. To avoid the TreeMergeMapper job, we tried to set the number of reducers to 12 (the same as the number of shards). In this case, by the way, the MorphlineMapper's reduce phase takes 12 cores and almost 5 hours to complete. So, we can't see any significant improvement using this strategy. When MorphlineMapper job (and eventually the TreeMergeMapper one) ends, the "Current" sign in the Solr web ui's Statistics tab becomes red, meaning there is something going on. We can't keep track of this in yarn, and cpu and memory usage is not very high during this stage. What is it about? After 4-5 hours the sign returns green and the collection is fully available. What is the compression algorithm used ? Is it efficient (the trade off between compression rate and performance is acceptable?) ? 15GB of compressed data : how many lines does it represent ? (how many fields per line ?) Gzip is used to compress records, every compressed file contains a single txt. This is the way our source sends data. For 20GB of data we have about (550 compressed files)x(37 MB). Every file contains about 320000 records. Every record is made of 23 text fields, some of which are dynamic: <field name="id" stored="true" indexed="true" type="string" multiValued="false" required="true"/>
<field name="_version_" stored="true" indexed="true" type="long"/>
<field name="timestamp" indexed="true" stored='true' type="date"/>
<field name="file_name" stored="true" indexed="true" type="text_general"/>
<field name="cod_auth" default="null" stored="true" indexed="true" type="text_general"/>
<!-- field used for full text search -->
<field name="text" default="null" stored="true" indexed="true" type="text_general"/>
<dynamicField name="file_*" type="text_general" indexed="false" stored="false"/>
<dynamicField name="base_id*" type="text_general" indexed="false" stored="false"/>
<dynamicField name="*" type="text_general" indexed="true" stored="true"/> Is there some room to improve the processing time of the morphline script ? Is it efficient enough ? There is no "load_solr" instruction in the morphline ? Yes, we have a "loadSolr" instruction in the morphline: {
loadSolr {
solrLocator : ${SOLR_LOCATOR}
}
}
... View more
08-23-2017
02:07 AM
Hi, we are using the MapReduceIndexerTool and there are no network problems. We are reading compressed files from HDFS and decompressing them in our morphline. This is the way we run our script: cmd_hdp=$(
HADOOP_OPTS="-Djava.security.auth.login.config=jaas.conf" hadoop --config /etc/hadoop/conf.cloudera.yarn \
jar /opt/cloudera/parcels/CDH/lib/solr/contrib/mr/search-mr-*-job.jar \
org.apache.solr.hadoop.MapReduceIndexerTool \
-D morphlineVariable.ZK_HOST=hostname1:2181/solr \
-D morphlineVariable.COLLECTION=my_collection \
-D mapreduce.map.memory.mb=8192 \
-D mapred.child.java.opts=-Xmx4096m \
-D mapreduce.reduce.java.opts=-Xmx4096m \
-D mapreduce.reduce.memory.mb=8192 \
--output-dir hdfs://isilonhostname:8020/tmp/my_tmp_dir \
--morphline-file morphlines/my_morphline.conf \
--log4j log4j.properties \
--go-live \
--collection my_collection \
--zk-host hostname1:2181/solr \
hdfs://isilonhostname:8020/my_input_dir/ ) The MorphlineMapper phase takes all available resources, the TreeMergeMapper takes only a couple of containers. We don't need to make queries for the moment, we just need to index historical data. We are wondering if there is a way to speed up indexing time and then optimize collections for searching when indexing is complete. Thank you, Sergio
... View more