Member since
10-28-2016
392
Posts
7
Kudos Received
20
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
2785 | 03-12-2018 02:28 AM | |
4342 | 12-18-2017 11:41 PM | |
3092 | 07-17-2017 07:01 PM | |
2157 | 07-13-2017 07:20 PM | |
6494 | 07-12-2017 08:31 PM |
05-17-2019
05:56 AM
@Ravi Kumar Lanke , thanks for the response - when i run the same code on command-line, it works fine. When i run this on PyCharm, it is failing. $PATH, $JAVA_HOME, $SPARK_HOME, $PYTHON_PATH on command line & PyCharm is the same, I've tried setting it manually as well On PySpark Command Line : >>> os.environ['PATH']'
/Library/Frameworks/Python.framework/Versions/2.7/bin:/usr/local/bin:/usr/bin:/bin:/usr/sbin:/sbin:/opt/X11/bin:/usr/local/bin/scala/bin:/Users/karanalang/Documents/Technology/IncortaAnalytics/spark/bin:/Users/karanalang/Documents/Technology/kafka/kafka_2.11-0.9.0.1/bin:/Users/karanalang/Documents/Technology/maven/apache-maven-3.3.9//bin:/bin:/Users/karanalang/Documents/Technology/Storm/zookeeper/zookeeper-3.4.8/bin:/Users/karanalang/Documents/Technology/kafka/confluent-3.2.2/bin:/usr/local/etc/:/Users/karanalang/Documents/Technology/kafka/confluent-3.2.2/bin:/Users/karanalang/Documents/Technology/Gradle/gradle-4.3.1/bin:/Users/karanalang/Documents/Technology/HadoopInstallation-local/hadoop-2.6.5/bin:/Users/karanalang/Documents/Technology/HadoopInstallation-local/hadoop-2.6.5/sbin' >>> os.environ['JAVA_HOME']
'/Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/ >>> os.environ['SPARK_HOME']
'/Users/karanalang/Documents/Technology/IncortaAnalytics/spark' >>> os.environ['PYTHONPATH']
'/Users/karanalang/Documents/Technology/IncortaAnalytics/spark/python/lib/py4j-0.10.4-src.zip:/Users/karanalang/Documents/Technology/IncortaAnalytics/spark/python/:/Users/karanalang/Documents/Technology/IncortaAnalytics/spark/python/lib/py4j-0.10.4-src.zip:/Users/karanalang/Documents/Technology/IncortaAnalytics/spark/python/:'
On PyCharm : (' spark_home -> ', '/Users/karanalang/Documents/Technology/IncortaAnalytics/spark JAVA_HOME => /Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/
PATH => /Users/karanalang/PycharmProjects/Python2/venv/bin:/Library/Frameworks/Python.framework/Versions/2.7/bin:/usr/local/bin:/usr/bin:/bin:/usr/sbin:/sbin:/opt/X11/bin:/usr/local/bin/scala/bin:/Users/karanalang/Documents/Technology/Hive/apache-hive-3.1.0-bin/bin:/Users/karanalang/Documents/Technology/IncortaAnalytics/spark/bin:/Users/karanalang/Documents/Technology/kafka/kafka_2.11-0.9.0.1/bin:/Users/karanalang/Documents/Technology/maven/apache-maven-3.3.9//bin:/bin:/Users/karanalang/Documents/Technology/Storm/zookeeper/zookeeper-3.4.8/bin:/Users/karanalang/Documents/Technology/kafka/confluent-3.2.2/bin:/usr/local/etc/:/Users/karanalang/Documents/Technology/kafka/confluent-3.2.2/bin:/Users/karanalang/Documents/Technology/Gradle/gradle-4.3.1/bin:/Users/karanalang/Documents/Technology/HadoopInstallation-local/hadoop-2.6.5/bin:/Users/karanalang/Documents/Technology/HadoopInstallation-local/hadoop-2.6.5/sbin PYTHONPATH -> /Users/karanalang/Documents/Technology/IncortaAnalytics/spark/python/lib/py4j-0.10.4-src.zip:/Users/karanalang/Documents/Technology/IncortaAnalytics/spark/python/:/Users/karanalang/Documents/Technology/IncortaAnalytics/spark/python/lib/py4j-0.10.4-src.zip:/Users/karanalang/Documents/Technology/IncortaAnalytics/spark/python/:
On PyCharm (based on similar issues on SO & other forums), i've tried setting PYSPARK_SUBMIT_ARGS, but it doesnt seems to be working os.environ['PYSPARK_SUBMIT_ARGS'] = "--master spark://Karans-MacBook-Pro-4.local:7077 pyspark-shell" Any input on which version might be a mismatch ? or what the root cause might be ?
... View more
05-16-2019
12:21 AM
@Ravi Kumar Lanke, @saichand
akella
- any ideas on this ?
... View more
05-16-2019
12:19 AM
hi @saichand
akella
- any ideas on this ?
... View more
05-16-2019
12:14 AM
Hello - i'm using PyCharm 2019, python 2.7.6, Apache Spark 2.2.0 for pyspark development, and running into issues when i try to run any spark code. Code : #! /bin/python
import os
import sys
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
import pandas as pd
import numpy as np
import pyarrow.parquet as pq
import pyarrow as pa
import pprint as pp
from pyspark.sql.functions import col, lit
import datetime
today = datetime.date.today().__format__("yyyymmdd")
print (" formatted date => " + today)
# d = {'a':1, 'b':2, 'c':3}
d = [(1, "20190101", "99990101", "false"), (2, "20190501", "99990101", "false"), (3, "20190101", "20190414", "false"), (3, "20190415", "99990101", "false")]
print(os.environ)
print(" spark_home -> ", os.environ['SPARK_HOME'])
print(" pythonpath -> ", os.environ['PYTHONPATH'])
print(" PYSPARK_SUBMIT_ARGS -> ", os.environ['PYSPARK_SUBMIT_ARGS'])
os.environ['PYSPARK_SUBMIT_ARGS'] = "--master spark://Karans-MacBook-Pro-4.local:7077 pyspark-shell"
print(" JAVA_HOME => " + os.environ['JAVA_HOME'])
print(" PATH => " + os.environ['PATH'])
print(" CLASSPATH => " + os.environ['CLASSPATH'])
sys.path.append(os.path.join(os.environ['SPARK_HOME'], "python"))
sys.path.append(os.path.join(os.environ['SPARK_HOME'], "python/lib/py4j-0.10.4-src.zip"))
# ERROR OBTAINED WHEN I CREATE SaprkSession object
spark = SparkSession.builder.master("local").appName("CreatingDF").getOrCreate()
sparkdf = spark.createDataFrame(d, ['pnalt', 'begda', 'endda', 'ref_flag'])
print(sparkdf)
ERROR (when i create the SparkSession object) 19/05/15 16:58:06 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[main,5,main]
java.util.NoSuchElementException: key not found: _PYSPARK_DRIVER_CALLBACK_HOST
at scala.collection.MapLike$class.default(MapLike.scala:228)
at scala.collection.AbstractMap.default(Map.scala:59)
at scala.collection.MapLike$class.apply(MapLike.scala:141)
at scala.collection.AbstractMap.apply(Map.scala:59)
at org.apache.spark.api.python.PythonGatewayServer$$anonfun$main$1.apply$mcV$sp(PythonGatewayServer.scala:50)
at org.apache.spark.util.Utils$.tryOrExit(Utils.scala:1262)
at org.apache.spark.api.python.PythonGatewayServer$.main(PythonGatewayServer.scala:37)
at org.apache.spark.api.python.PythonGatewayServer.main(PythonGatewayServer.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:755)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Traceback (most recent call last):
File "/Users/karanalang/PycharmProjects/Python2/karan/python2/falcon/py_createDF.py", line 55, in <module>
spark = SparkSession.builder.master("local").appName("CreatingDF").getOrCreate()
File "/Users/karanalang/PycharmProjects/Python2/venv/lib/python2.7/site-packages/pyspark/sql/session.py", line 173, in getOrCreate
sc = SparkContext.getOrCreate(sparkConf)
File "/Users/karanalang/PycharmProjects/Python2/venv/lib/python2.7/site-packages/pyspark/context.py", line 367, in getOrCreate
SparkContext(conf=conf or SparkConf())
File "/Users/karanalang/PycharmProjects/Python2/venv/lib/python2.7/site-packages/pyspark/context.py", line 133, in __init__
SparkContext._ensure_initialized(self, gateway=gateway, conf=conf)
File "/Users/karanalang/PycharmProjects/Python2/venv/lib/python2.7/site-packages/pyspark/context.py", line 316, in _ensure_initialized
SparkContext._gateway = gateway or launch_gateway(conf)
File "/Users/karanalang/PycharmProjects/Python2/venv/lib/python2.7/site-packages/pyspark/java_gateway.py", line 46, in launch_gateway
return _launch_gateway(conf)
File "/Users/karanalang/PycharmProjects/Python2/venv/lib/python2.7/site-packages/pyspark/java_gateway.py", line 108, in _launch_gateway
raise Exception("Java gateway process exited before sending its port number")
Exception: Java gateway process exited before sending its port number
forums on internet seem to be suggesting that it is a version related issue, however, i'm not sure how to debug/fix this issue. Any help on this is really appreciated. Attaching screenshot of the Pycharm Project interpretor as well.
... View more
Labels:
- Labels:
-
Apache Falcon
-
Apache Spark
-
Gateway
03-12-2018
02:28 AM
issue is fixed, since this is not an Avro file but just an Avro schema . this needs to read as a text file.
... View more
03-01-2018
11:43 PM
Hello - i'm reading an Avro file from HDFS - and it seems to be giving exception - Exception in thread "main" java.io.IOException: Not a data file.
at org.apache.avro.file.DataFileStream.initialize(DataFileStream.java:105)
at org.apache.avro.file.DataFileStream.<init>(DataFileStream.java:84)
at karan.scala.readuri.ReadAvroFromURI$.readAvroFromURI(ReadAvroFromURI.scala:52)
at karan.scala.readuri.ReadAvroFromURI$.delayedEndpoint$karan$scala$readuri$ReadAvroFromURI$1(ReadAvroFromURI.scala:29)
at karan.scala.readuri.ReadAvroFromURI$delayedInit$body.apply(ReadAvroFromURI.scala:24)
at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$anonfun$main$1.apply(App.scala:76)
at scala.App$anonfun$main$1.apply(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.App$class.main(App.scala:76)
at karan.scala.readuri.ReadAvroFromURI$.main(ReadAvroFromURI.scala:24)
at karan.scala.readuri.ReadAvroFromURI.main(ReadAvroFromURI.scala) Here is the Code : conf.set("fs.defaultFS", "hdfs://localhost:9000")
val inputF = "hdfs://localhost:9000/avro/emp.avsc"
val inPath = new Path(inputF)
val fs = FileSystem.get(URI.create(inputF), conf)
val inStream = new BufferedInputStream(fs.open(inPath))
val reader = new DataFileStream(inStream, new GenericDatumReader()) the DataFileStream.java seems to be looking for magic bytes, to determine if this is Avro, and it is not finding this, and throwing error void initialize(InputStream in) throws IOException { this.header = new Header(); this.vin = DecoderFactory.get().binaryDecoder(in, vin); byte[] magic = new byte[DataFileConstants.MAGIC.length]; try { vin.readFixed(magic); // read magic} catch (IOException e) { throw new IOException("Not a data file.");} if (!Arrays.equals(DataFileConstants.MAGIC, magic)) throw new IOException("Not a data file."); Any ideas on how to fix this ? The file is fine, and i'm able to do a cat of the file (shown below) : hdfs dfs -cat hdfs://localhost:9000/avro/emp.avsc
18/03/01 15:30:19 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
{"namespace": "tutorialspoint.com","type": "record","name": "emp","fields": [{"name": "name", "type": "string"},{"name": "id", "type": "int"},{"name": "salary", "type": "int"},{"name": "age", "type": "int"},{"name": "address", "type": "string"}]}
... View more
Labels:
- Labels:
-
Apache Hadoop
-
HDFS
12-22-2017
03:09 AM
@Venkata Sudheer Kumar M here is the update on this .. seems the script -> /usr/hdp/2.5.3.0-37/kafka/bin/kafka-producer-perf-test.sh has an issue in SSL mode, it seems to not be able to recognize the security-protocol=SSL & the config file passed i.e. when the truststore, password is passed through the config file .. instead, when i use the script -> /usr/hdp/2.5.3.0-37/kafka/bin/kafka-console-producer.sh, i'm able to produce the messages in SSL mode, by passing the security protocol & passing the same config files. /usr/hdp/2.5.3.0-37/kafka/bin/kafka-console-producer.sh --topic mmtest1 --security-protocol SSL --broker-list host1:9093 --producer.config /usr/hdp/2.5.3.0-37/kafka/mirror-maker/client-ssl.properties
I'm using Kafka 0.10, Hortonworks 2.5.3 Any ideas on this ?
... View more
12-21-2017
09:09 PM
Hello All -
I'm getting this error, when publishing messages to Kafka topic using SSL mode, Command to publish messages : /usr/hdp/2.5.3.0-37/kafka/bin/kafka-producer-perf-test.sh --messages 1000000 --message-size 1000 --topics mmtest4 \
--broker-list <host1>:9093,<host2>:9093,<host3>:9093, \
--threads 1 --compression-codec 3 --batch-size 10000 \
--security-protocol SSL --show-detailed-stats Error message :[2017-12-21 19:48:49,846] WARN Fetching topic metadata with correlation id 11 for topics [Set(mmtest4)] from broker [BrokerEndPoint(0,<host1>,9093)] failed (kafka.client.ClientUtils$)
java.io.EOFException
at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:99)
at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:140)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:131)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:84)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$doSend(SyncProducer.scala:81)
at kafka.producer.SyncProducer.send(SyncProducer.scala:126)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:83)
at kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:50)
at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$getPartitionListForTopic(DefaultEventHandler.scala:206)
at kafka.producer.async.DefaultEventHandler$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:170)
at kafka.producer.async.DefaultEventHandler$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:169)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:169)
at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:101)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78)
at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:106)
at kafka.producer.async.ProducerSendThread$anonfun$processEvents$3.apply(ProducerSendThread.scala:89)
at kafka.producer.async.ProducerSendThread$anonfun$processEvents$3.apply(ProducerSendThread.scala:69)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:68)
at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:46) Seems SSL is setup correctly, when i run the following command, i get expected result -> <strong><em>openssl s_client -debug -connect host1:9093 -tls1</em></strong>
Setting done in the server.properties (using Ambari) -> listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093
advertised.listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093
ssl.truststore.location=/etc/kafka/truststore/kafka.server.truststore.jks
ssl.truststore.password=password
ssl.keystore.location=/etc/kafka/truststore//kafka.server.keystore.jks
ssl.keystore.password=password
ssl.key.password=password These setting seem to be working on another environment, however - on this env (Prod),
it seem to be giving the error shown above. Any ideas on what need to be done to debug/fix the error ?
... View more
Labels:
- Labels:
-
Apache Ambari
-
Apache Kafka
12-18-2017
11:41 PM
the issue is fixed, the consumer config file - should have the producer bootstrap servers, while the producer config file - should have the consumer bootstrap server. i'd mixed up the 2, reversing that fixed the issue.
... View more