Member since
10-28-2016
392
Posts
7
Kudos Received
20
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
1764 | 03-12-2018 02:28 AM | |
2669 | 12-18-2017 11:41 PM | |
1852 | 07-17-2017 07:01 PM | |
1317 | 07-13-2017 07:20 PM | |
3886 | 07-12-2017 08:31 PM |
05-17-2019
05:56 AM
@Ravi Kumar Lanke , thanks for the response - when i run the same code on command-line, it works fine. When i run this on PyCharm, it is failing. $PATH, $JAVA_HOME, $SPARK_HOME, $PYTHON_PATH on command line & PyCharm is the same, I've tried setting it manually as well On PySpark Command Line : >>> os.environ['PATH']'
/Library/Frameworks/Python.framework/Versions/2.7/bin:/usr/local/bin:/usr/bin:/bin:/usr/sbin:/sbin:/opt/X11/bin:/usr/local/bin/scala/bin:/Users/karanalang/Documents/Technology/IncortaAnalytics/spark/bin:/Users/karanalang/Documents/Technology/kafka/kafka_2.11-0.9.0.1/bin:/Users/karanalang/Documents/Technology/maven/apache-maven-3.3.9//bin:/bin:/Users/karanalang/Documents/Technology/Storm/zookeeper/zookeeper-3.4.8/bin:/Users/karanalang/Documents/Technology/kafka/confluent-3.2.2/bin:/usr/local/etc/:/Users/karanalang/Documents/Technology/kafka/confluent-3.2.2/bin:/Users/karanalang/Documents/Technology/Gradle/gradle-4.3.1/bin:/Users/karanalang/Documents/Technology/HadoopInstallation-local/hadoop-2.6.5/bin:/Users/karanalang/Documents/Technology/HadoopInstallation-local/hadoop-2.6.5/sbin' >>> os.environ['JAVA_HOME']
'/Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/ >>> os.environ['SPARK_HOME']
'/Users/karanalang/Documents/Technology/IncortaAnalytics/spark' >>> os.environ['PYTHONPATH']
'/Users/karanalang/Documents/Technology/IncortaAnalytics/spark/python/lib/py4j-0.10.4-src.zip:/Users/karanalang/Documents/Technology/IncortaAnalytics/spark/python/:/Users/karanalang/Documents/Technology/IncortaAnalytics/spark/python/lib/py4j-0.10.4-src.zip:/Users/karanalang/Documents/Technology/IncortaAnalytics/spark/python/:'
On PyCharm : (' spark_home -> ', '/Users/karanalang/Documents/Technology/IncortaAnalytics/spark JAVA_HOME => /Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/
PATH => /Users/karanalang/PycharmProjects/Python2/venv/bin:/Library/Frameworks/Python.framework/Versions/2.7/bin:/usr/local/bin:/usr/bin:/bin:/usr/sbin:/sbin:/opt/X11/bin:/usr/local/bin/scala/bin:/Users/karanalang/Documents/Technology/Hive/apache-hive-3.1.0-bin/bin:/Users/karanalang/Documents/Technology/IncortaAnalytics/spark/bin:/Users/karanalang/Documents/Technology/kafka/kafka_2.11-0.9.0.1/bin:/Users/karanalang/Documents/Technology/maven/apache-maven-3.3.9//bin:/bin:/Users/karanalang/Documents/Technology/Storm/zookeeper/zookeeper-3.4.8/bin:/Users/karanalang/Documents/Technology/kafka/confluent-3.2.2/bin:/usr/local/etc/:/Users/karanalang/Documents/Technology/kafka/confluent-3.2.2/bin:/Users/karanalang/Documents/Technology/Gradle/gradle-4.3.1/bin:/Users/karanalang/Documents/Technology/HadoopInstallation-local/hadoop-2.6.5/bin:/Users/karanalang/Documents/Technology/HadoopInstallation-local/hadoop-2.6.5/sbin PYTHONPATH -> /Users/karanalang/Documents/Technology/IncortaAnalytics/spark/python/lib/py4j-0.10.4-src.zip:/Users/karanalang/Documents/Technology/IncortaAnalytics/spark/python/:/Users/karanalang/Documents/Technology/IncortaAnalytics/spark/python/lib/py4j-0.10.4-src.zip:/Users/karanalang/Documents/Technology/IncortaAnalytics/spark/python/:
On PyCharm (based on similar issues on SO & other forums), i've tried setting PYSPARK_SUBMIT_ARGS, but it doesnt seems to be working os.environ['PYSPARK_SUBMIT_ARGS'] = "--master spark://Karans-MacBook-Pro-4.local:7077 pyspark-shell" Any input on which version might be a mismatch ? or what the root cause might be ?
... View more
05-16-2019
12:21 AM
@Ravi Kumar Lanke, @saichand
akella
- any ideas on this ?
... View more
05-16-2019
12:19 AM
hi @saichand
akella
- any ideas on this ?
... View more
05-16-2019
12:14 AM
Hello - i'm using PyCharm 2019, python 2.7.6, Apache Spark 2.2.0 for pyspark development, and running into issues when i try to run any spark code. Code : #! /bin/python
import os
import sys
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
import pandas as pd
import numpy as np
import pyarrow.parquet as pq
import pyarrow as pa
import pprint as pp
from pyspark.sql.functions import col, lit
import datetime
today = datetime.date.today().__format__("yyyymmdd")
print (" formatted date => " + today)
# d = {'a':1, 'b':2, 'c':3}
d = [(1, "20190101", "99990101", "false"), (2, "20190501", "99990101", "false"), (3, "20190101", "20190414", "false"), (3, "20190415", "99990101", "false")]
print(os.environ)
print(" spark_home -> ", os.environ['SPARK_HOME'])
print(" pythonpath -> ", os.environ['PYTHONPATH'])
print(" PYSPARK_SUBMIT_ARGS -> ", os.environ['PYSPARK_SUBMIT_ARGS'])
os.environ['PYSPARK_SUBMIT_ARGS'] = "--master spark://Karans-MacBook-Pro-4.local:7077 pyspark-shell"
print(" JAVA_HOME => " + os.environ['JAVA_HOME'])
print(" PATH => " + os.environ['PATH'])
print(" CLASSPATH => " + os.environ['CLASSPATH'])
sys.path.append(os.path.join(os.environ['SPARK_HOME'], "python"))
sys.path.append(os.path.join(os.environ['SPARK_HOME'], "python/lib/py4j-0.10.4-src.zip"))
# ERROR OBTAINED WHEN I CREATE SaprkSession object
spark = SparkSession.builder.master("local").appName("CreatingDF").getOrCreate()
sparkdf = spark.createDataFrame(d, ['pnalt', 'begda', 'endda', 'ref_flag'])
print(sparkdf)
ERROR (when i create the SparkSession object) 19/05/15 16:58:06 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[main,5,main]
java.util.NoSuchElementException: key not found: _PYSPARK_DRIVER_CALLBACK_HOST
at scala.collection.MapLike$class.default(MapLike.scala:228)
at scala.collection.AbstractMap.default(Map.scala:59)
at scala.collection.MapLike$class.apply(MapLike.scala:141)
at scala.collection.AbstractMap.apply(Map.scala:59)
at org.apache.spark.api.python.PythonGatewayServer$$anonfun$main$1.apply$mcV$sp(PythonGatewayServer.scala:50)
at org.apache.spark.util.Utils$.tryOrExit(Utils.scala:1262)
at org.apache.spark.api.python.PythonGatewayServer$.main(PythonGatewayServer.scala:37)
at org.apache.spark.api.python.PythonGatewayServer.main(PythonGatewayServer.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:755)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Traceback (most recent call last):
File "/Users/karanalang/PycharmProjects/Python2/karan/python2/falcon/py_createDF.py", line 55, in <module>
spark = SparkSession.builder.master("local").appName("CreatingDF").getOrCreate()
File "/Users/karanalang/PycharmProjects/Python2/venv/lib/python2.7/site-packages/pyspark/sql/session.py", line 173, in getOrCreate
sc = SparkContext.getOrCreate(sparkConf)
File "/Users/karanalang/PycharmProjects/Python2/venv/lib/python2.7/site-packages/pyspark/context.py", line 367, in getOrCreate
SparkContext(conf=conf or SparkConf())
File "/Users/karanalang/PycharmProjects/Python2/venv/lib/python2.7/site-packages/pyspark/context.py", line 133, in __init__
SparkContext._ensure_initialized(self, gateway=gateway, conf=conf)
File "/Users/karanalang/PycharmProjects/Python2/venv/lib/python2.7/site-packages/pyspark/context.py", line 316, in _ensure_initialized
SparkContext._gateway = gateway or launch_gateway(conf)
File "/Users/karanalang/PycharmProjects/Python2/venv/lib/python2.7/site-packages/pyspark/java_gateway.py", line 46, in launch_gateway
return _launch_gateway(conf)
File "/Users/karanalang/PycharmProjects/Python2/venv/lib/python2.7/site-packages/pyspark/java_gateway.py", line 108, in _launch_gateway
raise Exception("Java gateway process exited before sending its port number")
Exception: Java gateway process exited before sending its port number
forums on internet seem to be suggesting that it is a version related issue, however, i'm not sure how to debug/fix this issue. Any help on this is really appreciated. Attaching screenshot of the Pycharm Project interpretor as well.
... View more
Labels:
- Labels:
-
Apache Falcon
-
Apache Spark
-
Gateway
03-12-2018
02:28 AM
issue is fixed, since this is not an Avro file but just an Avro schema . this needs to read as a text file.
... View more
03-01-2018
11:43 PM
Hello - i'm reading an Avro file from HDFS - and it seems to be giving exception - Exception in thread "main" java.io.IOException: Not a data file.
at org.apache.avro.file.DataFileStream.initialize(DataFileStream.java:105)
at org.apache.avro.file.DataFileStream.<init>(DataFileStream.java:84)
at karan.scala.readuri.ReadAvroFromURI$.readAvroFromURI(ReadAvroFromURI.scala:52)
at karan.scala.readuri.ReadAvroFromURI$.delayedEndpoint$karan$scala$readuri$ReadAvroFromURI$1(ReadAvroFromURI.scala:29)
at karan.scala.readuri.ReadAvroFromURI$delayedInit$body.apply(ReadAvroFromURI.scala:24)
at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$anonfun$main$1.apply(App.scala:76)
at scala.App$anonfun$main$1.apply(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.App$class.main(App.scala:76)
at karan.scala.readuri.ReadAvroFromURI$.main(ReadAvroFromURI.scala:24)
at karan.scala.readuri.ReadAvroFromURI.main(ReadAvroFromURI.scala) Here is the Code : conf.set("fs.defaultFS", "hdfs://localhost:9000")
val inputF = "hdfs://localhost:9000/avro/emp.avsc"
val inPath = new Path(inputF)
val fs = FileSystem.get(URI.create(inputF), conf)
val inStream = new BufferedInputStream(fs.open(inPath))
val reader = new DataFileStream(inStream, new GenericDatumReader()) the DataFileStream.java seems to be looking for magic bytes, to determine if this is Avro, and it is not finding this, and throwing error void initialize(InputStream in) throws IOException { this.header = new Header(); this.vin = DecoderFactory.get().binaryDecoder(in, vin); byte[] magic = new byte[DataFileConstants.MAGIC.length]; try { vin.readFixed(magic); // read magic} catch (IOException e) { throw new IOException("Not a data file.");} if (!Arrays.equals(DataFileConstants.MAGIC, magic)) throw new IOException("Not a data file."); Any ideas on how to fix this ? The file is fine, and i'm able to do a cat of the file (shown below) : hdfs dfs -cat hdfs://localhost:9000/avro/emp.avsc
18/03/01 15:30:19 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
{"namespace": "tutorialspoint.com","type": "record","name": "emp","fields": [{"name": "name", "type": "string"},{"name": "id", "type": "int"},{"name": "salary", "type": "int"},{"name": "age", "type": "int"},{"name": "address", "type": "string"}]}
... View more
- Tags:
- Avro
- avroschema
- Scala
Labels:
- Labels:
-
Apache Hadoop
-
HDFS
02-26-2018
03:30 AM
hello - i'm trying to read an avro schema stored in a location .. here is the code i tried - val html = Source.fromURL("https://drive.google.com/open?id=1evVW9bTy5bOqq6h_oCyhXDFv9Fuz6zUn")
val s = html.mkString
println("schema is => " + s)
This seems to be printing contents shown below, instead of reading the schema stored in the file. How do i read just the schema ?
... View more
02-25-2018
06:23 AM
@Timothy Spann - looping you in, any ideas on this ?
... View more
02-25-2018
06:17 AM
I have an Avro generated class - com.avro.Person which has a method -> getClassSchema. I’m passing className to a method, and in the method I need to get the Avro schema. Here is the code I’m using: val pr = Class.forName(productCls) // where productCls = classOf[Product].getName How do I call the method getClassSchema of this class ? Additional Info : Links to Person.java & Person.avsc Person.java [https://drive.google.com/open?id=1LOa_muN0tFSEm-VReOBDh3ozU3ufmLZQ] Person.avsc
[https://drive.google.com/open?id=1evVW9bTy5bOqq6h_oCyhXDFv9Fuz6zUn][1] In Person.java, there is a method public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; } I'm trying to call this method to get the AVRO schema corresponding to class - Person
Code import com.avro.Person
val m1 = Map("schema.class" -> classOf[Person].getName) This Map is passed onto a function .. say foo, and i'm trying to get the schema based on the class def foo(m : Map[String, String]){
val personClass = Map("schema.class")
val pr = Class.forName(personClass)
//TBD - to get the schema of the avro class
}
Now using this "pr", i need to get the avro schema of the Person avro, hence i'm trying to call method getClassSchema, which returns the schema
How do i do this ?
any help on this is really appreciated.
... View more
12-22-2017
03:09 AM
@Venkata Sudheer Kumar M here is the update on this .. seems the script -> /usr/hdp/2.5.3.0-37/kafka/bin/kafka-producer-perf-test.sh has an issue in SSL mode, it seems to not be able to recognize the security-protocol=SSL & the config file passed i.e. when the truststore, password is passed through the config file .. instead, when i use the script -> /usr/hdp/2.5.3.0-37/kafka/bin/kafka-console-producer.sh, i'm able to produce the messages in SSL mode, by passing the security protocol & passing the same config files. /usr/hdp/2.5.3.0-37/kafka/bin/kafka-console-producer.sh --topic mmtest1 --security-protocol SSL --broker-list host1:9093 --producer.config /usr/hdp/2.5.3.0-37/kafka/mirror-maker/client-ssl.properties
I'm using Kafka 0.10, Hortonworks 2.5.3 Any ideas on this ?
... View more
12-21-2017
09:09 PM
Hello All -
I'm getting this error, when publishing messages to Kafka topic using SSL mode, Command to publish messages : /usr/hdp/2.5.3.0-37/kafka/bin/kafka-producer-perf-test.sh --messages 1000000 --message-size 1000 --topics mmtest4 \
--broker-list <host1>:9093,<host2>:9093,<host3>:9093, \
--threads 1 --compression-codec 3 --batch-size 10000 \
--security-protocol SSL --show-detailed-stats Error message :[2017-12-21 19:48:49,846] WARN Fetching topic metadata with correlation id 11 for topics [Set(mmtest4)] from broker [BrokerEndPoint(0,<host1>,9093)] failed (kafka.client.ClientUtils$)
java.io.EOFException
at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:99)
at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:140)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:131)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:84)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$doSend(SyncProducer.scala:81)
at kafka.producer.SyncProducer.send(SyncProducer.scala:126)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:83)
at kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:50)
at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$getPartitionListForTopic(DefaultEventHandler.scala:206)
at kafka.producer.async.DefaultEventHandler$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:170)
at kafka.producer.async.DefaultEventHandler$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:169)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:169)
at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:101)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78)
at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:106)
at kafka.producer.async.ProducerSendThread$anonfun$processEvents$3.apply(ProducerSendThread.scala:89)
at kafka.producer.async.ProducerSendThread$anonfun$processEvents$3.apply(ProducerSendThread.scala:69)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:68)
at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:46) Seems SSL is setup correctly, when i run the following command, i get expected result -> <strong><em>openssl s_client -debug -connect host1:9093 -tls1</em></strong>
Setting done in the server.properties (using Ambari) -> listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093
advertised.listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093
ssl.truststore.location=/etc/kafka/truststore/kafka.server.truststore.jks
ssl.truststore.password=password
ssl.keystore.location=/etc/kafka/truststore//kafka.server.keystore.jks
ssl.keystore.password=password
ssl.key.password=password These setting seem to be working on another environment, however - on this env (Prod),
it seem to be giving the error shown above. Any ideas on what need to be done to debug/fix the error ?
... View more
Labels:
- Labels:
-
Apache Ambari
-
Apache Kafka
12-18-2017
11:41 PM
the issue is fixed, the consumer config file - should have the producer bootstrap servers, while the producer config file - should have the consumer bootstrap server. i'd mixed up the 2, reversing that fixed the issue.
... View more
12-18-2017
11:27 PM
i'm setting up Kafka Mirror Maker - to transfer data between 2 clusters, and somehow it seems the MirrorMaker is not able to pull data from the Producer cluster. Here is the command used : $CONFLUENT/bin/kafka-mirror-maker --consumer.config $CONFLUENT/mprops/mmConsumer-qa.properties --producer.config $CONFLUENT/mprops/mmProducer-qa.properties --whitelist="mmtest" --new.consumer --num.streams 4 mmConsumer-qa.properties :
bootstrap.servers=localhost:7092, localhost:7082,localhost:7072
group.id=mmtest
client.id=mm_consumer
mmProducer-qa.properties :
bootstrap.servers=localhost:8092
acks=1
batch.size=100
client.id=mm_producer
linger.ms=5
Any ideas on how to debug/fix this ?
... View more
Labels:
- Labels:
-
Apache Kafka
12-14-2017
11:46 PM
thanks, that helps !
... View more
12-14-2017
09:27 PM
Hello - is there a way to get the YARN logs for an applicationId for a specific period ? when i use command : yarn logs application -applicationId <applicationId> -log_files spark.log .. it is not giving me the older log files (eg. 2 days old log files) any way for to get this log file, w/o having to goto the consolidated yarn resource manager log files btw, the yarn logs retention is set to 30 days in yarn-site.xml Another question : whet s the option -> -log_files used for ? what are the options i can provide for this ?
... View more
Labels:
- Labels:
-
Apache YARN
12-12-2017
07:23 AM
@bkosaraju - .. i re-checked this & the issue seems to be when i include the column - deviceid bigint - in the query
... View more
12-12-2017
07:22 AM
fyi .. i re-checked this & the issue seems to be when i include the column - deviceid bigint - in the query
... View more
12-12-2017
06:54 AM
@bkosaraju - this is the query fired .. select deviceid, devicename, indicatorname, topic_k, partition_k, offset_k from powerpoll where year=2017 and month=12 and day=11 limit 5; There is no column called format, can you pls. clarify what you meant ?
... View more
12-12-2017
12:52 AM
@bkosaraju, @mqureshi - any ideas on this ?
... View more
12-12-2017
12:32 AM
hello - i'm getting error in querying Hive table (data in Parquet format) : select * from powerpoll_k1 where year=2017 and month=12 and day=11 limit 5;
Error: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 102.0 failed 4 times, most recent failure: Lost task 0.3 in stage 102.0 (TID 20602, msc02-jag-dn-016.uat.gdcs.apple.com): java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainIntegerDictionaryat org.apache.parquet.column.Dictionary.decodeToLong(Dictionary.java:52)at org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.getLong(OnHeapColumnVector.java:274)at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)at org.apache.spark.scheduler.Task.run(Task.scala:86)at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at java.lang.Thread.run(Thread.java:748) Table DDL is shown below : CREATE EXTERNAL TABLE `powerpoll_k1`(`topic_k` varchar(255), `partition_k` int, `offset_k` bigint, `timestamp_k` timestamp, `deviceid` bigint, `devicename` varchar(50), `deviceip` varchar(128), `peerid` int, `objectid` int, `objectname` varchar(256), `objectdesc` varchar(256), `oid` varchar(50), `pduoutlet` varchar(50), `pluginid` int, `pluginname` varchar(255), `indicatorid` int, `indicatorname` varchar(255), `format` int, `snmppollvalue` varchar(128) COMMENT 'value in kafka avsc', `time` double, `clustername` varchar(50) COMMENT 'rpp or power', `peerip` varchar(50))COMMENT 'external table at /apps/hive/warehouse/amp.db/sevone_power'PARTITIONED BY (`year` int, `month` int, `day` int)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'WITH SERDEPROPERTIES ('serialization.format' = '1')STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'LOCATION 'hdfs://application/apps/hive/warehouse/amp.db/power'TBLPROPERTIES ('transient_lastDdlTime' = '1513022286') any ideas on what the issue is ?
... View more
Labels:
- Labels:
-
Apache Hive
12-11-2017
11:42 PM
@bkosaraju i'm getting following error in querying the table, any ideas ? 0: jdbc:hive2://msc02-jag-hve-002.uat.gdcs.ap> select deviceid, devicename, indicatorname, topic_k, partition_k, offset_k from powerpoll where year=2017 and month=12 and day=11 limit 5;
Error: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 86.0 failed 4 times, most recent failure: Lost task 0.3 in stage 86.0 (TID 19049, msc02-jag-dn-011.uat.gdcs.apple.com): java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainIntegerDictionaryat org.apache.parquet.column.Dictionary.decodeToLong(Dictionary.java:52)at org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.getLong(OnHeapColumnVector.java:274)at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)at org.apache.spark.scheduler.Task.run(Task.scala:86)at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at java.lang.Thread.run(Thread.java:748)
... View more
12-11-2017
10:52 PM
thanks @bkosaraju - that worked !
... View more
12-11-2017
09:43 PM
hello - i've a parquet file, and i've created an EXTERNAL Hive table on top of the parquet file. When i try to query the table, it give 0 rows, any ideas what the issue might be ? hdfs dfs -ls hdfs://abc/apps/hive/warehouse/amp.db/power/year=2017/month=12/day=01
-rw-r--r-- 2 pstl hdfs141913174 2017-12-01 22:33 hdfs://abc/apps/hive/warehouse/amp.db/power/year=2017/month=12/day=01/part-00023-e749dbd1-63a9-499d-932e-a6eadf03a67c.c000.snappy.parquet Table created : CREATE EXTERNAL TABLE power_k1(topic_k varchar(255), partition_k int, offset_k bigint, timestamp_k timestamp, deviceid bigint, devicename varchar(50), deviceip varchar(128), peerid int, objectid int, objectname varchar(256), objectdesc varchar(256), oid varchar(50), pduoutlet varchar(50), pluginid int,pluginname varchar(255), indicatorid int, indicatorname varchar(255), format int, snmppollvalue varchar(128) COMMENT 'value in sevone kafka avsc',time double, clustername varchar(50) COMMENT 'rpp or power', peerip varchar(50)) COMMENT 'external table at /apps/hive/warehouse/amp.db/sevone_power' PARTITIONED BY ( year int, month int, day int) STORED AS PARQUET LOCATION '/apps/hive/warehouse/amp.db/power' select count(1) from power_k1 -> returns 0 records Any ideas what the issue might be & how to debug this ?
... View more
Labels:
- Labels:
-
Apache Hive
09-19-2017
09:52 PM
Hello All -
I was able to set up SSL for the Kafka brokers, using OpenSSL. however, I'm having issues with setting up SSL using the pem file (i.e. SSL certificate - certified by CA, provided by the company) Here is what i've done -
created the server/client keystore & truststore files and imported the provided cert.pem file keytool -keystore kafka.server.keystore.jks -alias localhost -validity 365 -genkey
keytool -keystore kafka.server.truststore.jks -alias CARoot -import -file cert.pem
keytool -keystore kafka.client.truststore.jks -alias CARoot -import -file cert.pem
keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file cert.pem
keytool -keystore kafka.client.keystore.jks -alias localhost -validity 365 -genkey
keytool -keystore kafka.client.keystore.jks -alias CARoot -import -file cert.pem I've a console producer pushing data in to the topic, and gives error as shown below ->
Any ideas on what the issue might be ? Caused by: javax.net.ssl.SSLProtocolException: Handshake message sequence violation, state = 1, type = 1
at sun.security.ssl.ServerHandshaker.processMessage(ServerHandshaker.java:213)
at sun.security.ssl.Handshaker.processLoop(Handshaker.java:1026)
at sun.security.ssl.Handshaker$1.run(Handshaker.java:966)
at sun.security.ssl.Handshaker$1.run(Handshaker.java:963)
at java.security.AccessController.doPrivileged(Native Method)
at sun.security.ssl.Handshaker$DelegatedTask.run(Handshaker.java:1416)
at org.apache.kafka.common.network.SslTransportLayer.runDelegatedTasks(SslTransportLayer.java:336)
at org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:417)
... 7 more thanks for the help in advance!
... View more
Labels:
- Labels:
-
Apache Kafka
09-05-2017
09:53 PM
@mqureshi, @Terry Stebbens - any ideas on this ?
... View more
09-05-2017
09:38 PM
Hi All - I've Kafka 0.10 .. I've enabled SSL(Non-kerberized) for Kafka Broker on Node 4, and i'm able to produce/consume messages using console-producer & console-consumer from Node 4. However, when i'm having issues enabling ssl connection between Node 4 & Node 5 & try to consume messages from Node5 (using console-consumer), i'm facing issues. here are the steps - On node4 :
Started console-producer, publishing on SSL port (9192) $CONFLUENT/bin/kafka-console-producer --broker-list node4:9192 --topic ssl --producer.config client-ssl.properties Started console-consumer on node4, consuming messages from console producer (port 9192) $CONFLUENT/bin/kafka-console-consumer --bootstrap-server node4:9192 --topic ssl --new-consumer --consumer.config client-ssl.properties contents of client-ssl.properties
--------------------------------- security.protocol=SSL
ssl.truststore.location=/usr/hdp/2.5.3.0-37/confluent-3.2.2/kafkaSSL/kafka.client.truststore.jks
ssl.truststore.password=<passwd>
ssl.keystore.location=/usr/hdp/2.5.3.0-37/confluent-3.2.2/kafkaSSL/kafka.client.keystore.jks
ssl.keystore.password=<passwd>
ssl.key.password=<passwd> This is working fine, and consumer is able to consumer messages produced by the producer. On Node5, i've another Kafka instance,
i start another console-consumer from Node5, for consuming the data from console-producer in node4 To enable SSL (between client on Node5 & Broker on Node4), Steps on Node 4 -> 1) create kafka.client05.keystore.jks (set the CN -> Node5) keytool -keystore kafka.client05.keystore.jks -alias localhost -validity 365 -genkey
- Export certificate from keystore keytool -keystore kafka.client05.keystore.jks -alias localhost -certreq -file cert05-file - Get certificate signed by CA openssl x509 -req -CA ca-cert -CAkey ca-key -in cert05-file -out cert05-signed -days 365 -CAcreateserial -passin pass:<passwd>
- Import the CA Certificate & Signed certificate into the keystore keytool -keystore kafka.client05.keystore.jks -alias CARoot -import -file ca-cert
keytool -keystore kafka.client05.keystore.jks -alias localhost -import -file cert05-signed Add the generated CA to the client's truststore keytool -keystore kafka.client05.truststore.jks -alias CARoot -import -file ca-cert On Node5, i do the same steps above,and start the console-consumer
(to read data produced by console-producer on node4) $CONFLUENT1/bin/kafka-console-consumer --bootstrap-server node4:9192 --topic ssl --new-consumer --consumer.config client-ssl.properties This is not working..
what do i need to make this work ?
... View more
Labels:
08-29-2017
04:55 AM
@mqureshi, @Constantin Stanca - looping you in, any ideas on this ?
... View more
08-29-2017
04:53 AM
Hi All, I've Kafka 10 & am trying to view the Producer/Consumer metrics using JConsole... I'd a basic question regarding viewing the metrics on MBean tab on jconsole. ref -> https://community.hortonworks.com/articles/73897/monitor-kafka-producer-for-performance-1.html in the link -> Do i need to make any changes on MBean tab - to view parameters eg <strong>kafka.producer:type=ProducerRequestMetrics,name=ProducerRequestRateAndTimeMs,clientId=console-producer</strong> Or are these already configured when i specify a JMX_PORT & start services ? when i login to MBean, i see many of the metrics present. If i need to add additional metrics, where do i need to make change ?
... View more
Labels:
- Labels:
-
Apache Kafka