import org.apache.flink.api.java.tuple.Tuple2 ; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream ; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment ; import org.apache.flink.streaming.connectors.json.JSONParseFlatMap ; import org.apache.flink.util.Collector ; import org.apache.flink.core.fs.FileSystem ; public class LogAnalytics { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final int maxEventDelay = 60; String url = args[0]; DataStream live = env.addSource(new HTTPJSONStream(url,maxEventDelay)); DataStream> ds = live.flatMap(new SelectJSONFlatMap()); //ds.print(); ds.writeAsText("/root/packages/opfinal.txt", FileSystem.WriteMode.OVERWRITE); env.execute("JSON Log Analytics"); } public static class SelectJSONFlatMap extends JSONParseFlatMap> { @Override public void flatMap(FlinkJSONObject value, Collector> out) throws Exception { out.collect(new Tuple2(value.jsonObject, 1)); } } }