Created 08-22-2018 01:09 PM
I have data pipeline to consume message from Kafka and insert into oracle database
consumeKafka -> JoltTransformJSON -> putDatabaseRecord
The oracle table structure is as follows
CREATE TABLE Persons (
ID NUMBER NOT NULL ENABLE,
LastName varchar(255) NOT NULL,
FirstName varchar(255),
Age int,
CONSTRAINT "Persons_PK" PRIMARY KEY ("ID")
);
To insert a new record into the "Persons" table, we will have to use the nextval function.Json payload does not have the value for ID column. is there any option in putDatabaseRecord processor or any other processor to have seq_person.nextval in the insert statement?
Created 08-22-2018 03:20 PM
In addition to Shu's suggestion, you can use UpdateAttribute with Store State set to "Store state locally". This gives you a little more flexibility over the sequence values: you can set an initial value with the "Stateful Variables Initial Value" property, you can increment by any formula you want by specifying it in the UpdateAttribute user-defined property value using NiFi Expression Language, etc.
If you must get the value from an external database sequence, you could use a ScriptedLookupService with LookupRecord, you'd write a script to query the database sequence and return the value as the lookup value. I've written a blog post on how to do this with Groovy and PostgreSQL, but it can be done in any supported scripting language against any database (you have to "bring your own SQL" in the script).
Created 08-22-2018 01:19 PM
Take a look into nextInt() function in NiFi probably that will serve for your purposes.
Refer to the below link for more details regards to nextInt()
https://nifi.apache.org/docs/nifi-docs/html/expression-language-guide.html#nextint
-
If the Answer helped to resolve your issue, Click on Accept button below to accept the answer, That would be great help to Community users to find solution quickly for these kind of issues.
Created 08-22-2018 03:55 PM
Thanks for the reply. The requirement for the data pipeline is to have guaranteed data delivery. The nextint() seems to be not guaranteed to be unique across a cluster which will result in unique constraint exception. The solution may not optimal for our use case. Thanks again for your suggestion.
Created 08-22-2018 03:20 PM
In addition to Shu's suggestion, you can use UpdateAttribute with Store State set to "Store state locally". This gives you a little more flexibility over the sequence values: you can set an initial value with the "Stateful Variables Initial Value" property, you can increment by any formula you want by specifying it in the UpdateAttribute user-defined property value using NiFi Expression Language, etc.
If you must get the value from an external database sequence, you could use a ScriptedLookupService with LookupRecord, you'd write a script to query the database sequence and return the value as the lookup value. I've written a blog post on how to do this with Groovy and PostgreSQL, but it can be done in any supported scripting language against any database (you have to "bring your own SQL" in the script).
Created 08-22-2018 03:59 PM
Thanks for the suggestion. I will try the solution in blog post and post my comments.
Created 08-24-2018 05:30 AM
Thanks you very much for the solution.The solution worked like charm.
I applied the DDL in postgres
CREATE SEQUENCE id_seq START 101;
The pipeline generated the sequence.nextvalue in the id column.
[{"studentName":"Foo","Age":"12","address_city":"newyork","address1":"North avenue","zipcode":"123213","id":"101"},{"studentName":"Foo1","Age":"12","address_city":"newyork","address1":"North avenue","zipcode":"123213","id":"102"},{"studentName":"Foo2","Age":"12","address_city":"newyork","address1":"North avenue","zipcode":"123213","id":"103"}]
Thanks again!!!