Member since
03-16-2020
11
Posts
0
Kudos Received
1
Solution
My Accepted Solutions
Title | Views | Posted |
---|---|---|
2248 | 09-07-2023 07:39 PM |
09-07-2023
07:39 PM
@RangaReddy Thanks ,It was server level issue, I tried with different edge not and it worked,
... View more
09-07-2023
07:37 PM
I am working with Spark Streaming and have a requirement to perform an aggregation on my data stream. Specifically, I need to count the occurrences of different statuses from the last 5 minutes and then display the result on the console. The data is coming from a Kafka topic. Each record has a timestamp, and I want to filter the records such that only those falling within the last 5 minutes (from the current timestamp) are considered for aggregation. Here's the desired outcome: If the batch executes at 6:10 PM, it should consider only the data from the last 5 minutes. Similarly, if it executes at 6:11 PM, again it should only consider the data from the last 5 minutes. `Example: For 6:10 PM: TXN00053,pending,2023-09-05 18:05:45 TXN00054,failed,2023-09-05 18:06:45 TXN00054,failed,2023-09-05 18:07:45 TXN00054,failed,2023-09-05 18:08:00 TXN00054,failed,2023-09-05 18:09:00 TXN00054,failed,2023-09-05 18:10:00 For 6:11 PM: TXN00054,failed,2023-09-05 18:06:45 TXN00054,failed,2023-09-05 18:07:45 TXN00054,failed,2023-09-05 18:08:00 TXN00054,failed,2023-09-05 18:09:00 TXN00054,failed,2023-09-05 18:10:00 TXN00054,failed,2023-09-05 18:11:00 ` I tried implementing this with the following code, but it isn't working as expected: ``` import org.apache.spark.sql.SparkSession import org.apache.spark.sql.streaming.{GroupStateTimeout, OutputMode, Trigger} import org.apache.spark.sql.functions._ import org.apache.spark.sql.streaming.GroupState import spark.implicits._ val currentTimestamp = current_timestamp() val sourceStream = spark.readStream .format("kafka") .option("kafka.bootstrap.servers", "") .option("subscribe", "20230907_3") .option("startingOffsets", "earliest") .load() val formattedData = sourceStream.selectExpr("CAST(value AS STRING) as message") .select( split($"message", ",").getItem(0).as("txn_id"), split($"message", ",").getItem(1).as("status"), split($"message", ",").getItem(2).as("txn_timestamp") ) .withColumn("txn_timestamp", to_timestamp($"txn_timestamp", "yyyy-MM-dd HH:mm:ss")) val recentData = formattedData.filter($"txn_timestamp" >= (currentTimestamp - expr("INTERVAL 5 MINUTES"))) val aggregatedData = recentData.groupBy($"status").agg(count($"status").as("count")) val query = aggregatedData.writeStream .outputMode("complete") .format("console") .trigger(Trigger.ProcessingTime("1 minute")) .option("truncate", "false") .start() .awaitTermination() ``` Can anyone help me understand what I'm doing wrong and how to achieve the desired outcome? Thanks in advance! Note : when i am using **complete mode** then basically whatever the output is coming from batch one , then in final count it's not removing the old events instead increasing the count with new records and giving result, I tried with **update mode** then it's only giving records where any new update came in records and not giving any other records which comes under in last 5 min, can anyone help so it can give the o/p based on combine approach of update and complete. TXN00053,pending,2023-09-05 18:05:45 TXN00054,pending,2023-09-05 18:05:50 TXN00054,pending,2023-09-05 18:06:50 TXN00054,pending,2023-09-05 18:11:00` TXN00054,failed,2023-09-05 18:06:45 TXN00054,accepted,2023-09-05 18:11:00` **Expected Result from above data:** For 6:10 PM: pending,3 failed,1 For 6:11 PM: (as 18:05:45 and 18:05:50 with status pending is older then 5 min so it shd'nt be part of agg.) pending,2 failed,1 accepted,1
... View more
Labels:
- Labels:
-
Apache Spark
08-05-2023
05:36 AM
I'm having an issue with a Spark-Hive application running on a Kerberos cluster. I receive a javax.security.sasl.SaslException: GSS initiate failed error, which appears to be caused by not finding any Kerberos tgt. Here's the error log: 23/08/04 22:56:55 INFO HiveUtils: Initializing HiveMetastoreConnection version 1.2.1 using Spark classes. 23/08/04 22:56:55 INFO HiveClientImpl: Attempting to login to Kerberos using principal: hdfs01@HDP.COM and keytab: hdfs01.keytab-2ca1f730-bef7-4166-90ce-67317c75c793 23/08/04 22:56:55 INFO UserGroupInformation: Login successful for user hdfs01@HDP.COM using keytab file hdfs01.keytab-2ca1f730-bef7-4166-90ce-67317c75c793 23/08/04 22:56:55 INFO metastore: Trying to connect to metastore with URI thrift://master3.abc.xyz.com:9083" 23/08/04 22:56:55 ERROR TSaslTransport: SASL negotiation failure javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)] at com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:211) at org.apac... I am submitting my Spark job as follows: spark-submit \ --name TestKerberous \ --num-executors 2 \ --driver-java-options "-Djava.security.auth.login.config=./key_fin.conf" \ --driver-java-options "-Dsun.security.krb5.debug=true" \ --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./key_fin.conf"\ --files=/etc/spark/conf/hive-site.xml,/etc/hadoop/conf/yarn-site.xml,/etc/hadoop/conf/hdfs-site.xml,/etc/hadoop/conf/core-site.xml \ --conf "spark.hadoop.hive.metastore.kerberos.principal=HTTP/_HOST@HDP.COM" \ --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./key.conf" \ --conf -Djavax.security.auth.useSubjectCredsOnly=false \ --conf spark.executorEnv.KRB5_CONFIG=/etc/krb5.conf \ --conf spark.driverEnv.KRB5_CONFIG=/etc/krb5.conf \ --conf "spark.hadoop.hive.metastore.sasl.enabled=true" \ --conf "spark.hadoop.hive.security.authorization.enabled=true" \ --conf "spark.hadoop.hive.metastore.execute.setugi=true" \ --conf spark.sql.hive.convertMetastoreParquet=false \ --conf spark.home=/usr/hdp/current/spark2-client \ --conf spark.sql.warehouse.dir=/apps/hive/warehouse \ --conf spark.sql.catalogImplementation=hive \ --conf spark.yarn.keytab=/etc/security/keytabs/hdfs01.keytab \ --conf spark.yarn.principal=hdfs01@HDP.COM \ --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \ --master yarn --deploy-mode cluster --driver-cores 2 --driver-memory 2G --executor-cores 2 --executor-memory 2G --supervise \ --class <CLASS_NAME> \ <JAR_FILE>\ "<Hive Jdbc Url>" "thrift://master3.abc.xyz.com:9083" "/apps/hive/warehouse" I would really appreciate it if anyone could help me diagnose what might be going wrong and how to resolve this issue. Thank you in advance for any insights you can provide
... View more
Labels:
- Labels:
-
Apache Hive
-
Apache Spark
04-26-2022
08:02 PM
Hi am trying to take simple count of "ScrollElasticsearchHttp" processor in nifi. and using QueryRecord after this processor. I have created one new variable and using below Sql "select count(1) from FLOFILE" I am expecting result.count value 10000 which is my record count but its always showing record.count value 1. can someone suggest how should I take count of this ScrollElasticsearchHttp flow. Thanks !!
... View more
Labels:
- Labels:
-
Apache NiFi
04-01-2022
03:46 AM
Thanks a lot @araujo It Worked. is there any option to handle this with multiline also.As we are getting this files from source in multiline.
... View more
03-31-2022
11:04 PM
I have one Json file and I have validate it, Its a valid json. I am creating Hive table on that file and then select data but its showing Error: java.io.IOException: org.apache.hadoop.hive.serde2.SerDeException: Row is not a valid JSON Object - JSONException: A JSONObject text must end with '}' at 2 [character 3 line 1] (state=,code=0) when I tried to set below property. ALTER TABLE test_rahul SET SERDEPROPERTIES ( "ignore.malformed.json" = "true"); Then It showing Null values for all the fields. Can someone Kindly Help me. json File: { "buyer": { "legalBusinessName": "test1 Company","organisationIdentifications": [{ "type": "abcd", "identification": "test.bb@tesr" }, { "type": "TXID","identification": "12345678" } ] }, "supplier": { "legalBusinessName": "test Company", "organisationIdentifications": [ { "type":"abcd","identification": "test28@test" } ] }, "paymentRecommendationId": "1234-5678-9876-2212-123456", "excludedRemittanceInformation": [], "recommendedPaymentInstructions": [{ "executionDate": "2022-06-12", "paymentMethod": "aaaa", "remittanceInformation": { "structured": [{ "referredDocumentInformation": [{ "type": "xxx", "number": "12341234", "relatedDate": "2022-06-12", "paymentDueDate": "2022-06-12", "referredDocumentAmount": { "remittedAmount": 2600.5, "duePayableAmount": 3000 } }] }] } }] } Jar Added: ADD JAR json-serde-1.3.7-SNAPSHOT-jar-with-dependencies.jar; Create Table: CREATE EXTERNAL TABLE IF NOT EXISTS `test`.`testerde11` (`buyer` STRUCT< `legalBusinessName`:STRING, `organisationIdentifications`:STRUCT< `type`:STRING, `identification`:STRING>>, `supplier` STRUCT< `legalBusinessName`:STRING, `organisationIdentifications`:STRUCT< `type`:STRING, `identification`:STRING>>, `paymentRecommendationId` STRING, `recommendedPaymentInstructions` ARRAY< STRUCT< `executionDate`:STRING, `paymentMethod`:STRING, `remittanceInformation`:STRUCT< `structured`:STRUCT< `referredDocumentInformation`:STRUCT< `type`:STRING, `number`:STRING, `relatedDate`:STRING, `paymentDueDate`:STRING, `referredDocumentAmount`:STRUCT< `remittedAmount`:DOUBLE, `duePayableAmount`:INT>>>>>>) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' WITH SERDEPROPERTIES ( "field.delim"=",","mapping.ts" = "number") STORED AS textFILE LOCATION '/user/hdfs/Jsontest/'; Error Message : select * from test.testerde11; Error: java.io.IOException: org.apache.hadoop.hive.serde2.SerDeException: Row is not a valid JSON Object - JSONException: A JSONObject text must end with '}' at 2 [character 3 line 1] (state=,code=0) I tried multiple option but didn't worked .Can someone suggest me what is the issue here/or do I need to change the delimiter/add new properties.
... View more
Labels:
- Labels:
-
Apache Hive
09-22-2021
12:52 AM
Hi Everyone, I have hive partitions folder at HDFS location, but all the partitions folders are in upper case. i.e. YEAR=2021/MONTH=07/DAY=31/HOUR=00 like this. in hive when i am creating table it taking partition columns in lowercase /year=2021/month=07/day=31/hour=00 like this. since hdfs is case sensitive and hive is case insensitive so hive is expecting partitions column in lowercase at hdfs location and I am not able to see any partition in my hive table. so is there any way to handle this case.either hive column in uppercase or Recursively change all hdfs partitions column in lowercase. I have 8000+ partitions (1day=24 hours=30 days=12 months(24*30*12=8640) for 1 year so not able to rename every folder manually. someone kindly suggest.
... View more
Labels:
03-16-2020
05:22 AM
I'm also getting the same issue but for me instead of taking my username.its showing user Anonymous.Can anyone suggest how should i resolve this issue?
... View more