Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Trying to use SplitJson with an unusual JSON flowfile

avatar
Contributor

I want the json split on the "DATA_MESSAGE", but note that some of them have multiple logEvents and should be included regardless of the number of them.  There are no outer "[ ]" in these flowfiles so $ or $.* doesn't work properly.

{

    "messageType": "DATA_MESSAGE",

    "owner": "540804524916",

    "logGroup": "/awg-p-deploy/aws/ec2/syslogs",

    "logStream": "540804524916/i-01918034325752bbd/ip-10-170-16-11.xxxxxxx.xxx.org/var/log/audit/audit.log ",

    "subscriptionFilters": ["awg-p-deploy-kinesis-destination-subscription-SubscriptionFilter-SOC692HYHJN8"],

    "logEvents": [{

            "id": "35604816732673909304179019342583609767225770179215556608",

            "timestamp": 1596575200322,

            "message": "type=SYSCALL msg=audit(1596575200.216:108922): arch=c000003e syscall=159 success=yes exit=5 a0=7ffd2ce779d0 a1=0 a2=a78b7 a3=563d1016b520 items=0 ppid=1 pid=695 auid=4294967295 uid=998 gid=995 euid=998 suid=998 fsuid=998 egid=995 sgid=995 fsgid=995 tty=(none) ses=4294967295 comm=\"chronyd\" exe=\"/usr/sbin/chronyd\" subj=system_u:system_r:chronyd_t:s0 key=\"time-change\""

        }

    ]

} {

    "messageType": "DATA_MESSAGE",

    "owner": "540804524916",

    "logGroup": "/awg-p-deploy/aws/ec2/syslogs",

    "logStream": "540804524916/i-01918034325752bbd/ip-10-170-16-11.xxxxxx.xxx.org/var/log/audit/audit.log ",

    "subscriptionFilters": ["awg-p-deploy-kinesis-destination-subscription-SubscriptionFilter-SOC692HYHJN8"],

    "logEvents": [{

            "id": "35604816732673909304179019349237189778599935663793831936",

            "timestamp": 1596575200322,

            "message": "type=PROCTITLE msg=audit(1596575200.216:108922): proctitle=2F7573722F7362696E2F6368726F6E7964002D75006368726F6E79"

        }, {

            "id": "35604816732673909304179019349237189778599935663793831937",

            "timestamp": 1596575200322,

            "message": "type=SYSCALL msg=audit(1596575200.216:108923): arch=c000003e syscall=159 success=yes exit=5 a0=7ffd2ce779e0 a1=0 a2=a78c3 a3=563d1016b520 items=0 ppid=1 pid=695 auid=4294967295 uid=998 gid=995 euid=998 suid=998 fsuid=998 egid=995 sgid=995 fsgid=995 tty=(none) ses=4294967295 comm=\"chronyd\" exe=\"/usr/sbin/chronyd\" subj=system_u:system_r:chronyd_t:s0 key=\"time-change\""

        }, {

            "id": "35604816732673909304179019349237189778599935663793831938",

            "timestamp": 1596575200322,

            "message": "type=PROCTITLE msg=audit(1596575200.216:108923): proctitle=2F7573722F7362696E2F6368726F6E7964002D75006368726F6E79"

        }, {

            "id": "35604816732673909304179019349237189778599935663793831939",

            "timestamp": 1596575200322,

            "message": "type=SYSCALL msg=audit(1596575200.216:108924): arch=c000003e syscall=159 success=yes exit=5 a0=7ffd2ce77ad0 a1=1 a2=0 a3=563d1016b520 items=0 ppid=1 pid=695 auid=4294967295 uid=998 gid=995 euid=998 suid=998 fsuid=998 egid=995 sgid=995 fsgid=995 tty=(none) ses=4294967295 comm=\"chronyd\" exe=\"/usr/sbin/chronyd\" subj=system_u:system_r:chronyd_t:s0 key=\"time-change\""

        }

    ]

}

1 ACCEPTED SOLUTION

avatar
Contributor

At the end of the day I used Steve Matison and 'PVVK's solutions.  First I used PVVK's to modify my flowfile into valid html, then I successfully was able to use SplitJson processor to break up the flowfiles cleanly.  Then since there were multiple 'logEvent' entries in some of the records I saved off the header items as attributes and then used SteveMatison's method to break down each logEvent and added the header attributes back to it for processing.

 

Thanks all!

View solution in original post

6 REPLIES 6

avatar
Super Guru

@Kilynn   The solution you are looking for here is QueryRecord using JsonRecordReader.

 

With this QueryRecord processor and record reader/writers configured, you can click + in QueryRecord to create a new key => value property.  In the value you can write a SQL like query against the results in the flowfile.  For example:

 

DATA_MESSAGE => SELECT logEvents FROM FLOWFILE 

 

This will create all of your message objects as individual flowfles directed away from QueryRecord for the relationship "DATA_MESSAGE".  From here you can process each message according to your use case.

 

 

If this answer resolves your issue or allows you to move forward, please choose to ACCEPT this solution and close this topic. If you have further dialogue on this topic please comment here or feel free to private message me. If you have new questions related to your Use Case please create separate topic and feel free to tag me in your post.  

 

Thanks,


Steven @ DFHZ

avatar
Contributor

I've tried this and I'm closer but my output doesn't include the parent values.  For each LogEvent I'd like to add the parent values for:

messageType

owner

logGroup

logStream

subscriptionFilters

avatar
Expert Contributor

@Kilynn 
There are no [] or {}, but, there is no comma(,) between them either. Can you tell if there will be comma between each DATA_MESSAGE record or not? And, if yes, can you tell if you are merging the records using any processor?

avatar
Contributor

This is raw input, however after splitting the flow into several flow files I will be inspecting each record for "owner" values that match a half dozen values that I am interested in.  The plan is to aggregate those values flow files and send through batches that are 10 mb in size or ever 15 minutes.  Whichever comes first. 

avatar
Expert Contributor

Hi @Kilynn !
Firstly, I tried to use regex to replace "}       {" (variable space) with "},{". Then, I tried adding square brackets at the beginning and the end ( "[" and "]" ). Then, I got a valid Json.

Screenshot from 2020-09-21 20-42-16.png

ReplaceText config:
Screenshot from 2020-09-21 20-48-09.png

Groovy code:

import org.apache.commons.io.IOUtils
import java.nio.charset.StandardCharsets

flowFile = session.get()
if (!flowFile) return

try {
def input = '';
session.read(flowFile, {inputStream ->
input = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
} as InputStreamCallback);

flowFile = session.putAttribute(flowFile,'Content-Type','application/json');
flowFile = session.putAttribute(flowFile,'mime.type','application/json');

flowFile = session.write(flowFile, {
outputStream ->
outputStream.write(('['+input+']').toString().getBytes(StandardCharsets.UTF_8))
}as OutputStreamCallback);

session.transfer(flowFile, REL_SUCCESS);
} catch (e) {
log.error('Error Occured,{}', e)
session.transfer(flowFile, REL_FAILURE);
}

avatar
Contributor

At the end of the day I used Steve Matison and 'PVVK's solutions.  First I used PVVK's to modify my flowfile into valid html, then I successfully was able to use SplitJson processor to break up the flowfiles cleanly.  Then since there were multiple 'logEvent' entries in some of the records I saved off the header items as attributes and then used SteveMatison's method to break down each logEvent and added the header attributes back to it for processing.

 

Thanks all!