Hello,
I am retrieving a csv content from a JDBC connection using ExecuteSQLRecord processor and I would like to add to the attribute the schema information id.
I can easily set an attribute with the avro schema definition of the query but I would like that NiFi wrote the schema to the Confluent Registry, retrieved the schema Id and add it to the flow attribute.
To achieve this is I set the following properties in the ExecuteSQLRecord:
Database Connection Pooling Service | DBCPConnectionPool |
SQL Pre-Query | No value set |
SQL select query | SQLQuery |
SQL Post-Query | No value set |
Max Wait Time | 0 seconds |
Record Writer | CSVRecordSetWriter |
Normalize Table/Column Names | FALSE |
Use Avro Logical Types | TRUE |
Max Rows Per Flow File | fetchSize |
Output Batch Size | 0 |
Fetch Size | fetchSize |
In the CSVRecordSetWriter I have the following settings:
Schema Write Strategy | Confluent Schema Registry Reference |
Schema Cache | No value set |
Schema Access Strategy | Inherit Record Schema |
Schema Registry | ConfluentSchemaRegistry |
Schema Name | \${schema.name} |
Schema Version | |
Schema Branch | |
Schema Text | \${avro.schema} |
And in the ConfluentSchemaRegistry I have:
But, when I try to run it I get the following error:
org.apache.nifi.schema.access.SchemaNotFoundException: Cannot write Confluent Schema registry reference because the schema identifier is not known.
So I have two questions:
- Is it possible (in an easy way) in NiFi to write a new Schema to Confluent Schema registry?
- Can I write a new schema using the Inherit Record Schema strategy and retrieve a schema ID?
Thanks in advance,
Alex