Created 09-18-2020 10:12 AM
I want the json split on the "DATA_MESSAGE", but note that some of them have multiple logEvents and should be included regardless of the number of them. There are no outer "[ ]" in these flowfiles so $ or $.* doesn't work properly.
{
"messageType": "DATA_MESSAGE",
"owner": "540804524916",
"logGroup": "/awg-p-deploy/aws/ec2/syslogs",
"logStream": "540804524916/i-01918034325752bbd/ip-10-170-16-11.xxxxxxx.xxx.org/var/log/audit/audit.log ",
"subscriptionFilters": ["awg-p-deploy-kinesis-destination-subscription-SubscriptionFilter-SOC692HYHJN8"],
"logEvents": [{
"id": "35604816732673909304179019342583609767225770179215556608",
"timestamp": 1596575200322,
"message": "type=SYSCALL msg=audit(1596575200.216:108922): arch=c000003e syscall=159 success=yes exit=5 a0=7ffd2ce779d0 a1=0 a2=a78b7 a3=563d1016b520 items=0 ppid=1 pid=695 auid=4294967295 uid=998 gid=995 euid=998 suid=998 fsuid=998 egid=995 sgid=995 fsgid=995 tty=(none) ses=4294967295 comm=\"chronyd\" exe=\"/usr/sbin/chronyd\" subj=system_u:system_r:chronyd_t:s0 key=\"time-change\""
}
]
} {
"messageType": "DATA_MESSAGE",
"owner": "540804524916",
"logGroup": "/awg-p-deploy/aws/ec2/syslogs",
"logStream": "540804524916/i-01918034325752bbd/ip-10-170-16-11.xxxxxx.xxx.org/var/log/audit/audit.log ",
"subscriptionFilters": ["awg-p-deploy-kinesis-destination-subscription-SubscriptionFilter-SOC692HYHJN8"],
"logEvents": [{
"id": "35604816732673909304179019349237189778599935663793831936",
"timestamp": 1596575200322,
"message": "type=PROCTITLE msg=audit(1596575200.216:108922): proctitle=2F7573722F7362696E2F6368726F6E7964002D75006368726F6E79"
}, {
"id": "35604816732673909304179019349237189778599935663793831937",
"timestamp": 1596575200322,
"message": "type=SYSCALL msg=audit(1596575200.216:108923): arch=c000003e syscall=159 success=yes exit=5 a0=7ffd2ce779e0 a1=0 a2=a78c3 a3=563d1016b520 items=0 ppid=1 pid=695 auid=4294967295 uid=998 gid=995 euid=998 suid=998 fsuid=998 egid=995 sgid=995 fsgid=995 tty=(none) ses=4294967295 comm=\"chronyd\" exe=\"/usr/sbin/chronyd\" subj=system_u:system_r:chronyd_t:s0 key=\"time-change\""
}, {
"id": "35604816732673909304179019349237189778599935663793831938",
"timestamp": 1596575200322,
"message": "type=PROCTITLE msg=audit(1596575200.216:108923): proctitle=2F7573722F7362696E2F6368726F6E7964002D75006368726F6E79"
}, {
"id": "35604816732673909304179019349237189778599935663793831939",
"timestamp": 1596575200322,
"message": "type=SYSCALL msg=audit(1596575200.216:108924): arch=c000003e syscall=159 success=yes exit=5 a0=7ffd2ce77ad0 a1=1 a2=0 a3=563d1016b520 items=0 ppid=1 pid=695 auid=4294967295 uid=998 gid=995 euid=998 suid=998 fsuid=998 egid=995 sgid=995 fsgid=995 tty=(none) ses=4294967295 comm=\"chronyd\" exe=\"/usr/sbin/chronyd\" subj=system_u:system_r:chronyd_t:s0 key=\"time-change\""
}
]
}
Created 09-28-2020 09:58 PM
At the end of the day I used Steve Matison and 'PVVK's solutions. First I used PVVK's to modify my flowfile into valid html, then I successfully was able to use SplitJson processor to break up the flowfiles cleanly. Then since there were multiple 'logEvent' entries in some of the records I saved off the header items as attributes and then used SteveMatison's method to break down each logEvent and added the header attributes back to it for processing.
Thanks all!
Created on 09-19-2020 05:10 AM - edited 09-19-2020 05:12 AM
@Kilynn The solution you are looking for here is QueryRecord using JsonRecordReader.
With this QueryRecord processor and record reader/writers configured, you can click + in QueryRecord to create a new key => value property. In the value you can write a SQL like query against the results in the flowfile. For example:
DATA_MESSAGE => SELECT logEvents FROM FLOWFILE
This will create all of your message objects as individual flowfles directed away from QueryRecord for the relationship "DATA_MESSAGE". From here you can process each message according to your use case.
If this answer resolves your issue or allows you to move forward, please choose to ACCEPT this solution and close this topic. If you have further dialogue on this topic please comment here or feel free to private message me. If you have new questions related to your Use Case please create separate topic and feel free to tag me in your post.
Thanks,
Steven @ DFHZ
Created 09-21-2020 06:32 AM
I've tried this and I'm closer but my output doesn't include the parent values. For each LogEvent I'd like to add the parent values for:
messageType
owner
logGroup
logStream
subscriptionFilters
Created 09-19-2020 06:40 AM
@Kilynn
There are no [] or {}, but, there is no comma(,) between them either. Can you tell if there will be comma between each DATA_MESSAGE record or not? And, if yes, can you tell if you are merging the records using any processor?
Created 09-19-2020 07:49 PM
This is raw input, however after splitting the flow into several flow files I will be inspecting each record for "owner" values that match a half dozen values that I am interested in. The plan is to aggregate those values flow files and send through batches that are 10 mb in size or ever 15 minutes. Whichever comes first.
Created 09-21-2020 08:21 AM
Hi @Kilynn !
Firstly, I tried to use regex to replace "} {" (variable space) with "},{". Then, I tried adding square brackets at the beginning and the end ( "[" and "]" ). Then, I got a valid Json.
ReplaceText config:
Groovy code:
import org.apache.commons.io.IOUtils
import java.nio.charset.StandardCharsets
flowFile = session.get()
if (!flowFile) return
try {
def input = '';
session.read(flowFile, {inputStream ->
input = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
} as InputStreamCallback);
flowFile = session.putAttribute(flowFile,'Content-Type','application/json');
flowFile = session.putAttribute(flowFile,'mime.type','application/json');
flowFile = session.write(flowFile, {
outputStream ->
outputStream.write(('['+input+']').toString().getBytes(StandardCharsets.UTF_8))
}as OutputStreamCallback);
session.transfer(flowFile, REL_SUCCESS);
} catch (e) {
log.error('Error Occured,{}', e)
session.transfer(flowFile, REL_FAILURE);
}
Created 09-28-2020 09:58 PM
At the end of the day I used Steve Matison and 'PVVK's solutions. First I used PVVK's to modify my flowfile into valid html, then I successfully was able to use SplitJson processor to break up the flowfiles cleanly. Then since there were multiple 'logEvent' entries in some of the records I saved off the header items as attributes and then used SteveMatison's method to break down each logEvent and added the header attributes back to it for processing.
Thanks all!