Member since
09-11-2018
39
Posts
1
Kudos Received
0
Solutions
01-06-2019
01:05 PM
Friends, any update for these 2 questions. Sadly after so many days still no reply. Regards
... View more
01-06-2019
05:07 AM
Friends, any update for these 2 questions, sadly after many days still no reply. Regards
... View more
12-21-2018
07:57 PM
Hello Friends, We have a upcoming project and for that I am learning Spark Streaming (with focus on pyspark). So far I have completed few simple case studies from online. But I am stuck with 2 scenarios and they are described below. I shall be highly obliged if you guys kindly share your thoughts or guide me to any web page for help on solution. 1. Writing Streaming Aggregation to File # spark-submit --master local[*] /home/training/santanu/ssfs_2.py
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField,StringType,IntegerType
spark = SparkSession.builder.appName("FileStream_Sink_Group").getOrCreate()
source = "file:///home/training/santanu/logs"
tgt = "file:///home/training/santanu/hdfs"
chk = "file:///home/training/santanu/checkpoint"
schema1 = StructType([StructField('agent',StringType()),StructField('code',IntegerType())])
df1 = spark.readStream.csv(source,schema=schema1,sep=",")
df2 = df1.filter("code > 300").select("agent").groupBy("agent").count()
df3 = df2.select("agent","count").withColumnRenamed("count","group_count")
query = df3.writeStream.format("csv").option("path",tgt).option("checkpointLocation",chk).start() # Error
query.awaitTermination()
spark.stop() Error message : # AnalysisException: Append output mode not supported when there are streaming aggregations on DataFrames without watermark; 2. Reading from Kafka (Consumer) using Streaming # spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0 /home/training/santanu/sskr.py
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Kafka_Consumer").getOrCreate()
df = spark.readStream.format("kafka").option("kafka.bootstrap.servers","localhost:9092").option("subscribe","MyTopic_1").load()
print(type(df)) # <class 'pyspark.sql.dataframe.DataFrame'>
df.printSchema() # printing schema hierarchy
query = df.selectExpr("CAST(value AS STRING)").writeStream.format("console").start() # Error
query.awaitTermination()
spark.stop() Error message : # NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/Collection;) Please Help. Thanking you Santanu Ghosh
... View more
Labels:
- Labels:
-
Apache Kafka
-
Apache Spark
12-21-2018
07:28 AM
Hello Friends, We have a upcoming project and for that I am learning Spark Streaming (with focus on Structured Streaming). So far I have completed few simple case studies from online. But I am stuck with 2 scenarios and they are described below. I shall be highly obliged if you guys kindly share your thoughts or guide me to any web page for help on solution. 1. Writing Streaming Aggregation to File # spark-submit --master local[*] /home/training/santanu/ssfs_2.py
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField,StringType,IntegerType
spark = SparkSession.builder.appName("FileStream_Sink_Group").getOrCreate()
source = "file:///home/training/santanu/logs"
tgt = "file:///home/training/santanu/hdfs"
chk = "file:///home/training/santanu/checkpoint"
schema1 = StructType([StructField('agent',StringType()),StructField('code',IntegerType())])
df1 = spark.readStream.csv(source,schema=schema1,sep=",")
df2 = df1.filter("code > 300").select("agent").groupBy("agent").count()
df3 = df2.select("agent","count").withColumnRenamed("count","group_count")
query = df3.writeStream.format("csv").option("path",tgt).option("checkpointLocation",chk).start() # Error
query.awaitTermination()
spark.stop() Error I am getting : # Append output mode not supported when there are streaming aggregations on DataFrames without watermark; 2. Reading from Kafka (Consumer) using Streaming # spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0 /home/training/santanu/sskr.py
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Kafka_Consumer").getOrCreate()
df = spark.readStream.format("kafka").option("kafka.bootstrap.servers","localhost:9092").option("subscribe","MyTopic_1").load()
print(type(df)) # <class 'pyspark.sql.dataframe.DataFrame'>
df.printSchema() # printing schema hierarchy
query = df.selectExpr("CAST(value AS STRING)").writeStream.format("console").start() # Error
query.awaitTermination()
spark.stop() Error I am getting : # NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/Collection;) Please Help. Thanking you Santanu Ghosh
... View more
Labels:
- Labels:
-
Apache Spark
10-04-2018
04:48 PM
Hello Friends, I have just read the news that Cloudera and Hortonworks have merged as one company. I was both shocked and surprised. But now I am eager to know what will be the future of Hadoop, which is still open source and free software under apache ? Also, the Hortonworks Certifications such as HDPCD and others, will they still hold any value at industry level ? Please do let me know your thoughts and ideas. Thanking you Santanu
... View more
Labels:
- Labels:
-
Apache Hadoop
09-30-2018
10:01 AM
@Rahul Soni , I am creating one avro file using flume regex interceptor and multiplexing. But that file contains value something like below and when I am trying to generate schema using avro-tools getschema option it is giving only "headers" and "body" as 2 fields. Please advise how to resolve this. Objavro.codenullavro.schema▒{"type":"record","name":"Event","fields":[{"name":"headers","type":{"type":"map","values":"string"}},{"name":"body","type":"bytes"}]}▒LA▒▒;ڍ▒(▒▒▒=▒YBigDatJava▒Y{"created_at":"Thu Sep 27 11:40:44 +0000 2018","id":1045277052822269952,"id_str":"1045277052822269952","text":"RT @SebasthSeppel: #Jugh \ud83d\udce3 heute ist wieder JUGH !\nHeute haben wir @gschmutz bei uns mit dem spannenden Thema: Streaming Data Ingestion in\u2026","source":"\u003ca href=\"http:\/\/twitter.com\/download\/android\" rel=\"nofollow\"\u003eTwitter for Android\u003c\/a\u003e","truncated":false,"in_reply_to_status_id":null,"in_reply_to_status_id_str":null,"in_reply_to_user_id":null,"in_reply_to_user_id_str":null,"in_reply_to_screen_name":null,"user":{"id":716199874157547520,"id_str":"716199874157547520","name":"Alexander Gomes","screen_name":"nEinsKnull","location":"Kassel, Hessen","url":null,"description":"CTO of family, work at @Micromata, loves #tec #rc #mountainbikes and #paragliding","trans......... Thanking you Santanu
... View more
09-28-2018
04:21 AM
Hi Friends, I need one help regarding Avro file processing using Flume and Kafka. In short, I am reading a json file, using interceptor and selector splitting specific values into avro sink, and then reading that avro source to write to hdfs as avro file. The flume configurations I am using are given below. The problem I am facing is, the avro file is getting written to hdfs with only header and body. So by using "java -jar avro-tools-1.8.2.jar getschema" option I am not getting desired schema of the avro file. Every time it is showing only "header" and "body" as 2 fields. Please suggest how to resolve this problem. It is very urgent. 1. flume-ng agent --name ta1 --conf conf --conf-file /home/cloudera/santanu/flume_interceptor_multiplexing.conf -Dflume.root.logger=DEBUG,console
## Flume Agent with Json Source , Kafka and Memory Channels , Avro Sink
ta1.sources = twitter
ta1.sinks = avrofile
ta1.channels = memchannel kafkachannel
## Properties
## Sources
ta1.sources.twitter.type = exec
ta1.sources.twitter.command = tail -F /home/cloudera/workspace/Cloudera_Share/Bigdata.json
## Sinks
ta1.sinks.avrofile.type = avro
ta1.sinks.avrofile.hostname = 192.XXX.x.x
ta1.sinks.avrofile.port = 4141
## Channel 1
ta1.channels.memchannel.type = memory
ta1.channels.memchannel.capacity = 5000
ta1.channels.memchannel.transactionCapacity = 500
## Channel 2
ta1.channels.kafkachannel.type = org.apache.flume.channel.kafka.KafkaChannel
ta1.channels.kafkachannel.kafka.bootstrap.servers = 192.XXX.x.x:9092
ta1.channels.kafkachannel.kafka.topic = MyTopic_1
ta1.channels.kafkachannel.kafka.consumer.group.id = my_group
## Interceptor
ta1.sources.twitter.interceptors = i1
ta1.sources.twitter.interceptors.i1.type = regex_extractor
ta1.sources.twitter.interceptors.i1.regex = (?i)(Python|Java|Scala|Perl|Sqoop|Flume|Kafka|Hive|Spark|Jethro|NoSQL)
ta1.sources.twitter.interceptors.i1.serializers = s1
ta1.sources.twitter.interceptors.i1.serializers.s1.name = BigData
## Source Selector
ta1.sources.twitter.selector.type = multiplexing
ta1.sources.twitter.selector.header = BigData
ta1.sources.twitter.selector.mapping.Python = kafkachannel
ta1.sources.twitter.selector.mapping.Java = kafkachannel
ta1.sources.twitter.selector.mapping.Scala = kafkachannel
ta1.sources.twitter.selector.mapping.Perl = kafkachannel
ta1.sources.twitter.selector.mapping.Sqoop = memchannel
ta1.sources.twitter.selector.mapping.Flume = memchannel
ta1.sources.twitter.selector.mapping.Kafka = memchannel
ta1.sources.twitter.selector.mapping.Hive = memchannel
ta1.sources.twitter.selector.mapping.Spark = memchannel
ta1.sources.twitter.selector.mapping.Jethro = memchannel
ta1.sources.twitter.selector.mapping.NoSQL = memchannel
## Mapping
ta1.sources.twitter.channels = kafkachannel memchannel
ta1.sinks.avrofile.channel = memchannel 2. flume-ng agent --name ta2 --conf conf --conf-file /home/cloudera/santanu/flume_avro_hdfs.conf -Dflume.root.logger=DEBUG,console
## Flume Agent with Avro Source and HDFS Sink
ta2.sources = avrofile
ta2.sinks = hdfsfile
ta2.channels = memchannel
## Properties
## Source
ta2.sources.avrofile.type = avro
ta2.sources.avrofile.bind = 192.XXX.x.x
ta2.sources.avrofile.port = 4141
## Sink
ta2.sinks.hdfsfile.type = hdfs
ta2.sinks.hdfsfile.hdfs.path = /user/cloudera/flume_avro
ta2.sinks.hdfsfile.hdfs.filePrefix = Hadoop
ta2.sinks.hdfsfile.hdfs.fileSuffix = .avro
ta2.sinks.hdfsfile.hdfs.fileType = DataStream
ta2.sinks.hdfsfile.hdfs.writeFormat = Text
ta2.sinks.hdfsfile.hdfs.rollInterval = 5
ta2.sinks.hdfsfile.serializer = avro_event
ta2.sinks.hdfsfile.compressionCodec = snappy
## Channel
ta2.channels.memchannel.type = memory
ta2.channels.memchannel.capacity = 5000
ta2.channels.memchannel.transactionCapacity = 500
## Interceptor
ta1.sources.avrofile.interceptors = i2
ta1.sources.avrofile.interceptors.i2.type = remove_header
ta1.sources.avrofile.interceptors.i2.withName = BigData
## Mapping
ta2.sources.avrofile.channels = memchannel
ta2.sinks.hdfsfile.channel = memchannel 3. Avro File in HDFS with Header and Body as 2 columns Thanking you Santanu
... View more
Labels:
- Labels:
-
Apache Flume
-
Apache Kafka
-
HDFS
09-20-2018
01:43 PM
Hello Friends, I have recently started working on hadoop and that too from 2.x onwards.
Also, I do understand the concept of High-Availability of NN in Hadoop 2.x . But recently someone told me that even on Hadoop 1.x NN HA was possible, because Hadoop 1.x had ZK and QJN . If so then why most of the articles on web always say NN was SPoF on Hadoop 1.x ? Please do let me know the answer. Thanking you Santanu
... View more
Labels:
- Labels:
-
Apache Hadoop
-
Apache Zookeeper
07-02-2018
07:42 PM
Thanks Vini. Based on your suggestion I am able to connect. Thanking you Santanu
... View more