Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Update Postgres Using Flink

Update Postgres Using Flink

New Contributor

My dataset named dbData contains set of data. I want to update postgres with that data regularly where the dbData changes regularly on everyday purpose.

 dbData.map(newWrite()).
output(JDBCOutputFormat.buildJDBCOutputFormat()
.setDrivername(Utils.properties_fetch("drivername"))
.setDBUrl(Utils.properties_fetch("dbURL"))
.setUsername(Utils.properties_fetch("username"))
.setPassword(Utils.properties_fetch("password"))
.setQuery(Write.updatequery).finish());

My "Write" class looks like the following:

publicclassWriteimplementsMapFunction<Tuple7<String,String,String,String,String,String,String>,Row>
{
staticString updatequery ;privatestaticfinallong serialVersionUID =1L;
publicRow map(Tuple7<String,String,String,String,String,String,String> value)throwsException
{       Row obj =newRow(7);
        obj.setField(0, value.f0); 
        obj.setField(1, value.f1);
        obj.setField(2, value.f2);
        obj.setField(3, value.f3);
        obj.setField(4, value.f4);
        obj.setField(5, value.f5);
        obj.setField(6, value.f6);
Write.updatequery=putdatainDb(obj);
return obj;
}
publicString putdatainDb(Row obj)
{
	String updateQuery="UPDATE dashboard   SET metric_result
          ='"+obj.getField(2)+"' ,metric_executed_on ='"+obj.getField(5)+"' ::date 
           where metric_orgid ='"+obj.getField(6)+"'  and date(metric_from) 
      ='"+obj.getField(3)+"'::date and date(metric_to) ='"+obj.getField(4)+"' ::date  and metric_topic ='"+obj.getField(1)+"' ;";return updateQuery;

}}

In set query I want the query to change every time with the new row so that I can update my database regularly. Suggest some ways to achieve this.

Don't have an account?
Coming from Hortonworks? Activate your account here