Member since
08-23-2022
16
Posts
0
Kudos Received
0
Solutions
02-23-2024
12:43 PM
Sathish, Did you find the solution to this ?
... View more
02-07-2024
11:34 AM
1 Kudo
@SS-dev Welcome to the Cloudera Community!
As this is an older post, you would have a better chance of receiving a resolution by starting a new thread. This will also be an opportunity to provide details specific to your environment that could aid others in assisting you with a more accurate answer to your question. You can link this thread as a reference in your new post. Thanks.
... View more
12-08-2023
01:45 PM
If the input will always be like your example, I would use Groovy to make the transformation. The following Groovy based InvokeScriptedProcessor should create the output you posted. import groovy.json.JsonOutput
import groovy.json.JsonSlurper
import java.nio.charset.StandardCharsets
class GroovyProcessor implements Processor {
PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("BATCH_SIZE")
.displayName("Batch Size")
.description("The number of incoming FlowFiles to process in a single execution of this processor.")
.required(true)
.defaultValue("100")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build()
Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description('FlowFiles that were successfully processed are routed here')
.build()
Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description('FlowFiles that were not successfully processed are routed here')
.build()
ComponentLog log
JsonSlurper jsonSlurper = new JsonSlurper()
JsonOutput jsonOutput = new JsonOutput()
void initialize(ProcessorInitializationContext context) {
log = context.logger
}
Set<Relationship> getRelationships() {
Set<Relationship> relationships = new HashSet<>()
relationships.add(REL_FAILURE)
relationships.add(REL_SUCCESS)
return relationships
}
Collection<ValidationResult> validate(ValidationContext context) {
}
PropertyDescriptor getPropertyDescriptor(String name) {
}
void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
}
List<PropertyDescriptor> getPropertyDescriptors() {
List<PropertyDescriptor> descriptors = new ArrayList<>()
descriptors.add(BATCH_SIZE)
return Collections.unmodifiableList(descriptors)
}
String getIdentifier() {
}
void onScheduled(ProcessContext context) throws ProcessException {
}
void onUnscheduled(ProcessContext context) throws ProcessException {
}
void onStopped(ProcessContext context) throws ProcessException {
}
void setLogger(ComponentLog logger) {
}
void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
ProcessSession session = sessionFactory.createSession()
try {
List<FlowFile> flowFiles = session.get(context.getProperty(BATCH_SIZE).asInteger())
if (!flowFiles) return
flowFiles.each { FlowFile flowFile ->
Map customAttributes = [ "mime.type": "application/json" ]
flowFile = session.write(flowFile, { inputStream, outputStream ->
List<Map> data = jsonSlurper.parse(inputStream)
data = data.collect { Map resouce ->
Map tags = jsonSlurper.parseText("{\"${resouce.Tags}\"}")
[
"Name": tags.Name,
"Owner": tags.Owner,
"ResourceId": resouce.ResourceId,
"Resourcename": resouce.ResourceId.split("/").last(),
"Tags": resouce.Tags
]
}
outputStream.write(JsonOutput.toJson(data).getBytes(StandardCharsets.UTF_8))
} as StreamCallback)
session.putAllAttributes(flowFile, customAttributes)
session.transfer(flowFile, REL_SUCCESS)
}
session.commit()
} catch (final Throwable t) {
log.error('{} failed to process due to {}; rolling back session', [this, t] as Object[])
session.rollback(true)
throw t
}
}
}
processor = new GroovyProcessor()
... View more
03-04-2023
09:24 AM
Hi, Please try the following jolt spec: [
{
"operation": "shift",
"spec": {
"rows": {
"*": {
"f": {
"0": {
"v": "[#4].export_time"
},
"1": {
"v": "[#4].account_id"
},
"2": {
"v": "[#4].cost"
}
}
}
}
}
}
] If that helps please accept solution. Thanks
... View more
12-13-2022
07:07 AM
@sathish3389 Routing based on a sensitive value is an unusual use case. I'd love to hear more about this use case. Ultimately the RouteOnAttribute processor expects a boolean NiFi Expression Language Statement. So you want to have a sensitive parameter value that is evaluated against something else (another attribute on the inbound FlowFile) and if true route to a new relationship. Is what you are comparing this sensitive parameter value against also sensitive? If so, how are you protecting it as Attributes on FlowFiles are not sensitive and stored in plaintext. The ability to use Sensitive Parameters in dynamic properties (non password specific component properties) was added via https://issues.apache.org/jira/browse/NIFI-9957 in Apache NiFi 1.17.0. While this change created the foundation for such dynamic Property support for sensitive parameters, individual components need to be updated to utilize this new capability. As you can imagine with well over 300+ components available to NiFi, this is a huge undertaking. So what i see in the apache community are changes based on specific use case requests. I'd recommend creating an Apache NiFi Jira detailing your use case and working with the Apache Community to adopt that use case change to the RouteOnAttribute processor to support dynamic property support for Sensitive parameters. If you found that the provided solution(s) assisted you with your query, please take a moment to login and click Accept as Solution below each response that helped. Thank you, Matt
... View more
10-28-2022
05:44 AM
1 Kudo
@sathish3389 Define a parameter context and parameter ("parameter_password") for your flow with your password string, define that as sensitive value, then use the parameter in the processor property value : ${http.headers.Authorization:equals(#{parameter_password}) This will hide the password and make it easy to update the password by just updating the parameter.
... View more
10-13-2022
08:05 AM
@sathish3389 Its not entirely clear what you are asking here but I will give it a go. ListenHttp is used to listen to an http port with POST limited capabilities. If you are looking post data to nifi as more of REST API, you may want to check out HandleHttpRequest and HandleHttpResponse, as they have a bit more capability, and some ssl client authentication requirements. They also allow you to program authentication logic before returning the response. To do that. you would build your data flow (after HandleHttpRequest) to look for an authentication (user,password,key,etc) header, validate that and then if valid, continue to HandleHttpResponse with 200 (success). An invalidate authentication header would then go to HandleHttpResponse with 500 (error). An invalid request (wrong path, missing info, etc) could be routed to HandleHttpReponse with 404 (invalid).
... View more
10-13-2022
06:23 AM
Hi, Try the following Jolt spec: [
{
"operation": "shift",
"spec": {
"tags": {
"*": {
"tag": "tags.[#2].Parameter",
"value": "tags.[#2].value"
}
}
}
}
] If you find this helpful please accept solution.
... View more
09-29-2022
08:02 AM
1 Kudo
Are you asking how to import the template into Nifi, or how to drop a template you've already loaded onto the canvas? It is two distinct actions.
... View more
09-20-2022
10:21 AM
Hi, Please try the following spec: [
{
"operation": "shift",
"spec": {
"timestamp": {
"*": {
"@(2,resourceid)": "[&1].resourceid",
"@": "[&1].timestamp"
}
},
"Key": {
"*": {
"@": "[&1].key"
}
},
"data": {
"*": {
"@": "[&1].data"
}
}
}
}
] If you find this helpful, please accept solution. Thanks
... View more