Created on 11-21-2014 07:58 AM - edited 09-16-2022 02:13 AM
Hi
I have written a morphline xquery command - see below snippet- which expects an XML as an input and indexes a list of fields and two other fields "a:name" and "a:version". Note that the data is coming from hbase. When the input is a none well formed XML, the indexer throws an exception which is the correct behaviour. The problem is that the other fields: "a:name" and "a:version" do not get indexed as result of the filed "p:in" containing an none well formed XML. Also, subsequent input into hbase are all valid XML do not get indexed either, somehow the indexer gets stuck on that exception and never reover until the bad XML is removed from hbase.
I have tried to use try and catch exception in the Xquery, only to find out the error happens well beofre the Xquery engine parses the XML - see below stack trace.
Looks like the SaxonCommand class isn't handling the exception well. Any ideas on how to solve this use case is very much welecome. You may be wondering as to why we are processing a none well formed XML in the first place. Business uses case mandates that we have to audit these bad XMLs.
Regards,
Ayache
MORPHLINE CONF
morphlines : [
{
id : morphline1
importCommands : ["org.kitesdk.**", "com.ngdata.**"]
commands : [
{
extractHBaseCells {
mappings : [
{
inputColumn : "a:name"
outputField : "serviceName"
type : string
source : value
}
{
inputColumn : "a:version"
outputField : "serviceVersion"
type : string
source : value
}
{
inputColumn : "p:in"
outputField : "_attachment_body"
type : "byte[]"
source : value
}
]
}
}
{
if {
conditions : [
{ equals { _attachment_body : [] } }
]
then : [
{logInfo { format : "no payload..." } }
]
else : [
{
xquery {
languageVersion : "3.0"
fragments : [
{
fragmentPath : "/"
queryString : """
(: All namespace declarations go here 🙂
(: Extracting all the fieleds that need indexing 🙂
try {
let $source := /message/messageData/LegacyMessage/LegacyHeader/SenderDetails/SendingApplication/text() | /message/messageHeader/sendingApplication/text()
let $orgId := /message/messageData/LegacyMessage/LegacyHeader/SenderDetails/ODSID/text() | /message/securityData/organisationId/text()
let $prescriptionId := /message/messageData/*/prescriptionId/text() | /message/messageData/*/UPN/text()
let $transaId := /message/messageHeader/transactionId/text()
(: Returning the list of the fields that needs to be indexed. These fields are defined in solar schema.xml file. 🙂
return
<fieldsToIndex>
<source>{$source}</source>
<organisationNationalId>{$orgId}</organisationNationalId>
<prescriptionId>{$prescriptionId}</prescriptionId>
<transactionId>{$transaId}</transactionId>
</fieldsToIndex>
}
catch * {
$err:code, $err:value, " module: ",
$err:module, "(", $err:line-number, ",", $err:column-number, ")"
}
"""
}
]
}
}
{ logInfo { format : "Finished processing..." }
}
]
}
}
]
}
]
STACK TRACE
4/11/21 15:37:16 WARN impl.SepConsumer: Error processing a batch of SEP events, the error will be forwarded to HBase for retry
java.util.concurrent.ExecutionException: java.lang.RuntimeException: org.kitesdk.morphline.api.MorphlineRuntimeException: org.kitesdk.morphline.api.MorphlineRuntimeException: com.ctc.wstx.exc.WstxParsingException: Unexpected close tag </dispenserDetails>; expected </organisation>.
at [row,col {unknown-source}]: [87,27]
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:188)
at com.ngdata.sep.impl.SepConsumer.waitOnSepEventCompletion(SepConsumer.java:294)
at com.ngdata.sep.impl.SepConsumer.replicateWALEntry(SepConsumer.java:275)
at org.apache.hadoop.hbase.protobuf.generated.AdminProtos$AdminService$2.callBlockingMethod(AdminProtos.java:20176)
at org.apache.hadoop.hbase.ipc.RpcServer.call(RpcServer.java:2031)
at org.apache.hadoop.hbase.ipc.CallRunner.run(CallRunner.java:108)
at org.apache.hadoop.hbase.ipc.FifoRpcScheduler$1.run(FifoRpcScheduler.java:74)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: org.kitesdk.morphline.api.MorphlineRuntimeException: org.kitesdk.morphline.api.MorphlineRuntimeException: com.ctc.wstx.exc.WstxParsingException: Unexpected close tag </dispenserDetails>; expected </organisation>.
at [row,col {unknown-source}]: [87,27]
at com.ngdata.hbaseindexer.indexer.IndexingEventListener.processEvents(IndexingEventListener.java:102)
at com.ngdata.sep.impl.SepEventExecutor$1.run(SepEventExecutor.java:97)
... 5 more
Caused by: org.kitesdk.morphline.api.MorphlineRuntimeException: org.kitesdk.morphline.api.MorphlineRuntimeException: com.ctc.wstx.exc.WstxParsingException: Unexpected close tag </dispenserDetails>; expected </organisation>.
at [row,col {unknown-source}]: [87,27]
at org.kitesdk.morphline.base.FaultTolerance.handleException(FaultTolerance.java:73)
at com.ngdata.hbaseindexer.morphline.LocalMorphlineResultToSolrMapper.map(LocalMorphlineResultToSolrMapper.java:245)
at com.ngdata.hbaseindexer.morphline.MorphlineResultToSolrMapper.map(MorphlineResultToSolrMapper.java:145)
at com.ngdata.hbaseindexer.indexer.Indexer$RowBasedIndexer.calculateIndexUpdates(Indexer.java:289)
at com.ngdata.hbaseindexer.indexer.Indexer.indexRowData(Indexer.java:144)
at com.ngdata.hbaseindexer.indexer.IndexingEventListener.processEvents(IndexingEventListener.java:99)
... 6 more
Caused by: org.kitesdk.morphline.api.MorphlineRuntimeException: com.ctc.wstx.exc.WstxParsingException: Unexpected close tag </dispenserDetails>; expected </organisation>.
at [row,col {unknown-source}]: [87,27]
at org.kitesdk.morphline.saxon.SaxonCommand.doProcess(SaxonCommand.java:78)
at org.kitesdk.morphline.stdio.AbstractParser.doProcess(AbstractParser.java:96)
at org.kitesdk.morphline.base.AbstractCommand.process(AbstractCommand.java:156)
at org.kitesdk.morphline.stdlib.IfThenElseBuilder$IfThenElse.doProcess(IfThenElseBuilder.java:110)
at org.kitesdk.morphline.base.AbstractCommand.process(AbstractCommand.java:156)
at org.kitesdk.morphline.base.Connector.process(Connector.java:64)
at org.kitesdk.morphline.base.AbstractCommand.doProcess(AbstractCommand.java:181)
at org.kitesdk.morphline.stdlib.ConvertTimestampBuilder$ConvertTimestamp.doProcess(ConvertTimestampBuilder.java:161)
at org.kitesdk.morphline.base.AbstractCommand.process(AbstractCommand.java:156)
at org.kitesdk.morphline.base.Connector.process(Connector.java:64)
at org.kitesdk.morphline.base.AbstractCommand.doProcess(AbstractCommand.java:181)
at org.kitesdk.morphline.stdlib.ConvertTimestampBuilder$ConvertTimestamp.doProcess(ConvertTimestampBuilder.java:161)
at org.kitesdk.morphline.base.AbstractCommand.process(AbstractCommand.java:156)
at org.kitesdk.morphline.base.Connector.process(Connector.java:64)
at org.kitesdk.morphline.base.AbstractCommand.doProcess(AbstractCommand.java:181)
at com.ngdata.hbaseindexer.morphline.ExtractHBaseCellsBuilder$ExtractHBaseCells.doProcess(ExtractHBaseCellsBuilder.java:86)
at org.kitesdk.morphline.base.AbstractCommand.process(AbstractCommand.java:156)
at org.kitesdk.morphline.base.AbstractCommand.doProcess(AbstractCommand.java:181)
at org.kitesdk.morphline.base.AbstractCommand.process(AbstractCommand.java:156)
at com.ngdata.hbaseindexer.morphline.LocalMorphlineResultToSolrMapper.map(LocalMorphlineResultToSolrMapper.java:239)
... 10 more
Caused by: com.ctc.wstx.exc.WstxParsingException: Unexpected close tag </dispenserDetails>; expected </organisation>.
at [row,col {unknown-source}]: [87,27]
at com.ctc.wstx.sr.StreamScanner.constructWfcException(StreamScanner.java:630)
at com.ctc.wstx.sr.StreamScanner.throwParseError(StreamScanner.java:461)
at com.ctc.wstx.sr.BasicStreamReader.reportWrongEndElem(BasicStreamReader.java:3258)
at com.ctc.wstx.sr.BasicStreamReader.readEndElem(BasicStreamReader.java:3200)
at com.ctc.wstx.sr.BasicStreamReader.nextFromTree(BasicStreamReader.java:2832)
at com.ctc.wstx.sr.BasicStreamReader.next(BasicStreamReader.java:1019)
at org.kitesdk.morphline.saxon.XMLStreamCopier.copy(XMLStreamCopier.java:169)
at org.kitesdk.morphline.saxon.SaxonCommand.parseXmlDocument(SaxonCommand.java:99)
at org.kitesdk.morphline.saxon.XQueryBuilder$XQuery.doProcess2(XQueryBuilder.java:158)
at org.kitesdk.morphline.saxon.SaxonCommand.doProcess(SaxonCommand.java:74)
... 29 more
14/11/21 15:37:16 ERROR impl.SepConsumer: Encountered exceptions on 2 batches (out of 2 total batches)
14/11/21 15:37:16 ERROR ipc.RpcServer: Unexpected throwable object
java.lang.RuntimeException: java.util.concurrent.ExecutionException: java.lang.RuntimeException: org.kitesdk.morphline.api.MorphlineRuntimeException: org.kitesdk.morphline.api.MorphlineRuntimeException: com.ctc.wstx.exc.WstxParsingException: Unexpected close tag </dispenserDetails>; expected </organisation>.
at [row,col {unknown-source}]: [87,27]
at com.ngdata.sep.impl.SepConsumer.waitOnSepEventCompletion(SepConsumer.java:309)
at com.ngdata.sep.impl.SepConsumer.replicateWALEntry(SepConsumer.java:275)
at org.apache.hadoop.hbase.protobuf.generated.AdminProtos$AdminService$2.callBlockingMethod(AdminProtos.java:20176)
at org.apache.hadoop.hbase.ipc.RpcServer.call(RpcServer.java:2031)
at org.apache.hadoop.hbase.ipc.CallRunner.run(CallRunner.java:108)
at org.apache.hadoop.hbase.ipc.FifoRpcScheduler$1.run(FifoRpcScheduler.java:74)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.ExecutionException: java.lang.RuntimeException: org.kitesdk.morphline.api.MorphlineRuntimeException: org.kitesdk.morphline.api.MorphlineRuntimeException: com.ctc.wstx.exc.WstxParsingException: Unexpected close tag </dispenserDetails>; expected </organisation>.
at [row,col {unknown-source}]: [87,27]
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:188)
at com.ngdata.sep.impl.SepConsumer.waitOnSepEventCompletion(SepConsumer.java:294)
... 10 more
Caused by: java.lang.RuntimeException: org.kitesdk.morphline.api.MorphlineRuntimeException: org.kitesdk.morphline.api.MorphlineRuntimeException: com.ctc.wstx.exc.WstxParsingException: Unexpected close tag </dispenserDetails>; expected </organisation>.
at [row,col {unknown-source}]: [87,27]
at com.ngdata.hbaseindexer.indexer.IndexingEventListener.processEvents(IndexingEventListener.java:102)
at com.ngdata.sep.impl.SepEventExecutor$1.run(SepEventExecutor.java:97)
... 5 more
Caused by: org.kitesdk.morphline.api.MorphlineRuntimeException: org.kitesdk.morphline.api.MorphlineRuntimeException: com.ctc.wstx.exc.WstxParsingException: Unexpected close tag </dispenserDetails>; expected </organisation>.
at [row,col {unknown-source}]: [87,27]
at org.kitesdk.morphline.base.FaultTolerance.handleException(FaultTolerance.java:73)
at com.ngdata.hbaseindexer.morphline.LocalMorphlineResultToSolrMapper.map(LocalMorphlineResultToSolrMapper.java:245)
at com.ngdata.hbaseindexer.morphline.MorphlineResultToSolrMapper.map(MorphlineResultToSolrMapper.java:145)
at com.ngdata.hbaseindexer.indexer.Indexer$RowBasedIndexer.calculateIndexUpdates(Indexer.java:289)
at com.ngdata.hbaseindexer.indexer.Indexer.indexRowData(Indexer.java:144)
at com.ngdata.hbaseindexer.indexer.IndexingEventListener.processEvents(IndexingEventListener.java:99)
... 6 more
Caused by: org.kitesdk.morphline.api.MorphlineRuntimeException: com.ctc.wstx.exc.WstxParsingException: Unexpected close tag </dispenserDetails>; expected </organisation>.
at [row,col {unknown-source}]: [87,27]
at org.kitesdk.morphline.saxon.SaxonCommand.doProcess(SaxonCommand.java:78)
at org.kitesdk.morphline.stdio.AbstractParser.doProcess(AbstractParser.java:96)
at org.kitesdk.morphline.base.AbstractCommand.process(AbstractCommand.java:156)
at org.kitesdk.morphline.stdlib.IfThenElseBuilder$IfThenElse.doProcess(IfThenElseBuilder.java:110)
at org.kitesdk.morphline.base.AbstractCommand.process(AbstractCommand.java:156)
at org.kitesdk.morphline.base.Connector.process(Connector.java:64)
at org.kitesdk.morphline.base.AbstractCommand.doProcess(AbstractCommand.java:181)
at org.kitesdk.morphline.stdlib.ConvertTimestampBuilder$ConvertTimestamp.doProcess(ConvertTimestampBuilder.java:161)
at org.kitesdk.morphline.base.AbstractCommand.process(AbstractCommand.java:156)
at org.kitesdk.morphline.base.Connector.process(Connector.java:64)
at org.kitesdk.morphline.base.AbstractCommand.doProcess(AbstractCommand.java:181)
at org.kitesdk.morphline.stdlib.ConvertTimestampBuilder$ConvertTimestamp.doProcess(ConvertTimestampBuilder.java:161)
at org.kitesdk.morphline.base.AbstractCommand.process(AbstractCommand.java:156)
at org.kitesdk.morphline.base.Connector.process(Connector.java:64)
at org.kitesdk.morphline.base.AbstractCommand.doProcess(AbstractCommand.java:181)
at com.ngdata.hbaseindexer.morphline.ExtractHBaseCellsBuilder$ExtractHBaseCells.doProcess(ExtractHBaseCellsBuilder.java:86)
at org.kitesdk.morphline.base.AbstractCommand.process(AbstractCommand.java:156)
at org.kitesdk.morphline.base.AbstractCommand.doProcess(AbstractCommand.java:181)
at org.kitesdk.morphline.base.AbstractCommand.process(AbstractCommand.java:156)
at com.ngdata.hbaseindexer.morphline.LocalMorphlineResultToSolrMapper.map(LocalMorphlineResultToSolrMapper.java:239)
... 10 more
Caused by: com.ctc.wstx.exc.WstxParsingException: Unexpected close tag </dispenserDetails>; expected </organisation>.
at [row,col {unknown-source}]: [87,27]
at com.ctc.wstx.sr.StreamScanner.constructWfcException(StreamScanner.java:630)
at com.ctc.wstx.sr.StreamScanner.throwParseError(StreamScanner.java:461)
at com.ctc.wstx.sr.BasicStreamReader.reportWrongEndElem(BasicStreamReader.java:3258)
at com.ctc.wstx.sr.BasicStreamReader.readEndElem(BasicStreamReader.java:3200)
at com.ctc.wstx.sr.BasicStreamReader.nextFromTree(BasicStreamReader.java:2832)
at com.ctc.wstx.sr.BasicStreamReader.next(BasicStreamReader.java:1019)
at org.kitesdk.morphline.saxon.XMLStreamCopier.copy(XMLStreamCopier.java:169)
at org.kitesdk.morphline.saxon.SaxonCommand.parseXmlDocument(SaxonCommand.java:99)
at org.kitesdk.morphline.saxon.XQueryBuilder$XQuery.doProcess2(XQueryBuilder.java:158)
at org.kitesdk.morphline.saxon.SaxonCommand.doProcess(SaxonCommand.java:74)
... 29 more
Created 11-27-2014 01:34 AM
Hi
After investigating this further, found out that the exception is thrown in saxon command code when attempting to parse the malformed XML. I don't it is right to throw an exceptin which results in aborting the indexer service.
A workaround solution is presented below. It consists of using Java command to parse the XML and catch the SaxParseException, log the error and set the '_attachment_body' filed to indicate a malformed XML is detected and other fields present in the same record are being indexed.
if {
conditions: [
{
java
{
imports: "import java.util.*; import org.xml.sax.*; import javax.xml.parsers.SAXParser; import javax.xml.parsers.SAXParserFactory; import java.io.StringReader; import org.xml.sax.helpers.DefaultHandler; import org.xml.sax.InputSource;"
code: """
List payload = record.get("_attachment_body");
try {
// parse the content of record field
SAXParserFactory factory = SAXParserFactory.newInstance();
SAXParser saxParser = factory.newSAXParser();
InputSource is = new InputSource(new StringReader(new String((byte[]) payload.get(0))));
saxParser.parse(is, new DefaultHandler());
} catch (Exception e)
{
logger.error("Malformed XML detected");
logger.error("payload: {} for record: {}", payload, record);
return true;
}
return false;
"""
}
}
]
then: [
{
translate {
field : _attachment_body
dictionary : {
0 : dummy
}
fallback : "Malformed XML detected" # if no fallback is defined and no match is found then the command fails
}
}
{logInfo {format: "Ignoring none well formed XML..."}}
]
else: [
{
xquery {
fragments: [
{
fragmentPath: "/"
queryString: """
(: All namespace declarations go here 🙂
(: Extracting all the fieleds that need indexing 🙂
let $source := /message/messageData/LegacyMessage/LegacyHeader/SenderDetails/SendingApplication/text() | /message/messageHeader/sendingApplication/text()
let $orgId := /message/messageData/LegacyMessage/LegacyHeader/SenderDetails/ODSID/text() | /message/securityData/organisationId/text()
let $prescriptionId := /message/messageData/*/prescriptionId/text() | /message/messageData/*/UPN/text()
let $transaId := /message/messageHeader/transactionId/text()
(: Returning the list of the fields that needs to be indexed. These fields are defined in solar schema.xml file. 🙂
return
<fieldsToIndex>
<source>{$source}</source>
<organisationNationalId>{$orgId}</organisationNationalId>
<prescriptionId>{$prescriptionId}</prescriptionId>
<transactionId>{$transaId}</transactionId>
</fieldsToIndex>
"""
}
]
}
}
]
}
}
Created 11-27-2014 01:34 AM
Hi
After investigating this further, found out that the exception is thrown in saxon command code when attempting to parse the malformed XML. I don't it is right to throw an exceptin which results in aborting the indexer service.
A workaround solution is presented below. It consists of using Java command to parse the XML and catch the SaxParseException, log the error and set the '_attachment_body' filed to indicate a malformed XML is detected and other fields present in the same record are being indexed.
if {
conditions: [
{
java
{
imports: "import java.util.*; import org.xml.sax.*; import javax.xml.parsers.SAXParser; import javax.xml.parsers.SAXParserFactory; import java.io.StringReader; import org.xml.sax.helpers.DefaultHandler; import org.xml.sax.InputSource;"
code: """
List payload = record.get("_attachment_body");
try {
// parse the content of record field
SAXParserFactory factory = SAXParserFactory.newInstance();
SAXParser saxParser = factory.newSAXParser();
InputSource is = new InputSource(new StringReader(new String((byte[]) payload.get(0))));
saxParser.parse(is, new DefaultHandler());
} catch (Exception e)
{
logger.error("Malformed XML detected");
logger.error("payload: {} for record: {}", payload, record);
return true;
}
return false;
"""
}
}
]
then: [
{
translate {
field : _attachment_body
dictionary : {
0 : dummy
}
fallback : "Malformed XML detected" # if no fallback is defined and no match is found then the command fails
}
}
{logInfo {format: "Ignoring none well formed XML..."}}
]
else: [
{
xquery {
fragments: [
{
fragmentPath: "/"
queryString: """
(: All namespace declarations go here 🙂
(: Extracting all the fieleds that need indexing 🙂
let $source := /message/messageData/LegacyMessage/LegacyHeader/SenderDetails/SendingApplication/text() | /message/messageHeader/sendingApplication/text()
let $orgId := /message/messageData/LegacyMessage/LegacyHeader/SenderDetails/ODSID/text() | /message/securityData/organisationId/text()
let $prescriptionId := /message/messageData/*/prescriptionId/text() | /message/messageData/*/UPN/text()
let $transaId := /message/messageHeader/transactionId/text()
(: Returning the list of the fields that needs to be indexed. These fields are defined in solar schema.xml file. 🙂
return
<fieldsToIndex>
<source>{$source}</source>
<organisationNationalId>{$orgId}</organisationNationalId>
<prescriptionId>{$prescriptionId}</prescriptionId>
<transactionId>{$transaId}</transactionId>
</fieldsToIndex>
"""
}
]
}
}
]
}
}
Created 11-27-2014 02:06 AM
FYI, the tryRules command with the catchExceptions : true parameter handles this kind of scenario more easily. http://kitesdk.org/docs/current/kite-morphlines/morphlinesReferenceGuide.html#/tryRules
Created 11-27-2014 04:08 AM
TryRules will do too and more elegant.
Thanks
Created 05-01-2019 12:27 PM
Can you please share your morphlines.conf? I am stuck in a similar situation.