Member since
04-20-2023
5
Posts
0
Kudos Received
0
Solutions
09-16-2023
04:35 PM
I would do this with a Groovy based InvokeScriptedProcessor Using this code: import groovy.json.JsonOutput
import groovy.json.JsonSlurper
import java.nio.charset.StandardCharsets
import org.apache.commons.io.IOUtils
class GroovyProcessor implements Processor {
PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("BATCH_SIZE")
.displayName("Batch Size")
.description("The number of incoming FlowFiles to process in a single execution of this processor.")
.required(true)
.defaultValue("1000")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build()
Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description('FlowFiles that were successfully processed are routed here')
.build()
Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description('FlowFiles that were not successfully processed are routed here')
.build()
ComponentLog log
void initialize(ProcessorInitializationContext context) { log = context.logger }
Set<Relationship> getRelationships() { return [REL_FAILURE, REL_SUCCESS] as Set }
Collection<ValidationResult> validate(ValidationContext context) { null }
PropertyDescriptor getPropertyDescriptor(String name) { null }
void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { }
List<PropertyDescriptor> getPropertyDescriptors() { Collections.unmodifiableList([BATCH_SIZE]) as List<PropertyDescriptor> }
String getIdentifier() { null }
JsonSlurper jsonSlurper = new JsonSlurper()
JsonOutput jsonOutput = new JsonOutput()
void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
ProcessSession session = sessionFactory.createSession()
try {
List<FlowFile> flowFiles = session.get(context.getProperty(BATCH_SIZE).asInteger())
if (!flowFiles) return
flowFiles.each { flowFile ->
List data = null
session.read(flowFile, {
inputStream -> data = jsonSlurper.parseText(IOUtils.toString(inputStream, StandardCharsets.UTF_8))
} as InputStreamCallback)
List outputData = []
data.each { order ->
outputData.add("${order.orderId} ${order.orderName}")
order.orderItems.each { orderItem ->
outputData.add("${orderItem.orderItemId} ${orderItem.orderItemName}")
}
}
FlowFile newFlowFile = session.create()
newFlowFile = session.write(newFlowFile, { outputStream -> outputStream.write(outputData.join('\n').getBytes(StandardCharsets.UTF_8)) } as OutputStreamCallback)
session.transfer(newFlowFile, REL_SUCCESS)
session.remove(flowFile)
}
session.commit()
} catch (final Throwable t) {
log.error('{} failed to process due to {}; rolling back session', [this, t] as Object[])
session.rollback(true)
throw t
}
}
}
processor = new GroovyProcessor() Don't let all that code scare you when the part that's doing the formatting is only these lines: This is the generated output:
... View more
08-16-2023
06:20 AM
1 Kudo
@janvit04 The pattern you need is this: ${input_date:toDate("MM/dd/yyyy hh:mm:ss"):format("yyyy-MM-dd HH:mm:ss")} I did this in a test which you can find here. In this example I have UpdateAttribute with an input attribute called input_date and its string value is "8/6/2023 12:46 am". In next UpdateAttribute i do the toDate and format. With this setup you may need to modify the format in toDate function to match your input string until it gets right format. For example I thought it should be m/d/yyyy but i got right output using MM/dd/yyyy.
... View more
05-11-2023
12:43 AM
Hi @cotopaul Thanks for your reply. I am familiar with Jolt (a json to json transformation library). I have been thinking to add the required padding using the funmctions in Jolt and then use the FreeFormTextRecordSetWriter controller service. This service take the name of the key in Json and prepare the file containing only the value. It also keeps the padding added in the previous Jolt Transform. I think using 10 UpdateAttribute will be tough and I have multiple fields that need the required padding/empty spaces. Thank you for your answers!
... View more