Member since
06-08-2017
1049
Posts
510
Kudos Received
312
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
5632 | 04-15-2020 05:01 PM | |
2949 | 10-15-2019 08:12 PM | |
1170 | 10-12-2019 08:29 PM | |
6075 | 09-21-2019 10:04 AM | |
1991 | 09-19-2019 07:11 AM |
06-30-2019
06:03 PM
@hem lohani Create column using withColumn function with literal value as 12. Use month column as partitionby column and use insertInto table. df.withColumn("month",lit(12)).write.mode("<append or overwrite>").partitionBy("month").insertInto("<hive_table_name>") (or) Using SQL query df.createOrReplaceTempView("temp_table")
spark.sql("insert into <partition_table> partition(`month`=12) select * from <temp_table>") - If the answer is helpful to resolve the issue, Login and Click on Accept button below to close this thread.This will help other community users to find answers quickly 🙂
... View more
06-30-2019
02:58 AM
@hem lohani Try with below syntax df.write.mode("<append or overwrite>").partitionBy("<partition_cols>").insertInto("<hive_table_name>")
... View more
06-30-2019
02:17 AM
@Sidhartha Bobby To run exchange partition successfully, Your destination table base.customers cannot contain the partition that you are exchanging. But base.customers table is already have the partition(source_name=ORACLE) that you are exchanging. Resolution: Delete the existing partition in destination table and run the exchange partition command again. (or) Exchange the partition that is already not exists in destination table. If you want to just append the data to Destination table then run insert into base.customers table by selecting from stg.customers_testcontrol_staging table. From HiveDocs: Constraints for Hive Exchange partitions: The destination table cannot contain the partition to be exchanged. The operation fails in the presence of an index. Exchange partition is not allowed with transactional tables either as source or destination. Alternatively, use LOAD DATA or INSERT OVERWRITE commands to move partitions across transactional tables. This command requires both the source and destination table names to have the same table schema. If the schemas are different, the following exception is thrown: The tables have different schemas. Their partitions cannot be exchanged
... View more
06-28-2019
09:32 AM
@Khaja Mohd Use GetFile processor as this processor allows to keep specific patterns. Refer this link for configuring GetFile processor to use with File Filter patterns.
... View more
06-28-2019
03:45 AM
@manohar ghanta Option-1: You can do .coalesce(n) (no shuffle will happen) on your dataframe and then use .option("maxRecordsPerFile",n) to control the number of records written in each file. Option-2: Using spark.sql.shuffle.partitions=n this option is used to control the number of shuffles happens. Then use df.sort("<col_name>").write.etc will create exactly the number of files that we mentioned for shuffle.partitions . Option-3: Hive: Once the spark job is done then trigger hive job insert overwrite by selecting the same table and use sortby,distributedby,clusteredby and set the all hive configurations that you have mentioned in the question. Insert overwrite table select * from table sort by <col1> distributed by <col2>
Option-4: Hive: If you have ORC table then schedule concatenate job to run periodically alter table <table_name> concatenate;
If none of the methods seems to be feasible solutions then .repartition(n) will be the way to go as this will take extra overhead but we are going to end up ~evenly sized filesin HDFS and boost up the performance while reading these files from hive/spark. - If the answer is helpful to resolve the issue, Login and Click on Accept button below to close this thread.This will help other community users to find answers quickly 🙂
... View more
06-28-2019
03:39 AM
@Adarsh R You can try using InferAvroSchema processor with SchemaOutputDestination as flowfile-attribute. Then processor adds "inferred.avro.schema" attribute to the flowfile use this attribute in your Record oriented processors. Refer this and this links for more details regards to usage and challenges with the inferavroschema processor.
... View more
06-28-2019
03:20 AM
@Piotr Grzegorski Try using ListHDFS + FetchHDFS processors. You can simulate MoveHDFS processor with the below Flow: ListHDFS //list all the files in HDFS directory
RouteOnAttribute //Use nifi expression language to filter out the required files
FetchHDFS //fetch the files from HDFS
PutHDFS //put the files into HDFS directory.
DeleteHDFS //delete the file from HDFS directory that are pulled from FetchHDFS - If the answer is helpful to resolve the issue, Login and Click on Accept button below to close this thread.This will help other community users to find answers quickly 🙂
... View more
06-28-2019
03:05 AM
@Prathamesh H It seems AccessControlException on /ranger/audit directory as directory have hdfs user access but hive user is trying to write to .log in the direcrtory. Could you change permissions on this directory as per this link and try to insert data into Hive table?
... View more
06-27-2019
03:29 AM
@Krishna Murthy In convertRecord processor define xml reader and csv writer controller services. In CSV writer controller service define avro schema only the required fields you need to be in csv file.
... View more
06-26-2019
07:29 PM
1 Kudo
Try with below jolt spec: [{
"operation": "shift",
"spec": {
"id": "ID",
"nummer": "Nummer",
"table": {
"*": {
"zn": "ArtikelPreise_Pos.[#2].ZeileNr",
"stfflbisart": "ArtikelPreise_Pos.[#2].StaffelBis"
}
}
}
}, {
"operation": "default",
"spec": {
"Default_Kopf": "${VAR_KD}",
"ArtikelPreise_Pos[]": {
"*": {
"Default_Kopf": "${DFT_POS}"
}
}
}
}
] Output: {
"ID" : "177",
"Nummer" : "22",
"ArtikelPreise_Pos" : [ {
"ZeileNr" : 1,
"StaffelBis" : 10,
"Default_Kopf" : "${DFT_POS}"
}, {
"ZeileNr" : 2,
"StaffelBis" : 50,
"Default_Kopf" : "${DFT_POS}"
} ],
"Default_Kopf" : "${VAR_KD}"
} I hope this matches with your expected output. - If the answer is helpful to resolve the issue, Login and Click on Accept button below to close this thread.This will help other community users to find answers quickly 🙂
... View more
06-26-2019
04:36 PM
@srini This is Hive bug reported here HIVE-2927. Possible Workarounds would be: Hive >=1.3.+ Then use replace function in hive with get_json_object Example: hive> select get_json_object(replace(jsn,'@',''),"$.date") from (select string('{"name":"jai","@date":"2015-06-15"}')jsn)t;
2015-06-15 (or) hive> select get_json_object(replace(jsn,'@date','date'),"$.date") from (select string('{"name":"jai","@date":"2015-06-15"}')jsn)t;
2015-06-15 Hive <1.3: Use regexp_replace function: hive> select get_json_object(regexp_replace(jsn,'@',''),"$.date") from (select string('{"name":"jai","@date":"2015-06-15"}')jsn)t;
2015-06-15 (or) hive> select get_json_object(regexp_replace(jsn,'@date','date'),"$.date") from (select string('{"name":"jai","@date":"2015-06-15"}')jsn)t;
2015-06-15 -->Final query would be: hive> select get_json_object(jsn,'$.name'),get_json_object(regexp_replace(jsn,'@',''),"$.date") from (select string('{"name":"jai","@date":"2015-06-15"}')jsn)t;
jai 2015-06-15 - If the answer is helpful to resolve the issue, Login and Click on Accept button below to close this thread.This will help other community users to find answers quickly 🙂
... View more
06-25-2019
06:02 PM
@Jim Barnett Not able to recreate the same scenario on my end(using hive 1.2.1). Wondering how count(distinct key) is generated just one row of count as result ,but in your case query is giving results as select key,count(*) from default.dummy_data group by key; ----- Check is there any data that already exists in hdfs directory that table is pointing to, If yes clear off the data from the hdfs directory and recreate the table again. Example: hive> CREATE TABLE default.dummy_data
> AS
> SELECT row_nbr as key
> FROM (
> SELECT row_number() OVER (partition by '1') as row_nbr
> FROM (
> select explode(split(repeat("x,", 1000000-1), ",")) -- 1,024 distinct
> ) AS x
> ) AS y; hive> select count(distinct key) c from default.dummy_data;
1000000 hive> select key,count(*)cnt from default.dummy_data group by key order by cnt desc limit 10; --ordering desc and limiting 10 Result: key cnt
10 1
9 1
1000000 1
7 1
6 1
5 1
4 1
3 1
999999 1
1 1
... View more
06-25-2019
05:51 PM
1 Kudo
@Rupak Dum 1. Do I need to upload the .sh script in a folder within HDFS No need to upload .sh script to HDFS/ If you upload script to HDFS then follow this link to execute shell script from HDFS. 2. How do I setup the permission for the script so that it runs successfully You are running sqoop import as root user for this case you need to change the permissions in HDFS for /user directory. Refer to this and this link for similar kind of thread. 3. How to execute .sh file from within hdfs so that I do not get the permission denied error change the permissions of /user hdfs directory to 700 (or) 777, then you won't get any permission issues.
... View more
06-25-2019
02:00 PM
1 Kudo
@Juhyeon Yun Configure directory as /user/paxata/job and keep recursive sub directories to true will list all the files in the sub directories. Similar question reported in this HCC thread.
... View more
06-24-2019
12:22 AM
1 Kudo
@Bora Özkan One thing i observed in the article is author created hive external table but PutHiveStreaming expects internal table. Create hive internal table CREATE TABLE
OLYMPICS(CITY STRING,EDITION INT,SPORT STRING,SUB_SPORT STRING,ATHLETE STRING,COUNTRY STRING,GENDER STRING,EVENT STRING,EVENT_GENDER STRING,MEDAL STRING)
CLUSTERED BY (EDITION)INTO 3 BUCKETS
ROW FORMAT DELIMITED
STORED AS ORC
LOCATION '/user/hive/olympics'
TBLPROPERTIES('transactional'='true'); -> PutHiveStreaming processor expects input data in Avro format. Use ConvertRecord processor to convert data from other formats(csv..etc) to AVRO format. Then feed the avro format data to PutHiveStreaming processor.
... View more
06-20-2019
08:37 PM
@Amrutha K This is a known issue in spark reported in this Jira SPARK-24260 and not yet resolved. One way of doing this is to execute each query at a time i.e after reading .hql file we can access array of elemets by their indexes (0),(1) val df1=sc.sql(sc.textFile("/user/temp/hive.hql").collect().mkString.split(";").collect()(0)) val df2=sc.sql(sc.textFile("/user/temp/hive.hql").collect().mkString.split(";").collect()(1)) (or) If you want to just execute the queries and see the results on console then try this approach. sc.textFile("/user/temp/hive.hql").collect().mkString.split(";").map(x => sc.sql(x).show()) Now we are executing all queries in hql script and displaying results in console.
... View more
06-19-2019
01:52 AM
@Jayashree S Use RouteOnArttribute processor after ListS3Object processor and filter only the required file and pass that to FetchS3Object. Flow: Lists3
RouteOnAttribute
FetchS3 (or) If you want to pull the same file from s3 all the time, then you can use flow as: GenerateFlowFile //schedule this processor as per your requirements
FetchS3Object //configure full s3 file path
... View more
06-19-2019
01:34 AM
@Karthikeyyan Subbiah I think your attribute is not coming as attribute of the flowfile, if your attribute is in content of flowfile then UpdateAttribute processor won't work. We need to use UpdateRecord processor for that and give the path as /<attribute_name> and add your logic to get the desired format of timestamp.
... View more
06-19-2019
01:21 AM
@Bill Miller Try with series of SplitRecord processors to create smaller chunks of files. Follow the similar approach mentioned in this thread and see if you get any performance with this approach.
... View more
06-19-2019
01:06 AM
@Veera Pavan This job will work fine in Hive but in Spark follow these steps: write the data to temporary table first. then select from temporary table insert overwrite the final table. Check this similar thread regards to similar case. - If the answer is helpful to resolve the issue, Login and Click on Accept button below to close this thread.This will help other community users to find answers quickly 🙂
... View more
06-19-2019
01:00 AM
@Sampath Kumar Hive Timestamp type accepts format as yyyy-MM-dd HH:mm:ss[.SSS] hive> select timestamp("2019-06-15 15:43:12");
2019-06-15 15:43:12
hive> select timestamp("2019-06-15 15:43:12.988");
2019-06-15 15:43:12.988 hive> select timestamp("2019-06-15T15:43:12")
NULL If you are thinking to have timestamp type rather than text format tables then you use from_unixtime,unix_timestamp functions to remove "T" from the data and then you can have timestamp type in all formats. - If the answer is helpful to resolve the issue, Login and Click on Accept button below to close this thread.This will help other community users to find answers quickly 🙂
... View more
06-18-2019
01:27 AM
@Carlo Bustos Could you confirm that DistributedMapCacheClientService is defined and enabled ? Please refer this and this links for defining distributed map cache service.
... View more
06-18-2019
12:58 AM
1 Kudo
@Carlos Try with below spec: [{
"operation": "shift",
"spec": {
"*": "data.&",
"ID": ["ID", "data.ID"]
}
}, {
"operation": "default",
"spec": {
"dataset": "${dataset:toLower()}",
"date": "${date}"
}
}] Output: {
"ID" : "123",
"data" : {
"ID" : "123",
"Text1" : "aaa",
"Text2" : "aaa",
"Text3" : "aaa"
},
"date" : "${date}",
"dataset" : "${dataset:toLower()}"
}
... View more
06-16-2019
11:02 PM
1 Kudo
@Sampath Kumar ALTER TABLE table SET SERDEPROPERTIES ("timestamp.formats"="yyyy-MM-dd'T'HH:mm:ss"); Works only in case of Textformat,CSV format tables. If you are having other format table like orc..etc then set serde properties are not got to be working. - Tested by creating text format table: Data: 1,2019-06-15T15:43:12
2,2019-06-15T15:43:19 create table i(id int,ts timestamp) row format delimited fields terminated by ',' stored as textfile;
ALTER TABLE i SET SERDEPROPERTIES ("timestamp.formats"="yyyy-MM-dd'T'HH:mm:ss");
select * from i;
1 2019-06-15 15:43:12
2 2019-06-15 15:43:19 - incase if we have orc file with 2019-06-15T15:43:12 format then altering the serde properties still results null format for timestamp field.
... View more
06-15-2019
02:36 PM
1 Kudo
@Jayashree S ListS3 processor is stateful processor once the processor runs it will store the state in the processor and then runs incrementally,if we don't have any new files added to S3 directory then processor won't list any files. How to Clear state: Stop the ListS3 processor and Right click on ListS3processor and select state and clear the state that is saved in the processor. Then start ListS3 processor, now processor will list all the files in S3 directory.
... View more
06-13-2019
01:44 AM
@Veerendra Nath Jasthi In UpdateAttribute add new attribute as ts value as ${now():format("yyyy_MM_dd_HH_mm_ss_SSS")} Example: ts attribute will have value as 2019_06_12_20_42_26_762 Then in PutHDFS processor configure directory as /<path>/${ts} (or) You can skip UpdateAttribute processor and directly use directory name as /<path>/${now():format("yyyy_MM_dd_HH_mm_ss_SSS")} In PutHDFS processor. This will create a directory in HDFS with current timestamp value. You can change the format of the timestamp using NiFi expression language.
... View more
06-12-2019
08:06 PM
@chauvin christophe You can use NiFi RestApi /processors/{id}/state/clear-requests call, to clear the state stored in the processor. We can make restapi call by using invokehttp processor (or) groovy script and triggered by using NiFi.
... View more
06-12-2019
12:36 AM
@James Willson Try with below spec: [{
"operation": "shift",
"spec": {
"rows": {
"*": {
"value": "[&1].date",
"data": {
"*": "[#3].data"
}
}
}
}
}
] Gives output that you are looking for: [ {
"date" : "00:00 2019-06-03",
"data" : 120
}, {
"date" : "05:00 2019-06-08",
"data" : 98
}, {
"date" : "23:00 2019-06-09",
"data" : 172
} ] - If the answer is helpful to resolve the issue, Login and Click on Accept button below to close this thread.This will help other community users to find answers quickly 🙂
... View more
06-11-2019
04:35 AM
@vinay p Try with CountText,CalculateRecordStats processors will gives what you are looking for. To get one summary flowfile from 3 flowfiles then first you should merge 3 flowfilee into one by using MergeRecord/MergeContent processor then use CountText,CalculateRecordStats processors to get the summary of the flowfile. Then use PutEmail processor to send to send the flowfile as an attachment.
... View more
06-11-2019
04:26 AM
@jingyong zou The issue is with the flowfile format that is passed through the processor. As ConvertAvroToJson processor accepts only Avro format but i think you are passing Json format to the processor which is causing java.io.IOException: Not a data file error.
... View more