Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

How to dump the data from DB2 to Cassandra more faster in nifi

How to dump the data from DB2 to Cassandra more faster in nifi

Explorer
-1

I have a requirement where I need to load data from DB2 to Cassandra using Apache Nifi. My DB2 table has around 40k records which has taken around 15 mins to complete the data dump to cassandra. I have attached 2 images of current nifi flow for this usecase. Observed only 100+ records are read/sec. Could anyone please let me know - how to tune the flow/processors so that we can increase the speed (reduce time) of data dump.

  1. DB2 to Cassandra Nifi Flow - Before Execute script starts
  2. After Execute script started

 

Sample Data (example):

  • Incoming Data of ExecuteScript: 2-Dec-15,120, Albuquerque
  • Output Data of ExecuteScript: INSERT INTO model.test_data JSON '{ "date": "2015-12-02", "max": "120", "city": "Albuquerque" }

 

 

Am attaching execute script where we are preparing insert statements for Cassandra dump.

import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
import json
import csv
import io
import datetime

class TransformCallback(StreamCallback):
    def _init_(self):
        pass
    def process(self,inputStream,outputStream):
        inputdata = IOUtils.toString(inputStream,StandardCharsets.UTF_8)
        text = csv.reader(io.StringIO(inputdata))
        l = []
        for row in text:
            mon = row[0].strip()
            modified_date = str(datetime.datetime.strptime(str(mon), "%d%b%Y").strftime("%Y-%m-%d"))
            row[0] = modified_date
            row[1] = row[1].strip()
            row[2] = row[2].strip()
            l.append(row)
        values_str = json.dumps(l)
        leng = len(l)
        for i in range(leng):
            obj = json.loads(values_str)[i]  ## obj = dict
            newObj = {
                  "date": obj[0],
                  "max": obj[1],
                  "city": obj[2]
                }
            insert_query = ("INSERT INTO model.test_data JSON '"+json.dumps(newObj , indent=4)+"';").encode('utf-8')
            outputStream.write(bytearray(insert_query))

2 REPLIES 2
Highlighted

Re: How to dump the data from DB2 to Cassandra more faster in nifi

Super Guru

The PutCassandraRecord processor was added for this purpose. If you only have one line of data per flow file, it will be slower than it needs to be; instead you could use MergeContent or MergeRecord to consolidate a number of these rows into a single flow file, then use PutCassandraRecord (with a CSVReader that has a schema with your 3 fields in it), and it will load all the records in one execution.

Highlighted

Re: How to dump the data from DB2 to Cassandra more faster in nifi

Explorer

Hi @Matt Burgess thanks for your quick response.

 

I have tried using PutCassandraRecord Processor(with CSVReader - defined schema using AvroSchemaRegistry).

Batch Size : 1000

 

PutCassandraRecord.PNG

 

 

 

 

 

 

 

 

 

 

Out of 40K records, only 100 records are getting dumped, after that I am getting server timed out error.

 

timeouterror.png

 

 

 

 

 

 

 

 

Could you please help me with your suggestion here.

 

1) What could be the maximum batch size we can define in PutCassandraRecord processor?

2) Do we need to enable any properties/settings to handle this timeout error, as I have noticed currently there are no DB timeout settings available in PutCassandraRecord Processor end.

3) And also could you please let me know is there any other way can we design the flow so that there is no need to define the schema manually (using AvroSchemaRegistry), as in my most of the cases I don't know the full source table's schema?

     In this case do my Db2 and Casandra table columns should be same and in the same order ? If I have different columns names in 

  • DB2 (3 columns) -->  date, maximum, city
  • Cassandra (3 columns) --> date, city, max

How could it be handled?

 

Thanks in advance

 

 

Don't have an account?
Coming from Hortonworks? Activate your account here