I have a requirement where I need to load data from DB2 to Cassandra using Apache Nifi. My DB2 table has around 40k records which has taken around 15 mins to complete the data dump to cassandra. I have attached 2 images of current nifi flow for this usecase. Observed only 100+ records are read/sec. Could anyone please let me know - how to tune the flow/processors so that we can increase the speed (reduce time) of data dump.
Sample Data (example):
- Incoming Data of ExecuteScript: 2-Dec-15,120, Albuquerque
- Output Data of ExecuteScript: INSERT INTO model.test_data JSON '{ "date": "2015-12-02", "max": "120", "city": "Albuquerque" }
Am attaching execute script where we are preparing insert statements for Cassandra dump.
import java.io from org.apache.commons.io import IOUtils from java.nio.charset import StandardCharsets from org.apache.nifi.processor.io import StreamCallback import json import csv import io import datetime class TransformCallback(StreamCallback): def _init_(self): pass def process(self,inputStream,outputStream): inputdata = IOUtils.toString(inputStream,StandardCharsets.UTF_8) text = csv.reader(io.StringIO(inputdata)) l = [] for row in text: mon = row[0].strip() modified_date = str(datetime.datetime.strptime(str(mon), "%d%b%Y").strftime("%Y-%m-%d")) row[0] = modified_date row[1] = row[1].strip() row[2] = row[2].strip() l.append(row) values_str = json.dumps(l) leng = len(l) for i in range(leng): obj = json.loads(values_str)[i] ## obj = dict newObj = { "date": obj[0], "max": obj[1], "city": obj[2] } insert_query = ("INSERT INTO model.test_data JSON '"+json.dumps(newObj , indent=4)+"';").encode('utf-8') outputStream.write(bytearray(insert_query))