Created 08-08-2019 01:30 PM
I have a flow in which i’m selecting data from database table using SelectHiveQL processor and storing data to azure using PutAzureBlobStorage processor , and now i want to prevent that SelectHiveQL processor from running at certain time , for instance it mu run at 12:30
Created 08-09-2019 05:48 PM
You can put an ExecuteScript processor before SelectHiveQL, and use that to check the time. If it must not execute then it can simply return without fetching any FlowFiles from the queue; otherwise you can simply fetch and transfer all available FlowFiles.
Created 08-12-2019 01:30 PM
i have tried to use my code with executeScript processor but it didn't work , please check and let me know if i'm missing anything or if you have another same code please share it with me ,Will really appreciate:
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
from time import sleep
class WriteContentCallback(StreamCallback):
def __init__(self):
pass
def process(self, inputStream,outputStream):
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
outputStream.write(bytearray(start(text).encode('utf-8')))
def start():
time_periods = working_periods_table.find_all()
today = datetime.datetime.now().isoweekday()
for day in time_periods:
if day["day"] == today:
start_time = day["work_start_at"]
end_time = day["12:35"]
while True:
if datetime.datetime.now() == end_time:
schedule.every().week.at(end_time).do(record_activity(idle_time=300))
break
else:
sleep(100)
flowFile = session.get()
if flowFile != None:
flowFile=session.write(flowFile, WriteContentCallback())
flowFile = session.putAttribute(flowFile, "filename", "true")
session.transfer(flowFile, REL_SUCCESS)
session.commit()
Created 08-12-2019 04:20 PM
Thank you @Matt Burgess...
i have tried to use my code with executeScript processor but it didn't work , please check and let me know if i'm missing anything or if you have another same code please share it with me ,Will really appreciate:
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
from time import sleep
class WriteContentCallback(StreamCallback):
def __init__(self):
pass
def process(self, inputStream,outputStream):
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
outputStream.write(bytearray(start(text).encode('utf-8')))
def start():
time_periods = working_periods_table.find_all()
today = datetime.datetime.now().isoweekday()
for day in time_periods:
if day["day"] == today:
start_time = day["work_start_at"]
end_time = day["12:35"]
while True:
if datetime.datetime.now() == end_time:
schedule.every().week.at(end_time).do(record_activity(idle_time=300))
break
else:
sleep(100)
flowFile = session.get()
if flowFile != None:
flowFile=session.write(flowFile, WriteContentCallback())
flowFile = session.putAttribute(flowFile, "filename", "true")
session.transfer(flowFile, REL_SUCCESS)
session.commit()