Support Questions
Find answers, ask questions, and share your expertise

Apache Pig load only New files using LOAD Statement

Highlighted

Apache Pig load only New files using LOAD Statement

Contributor

Hi all,

I want to load only new files from the Apache Pig  LOAD statement. I have 25 files in HDFS directory and on other days I upload another few files in the same directory of HDFS. But now I just want to load only those files which are still unloaded (New files) in the Pig.

How could I achieve this?

 

Thanks,

Jay.

4 REPLIES 4
Highlighted

Re: Apache Pig load only New files using LOAD Statement

New Contributor

I also have some questions that, why upload the whole folder when changes are happening in a few files. Can anyone answer?

Highlighted

Re: Apache Pig load only New files using LOAD Statement

Contributor

Hi @pateljay .

 

A- you can use shell-script to identify newly file with current directory and load it.

 

B-or after first time pig load completion ,remove Tthat files into another directory or delete directly if not required.

C-or write python or java code that identify the timestamp for newly files and load that file with pig.

 

This link may be helpful-

https://stackoverflow.com/questions/12630584/load-multiple-files-in-pig

 

Thanks

HadoopHelp

Highlighted

Re: Apache Pig load only New files using LOAD Statement

Contributor

@HadoopHelp 

 

as I didn't yet use shell script more, could you please provide me a sample shell-script for that ??

Highlighted

Re: Apache Pig load only New files using LOAD Statement

Contributor

Hi @pateljay .

 

i am sharing dummy python code . that code is taking new file from some dir and moving to another directory .

 

 

import time
import os
import shutil

SECONDS_IN_DAY = 120 #inseconds

src="C:\\Users\Ramesh.kumar\Desktop\SourceData"
dst = "C:\\Users\Ramesh.kumar\Desktop\newDataIdentified"
Data ="C:\\Users\Ramesh.kumar\Desktop\MoveToLinux"

now = time.time()
before = now - SECONDS_IN_DAY
print("time interval is:",before)

def last_mod_time(fname):
    return os.path.getmtime(fname)

for fname in os.listdir(src):
    src_fname = os.path.join(src, fname)
    data_fname = os.path.join(Data, fname)
    if last_mod_time(src_fname) > before:
        dst_fname = os.path.join(dst, fname)
        print("-------------------------------------------------------------------------------")
        print("data is going to from A TO B")
        print("-------------------------------------------------------------------------------")
        shutil.copy(src_fname, dst_fname)
        shutil.move(dst_fname,data_fname)
        print("-------------------------------------------------------------------------------")
        print("new data has been moved")
        print("-------------------------------------------------------------------------------")



 

Note- if you are using hue or any job scheduler then you can easily achieve that.

 

Shell-script dummy -

 

echo "............................................................BANGALORE................................................................................."
echo "............................................................This is CCDA Job.........................................................................."
echo "........................................................Spark Job is going To START..................................................................."
echo "........................................................Spark is Ready to RUN........................................................................."
spark-submit python1.py
echo ".......................................................Spark job has been completed..................................................................."
echo ".......................................................Now PIG Query is going To RUN.................................................................."
pig -f ccd_cit_py.pig
echo ".......................................................PIG Query Completed here......................................................................."
echo "....................................................Now Going to create Hive Stage Table.............................................................."
echo ".....................................................Ready to create hive stage Table................................................................."
hive -f ccda_pig_hive_cit1.sql
echo "......................................................hive stage Table created........................................................................"
echo "......................................................Now Going Create Final Table...................................................................."
echo ".......................................................Ready to Create Final Table...................................................................."
hive -f ccda_pig_hive_cit_final.sql
echo "-------------------------------------------------------Final Table has been Created-------------------------------------------------------------------"
echo "--------------------------------------------------------demography table id going to create here-------------------------------------------------------"
hive -f DemoGrapghy.sql
echo "---------------------------------------------------------demoGraphy Table has been Created------------------------------------------------------------"
echo "-------------------------------------------------------HRK_CCDFile Table is going to create-----------------------------------------------------------"
echo "-----------------------------------------------------HRK_CCDFile Table has been Created---------------------------------------------------------------"
echo "......................................................ALL Table is created on Hive.................................................................."
echo "..................................................Now Going To Delete PIG INPUTPATH from HDFS........................................................."
echo "......................................................Ready To Delete PIG INPUTPATH..................................................................."
hdfs dfs -rmr /user/root/BigDataTest/CCDACIT/CCDAPYOUTPUTDATA/A
echo ".......................................................PIG INPUTPATH Deleted.........................................................................."
echo "......................................................................................................................................................"
echo "....................................................Now Going to Delete PIG OUTPUTPATH from HDFS......................................................"
echo "........................................................Ready To Delete PIG OUTPUTPATH................................................................"
echo ".............................................................................................................................................."
echo "............................................................BANGALORE................................................................................."
echo "............................................................This is CCDA Job.........................................................................."
echo "........................................................Spark Job is going To START..................................................................."
echo "........................................................Spark is Ready to RUN........................................................................."
spark-submit python1.py
echo ".......................................................Spark job has been completed..................................................................."
echo ".......................................................Now PIG Query is going To RUN.................................................................."
pig -f ccd_cit_py.pig
echo ".......................................................PIG Query Completed here......................................................................."

 

Note - you need to add script as per your requirements .

reference link-

https://unix.stackexchange.com/questions/24952/script-to-monitor-folder-for-new-files

 

 

Thanks

HadoopHelp

Don't have an account?