Support Questions
Find answers, ask questions, and share your expertise

Nifi serially processed with paused flow file

Highlighted

Nifi serially processed with paused flow file

New Contributor

I would like to achive, to make simple simulator. For instance I want to consume some messages from kafka with timestamp:

t:0,

t:2,

t:5,

t:8

With my custom processor I want to achieve that flow file will go out with time t=0, and than processor will wait 2 seconds and than return t=2, etc..

 

The problem is that if I send ordered messages yield and rollback somehow change order in the connection. For me would be the best answer how to block flow file for few seconds but I have to check at first what is it timestamp and than calcualte diff.

 

Code on trigger method looks something:

 

 

		final FlowFile flowFile = session.get();
		long current_kafka_message_send = Long.parseLong(flowFile.getAttribute(variableName));
		/*
		 * Transfer to failure if we send already more recent timestamped kafka record
		 * */
		if (previous_kafka_message_send != null && previous_kafka_message_send > current_kafka_message_send) {
			session.transfer(flowFile, FAILURE);
			return;
		}

		if (previous_message_send != null && previous_kafka_message_send != null) {
			float delta = (current_kafka_message_send - previous_kafka_message_send)/ speedUpFactor;
			long send_diff = onTriggerStartTime - previous_message_send;
			/*
			 * Calculate if current_kafka - previous_kafka is less than delta
			 * */
			if (send_diff < (delta)) {
				logger.info("Calculate diff " + send_diff+ "delta" + delta);
				context.yield();
				session.rollback();
				//session.transfer(flowFile, Relationship.SELF);
				return;
			}
		}


		//int send_timestamp = (int)(System.currentTimeMillis()/1000);
		Map<String, String> attrs = new HashMap<>();
		attrs.put("output", String.valueOf(onTriggerStartTime));


		FlowFile outfile = session.putAllAttributes(flowFile, attrs);
		session.transfer(outfile, SUCCESS);