Support Questions
Find answers, ask questions, and share your expertise

Nifi serially processed with paused flow file


Nifi serially processed with paused flow file

New Contributor

I would like to achive, to make simple simulator. For instance I want to consume some messages from kafka with timestamp:





With my custom processor I want to achieve that flow file will go out with time t=0, and than processor will wait 2 seconds and than return t=2, etc..


The problem is that if I send ordered messages yield and rollback somehow change order in the connection. For me would be the best answer how to block flow file for few seconds but I have to check at first what is it timestamp and than calcualte diff.


Code on trigger method looks something:



		final FlowFile flowFile = session.get();
		long current_kafka_message_send = Long.parseLong(flowFile.getAttribute(variableName));
		 * Transfer to failure if we send already more recent timestamped kafka record
		 * */
		if (previous_kafka_message_send != null && previous_kafka_message_send > current_kafka_message_send) {
			session.transfer(flowFile, FAILURE);

		if (previous_message_send != null && previous_kafka_message_send != null) {
			float delta = (current_kafka_message_send - previous_kafka_message_send)/ speedUpFactor;
			long send_diff = onTriggerStartTime - previous_message_send;
			 * Calculate if current_kafka - previous_kafka is less than delta
			 * */
			if (send_diff < (delta)) {"Calculate diff " + send_diff+ "delta" + delta);
				//session.transfer(flowFile, Relationship.SELF);

		//int send_timestamp = (int)(System.currentTimeMillis()/1000);
		Map<String, String> attrs = new HashMap<>();
		attrs.put("output", String.valueOf(onTriggerStartTime));

		FlowFile outfile = session.putAllAttributes(flowFile, attrs);
		session.transfer(outfile, SUCCESS);