Created 12-19-2018 01:34 PM
We have a file in hdfs which contains multiple json events all with different schema and we want to batch process that file. The underlying schema of those events is different, means one event can 10 fields, other event can have 8 fields with nested structure, another event can have 5 nested fields with all underlying individual fields. I mean to say schema is not fixed.
What is the best strategy to process events in such scenario. We are open with any tool like Spark, Hive etc for batch processing of events. The end result is to give the structured format to these events so that we can analyse these by combining with other hive tables/datasets.
Created 12-19-2018 07:11 PM
You can do this in numerous ways.
1. With Hive You could
A. Use the built-in json functions with some conditional logic (if, isnull, etc.) and create a superset (ugly!)
B. Define the table as a string and Build a Java UDF
C. Define the table as a string and use a Python Transform
As an aside, the build in Json serde is pretty basic. I have used the alternative at https://github.com/rcongiu/Hive-JSON-Serde, which can do name mapping and some other cool stuff. But I don't think by itself, you could define a table definition that would nicely handle this case.
2. With Spark or Pyspark, you can pretty much do whatever you want
If you are comfortable, I would recommend pre-processing the data with Spark into a common format. Or if you are more comfortable with Hive then use the TRANSFORM statement with a python script.
Created 12-21-2018 02:30 PM
@jbarnett Thanks for your response !!
I have already implemented json serde over hive tables where the underlying json schema was fixed and not changing in other scenario
And TRANSFORM function helps in applying some transformation or logic over fixed set of fields in SELECT statement of hive. Definitely we can write python code and call some fields in TRANSFORM function but that fieldset needs to be fixed. Correct me if I am wrong ?
I have pasted some sample events below. Consider 250+ type of these events where in each event has different underlying schema(different nested structures, no of fields, different data type formats ) and any event type can come on fly. The event file land in raw zone of my data lake which consists of 1000 events(batch defined) and then in processed zone, I want to process these events and store in some structured way. So that's the solution I am searching for. May be I can categorize the problem in below steps:
1. How shall I store these events in processed zone ?
2. How shall I process the events in some structured form, considering the dynamic nature of underlying json schema of each event ?
Maybe there can more steps but as of now I can think of these two only. I am open with an tool/utility/technology to process these events.
Sample Events:
****************************************************************************************************************************************************
****************************************************************************************************************************************************
{"eventFamily":"CCELITE_RESD","eventName":"TIME22","eventVersion":"8.0","eventTimestamp":"1158511980000","snapinTimestamp":"1184729563007","sourceType":"CCElite","sourceId":"CM70","sourceVersion":"7.0","snapinId":"analytCollectorElite","snapinVersion":"3.2.0.0.1097","sequenceNumber":337,"eventDataMap":{"TZOffsetHours":"6","Year":"16","Hour":"13","CMTime":"1","TimeTag":"1158511980000","Minute":"53","DaylightSavings":"1","TZOffset":"0","TZOffsetMinutes":"0","DayOfYear":"81"},"collectedTime":1184729563007,"publishedTime":1184729563015}
{"eventFamily":"CCELITE_RESD","eventName":"24ACW","eventVersion":"9.0","eventTimestamp":"1158590110000","snapinTimestamp":"1184729573010","sourceType":"CCElite","sourceId":"CM70","sourceVersion":"7.0","snapinId":"analytCollectorElite","snapinVersion":"3.2.0.1097","sequenceNumber":348,"eventDataMap":{"Hold":"0","Reconnect":"0","CMTime":"1","CMAgent":"5550181","UCIDPresent":"0","Direction":"0","WaitObserve":"0","TimeTag":"1158590110000","UCIDTimestamp":"0","Position":"5550181","UCIDCallSeqNo":"0","MeasuredInternal":"0","ExternalCall":"0","KeyboardDialed":"0","UCIDNetworkID":"0"},"collectedTime":1184729573010,"publishedTime":1184729573016}
{"eventFamily":"UCA_MIN_EVENTS_PR","eventName":"USERP","eventCategory":"PUMP_UPSI","eventAction":"PUMP_UPSI","eventVersion":null,"sourceType":"UCA","sourceId":"UCA","sourceVersion":"EDD.wf.auras.com","snapinId":"AdminDataCollector","snapinVersion":"3.3.0.701301","sequenceNumber":null,"eventTimestamp":"1532080250298","matchEventField":null,"siteId":null,"siteName":null,"eventDataMap":{"ucaEvent":{"subscriptionId":"CeanaAdcQueue_10_134_44_196","user":{"userId":"analyt","tenant":null,"userName":"analyt","firstName":"analyt","lastName":"analyt","displayName":"analyt","attributes":{"Language":["English","French"],"Channel":["Voice"],"Service":["CRTAccnt"],"Location":["Inhouse"]},"role":"SUPERVISOR","supervisorId":null,"supervisorFirstName":null,"supervisorLastName":null,"supervisorDisplayName":null,"title":null}}}}
{"eventFamily":"UCA_MIN_EVENTS_PR","eventName":"GROUP_MEMBER","eventCategory":"PUMP_UPSI","eventAction":"PUMP_UPSI","eventVersion":null,"sourceType":"UCA","sourceId":"UCA","sourceVersion":"rre4481.sazab.com","snapinId":"AdminDataCollector","snapinVersion":"3.2.1.61101","sequenceNumber":null,"eventTimestamp":null,"matchEventField":null,"siteId":"SITEIDXYZ","siteName":"SITENAMEDUB","eventDataMap":{"ucaEvent":{"subscriptionId":"CeanaAdcQueue_10_134_44_93","group":{"tenant":null,"groupId":"10020","name":"groupROI","type":"USER"}}}}
{"eventFamily":"WORK_EVENTS_PR","eventName":"CONVERSATION_INTERACTION","eventVersion":"1","eventCategory":"REALTIME","eventAction":"WRITE","sourceType":"UCM","sourceId":"WFEDP42118V","sourceVersion":"3.4","snapinId":"UIDataCollector","snapinVersion":"3.4","eventTimestamp":"1532080290098","eventDataMap":{"ucmEvent":{"subscriptionId":"EANA_LIVE","notificationType":"CONVERSATION_INTERACTION","delta":[],"WORK":{"Id":"WORK_ID_0","CONTACT":{"Id":"CONTACT_ID_A_0","conversationInteraction":{"Id":"RESOURCE_ID_B_0","providerId":"CM3456","resourceId":"RESOURCE_ID_B_0","channelTypeId":"Voice","accountId":"ACCOUNT_0","conversationId":"CONTACT_ID_A_0","activityCode":"null","transferredServiceId":"null","routePointId":"8344568","activityCodeEffectiveDT":"null","previousActivityCode":"null","previousActivityCodeEffectiveDT":"null","afterContactWorkEnabled":"null","offeredService":{"serviceName":"Ceana Service","attributes":{"Language":["French"],"Channel":["Voice"],"Service":["CRTAccnt"],"Location":["Inhouse"]},"priority":5,"serviceID":"Channel.Voice|Language.French|Location.Inhouse|Service.CRTAccnt"},"requestedService":null,"interactionType":"POCALLED","interactionTypeDT":"1792080290098","previousInteractionType":"null","previousInteractionTypeDT":"null","interactionTypeEndDT":"null","version":"1","createDT":"1532080290098","state":"ALERTNG","stateEffectiveDT":"1534580290098","previousState":"DEF","previousStateEffectiveDT":"null","stateReason":"DEF","stateReasonEffectiveDT":"1534580290098","previousStateReason":"DEF","previousStateReasonEffectiveDT":"null","isRoutedCall":"true","isExternal":"false"}}}}}}
Thanks
Anish