Support Questions

Find answers, ask questions, and share your expertise

Insert into works very slow in Impala.

avatar
Explorer

Hi, I am trying to insert mere 10k record batch however its taking very long time, so I reduced my batch to 1000 records yet it took nearly 14 seconds. I don't know whats wrong below is my java code which I tried -
Currently I am able t oinsert 200/sec records however I need to have atleast 10k/sec records.

 

import java.sql.*;

class impala12 {

    private static final String CONNECTION_URL = "jdbc:impala://172.16.8.177:21050" ;
    private static final String sqlStatementCreate = "CREATE TABLE if not exists helloworld (message String) STORED AS PARQUET";
    private static final String sqlStatementInsert = "INSERT INTO helloworld VALUES (\"helloworld\")";

    private static final String sqlCompiledQuery = "INSERT INTO tbl_mindarray (source_ip,destination_ip,protocol_number," +
            "source_port,destination_port,packet,volume,duration,pps,bps,bpp,source_latitude,source_longitude," +
            "source_city,source_country,destination_latitude,destination_longitude ,destination_city ,destination_country ," +
            "ingress_volume ,egress_volume ,ingress_packet ,egress_packet ,source_if_index ,destination_if_index," +
            "source_host,event_date,event_time,_time,flow,year)" + " VALUES" +
            "(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
            ;
    //,event_date,event_time
    //,?,?,?,?,?,?,?
    //source_longitude,source_city," +
    //            "source_country,destination_latitude,destination_longitude ,destination_city ,destination_country

    public static void main(String[] args) {
        System.out.println("Impala using Java");

        writeInABatchWithCompiledQuery(100);

        System.out.println("Done");

    }


    private static Connection connectViaDS() throws Exception {
        Connection connection = null;

        Class.forName("com.cloudera.impala.jdbc41.Driver");

        connection = DriverManager.getConnection(CONNECTION_URL);

        return connection;

    }

    private static void writeInABatchWithCompiledQuery(int records) {

        int protocol_no = 233,s_port=20,d_port=34,packet=46,volume=58,duration=39,pps=76,
                bps=65,bpp=89,i_vol=465,e_vol=345,i_pkt=5,e_pkt=54,s_i_ix=654,d_i_ix=444,_time=1000,flow=989;

        int yr = 1951;

        int flag = 0;

        String s_city = "Mumbai",s_country = "India", s_latt = "12.165.34c", s_long = "39.56.32d",
                s_host="motadata",d_latt="29.25.43c",d_long="49.15.26c",d_city="Damouli",d_country="Nepal";

        long e_date= 1275822966, e_time= 1370517366;

        PreparedStatement preparedStatement;

        // int total = 1000000*1000;

        int total = 1000*1000;

        int counter =0;

        Connection connection = null;
        try {
            connection = connectViaDS();

            connection.setAutoCommit(false);

            preparedStatement = connection.prepareStatement(sqlCompiledQuery);


            Timestamp ed = new Timestamp(e_date);
            Timestamp et = new Timestamp(e_time);

            while(counter <total) {

                for (int index = 1; index <= records; index++) {

                    counter++;

                    preparedStatement.setString(1, "s_ip" + String.valueOf(index));
                    //   System.out.println(1);
                    preparedStatement.setString(2, "d_ip" + String.valueOf(index));
                    //   System.out.println(2);
                    preparedStatement.setInt(3, protocol_no + index);
                    //   System.out.println(3);
                    preparedStatement.setInt(4, s_port + index);
                    //  System.out.println(4);
                    preparedStatement.setInt(5, d_port + index);
                    //  System.out.println(5);
                    preparedStatement.setInt(6, packet + index);
                    //  System.out.println(6);
                    preparedStatement.setInt(7, volume + index);
                    //  System.out.println(7);
                    preparedStatement.setInt(8, duration + index);
                    //   System.out.println(8);
                    preparedStatement.setInt(9, pps + index);
                    //  System.out.println(9);
                    preparedStatement.setInt(10, bps + index);
                    //  System.out.println(10);
                    preparedStatement.setInt(11, bpp + index);
                    //   System.out.println(11);
                    preparedStatement.setString(12, s_latt + String.valueOf(index));
                    //  System.out.println(12);
                    preparedStatement.setString(13, s_long + String.valueOf(index));
                    //  System.out.println(13);
                    preparedStatement.setString(14, s_city + String.valueOf(index));
                    //  System.out.println(14);
                    preparedStatement.setString(15, s_country + String.valueOf(index));
                    //  System.out.println(15);
                    preparedStatement.setString(16, d_latt + String.valueOf(index));
                    //   System.out.println(16);
                    preparedStatement.setString(17, d_long + String.valueOf(index));
                    //   System.out.println(17);
                    preparedStatement.setString(18, d_city + String.valueOf(index));
                    //   System.out.println(18);
                    preparedStatement.setString(19, d_country + String.valueOf(index));
                    //   System.out.println(19);
                    preparedStatement.setInt(20, i_vol + index);
                    //   System.out.println(20);
                    preparedStatement.setInt(21, e_vol + index);
                    //   System.out.println(21);
                    preparedStatement.setInt(22, i_pkt + index);
                    //   System.out.println(22);
                    preparedStatement.setInt(23, e_pkt + index);
                    //   System.out.println(23);
                    preparedStatement.setInt(24, s_i_ix + index);
                    //   System.out.println(24);
                    preparedStatement.setInt(25, d_i_ix + index);
                    //    System.out.println(25);
                    preparedStatement.setString(26, s_host + String.valueOf(index));
                    //   System.out.println(26);
                    preparedStatement.setTimestamp(27, ed);
                    //   System.out.println(27);
                    preparedStatement.setTimestamp(28, et);
                    //   System.out.println(28);
                    preparedStatement.setInt(29, _time);
                    //   System.out.println(29);
                    preparedStatement.setInt(30, flow + index);

                    preparedStatement.setInt(31, yr + flag);
                    //   System.out.println(30);
                    // System.out.println(index);
                    preparedStatement.addBatch();

                }
                preparedStatement.executeBatch();

                preparedStatement.clearBatch();

                //connection.commit();

                System.out.println("Counter = "+counter);

                //flag++;


            }





        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                connection.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }

    }

}


I am using latest Cloudera Vm provided as on 17June2019.
Statements are not working I am getting error, as executeBatch() is not supported.
I tried increasing Daemon Limit and Heap but it didn't worked.
I had earlier posted this but I don't know someone reported as spam or something and it got deleted. 
I am reposting it.
Thanks

4 REPLIES 4

avatar
Contributor

Hi Punshi

When you insert into parquet like this through Impala, you will create 10,000 small files in hdfs, each with 100 rows. That won't be an efficient way to write data, and it will be very inefficient to read. 

You should probably review
https://www.cloudera.com/documentation/enterprise/latest/topics/impala_parquet.html#parquet
and
https://www.cloudera.com/documentation/enterprise/latest/topics/impala_perf_cookbook.html
to see other ways to ingest your data.

-Andrew

avatar
Explorer

Yes I had seen the document earlier before posting here, I had batch size of 10,000 which took several minutes to process so I started reducing the batch at the point where I am able to insert the record in one sec, batch size reduced to 100. My question is what changes I have to do to get insertion speed of 10k/sec records, I can increase batch size to 10000 but I won't get the speed.
What are the bare minimum settings required to get these speed ?, this isn't fast as I have insert upto 40k/sec records in mysql from same system but for Impala I believe it works differently and somewhere my lack of understanding/knowledge is part of the problem.
I cannot use INSERT ... SELECT or LOAD as I will be getting live data into variables so I need to use INSERT INTO..

Is my java method of using PreparedStatements correct or there is some other better way.

avatar
Contributor

Where is your data that you want to query with Impala?

 

Many people write their data directly into hdfs. It might be written as a text or csv file.
You can access the files via an external table.
That might be enough for a simple case, but if optimal query performance is required you could create a parquet table by selecting from the external table.

 

If the data is in another database then you could use sqoop to import the data,

 

If the newer data is going to be continually updated in hdfs then you could consider this pattern: https://blog.cloudera.com/blog/2019/03/transparent-hierarchical-storage-management-with-apache-kudu-...

avatar
Explorer

I will be getting data from network via snmp or serial.


Loading data from file is very fast , I did it manually first to check and it worked however when I try to do the same with my java code I get different errors. 

 

I am trying few steps but getting error --

 

1. I wrote 1 miilion records into text file through java.

2. I tried to upload the generate text file into HDFS through my java code.

3. I am executing LOAD query through code.

 

However I am unable to execute step 2-

1. I tried java method of ProcessBuilder(), getRuntime.exec() but it didn't worked. Then I tried searching for some API to do following task but I was getting error-

 

java.io.IOException: No FileSystem for scheme: file
	at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2584)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591)
	at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:169)
	at HdfsWriter.run(impala_crt.java:935)
	at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
	at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)
	at impala_crt.writeFile(impala_crt.java:861)
	at impala_crt.main(impala_crt.java:106)


Do you have any solution for this or there is some better method ?