Member since
11-16-2015
892
Posts
650
Kudos Received
245
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
5686 | 02-22-2024 12:38 PM | |
1389 | 02-02-2023 07:07 AM | |
3095 | 12-07-2021 09:19 AM | |
4209 | 03-20-2020 12:34 PM | |
14178 | 01-27-2020 07:57 AM |
11-15-2018
03:44 PM
1 Kudo
Yep, just updated the answer with another script that puts the max in an attribute. Cheers!
... View more
11-14-2018
10:52 PM
If your incoming data is in Avro format with an embedded schema (i.e. what you would need to use the Hive 1 version PutHiveStreaming), then you can add a Controller Service of type "AvroReader". Configure that to Use Embedded Schema (as the Schema Access Strategy), then Apply and Enable it. Then go back to PutHive3Streaming and select that as your reader. If you have data in other formats (JSON, XML, CSV, etc.) you'd need to specify the schema for that data somehow (it is not embedded in the file as Avro is). See this blog for more details on the Record Reader/Writer stuff in NiFi. It may take a little bit to get used to it (and to configure it), but it is very flexible and very powerful and worth getting familiar with.
... View more
11-14-2018
09:44 PM
If you are using HDP 3.x, then you are using Hive 3. Apache NiFi 1.7+ has Hive 3 versions of the Hive processors, so you will want to use PutHive3Streaming instead of PutHiveStreaming. Having said that, the Apache NiFi distribution does not include the Hive 3 NAR by default (due to its size). HDF 3.2 NiFi does include the Hive 3 NAR, so perhaps try that instead of the Apache NiFi distribution. Otherwise you can download the Apache NiFi 1.7.1 Hive 3 NAR here. I recommend using HDF NiFi with HDP rather than Apache NiFi, as HDF has HDP-specific Hive JARs, where Apache NiFi uses Apache Hive JARs. Often the two can work interchangeably, but there are some times where HDP Hive is not compatible with a corresponding Apache Hive version, and for that you'll want HDP Hive JARs, which are bundled with HDF NiFi.
... View more
11-14-2018
03:46 PM
1 Kudo
Another possibility is FlattenJson, but your destination table would have to have the same naming convention (based on your choice of the Separator property in FlattenJson) in order for the INSERT to be successful. JoltTransformJSON won't automatically flatten things, you'd need to know your schema in order to write the spec, but then you'd have more control over what the output field names would be. ExecuteScript could be a last resort; Groovy has very good JSON-handling capabilities.
... View more
11-14-2018
03:31 PM
1 Kudo
I updated my answer with a scripting alternative. If you find it useful, please take the time to "Accept" the answer, thanks!
... View more
11-14-2018
02:42 PM
2 Kudos
You can use QueryRecord for this. Ensure your JSONReader's schema has the geaendertAm_ABAS field marked as a timestamp type (not a string), such as: {
"namespace": "nifi",
"name": "ABAS",
"type": "record",
"fields": [
{"name": "ID","type": "int"},
{"name": "geaendertAm_ABAS","type": {"type": "long","logicalType": "timestamp-millis"}}
]
} Then you can add a user-defined property (let's call it "max") to QueryRecord with the value SELECT MAX(geaendertAm_ABAS) from FLOWFILE Your JSONRecordSetWriter will need a schema with just the field: {
"namespace": "nifi",
"name": "ABAS",
"type": "record",
"fields": [
{"name": "geaendertAm_ABAS","type": {"type": "long","logicalType": "timestamp-millis"}}
]
} Once you click the Apply button on QueryRecord, you will be able to create a connection from QueryRecord called "max" and connect it to the next downstream processor. As an alternative, here is a Groovy script for use in an ExecuteScript processor, note that it is very specific to your input: def flowFile = session.get()
if(!flowFile) return
try {
flowFile = session.write(flowFile, {inputStream, outputStream ->
def objList = new groovy.json.JsonSlurper().parse(inputStream)
def max = objList.max {Date.parse("yyyy-MM-dd'T'HH:mm:ss.SSSSSSS'Z'",it.geaendertAm_ABAS)}
def maxOutput = "{\"geaendertAm_ABAS\": \"${max.geaendertAm_ABAS}\"}"
outputStream.write(maxOutput.bytes)
} as StreamCallback)
session.transfer(flowFile, REL_SUCCESS)
} catch(e) {
log.error("Error while determining max", e)
session.transfer(flowFile, REL_FAILURE)
} If you instead want the max in an attribute, you can use something like: def flowFile = session.get()
if(!flowFile) return
try {
def inputStream = session.read(flowFile)
def objList = new groovy.json.JsonSlurper().parse(inputStream)
def max = objList.max {Date.parse("yyyy-MM-dd'T'HH:mm:ss.SSSSSSS'Z'",it.geaendertAm_ABAS)}
inputStream.close()
flowFile = session.putAttribute(flowFile, 'MAX_geaendertAm_ABAS', max.geaendertAm_ABAS.toString())
session.transfer(flowFile, REL_SUCCESS)
} catch(e) {
log.error("Error while determining max", e)
session.transfer(flowFile, REL_FAILURE)
}
... View more
11-05-2018
09:36 PM
1 Kudo
That error message is misleading, Jython doesn't put the "real" error message in the getMessage() method of its exception(s), see this issue. You've got a couple things going on here, the first is that you can't use the bang character (!) for a boolean NOT, instead you should use "not". After that, if the file doesn't exist, it will be transferred to REL_SUCCESS in your first "if" statement, and then since "exists" is never set to True, then it will try to transfer the same flow file again. Even though they're the same relationship, the framework notes that a flow file has already been marked for transfer, and will complain if it is transferred again (even to the same relationship), as this identifies a logic bug. I'm not exactly following what you're trying to do here, please feel free to elaborate and I can help out getting you where you need to go.
... View more
11-01-2018
08:28 PM
Since your CSV headers match the column names exactly, try setting Translate Field Names to false.
... View more
10-30-2018
03:45 PM
1 Kudo
Try the following chain spec (it uses Modify-overwrite instead of Shift): [
{
"operation": "modify-overwrite-beta",
"spec": {
"effectiveStart": "=toString",
"effectiveEnd": "=toString"
}
}
] That will preserve all other fields, but change the values of effectiveStart/End to strings
... View more
10-30-2018
03:43 PM
Do you need the numbers as-is into strings, or a formatted timestamp string, or something else?
... View more