Member since
02-25-2016
72
Posts
34
Kudos Received
5
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
2454 | 07-28-2017 10:51 AM | |
1826 | 05-08-2017 03:11 PM | |
717 | 04-03-2017 07:38 PM | |
1680 | 03-21-2017 06:56 PM | |
571 | 02-09-2017 08:28 PM |
02-13-2018
02:13 AM
@Dinesh Chitlangia
... View more
02-13-2018
02:13 AM
hi Team, I am trying to stream log files from Kafka cosumer to hive using Spark in python. It is throwing me below error 8/02/12 20:41:18 WARN AbstractLifeCycle: FAILED SelectChannelConnector@x.x.x.x:4040: java.net.BindException: Address already in use java.net.BindException: Address already in use 8/02/12 20:41:18 WARN AbstractLifeCycle: FAILED org.spark-project.jetty.server.Server@xxxxxx: java.net.BindException: Address already in use java.net.BindException: Address already in use at sun.nio.ch.Net.bind0(Native Method) File "/sourcefiles/streamingspkf.py", line 16, in <module> b3 = hc.createDataFrame(b2) File "/usr/hdp/current/spark-client/python/lib/pyspark.zip/pyspark/sql/context.py", line 425, in createDataFrame File "/usr/hdp/current/spark-client/python/lib/pyspark.zip/pyspark/sql/context.py", line 338, in _createFromLocal TypeError: 'TransformedDStream' object is not iterable 18/02/12 20:41:20 INFO SparkContext: Invoking stop() from shutdown hook Any idea what is leading to this. Please let me know if you need any further information.
... View more
Labels:
- Labels:
-
Apache Hive
-
Apache Kafka
-
Apache Spark
01-30-2018
08:32 PM
Thank you @Ramya Jayathirtha I downloaded jar file from maven and executed,it went through this. Now its blocked different error File "/usr/hdp/current/spark-client/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main process() File "/usr/hdp/current/spark-client/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/usr/hdp/current/spark-client/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream vs = list(itertools.islice(iterator, batch)) File "/sourcefiles/spkf.py", line 9, in <lambda> parsed = kafkaStream.map(lambda v: json.loads(v[1])) File "/usr/lib64/python2.7/json/__init__.py", line 338, in loads return _default_decoder.decode(s) File "/usr/lib64/python2.7/json/decoder.py", line 366, in decode obj, end = self.raw_decode(s, idx=_w(s, 0).end()) File "/usr/lib64/python2.7/json/decoder.py", line 384, in raw_decode raise ValueError("No JSON object could be decoded") ValueError: No JSON object could be decoded
... View more
01-30-2018
07:16 PM
Hi Team, I am trying to copy streaming data from Kafka topic to HDFS directory. It is throwing an error 'java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper' Any kind of assistance to help me resolve is much appreciated. Here are the steps I followed Step 1 - Created topic -> topicXYZ STep 2 - created producer and linked to topicXYZ Step 3 - created consumer and linked to topicXYZ => pyspark program to stream and copy data to HDFS directory from pyspark import SparkContext from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils import json sc = SparkContext(appName="kafksteststreaming") sc.setLogLevel("WARN") ssc = StreamingContext(sc, 60) kafkaStream = KafkaUtils.createStream(ssc, 'xxxx:2181', 'raw-event-streaming-consumer', {'topicXYZ':1})parsed = kafkaStream.map(lambda (k, v): json.loads(v)) parsed.saveAsTextFiles('/tmp/folderxyz') ssc.start() ssc.awaitTermination() spark-submit --jars /usr/hdp/current/spark-client/lib/spark-assembly-*.jar spkf.py The above code is throwing error Spark Streaming's Kafka libraries not found in class path. Try one
of the following. Include the Kafka library and its dependencies with in the spark-submit command $ bin/spark-submit --packages org.apache.spark:spark-streaming-kafka:1.6 Download the JAR of the artifact from Maven Central http://search.maven.org/,
Group Id = org.apache.spark, Artifact Id =
spark-streaming-kafka-assembly, Version = 1.4.0. Then, include the jar
in the spark-submit command as
... View more
Labels:
- Labels:
-
Apache Kafka
-
Apache Spark
11-09-2017
06:53 PM
1 Kudo
Hi Team, I have a requirement to read an existing hive table, massage few columns and overwrite back the same hive table. Below is the code lp=hc.sql('select * from logistics_prd') adt=hc.sql('select * from senty_audit.maintable') cmb_data=adt.unionAll(lp) cdc_data=cmb_data.distinct() cdc_data.write.mode('overwrite').saveAsTable('senty_audit.temptable') In step 2 I am reading senty_audit.maintable from hive. Then I am joining with other dataframe, in last step I am trying to load back(OVERWRITE) to same hive table. In this case spark is throwing an error 'org.apache.spark.sql.AnalysisException: Cannot insert overwrite into table that is also being read from'. Can you please help me understand how should I proceed in this scenario.
... View more
Labels:
- Labels:
-
Apache Hive
-
Apache Spark
11-02-2017
07:19 PM
Hi Team, Is there a way to write KAFKA messages to HDFS. Any kind of suggestions owuld be of great help. Thank you
... View more
Labels:
11-01-2017
06:28 PM
Hi team, I am looking to convert a unix timestamp field to human readable format. Can some one help me in this. I am using from unix_timestamp('Timestamp', "yyyy-MM-ddThh:mm:ss"), but this is not working. Any suggestions would be of great help
... View more
Labels:
- Labels:
-
Apache Ambari
10-25-2017
03:28 PM
@Aditya Sirna thank you for your response. I have tried that too, this way it is still creating a folder with name 'finename.txt' and one single file 'part-00000' file in the folder. Kindly let me know if there is a way to create a file directly on specified path
... View more
10-25-2017
03:01 PM
Hi Team, I am using python spark 1.6 trying to save RDD to hdfs path, when I am giving command rdd.saveAsTextFile('/path/finename.txt') it is saving directory and files in it. It is creating directory in specified path instead of creating file Can you suggest a way to create a file in hdfs specified path instead of directory and files in it.
... View more
Labels:
- Labels:
-
Apache Spark
10-19-2017
03:15 AM
1 Kudo
rdd.sortByKey() sorts in ascending order. I want to sort in descending order. I tried rdd.sortByKey("desc") but it did not work
... View more
- Tags:
- Hadoop Core
- rdd
- Spark
Labels:
- Labels:
-
Apache Spark
10-13-2017
07:22 PM
@Dinesh Chitlangia Thank you for explanation. In that case i would rather use reducByKey() to get the number of occurence. thanks for the info on CountByValue()
... View more
10-13-2017
06:02 PM
1 Kudo
Hi Team, I am working on to get count out number of occurence in a file. RDD1 = RDD.flatMap(lambda x:x.split('|')) RDD2 = RDD1.countByValue() I want to save the output of RDD2 to textfile. i am able to see output by for x,y in RDD2.items():print(x,y) but when tried to save to textfile using RDD2.saveAsTextFile(\path) it is not working. it was throwing as 'AttributeError: 'collections.defaultdict' object has no attribute 'saveAsTextFile'' Can you please help me understanding if i am missing something here. Or how to save countByValue to text file
... View more
Labels:
- Labels:
-
Apache Spark
10-10-2017
01:13 PM
1 Kudo
Hi Team, I was trying to load dataframe to hive table, bucket by one of the column. I am facing error. File "<stdin>", line 1, in <module> AttributeError: 'DataFrameWriter' object has no attribute 'bucketBy' Here is the statement I am trying to pass rs.write.bucketBy(4,"Column1").sortBy("column2").saveAsTable("database.table") Can you please help me out in this
... View more
Labels:
- Labels:
-
Apache Spark
09-18-2017
06:16 PM
@rahul gulati - to address your scenario, you will have to consider imposing Partitions wisely on source table Based on data size on each partition, you can load data in chunks to target table hope this helps
... View more
08-30-2017
07:01 PM
1 Kudo
Hello Team, Can some one please help me understand the comparison/difference between Hive CLI and beeline.
... View more
Labels:
- Labels:
-
Apache Hive
08-15-2017
02:05 PM
1 Kudo
@ Joe Olson Here you are trying to join large datasets, one of which is low billions lets assume table1: record count 800 Million , 20 columns, data size 4GB table2: record count 4 Million, 10 columns 500 MB When you are trying to Inner/outer join on these 2 tables select A.*, B.Col2 from table1 A, table2 B where A.Col1 = B.Col1; when we execute such query YARN will join both datasets and generate huge internal dataset which would be greater than 10 - 15 GB to avoid above scenario, we will split query get key columns from both tables Table1 - Column 1 Table2 - Column 2 and we are interseted in getting column2 from table2. This would require to create few temporary tables Create Table1_temp as Select Col1 from Table1 Create Table2_temp as Select Col2 from Table2 Crete Table_columnsfromTable2 Select A.Col1, B.Col2 from table1 A, table2 B where A.Col1 = B.Col1; this would certainly generate lesses data set while performing join Now join Table_columnsfromTable2 temporary table with Table1, to get your desired output select A.*, B.Col2 from table1 A, Table_columnsfromTable2 B where A.Col1 = B.Col1; Tuning of HQL varies for each scenario In some cases we can partition and have buckets. Some cases may need to look into if incoming data is sorted, number of files. If possible create Temporary table.
... View more
07-28-2017
11:04 AM
1 Kudo
We generally encounter such errors when the delimiter specified in command doesn't match the delimiter in input file. Also make sure you are giving complete and right path of file Please try below syntax load '/path_to_file' using PigStorage('|') as (aa,bb,cc,dd,ee);
... View more
07-28-2017
10:51 AM
1 Kudo
Time taking for Query execution depends on multiple factors 1. Mainly the Hive query design, joins and the columns being pulled 2. YARN/TEZ container size allocated, depends where you are running 3. Check the queue you are running your job, check if queue is free to answer your question on why one of the reducer is taking 1000 tasks please the hive.exec.reducers.max value defined If you want to play and modify the number of reducers, try changing the value of hive.exec.reducers.bytes.per.reducer(preferably assign a smaller, as this value is inversely proportional to number of reducers)
... View more
07-26-2017
05:54 PM
1 Kudo
Hi, I was running Hive query, which initiated 2 reducers. It was running for around 15 mins, later job was killed. We have check TEZ view for logs, where we found that Job was killed due to Interrupted while waiting for task to complete. Here is the message in logs 'Got a shouldDie notification via heartbeats for container container_XXXX' Can you please help me in understanding under which scenario would I get such message
... View more
Labels:
- Labels:
-
Apache Hive
07-25-2017
12:07 AM
1 Kudo
Hi Experts, I am trying to create Ambari Pig View. It is throwing error Can you please help me understand where did I go wrong
... View more
Labels:
- Labels:
-
Apache Ambari
-
Apache Hadoop
07-21-2017
09:19 PM
1 Kudo
@Varun Please see the below to control number of Reducers setting MAPRED.REDUCE.TASKS = -1 -- this property lets Tez determine the no of reducers to initiate hive.tez.auto.reducer.parallelism = true; --this property is enabled to TRUE, hive will estimate data sizes and set parallelism estimates. Tez will sample source vertices, output sizes and adjust the estimates at run time
this is the 1st property that determines initial number of reducers once Tez starts the query hive.tex.min.partition.factor =0.25;-- when auto parallelism enable, this property will be used to put a lower limit to number of reducers that Tez specified 1. hive.tez.max.partition.factor - 2.0; -- this property specifies,over-partition data in shuffle edges 2.hive.exec.reducers.max by default is 1099 --max number of reducers 3.hive.exec.reducers.bytes.per.reducer = 256 MB; which is 268435456 bytes Now to calculate the number of reducers we will need to put altogether, along with this formula
also from Explain plan we will need to get the size of output, lets assume 200,000 bytes Max(1, Min(hive.exec.reducers.max [1099], Reducer Stage estimate/hive.exec.reducers.bytes.per.reducer)) x hive.tez.max.partition.factor [2]
Max(1, Min(1099, 200000/268435456)) x 2 =MAX(1,min(1099,0.00074505805)) X 2 =MAX(1,0.0007) X 2 = 1 X 2
= 2
Tez will spawn 2 Reducers. In this case we can legally make Tez initiate higher number of reducers by modifying value of hive.exec.reducers.bytes.per.reducer
by setting it to 20 KB =Max(1,min(1099,20000/10432)) X 2 =Max(1,19) X 2 = 38 Please note higher number of reducers doesn't mean better performance
... View more
07-21-2017
08:44 PM
1 Kudo
@Varun R Optimization varies in every case. Depends on incoming data, file size. In general please use these setting for fine tuning Enable predicate pushdown (PPD) to filter at the storage layer: SET hive.optimize.ppd=true; SET hive.optimize.ppd.storage=true Vectorized query execution processes data in batches of 1024 rows instead of one by one: SET hive.vectorized.execution.enabled=true; SET hive.vectorized.execution.reduce.enabled=true; Enable the Cost Based Optimizer (COB) for efficient query execution based on cost and fetch table statistics: SET hive.cbo.enable=true; SET hive.compute.query.using.stats=true; SET hive.stats.fetch.column.stats=true; SET hive.stats.fetch.partition.stats=true; Partition and column statistics from fetched from the metastsore. Use this with caution. If you have too many partitions and/or columns, this could degrade performance. Control reducer output: SET hive.tez.auto.reducer.parallelism=true; Partition table based on necessary column, also bucket the tables(wisely identify the column) Also depends on how you want to tune your Query, based on Explain Plan. Please check number of Mappers and Reducers spawnned.
... View more
07-21-2017
06:33 PM
2 Kudos
@Simran Kaur I see in stdout as Oozie launcher failed. Are you trying to run Hive action in Oozie. If that's the case please use command (yarn logs -applicationId application_1499692338187_45811)to get logs, or follow the below KB article to trace for logs to debug further https://community.hortonworks.com/articles/9148/troubleshooting-an-oozie-flow.html
... View more
07-21-2017
06:14 PM
2 Kudos
@Helmi Khalifa Please use below snytax to load data from hdfs to hive tables LOAD DATA INPATH '/hdfs/path' OVERWRITE INTO TABLE TABLE_NAME; In case if you are trying to load to a specific partition of the table LOAD DATA INPATH '/hdfs/path' OVERWRITE INTO TABLE TABLE_NAME PARTITION (ds='2008-08-15');
... View more
07-14-2017
06:19 PM
2 Kudos
For every Reducer certain number of tasks are created. Can someone explain what is the factor which decides number of tasks to be created for each reducer
... View more
Labels:
- Labels:
-
Apache Hadoop
-
Apache YARN
06-23-2017
02:05 AM
get hdfs path where hive table files are stored. Use hdfs dfs -du -s -h /hdfs_path to get size in readable format.
... View more
06-22-2017
07:32 PM
after adding this setting, am getting the same explain plan. Nothing additional
... View more
06-22-2017
06:33 PM
2 Kudos
Am trying to understand Hive Explain plan. Sample explain plan for my query is as below Vertex dependency in root stage
Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 2 (SIMPLE_EDGE), Map 3 (BROADCAST_EDGE)
Reducer 3 <- Map 4 (SIMPLE_EDGE), Map 5 (BROADCAST_EDGE), Reducer 2 (SIMPLE_EDGE)
Reducer 4 <- Map 6 (SIMPLE_EDGE), Map 7 (BROADCAST_EDGE), Map 8 (BROADCAST_EDGE), Reducer 3 (SIMPLE_EDGE). Can some one help me in understanding what is SIMPLE_EDGE and BROADCAST_EDGE. What should I interpret from BROADCAST_EDGE and SIMPLE_EDGE?
... View more
Labels:
- Labels:
-
Apache Hive
06-14-2017
08:22 PM
1 Kudo
Couldnt Agree More with @Dinesh Chitlangia Same is the case with WHERE clause. If you have alias in WHERE clause we
will facing similarerror. Reason being here clause is evaluated before the select clause, select col1 as c1, col2 as c2 from test_table where c1 = "ABC"; we make it work by re tweaking above query as
select * from (
select col1 as c1, col2 as c2 from test_table
) t1 where c1 = "ABC";
... View more