Support Questions
Find answers, ask questions, and share your expertise

Update Postgres Using Flink

Update Postgres Using Flink

New Contributor

My dataset named dbData contains set of data. I want to update postgres with that data regularly where the dbData changes regularly on everyday purpose.

 dbData.map(newWrite()).
output(JDBCOutputFormat.buildJDBCOutputFormat()
.setDrivername(Utils.properties_fetch("drivername"))
.setDBUrl(Utils.properties_fetch("dbURL"))
.setUsername(Utils.properties_fetch("username"))
.setPassword(Utils.properties_fetch("password"))
.setQuery(Write.updatequery).finish());

My "Write" class looks like the following:

publicclassWriteimplementsMapFunction<Tuple7<String,String,String,String,String,String,String>,Row>
{
staticString updatequery ;privatestaticfinallong serialVersionUID =1L;
publicRow map(Tuple7<String,String,String,String,String,String,String> value)throwsException
{       Row obj =newRow(7);
        obj.setField(0, value.f0); 
        obj.setField(1, value.f1);
        obj.setField(2, value.f2);
        obj.setField(3, value.f3);
        obj.setField(4, value.f4);
        obj.setField(5, value.f5);
        obj.setField(6, value.f6);
Write.updatequery=putdatainDb(obj);
return obj;
}
publicString putdatainDb(Row obj)
{
	String updateQuery="UPDATE dashboard   SET metric_result
          ='"+obj.getField(2)+"' ,metric_executed_on ='"+obj.getField(5)+"' ::date 
           where metric_orgid ='"+obj.getField(6)+"'  and date(metric_from) 
      ='"+obj.getField(3)+"'::date and date(metric_to) ='"+obj.getField(4)+"' ::date  and metric_topic ='"+obj.getField(1)+"' ;";return updateQuery;

}}

In set query I want the query to change every time with the new row so that I can update my database regularly. Suggest some ways to achieve this.