Member since
07-14-2017
99
Posts
5
Kudos Received
4
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
708 | 09-05-2018 09:58 AM | |
1128 | 07-31-2018 12:59 PM | |
651 | 01-15-2018 12:07 PM | |
635 | 11-23-2017 04:19 PM |
02-18-2021
09:19 AM
Hi everybody, I am trying the following approach to write data in to hive table. import logging
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from pyspark.streaming.kafka import KafkaUtils
import datetime
from pyspark.sql.functions import lit,unix_timestamp
from os.path import *
from pyspark import Row
warehouseLocation = abspath("spark-warehouse")
spark = SparkSession.builder.appName("spark_streaming").config("spark.sql.warehouse.dir", warehouseLocation).enableHiveSupport().getOrCreate()
kafka = "kafka"
offsets = "earliest"
servers = "server_1:port,server_2:port"
security_protocol = "SSL"
keystore_location = "keystore"
keystore_password = "keystore_password"
kafka_topic = "kafka_topic"
checkpoint_location ="/checkpoint/location"
def hiveInsert(df, batchId):
df.createOrReplaceTempView("updates")
spark.sql("insert into hive_db.hive_table select value, time_stamp from updates")
df = spark.readStream.format(kafka).option("startingoffsets", offsets).option("kafka.bootstrap.servers", servers).option("kafka.security.protocol", security_protocol).option("kafka.ssl.keystore.location", keystore_location).option("kafka.ssl.keystore.password", keystore_password).option("subscribe",kafka_topic).load().selectExpr("CAST(value AS STRING)").select('value').withColumn('time_stamp',lit(datetime.datetime.now().strftime('%Y%m%d%H%M')))
query = df.writeStream.foreachBatch(hiveInsert).start()
query.awaitTermination() The above code is not working Any pointers are of great help!
... View more
Labels:
- Labels:
-
Apache Spark
08-21-2019
10:29 AM
Hi,
I am trying to match multiple values in a string using hive regxp, looking for an optimal solution.
I want to match "first" and "1.11" from the below
column name is col:
This string is the first string with two decimals 1.11 and 2.22 with a special char / and some more extra string.
table name is t:
query I was using:
select * from t where t.col regexp '(?=.*first)(?=.*1.11)'
Could you please help me.
Thank you
... View more
Labels:
- Labels:
-
Apache Hive
02-07-2019
04:04 PM
@Shu can you please help me
... View more
02-06-2019
10:29 AM
Hi All, I have a string String:
some text with an ip 111.111.111.111 and a decimal 11.2323232 and some text here and then an int 1 and then some HTTP/1.1 with a 503 request and then another ip 222.222.222.222 and some imaginary 999.999.999.999 I want to output all the ip addresses in comma saperated. I tried the below select regexp_replace(regexp_replace(String,'[^(\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3})]',' '),'\\s+',',');
+------------------------------------------------------------------------+--+
| _c0 |
+------------------------------------------------------------------------+--+
| ,111.111.111.111,11.2323232,1,1.1,503,222.222.222.222,999.999.999.999 |
+------------------------------------------------------------------------+--+
Expected output is : 111.111.111.111,222.222.222.222,999.999.999 Could you please help me
... View more
Labels:
- Labels:
-
Apache Hive
10-16-2018
01:35 PM
@sadapa you can never insert a "?" in to a column which has a datatype int. because you can never find a number as "?", and hive knows it. I am not sure why you want to do that, but if you want to still convert a "?" in to a number, which you want to change it later, you can try ascii()
... View more
10-02-2018
01:33 PM
@Shu Thankyou
... View more
10-02-2018
11:40 AM
@Carlton Patterson myresults.coalesce(1).write.format('csv').save("/tmp/myresults.csv", header='true')
... View more
10-01-2018
01:14 PM
Hi All, I need help to get the below result. I have two tables table name: match
+-----------------------------------+----------------+--+
| hint | remarks |
+-----------------------------------+----------------+--+
| 1.1.1.1 | ip |
| 123456789 | contact |
| http://123123123123123123.some_n | url |
+-----------------------------------+----------------+--+
table name : t1
+-------------------------------------------------------------------------------+-------------------+--+
| t1.text | t1.b |
+-------------------------------------------------------------------------------+-------------------+--+
| This ip is found 1.1.1.1 and is matched with match | table name match |
| This ip is found 1.1.1.2 and is matched with match | table name match |
| This contact is found 123456789 and is matched with match | table name match |
| This contact is found 123456789123456789 and is matched with match | table name match |
| This url is found http://123456789123456789.some_n and is matched with match | table name match |
+-------------------------------------------------------------------------------+-------------------+--+
I want to search hint column of match table in text column of t1 table and get complete text column values. so, basically I want to do a query like select t1.text from t1 join match where t1.text contains (any value in match.hint); It will be helpful if this can be done in hive or I can live with pyspark, so pyspark help is also welcome P.S: table t1 is a big table and match is a small table with limite values(say 1500). Thank you
... View more
Labels:
- Labels:
-
Apache Hive
09-18-2018
01:12 PM
@Andy LoPresto Thats a nice idea, but I dont have leverage to user executescript or excecutestreamcommand, as there are no scripts/programs(including awk) waiting for me, also getting them is out of my hands, so looking for a solution with in my flex. Thank you
... View more
09-18-2018
01:09 PM
@Shu 1. Sample data: Every value is present in attributes(i.e. every flowfile is parsed and the value in the flowfile is assigned to attributes) There are multiple flow files with the same value (user_name)in attributes. ex: flowfile1 attributes: user_name: mark, file_in: 2018-09-18 15:00:00, file_out: 2018-09-18 15:01:00
user_name: michelle, file_in: 2018-09-18 15:00:02, file_out: 2018-09-18 15:01:01
user_name: mark, file_in: 2018-09-18 15:00:05, file_out: 2018-09-18 15:01:01
flowfile2 attributes:
user_name: mark, file_in: 2018-09-18 15:01:00, file_out: 2018-09-18 15:01:10
user_name: stella, file_in: 2018-09-18 15:01:12, file_out: 2018-09-18 15:01:21
2. I want to count all the flowfiles that have user_name (in the above example count of mark is 3 in both the flowfiles) 3. Schema of the flow file is just as above 3 fields, which are assigned to attributes. Thank you
... View more
09-17-2018
01:32 PM
Hi All, I have an use case where I want to find number of occurrences of the word and want to perform an action on it. example: 1. I have multiple flow files coming in 2. I want to extract a word (say, user_name) using extracttext processor 3. count the word 4. if user_name_count =10 5. do replacetext 10 as 1 6. putemail to user_name that user_name count is 10. Can you please let me know which processors can be helpful for the usecase. Suggestions are appreciated.
... View more
Labels:
- Labels:
-
Apache NiFi
09-12-2018
01:57 PM
@rtheron for some reason I cannot follow the first approach. I tried creating an intermediate orc with partitions and loaded the data in to it from external table. now when I load in to the destination from the intermediate table, puthiveql is taking a lot of time. any suggestions are appreciated.
... View more
09-07-2018
01:35 PM
Hi All, I have a 10GB file every minute coming to a location (/dir), and there is an external table for that location. The file is as below karlon,n_d_1,26,6234,2019-09-08,1536278400
d'lov,research,20,1001,2019-09-08,1536278400
kris'a,b_x_3,20,4532,2019-09-08,1536278400 external table name: ex_t name department age id date time karlon n_d_1 26 6234 2019-09-08 1536278400 d'lov research 20 1001 2018-09-08 1536278400 I have puthiveql processor in my flow which gets data from external table and inserts in to multiple ORC table. ORC : table_1, table_2, table_3,table_4,table_5, table_6 Every table(orc table) has same columns. name(string),department (string),age (int),id (int),date (string),partition_value (int) The puthiveql processor has multiple insert queries in it. INSERT INTO table_1 PARTITION(partition_value) SELECT name, department, age, id, date, cast(regexp_replace(date,'-','') as int) AS partition_value FROM ex_t WHERE department = 'research' AND time='1536278400';
INSERT INTO table_2 PARTITION(partition_value) SELECT name, department, age, id, date, cast(regexp_replace(date,'-','') as int) AS partition_value FROM ex_t WHERE department = 'n_d_1' AND time='1536278400';
INSERT INTO table_3 PARTITION(partition_value) SELECT name, department, age, id, date, cast(regexp_replace(date,'-','') as int) AS partition_value FROM ex_t WHERE department = 'b_x_3' AND time='1536278400';
INSERT INTO table_4 PARTITION(partition_value) SELECT name, department, age, id, date, cast(regexp_replace(date,'-','') as int) AS partition_value FROM ex_t WHERE department = 'research' AND time='1536278400';
INSERT INTO table_5 PARTITION(partition_value) SELECT name, department, age, id, date, cast(regexp_replace(date,'-','') as int) AS partition_value FROM ex_t WHERE department = 'x_in_1' AND time='1536278400';
INSERT INTO table_6 PARTITION(partition_value) SELECT name, department, age, id, date, cast(regexp_replace(date,'-','') as int) AS partition_value FROM ex_t WHERE department = 'z_e_3' AND time='1536278400'; The above is sent as a flowfile to puthiveql, which is scheduled every minute, as the file arrives every minute. Puthiveql is very slow process the above and the inserts are not happening frequently. Can you please suggest how to improve the performance of the puthiveql, I have increased the concurrent processor but it did not help, some times the flowfiles(which have insert statements) get queued and never execute. Suggestions are highly appreciated.
... View more
Labels:
- Labels:
-
Apache Hive
-
Apache NiFi
09-07-2018
10:10 AM
@Diego A Labrador I have used the set parameters in connection string in controller. Could you try and see
... View more
09-06-2018
03:35 AM
@Bryan Bende I have checked my data, it has no new blank spaces, but was arriving like batches. I am merging the files and appending using puthdfs. When I use the configurations you suggested, some times I am getting a new blank line at the beginning of the file which is appended using puthdfs. Can you please help me how to avoid the blank line at the beginning of the file, also the file is big (1GB).
... View more
09-05-2018
09:58 AM
I find an alternate way of doing Thank you
... View more
09-05-2018
09:42 AM
Hi All, I have json data in multiple small files (some times only one line in a file). I want to merge all small files in to single large file. I am getting a large file in an unexpected format. ex:
file 1: {"code"="1", "color"="green"}
{"code"="2", "color"="blue"}
{"code"="3", "color"="orange"}
file 2: {"code"="4", "color"="yellow"}
{"code"="5", "color"="red"}
I am getting the below output after using MergeContent {"code"="1", "color"="green"}
{"code"="2", "color"="blue"}
{"code"="3", "color"="orange"}{"code"="4", "color"="yellow"}
{"code"="5", "color"="red"}
Expected output {"code"="1", "color"="green"}
{"code"="2", "color"="blue"}
{"code"="3", "color"="orange"}
{"code"="4", "color"="yellow"}
{"code"="5", "color"="red"}
... View more
Labels:
- Labels:
-
Apache NiFi
08-31-2018
04:18 AM
Looks like the jpgs are not aligned as expected, but the names of jpgs are listed below in order, Thank you
... View more
08-31-2018
04:14 AM
Hi, I am getting a plain json stream with '\n' delimiter through TCP. I am listening to TCP using listenTCP, set batchsize to 10000. My json is with variable values, ex: {"a":"20180831","b":"b"}
{"a":"20180831","b":"b","c":"c"}
I want to add a partition_value attribute to every line in json stream at once, which should look like The attribute a is always present in json, so I want to use partition_value in a {"a":"20180831","b":"b","partition_value":"20180831"}
{"a":"20180831","b":"b","c":"c","partition_value":"20180831"}
I have used "UpdateRecord" processor below are the configuration UpdateRecord JsonTreeReader AvroSchemaRegistry AvroRecordSetWriter I used UpdateRecord -> jsontreereader ->avroschemaregistry |_________ -> avrorecordsetwriter Then I have used avrotojson I am getting only one line as output, can you please suggest where it is happening wrong or let me know if there is a better way to do it Thank you {"a":"20180831","b":"b","c":null,"partition_value":"20180831"}
... View more
Labels:
- Labels:
-
Apache NiFi
08-14-2018
03:17 PM
@Felix Albani Thanks for the helping arm, I will go through them and could ask for suggsetions if required. Thank you.
... View more
08-14-2018
08:46 AM
@Felix Albani can you please suggest
... View more
08-13-2018
02:14 PM
Hi, I am recieving data from TCP as a json stream using pyspark. I want to save the files(append files and basically a file is a minute based ex:yyyyMMddHHmm (file), so all messages in one min should go to the corresponding file) and parallelly I want to save the json to orc hive table. I have two questions involved 1. *[path : '/folder/file'] When I receive data in Dstream I flatMap and split("\n") and then repartition(1).saveAsTextfile(path,"json") lines = ssc.socketTextStream("localhost", 9999)
flat_map = lines.flatMap(lambda x: x.split("\n"))
flat_map.repartition(1).saveAsTextFiles(path,"json")
The above saves to the path given, but instead of giving one single file per minute and save to the folder, this makes three folders with a _SUCCESS file and a part_00000 file in every folder, which is not expected. Please help me how to solve this as expected : basically one folder per day and one file per minute under the folder? 2. If I want to save the json to orc hive table.. can I do it from a dstream? or I have to change the dstream to rdd and then perform some processing to save it to orc? as I am new to pyspark please help with the above or with some examples.
... View more
Labels:
- Labels:
-
Apache Spark
08-10-2018
09:12 AM
@Veerendra Nath Jasthi What is the frequency of files and how big are the files in the given path? Also could you please check your JVM heap memory (this is a guess, not solution)?
... View more
08-10-2018
09:08 AM
@Felix Albani Can you help me with the pyspark version of the above please.
... View more
07-31-2018
02:36 PM
@Veerendra Nath JasthiPossibly, you have a complicated computation (may be regex) running on Getfile, which is taking a lot of time to complete, also check howmany files it is getting based on your regex, it should be fixed.
... View more
07-31-2018
01:51 PM
@Felix Albani Thank your for quick response, I will go through the given info
... View more
07-31-2018
01:45 PM
@Veerendra Nath Jasthi It should not be the case, what processors you are using and getting the issue?
... View more
07-31-2018
01:15 PM
@veerendra If you are using nifi below 1.7, the best way is to restart nifi
... View more
07-31-2018
01:10 PM
Hi All, I am beginner to spark and wanted to do the below. a port 55500 is trying to send jsons as a stream (ex: {"one":"1","two":"2"}{"three":"3","four":"4"}). I have a orc table in hive with columns given below one, two,three,four,spark_streaming_startingtime,spark_streaming_endingtime,partition_value I want to load the streaming values in to hive orc table. Can you please guide me how to achieve this. Thank you for your support.
... View more
Labels:
- Labels:
-
Apache Spark
07-31-2018
12:59 PM
@Bryan Bende I checked the nifi-app.log, the JVM heap size is max, whcih is rejecting the connections and failing the processor. It got resolved as the heap size issue is solved. Thank you for you support.
... View more