Support Questions
Find answers, ask questions, and share your expertise

How to prevent SelectHiveQL processor from running at certain time , for instance it must not run at 12:30

New Contributor

I have a flow in which i’m selecting data from database table using SelectHiveQL processor and storing data to azure using PutAzureBlobStorage processor , and now i want to prevent that SelectHiveQL processor from running at certain time , for instance it mu run at 12:30


3 REPLIES 3

Super Guru

You can put an ExecuteScript processor before SelectHiveQL, and use that to check the time. If it must not execute then it can simply return without fetching any FlowFiles from the queue; otherwise you can simply fetch and transfer all available FlowFiles.

New Contributor

i have tried to use my code with executeScript processor but it didn't work , please check and let me know if i'm missing anything or if you have another same code please share it with me ,Will really appreciate:

import java.io

from org.apache.commons.io import IOUtils

from java.nio.charset import StandardCharsets

from org.apache.nifi.processor.io import StreamCallback

from time import sleep


class WriteContentCallback(StreamCallback):


def __init__(self):

pass

def process(self, inputStream,outputStream):

text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)


outputStream.write(bytearray(start(text).encode('utf-8')))


def start():

time_periods = working_periods_table.find_all()

today = datetime.datetime.now().isoweekday()

for day in time_periods:

if day["day"] == today:

start_time = day["work_start_at"]

end_time = day["12:35"]


while True:

if datetime.datetime.now() == end_time:


schedule.every().week.at(end_time).do(record_activity(idle_time=300))


break

else:

sleep(100)


flowFile = session.get()

if flowFile != None:


flowFile=session.write(flowFile, WriteContentCallback())

flowFile = session.putAttribute(flowFile, "filename", "true")

session.transfer(flowFile, REL_SUCCESS)

session.commit()




New Contributor

Thank you @Matt Burgess...

i have tried to use my code with executeScript processor but it didn't work , please check and let me know if i'm missing anything or if you have another same code please share it with me ,Will really appreciate:

import java.io

from org.apache.commons.io import IOUtils

from java.nio.charset import StandardCharsets

from org.apache.nifi.processor.io import StreamCallback

from time import sleep


class WriteContentCallback(StreamCallback):

def __init__(self):

pass

def process(self, inputStream,outputStream):

text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)

outputStream.write(bytearray(start(text).encode('utf-8')))

def start():

time_periods = working_periods_table.find_all()

today = datetime.datetime.now().isoweekday()

for day in time_periods:

if day["day"] == today:

start_time = day["work_start_at"]

end_time = day["12:35"]

while True:

if datetime.datetime.now() == end_time:

schedule.every().week.at(end_time).do(record_activity(idle_time=300))

break

else:

sleep(100)


flowFile = session.get()

if flowFile != None:


flowFile=session.write(flowFile, WriteContentCallback())

flowFile = session.putAttribute(flowFile, "filename", "true")

session.transfer(flowFile, REL_SUCCESS)

session.commit()