Created on 01-10-2017 10:21 AM - edited 08-19-2019 03:22 AM
i try to load a csv file into elasticsearch using pyspark.when i execute the spark submit,it show an error
i try the sample code below
from pyspark import SparkContext, SparkConf from pyspark.sql import SQLContext if __name__ == "__main__": conf = SparkConf().setAppName("WriteToES") sc = SparkContext(conf=conf) sqlContext = SQLContext(sc) es_conf = {"es.nodes" : "sandbox.hortonworks.com","es.port" : "9200","es.nodes.client.only" : "true","es.resource" : "spark/data"} es_df_p = sc.textFile("/sample/hello.csv").map(lambda line: line.split(",")) es_df_pf= es_df_p.groupBy("element_id").count().map(lambda (a,b): ('id',{'element_id': a,'count': b})) es_df_pf.saveAsNewAPIHadoopFile( path='-', outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat", keyClass="org.apache.hadoop.io.NullWritable", valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=es_conf)
spark-submit --master yarn-cluster --jars /root/elasticsearch-spark-20_2.10-5.1.1.jar /root/es_spark_write.py