Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

how to release mysql long blob data select

avatar
Explorer

I am using nifi 1.13.2 version.
longblob in mysql db (*. zip) After selecting the file, decompress it, parse the data, and insert it to oracle db.
We have a problem.
the select file cannot be decompressed.
unpackcontent and processcontent can not be released.
Please help me how to decompress
Also, I would like to know how to convert the bytes" : "{\r\n\t\" data into ACSII.

※ I also want to know Groovy ExecuteScript.

sample.1
SELECT TESTInfo (longblob)
FROM TestTable
WHERE 1=1

sample.2

[ {
? "TESTInfo" : {
? ? "bytes" : "{\r\n\t\"testinfo\" : \r\n\t{\r\n\t\t\"TESTITEM1\" : \"TESTDATA1\",\r\n\t\t\"TESTITEM2\" : \"TESTDATA2\"}"
? }
} ]

Below is the flow I think.


1.ExecuteSQL → 2.ExecuteScript (groovy) →?3.EvaluateJsonPath → 4.ExecuteSQL

1.ExecuteSQL:select the longblob column from mysqldb.
The compressed file
It has a folder structure of TEST/UUID/Data.

2.ExecuteScript (groovy) :i want to decompress with groovy script.

3.?EvaluateJsonPath :Since the data file is in Json format, the required value is extracted with attribute information.

4.ExecuteSQL :create insert query and insert data.

1 REPLY 1

avatar
Explorer

I changed my mind.
1.GenerateFlowFile (Triger)→ 2.ExecuteScript (groovy) → 3.EvaluateJsonPath → 4.ExecuteSQL

ExecuteScript content is below

def flowFile = session.get()
if(!flowFile) return

// Executescript attributes
def serviceName = dbcp.value
def sqlCmdString = sqlCmd.value
def jsonSlurper = new JsonSlurper()

// get controller service lookup from context
def lookup = context.controllerServiceLookup

// search for serviceName in controller services
def dbcpServiceId = lookup.getControllerServiceIdentifiers(ControllerService).find {
    cs -> lookup.getControllerServiceName(cs) == serviceName
}

//Get the service from serviceid
def service = lookup.getControllerService(dbcpServiceId)

// Connect to service
def conn = service.getConnection()
if (!conn) {
  log.error("Failed to connect to " + serviceName)
  return;
}

try {
//flowFile = session.create()
flowFile = session.write(flowFile, {out ->
def sql = new Sql(conn)
  if (!sql) {
    log.error("Failed to get SQL connection")
    return
  }
//def sqlCmdString = "ExecuteScript Attributs info"
def rows = sql.rows(sqlCmdString)
"""

    i want to know the part that decompresses select return value and transfers json file to lower flow file.

"""

} as OutputStreamCallback)
session.transfer(flowFile, REL_SUCCESS)
} catch(e) {
    conn?.close()
    log.error("Scripting error " + sqlCmd, e)
    session.transfer(flowFile, REL_FAILURE)
}
// Release connection, this is important as it will otherwise block new executions
conn?.close()