Support Questions
Find answers, ask questions, and share your expertise

Record Based Handling of Nested JSON

Explorer

I am working on ingesting CloudWatch log files into my environment.  We would like to avoid splitting these files.  As 5 flowfiles becomes over 1000 and then over 30000.  We get hundreds of these logs a day so splitting and merging is NOT scalable.

 

This is what the data looks like as it arrives as a flowfile:

Note there may be one 'DATA_MESSAGE' or there may be thousands.  In like manner there may be one logEvents or there may be thousands of logEvents per DATA_MESSAGE

{
"messageType": "DATA_MESSAGE",
"owner": "8675309",
"logGroup": "/aws-x-xxx-xxx/aws/ec2/syslogs",
"logStream": "8675309/i-03b74f8759ed4a1fd/ip-99-999-99-999.us-gov-east-1.compute.internal/var/log/audit/audit.log",
"subscriptionFilters": ["aws-x-xxx-xxx-kinesis-destination-subscription-SubscriptionFilter-5SO0E3NA2U1Z"],
"logEvents":
[
{
"id": "36209782591764436903363851577240401571437415584279429120",
"timestamp": 1623702807660,
"message": "type=SYSCALL msg=audit(1623702807.456:69260): arch=c000003e syscall=159 success=yes exit=5 a0=7ffd7f714460 a1=0 a2=fffffffffffa15d4 a3=12 items=0 ppid=1 pid=767 auid=4294967295 uid=998 gid=995 euid=998 suid=998 fsuid=998 egid=995 sgid=995 fsgid=995 tty=(none) ses=4294967295 comm=\"chronyd\" exe=\"/usr/sbin/chronyd\" subj=system_u:system_r:chronyd_t:s0 key=\"time-change\""
}, {
"id": "36209782591764436903363851577240401571437415584279429121",
"timestamp": 1623702807660,
"message": "type=PROCTITLE msg=audit(1623702807.456:69260): proctitle=2F7573722F7362696E2F6368726F6E7964002D75006368726F6E79"
}
]
}

This is what the data should look like for ingest:

[ 
{
"messageType": "DATA_MESSAGE",
"owner": "8675309",
"logGroup": "/aws-x-xxx-xxx/aws/ec2/syslogs",
"logStream": "8675309/i-03b74f8759ed4a1fd/ip-99-999-99-999.us-gov-east-1.compute.internal/var/log/audit/audit.log",
"subscriptionFilters": ["aws-x-xxx-xxx-kinesis-destination-subscription-SubscriptionFilter-5SO0E3NA2U1Z"],
"id": "36209782591764436903363851577240401571437415584279429120",
"timestamp": 1623702807660,
"message": "type=SYSCALL msg=audit(1623702807.456:69260): arch=c000003e syscall=159 success=yes exit=5 a0=7ffd7f714460 a1=0 a2=fffffffffffa15d4 a3=12 items=0 ppid=1 pid=767 auid=4294967295 uid=998 gid=995 euid=998 suid=998 fsuid=998 egid=995 sgid=995 fsgid=995 tty=(none) ses=4294967295 comm=\"chronyd\" exe=\"/usr/sbin/chronyd\" subj=system_u:system_r:chronyd_t:s0 key=\"time-change\""
}, {
"messageType": "DATA_MESSAGE",
"owner": "8675309",
"logGroup": "/aws-x-xxx-xxx/aws/ec2/syslogs",
"logStream": "8675309/i-03b74f8759ed4a1fd/ip-99-999-99-999.us-gov-east-1.compute.internal/var/log/audit/audit.log",
"subscriptionFilters": ["aws-x-xxx-xxx-kinesis-destination-subscription-SubscriptionFilter-5SO0E3NA2U1Z"],
"id": "36209782591764436903363851577240401571437415584279429121",
"timestamp": 1623702807660,
"message": "type=PROCTITLE msg=audit(1623702807.456:69260): proctitle=2F7573722F7362696E2F6368726F6E7964002D75006368726F6E79"
}
]

What is the most effective method to do this?

Thanks in advance.