Member since
06-30-2025
9
Posts
1
Kudos Received
0
Solutions
09-23-2025
06:43 PM
Hello @MattWho , Thank you so much. Very clear details and usefuls
... View more
08-25-2025
05:20 AM
@HoangNguyen Keep in mind that the Apache NiFi Variable Registry no longer exist in Apache NiFi 2.x releases and there is no more development of the Apache NIFi 1.x versions. NiFi Parameter Contexts, which were introduced in later versions of Apache NiFi 1.x, provides similar capability going forward and should be used instead of the variable registry. You'll be forced to transition to Parameter Contexts in order to move to Apache NiFi 2.x. versions. Please help our community grow. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more
07-07-2025
03:45 AM
Hello Folks, I’ve the issue with JoinEnrichment Processor The ORIGINAL flowfile looks like below: {
"ticketserialnumber" : "TSN00000012",
"tranheaderid" : "TH00000012",
"entrymethodid" : "EM6",
"deviceid" : "DEV22",
"prodid" : 1,
"isbetrejectedbytrader" : true,
"isexchangeticket" : false,
"terdisplayid" : "TER41",
"createddate" : "2025-06-17 22:24:00.0",
"requestid" : "REQ00000012",
"userdisplayid" : "USR863",
"cartid" : "CART00000012",
"transactiontype" : 1
}, {
"ticketserialnumber" : "TSN0000001",
"tranheaderid" : "TH001",
"entrymethodid" : "EM001",
"deviceid" : "DEV001",
"prodid" : 1,
"isbetrejectedbytrader" : false,
"isexchangeticket" : false,
"terdisplayid" : "TERM001",
"createddate" : "2025-06-17 22:10:00.0",
"requestid" : "REQ001",
"userdisplayid" : "USER001",
"cartid" : "CART001",
"transactiontype" : 1
}, And the ENRICHMENT Flowfile is looks like below: {
"tsn" : "TSN00000008",
"uuid_schema" : "VREQ00000005",
"event_date" : "2025-06-17 22:26:53.610071",
"terminalid" : "TER25",
"userid" : "USR244",
"locationid" : "LOC001",
"cartid" : "CART00000008",
"tranheaderid" : "VTH00000005",
"from_table" : "vbt",
"winningamount" : "399.09",
"rebatereclaim" : null,
"validationtypeid" : "VALD"
}, {
"tsn" : "TSN00000008",
"uuid_schema" : "VREQ00000005",
"event_date" : "2025-06-17 22:26:53.610071",
"terminalid" : "TER25",
"userid" : "USR244",
"locationid" : "LOC001",
"cartid" : "CART00000008",
"tranheaderid" : "VTH00000005",
"from_table" : "vbt",
"winningamount" : "399.09",
"rebatereclaim" : null,
"validationtypeid" : "VALD"
}, The setting with JoinEnricment processor is the Join with SQL SELECT
o.entrymethodid
,o.prodid
,o.isbetrejectedbytrader
,o.ticketserialnumber as ticketserialnumber
,o.requestid as uuid_schema
,1 as transactiontype
--,(o.createddate + INTERVAL '8' HOUR) as event_date -- Sửa cú pháp INTERVAL
,TIMESTAMPADD(HOUR, 8, CAST(o.createddate AS TIMESTAMP)) AS event_date
,o.terdisplayid as terminalid
,o.userdisplayid as userid
,e.locationid as locationid
,o.cartid
,o.tranheaderid
FROM original o
INNER JOIN enrichment e ON o.terdisplayid = e.terminalid
WHERE o.prodid IN (1, 2, 3, 4, 5, 6) AND o.transactiontype = 1
UNION ALL
SELECT
o.entrymethodid
,o.prodid
,o.isbetrejectedbytrader
,e.tsn as ticketserialnumber
,o.requestid as uuid_schema
,3 as transactiontype
,e.event_date as event_date
,e.terminalid as terminalid
,e.userid as userid
,e.locationid as locationid
,e.cartid
,e.tranheaderid
FROM original o
INNER JOIN enrichment e ON o.ticketserialnumber = e.tsn
WHERE
e.from_table = 'vbt'
AND o.prodid = 1
AND ((COALESCE(CAST(e.winningamount AS DECIMAL(10, 2)), 0.0) > 0.0) OR (COALESCE(CAST(e.rebatereclaim AS DECIMAL(10, 2)), 0.0) > 0.0))
AND o.transactiontype = 3
UNION ALL
SELECT
o.entrymethodid
,o.prodid
,o.isbetrejectedbytrader
,e.tsn as ticketserialnumber
,e.uuid_schema as uuid_schema
,3 as transactiontype
,e.event_date as event_date
,e.terminalid as terminalid
,e.userid as userid
,e.locationid as locationid
,e.cartid
,e.tranheaderid
FROM original o
INNER JOIN enrichment e ON o.ticketserialnumber = e.tsn
WHERE
e.from_table = 'vbt'
AND o.prodid IN (2, 3, 4)
AND ((COALESCE(cast(e.winningamount as decimal(10, 2)), 0.0) > 0.0) AND e.validationtypeid = 'VALD')
AND o.transactiontype = 3
UNION ALL
SELECT
o.entrymethodid
,o.prodid
,o.isbetrejectedbytrader
,e.tsn as ticketserialnumber
,e.uuid_schema as uuid_schema
,3 as transactiontype
,e.event_Date as event_date
,e.terminalid as terminalid
,e.userid as userid
,e.locationid as locationid
,e.cartid
,e.tranheaderid
FROM original o
INNER JOIN enrichment e ON o.ticketserialnumber = e.tsn
WHERE
e.from_table = 'vbt'
AND o.prodid IN (5, 6)
AND (COALESCE(cast(e.winningamount as decimal(10, 2)), 0) > 0 AND e.validationtypeid = 'VALD')
AND o.transactiontype = 3
UNION ALL
SELECT
o.entrymethodid
,o.prodid
,o.isbetrejectedbytrader
,e.tsn as ticketserialnumber
,o.requestid as uuid_schema
,2 as transactiontype
,e.event_date as event_date
,e.terminalid as terminalid
,e.userid as userid
,e.locationid as locationid
,e.cartid
,e.tranheaderid
FROM original o
INNER JOIN enrichment e ON o.ticketserialnumber = e.tsn
WHERE
e.from_table = 'cbt'
AND o.prodid = 1
AND o.transactiontype = 5
UNION ALL
SELECT
o.entrymethodid
,o.prodid
,o.isbetrejectedbytrader
,e.tsn as ticketserialnumber
,o.requestid as uuid_schema
,2 as transactiontype
,e.event_date as event_date
,e.terminalid as terminalid
,e.userid as userid
,e.locationid as locationid
,e.cartid
,e.tranheaderid
FROM original o
INNER JOIN enrichment e ON o.ticketserialnumber = e.tsn
WHERE e.from_table = 'cbt'
AND o.prodid IN (2, 3, 4)
AND o.transactiontype = 5
UNION ALL
SELECT
o.entrymethodid
,o.prodid
,o.isbetrejectedbytrader
,e.tsn as ticketserialnumber
,o.requestid as uuid_schema
,2 as transactiontype
,e.event_date as event_date
,e.terminalid as terminalid
,e.userid as userid
,e.locationid as locationid
,e.cartid
,e.tranheaderid
FROM original o
INNER JOIN enrichment e ON o.ticketserialnumber = e.tsn
WHERE
e.from_table = 'cbt'
AND o.prodid IN (5, 6)
AND o.isbetrejectedbytrader = FALSE
AND o.transactiontype = 5
/* Start Here */
UNION ALL
SELECT
o.entrymethodid
,o.prodid
,o.isbetrejectedbytrader
,e.tsn as tsn
,e.uuid_schema as uuid_schema
,3 as transactiontype
,e.event_date as event_date
,e.terminalid as terminalid
,e.userid as userid
,e.locationid as locationid
,e.cartid as cartid
,e.tranheaderid as tranheaderid
FROM original o
JOIN enrichment e ON o.ticketserialnumber = e.tsn
WHERE
(e.from_table = 'vbt')
AND o.prodid IN (5, 6)
AND e.validationtypeid = 'RFND'
AND o.transactiontype = 3 If I put the SQL from /Start Here/ the Join Enrichment will show the error as below JoinEnrichment[id=e3cb63a3-0197-1000-9980-b2fb1481e3ad] Failed to join 'original' FlowFile FlowFile[filename=5a8e2b97-533f-425b-a5aa-40d2753c80f9] and 'enrichment' FlowFile FlowFile[filename=5a8e2b97-533f-425b-a5aa-40d2753c80f9]; routing to failure: java.lang.NumberFormatException: Character v is neither a decimal digit number, decimal point, nor "e" notation exponential mark. I have debug but I can’t find any problem with the SQL. If I remove it the SQL from /Start Here/ the JoinEnrichment can run but if I put it into The JoinEnrichment Processor will pop up the error. Can you please help on this issue?
... View more
Labels:
- Labels:
-
Apache NiFi
07-03-2025
06:36 AM
@HoangNguyen There isn't an existing processor included with Apache NiFi capable of performing an UNION ALL against the contents of multiple FlowFiles. The JoinEnrichment is the only processor that can modify the contents of one FlowFile using the contents of another, but that only handles two FlowFiles (original FlowFile and enrichment FlowFile) in a single execution. The other record orientated processor all perform actions against an individual record in a FlowFile. You may need to develop your own custom processor for such a task. Something like the MergeRecord processor that bins like FlowFiles and then performs a UNION ALL on those binned FlowFiles. You could also raise a Jira in Apache NiFi (https://issues.apache.org/jira/browse/NIFI) asking for a processor that can perform such an operation and maybe someone would attempt to build it if their us enough Apache Community interest. You could also explore what Cloudera offers to its customers in terms of professional services that could help with building custom processors for Cloudera Flow Management offerings based off Apache NiFi. Please help our community grow. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more
07-02-2025
08:31 AM
@HoangNguyen All the ForkEnrichment processor does is add two specific FlowFile Attributes to each FlowFile it outputs: The JoinEnrichment processor depends on receiving two FlowFiles with Matching "enrichment.group.ids" and one with "enrichment.role" = ORIGINAL and other FlowFile with "enrichment.id" = ENRICHMENT. So you can do something like this for example: In the above you above you fork the staring FlowFile and then join that first Enrichment, then you use ForkEnrichment again to generate the needed FlowFile attributes for the second Join enrichment operation. Please help our community grow. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more