Support Questions
Find answers, ask questions, and share your expertise

Appending mysql table row using spark sql dataframe write method

Appending mysql table row using spark sql dataframe write method

New Contributor

I am a newbie in apache spark sql. Below is the codes of spark sql application and the results of query.

SparkSession spark = SparkSession.builder().appName("Spark SQL Test").master("local[*]").getOrCreate();      

Properties connectionProperties = new Properties();
connectionProperties.put("driver", "com.mysql.jdbc.Driver");
connectionProperties.put("url", "jdbc:mysql://localhost:3306/test");
connectionProperties.put("user", "root");
connectionProperties.put("password", "password");

Dataset<Row> jdbcDF = spark.read().jdbc(connectionProperties.getProperty("url"), "family", connectionProperties);
jdbcDF.show();
jdbcDF.printSchema();

And the results are

+------+----------+--------+-------+
|EMP_ID|EMP_PASSWD|EMP_NAME|EMP_AGE|
+------+----------+--------+-------+
|  jina|       bbb|    mother|     45|
|joseph|       aaa|    father|     50|
|julian|       ccc|      son |     20|
+------+----------+--------+-------+
root
 |-- EMP_ID: string (nullable = false)
 |-- EMP_PASSWD: string (nullable = false)
 |-- EMP_NAME: string (nullable = false)
 |-- EMP_AGE: integer (nullable = false)

And I try to append the another row into family table. First I use sql command like below

Dataset<Row> appendSql = spark.sql("INSERT INTO family VALUES('jane' , 'ddd', 'daughter' , 15)");

Then I can bring the result which I want.

+------+----------+--------+-------+
|EMP_ID|EMP_PASSWD|EMP_NAME|EMP_AGE|
+------+----------+--------+-------+
|  jane|       ddd|daughter|     15|
|  jina|       bbb|  mother|     45|
|joseph|       aaa|  father|     50|
|julian|       ccc|     son|     20|
+------+----------+--------+-------+

And then I use write method like below

List<Object> appendData = Arrays.asList("julia", "eee", "grand mother", new Integer(70)); 
spark.createDataFrame(appendData, Object.class).write().mode(SaveMode.Append).jdbc(connectionProperties.getProperty("url"), "family", connectionProperties);

But the above line throws the following exception

java.sql.BatchUpdateException: Field 'EMP_ID' doesn't have a default value
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at com.mysql.jdbc.Util.handleNewInstance(Util.java:425)
    at com.mysql.jdbc.Util.getInstance(Util.java:408)
    at com.mysql.jdbc.SQLError.createBatchUpdateException(SQLError.java:1163)
    at com.mysql.jdbc.PreparedStatement.executeBatchSerially(PreparedStatement.java:1778)
    at com.mysql.jdbc.PreparedStatement.executeBatchInternal(PreparedStatement.java:1262)
    at com.mysql.jdbc.StatementImpl.executeBatch(StatementImpl.java:970)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:641)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$anonfun$saveTable$1.apply(JdbcUtils.scala:782)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$anonfun$saveTable$1.apply(JdbcUtils.scala:782)
    at org.apache.spark.rdd.RDD$anonfun$foreachPartition$1$anonfun$apply$29.apply(RDD.scala:926)
    at org.apache.spark.rdd.RDD$anonfun$foreachPartition$1$anonfun$apply$29.apply(RDD.scala:926)
    at org.apache.spark.SparkContext$anonfun$runJob$5.apply(SparkContext.scala:2062)
    at org.apache.spark.SparkContext$anonfun$runJob$5.apply(SparkContext.scala:2062)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.sql.SQLException: Field 'EMP_ID' doesn't have a default value
    at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:965)
    at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3973)
    at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3909)
    at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2527)
    at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2680)
    at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2484)
    at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1858)
    at com.mysql.jdbc.PreparedStatement.executeUpdateInternal(PreparedStatement.java:2079)
    at com.mysql.jdbc.PreparedStatement.executeBatchSerially(PreparedStatement.java:1756)
    ... 15 more

I try not to use StructField interface because DataTypes of column is simple. Am I wrong? If i am, pls, inform me of your advice. Your any reply will be deeply appreciated. Best regards.