Support Questions

Find answers, ask questions, and share your expertise

How to dump the data from DB2 to Cassandra more faster in nifi


I have a requirement where I need to load data from DB2 to Cassandra using Apache Nifi. My DB2 table has around 40k records which has taken around 15 mins to complete the data dump to cassandra. I have attached 2 images of current nifi flow for this usecase. Observed only 100+ records are read/sec. Could anyone please let me know - how to tune the flow/processors so that we can increase the speed (reduce time) of data dump.

  1. DB2 to Cassandra Nifi Flow - Before Execute script starts
  2. After Execute script started


Sample Data (example):

  • Incoming Data of ExecuteScript: 2-Dec-15,120, Albuquerque
  • Output Data of ExecuteScript: INSERT INTO model.test_data JSON '{ "date": "2015-12-02", "max": "120", "city": "Albuquerque" }



Am attaching execute script where we are preparing insert statements for Cassandra dump.

from import IOUtils
from java.nio.charset import StandardCharsets
from import StreamCallback
import json
import csv
import io
import datetime

class TransformCallback(StreamCallback):
    def _init_(self):
    def process(self,inputStream,outputStream):
        inputdata = IOUtils.toString(inputStream,StandardCharsets.UTF_8)
        text = csv.reader(io.StringIO(inputdata))
        l = []
        for row in text:
            mon = row[0].strip()
            modified_date = str(datetime.datetime.strptime(str(mon), "%d%b%Y").strftime("%Y-%m-%d"))
            row[0] = modified_date
            row[1] = row[1].strip()
            row[2] = row[2].strip()
        values_str = json.dumps(l)
        leng = len(l)
        for i in range(leng):
            obj = json.loads(values_str)[i]  ## obj = dict
            newObj = {
                  "date": obj[0],
                  "max": obj[1],
                  "city": obj[2]
            insert_query = ("INSERT INTO model.test_data JSON '"+json.dumps(newObj , indent=4)+"';").encode('utf-8')


Super Guru

The PutCassandraRecord processor was added for this purpose. If you only have one line of data per flow file, it will be slower than it needs to be; instead you could use MergeContent or MergeRecord to consolidate a number of these rows into a single flow file, then use PutCassandraRecord (with a CSVReader that has a schema with your 3 fields in it), and it will load all the records in one execution.


Hi @Matt Burgess thanks for your quick response.


I have tried using PutCassandraRecord Processor(with CSVReader - defined schema using AvroSchemaRegistry).

Batch Size : 1000













Out of 40K records, only 100 records are getting dumped, after that I am getting server timed out error.











Could you please help me with your suggestion here.


1) What could be the maximum batch size we can define in PutCassandraRecord processor?

2) Do we need to enable any properties/settings to handle this timeout error, as I have noticed currently there are no DB timeout settings available in PutCassandraRecord Processor end.

3) And also could you please let me know is there any other way can we design the flow so that there is no need to define the schema manually (using AvroSchemaRegistry), as in my most of the cases I don't know the full source table's schema?

     In this case do my Db2 and Casandra table columns should be same and in the same order ? If I have different columns names in 

  • DB2 (3 columns) -->  date, maximum, city
  • Cassandra (3 columns) --> date, city, max

How could it be handled?


Thanks in advance