Support Questions

Find answers, ask questions, and share your expertise
Announcements
Check out our newest addition to the community, the Cloudera Data Analytics (CDA) group hub.

how to release mysql long blob data select

New Contributor

I am using nifi 1.13.2 version.
longblob in mysql db (*. zip) After selecting the file, decompress it, parse the data, and insert it to oracle db.
We have a problem.
the select file cannot be decompressed.
unpackcontent and processcontent can not be released.
Please help me how to decompress
Also, I would like to know how to convert the bytes" : "{\r\n\t\" data into ACSII.

※ I also want to know Groovy ExecuteScript.

sample.1
SELECT TESTInfo (longblob)
FROM TestTable
WHERE 1=1

sample.2

[ {
? "TESTInfo" : {
? ? "bytes" : "{\r\n\t\"testinfo\" : \r\n\t{\r\n\t\t\"TESTITEM1\" : \"TESTDATA1\",\r\n\t\t\"TESTITEM2\" : \"TESTDATA2\"}"
? }
} ]

Below is the flow I think.


1.ExecuteSQL → 2.ExecuteScript (groovy) →?3.EvaluateJsonPath → 4.ExecuteSQL

1.ExecuteSQL:select the longblob column from mysqldb.
The compressed file
It has a folder structure of TEST/UUID/Data.

2.ExecuteScript (groovy) :i want to decompress with groovy script.

3.?EvaluateJsonPath :Since the data file is in Json format, the required value is extracted with attribute information.

4.ExecuteSQL :create insert query and insert data.

1 REPLY 1

New Contributor

I changed my mind.
1.GenerateFlowFile (Triger)→ 2.ExecuteScript (groovy) → 3.EvaluateJsonPath → 4.ExecuteSQL

ExecuteScript content is below

def flowFile = session.get()
if(!flowFile) return

// Executescript attributes
def serviceName = dbcp.value
def sqlCmdString = sqlCmd.value
def jsonSlurper = new JsonSlurper()

// get controller service lookup from context
def lookup = context.controllerServiceLookup

// search for serviceName in controller services
def dbcpServiceId = lookup.getControllerServiceIdentifiers(ControllerService).find {
    cs -> lookup.getControllerServiceName(cs) == serviceName
}

//Get the service from serviceid
def service = lookup.getControllerService(dbcpServiceId)

// Connect to service
def conn = service.getConnection()
if (!conn) {
  log.error("Failed to connect to " + serviceName)
  return;
}

try {
//flowFile = session.create()
flowFile = session.write(flowFile, {out ->
def sql = new Sql(conn)
  if (!sql) {
    log.error("Failed to get SQL connection")
    return
  }
//def sqlCmdString = "ExecuteScript Attributs info"
def rows = sql.rows(sqlCmdString)
"""

    i want to know the part that decompresses select return value and transfers json file to lower flow file.

"""

} as OutputStreamCallback)
session.transfer(flowFile, REL_SUCCESS)
} catch(e) {
    conn?.close()
    log.error("Scripting error " + sqlCmd, e)
    session.transfer(flowFile, REL_FAILURE)
}
// Release connection, this is important as it will otherwise block new executions
conn?.close()
Take a Tour of the Community
Don't have an account?
Your experience may be limited. Sign in to explore more.