Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Nifi Expression Language - Begin and end a string

avatar
Explorer

Hi there,

I have something I am a bit stuck on. Within my flow my avro.schema changes because I have the flow files going through a few custom processors to change the data and add new fields to the data. At the point in which the data is coming out of the custom processors, its in json format, so I can't re-use the same "extract avro metadata" processor. I found that I could use the "infer avro schema" processor and just have it set the attribute "inferred.avro.schema" with what the new schema should be. At that point I can update the avro.schema attribute to be that of the inferred... but I found a few issues with the formatting of the "inferred.avro.schema" processor. It seems to be very verbose and hive won't accept that output as part of a create table statement. Because of this I have started to regex out the pieces that I know are not formatted correctly. I have most working, but stuck on one item. I don't see where I can start and end a string literal.

Example dummy output snippet, from inferred.avro.schema:

{"type":"record","name":"test_record","fields":[{"name":"bill_id","type":"string", '\"488\"'"},{"name":"congr_district","type":"string", '\"YEA\"'"},{"name":"cq","type":"string", '\"2004\"'"},{"name":"day","type":"string", '\"On Motion to Suspend the Rules and Pass\"'"},{"name":"month","type":"string", '\"4\"'"},{"name":"pull_date","type":"string", '\"2018-02-26\"'"},{"name":"pull_date_normalized","type":"string", '\"2018-01-26 00:02:00.000+0000\"'"},{"name":"question","type":"string", '\"nan\"'"},{"name":"rep_name","type":"string", '\"DEMOCRAT\"'"},{"name":"rep_party","type":"string", '\"OHIO\"'"},{"name":"state","type":"string", '\"24\"'"},{"name":"state_code","type":"string", '\"17\"'"},{"name":"title","type":"string", '\"encrypted_content"'"},{"name":"vote","type":"string", '\"To transfer Federal lands between the Secretary of Agriculture and the Secretary of the Interior\"'"},{"name":"year","type":"string", '\"OCTOBER\"'"}]}}

So you will notice that I have parsed out the "Schema inferred by...." stuff, but I need to remove the items within the \.

So this:

{"name":"cq","type":"string", '\"2004\"'"},{"name":"day","type":"string", '\"On Motion to Suspend the Rules and Pass\"'"}

Would be this:

{"name":"cq","type":"string"},{"name":"day","type":"string"}

I can't find a good way to start and stop within the expression language. I know for sure that regular java regex would pick it up as this:

'\\.*?}

Where it would start at the first \ and then go until it finds the first }. I don't see how it can be achieved from Nifi EL. It will accept this format, but don't do anything to the output of the attribute (it remains unchanged). Anyone have any ideas?

Thanks,

Chris L

1 ACCEPTED SOLUTION

avatar
Master Guru

If you're updating the record structure, you'll need to update the schema as well. It sounds like the data coming out of ExecuteSQL may have different schemas based on which table is specified by the user. If you know all the possible schemas, you could enumerate them in a Schema Registry, where the key is some attribute (such as table name, let's say it's called table.name), and in the reader(s) you can use ${table.name} as the Schema Name property, and "Use Schema Name" as the access strategy. Then you could add "copies" of the input schemas, with the additional fields and any updated types, as output schemas. Those would be the ones used by the readers/writers after all your scripts have run.

If the schemas truly are dynamic and you just need to add fields, then if the schema is in the avro.schema attribute, you could update it manually using a scripted processor such as ExecuteScript. For example, here is a Groovy script to be used in ExecuteScript for updating the avro.schema attribute by adding 2 extra entries to the "fields" array:

import groovy.json.*
def flowFiles = session.get(1000)
if(!flowFiles) return
session.transfer(flowFiles.collect { flowFile ->
   def schema = flowFile.getAttribute('avro.schema')
   def json = new JsonSlurper().parseText(schema)
   json.fields += ['name': 'processGroupName', 'type': ['null', 'string']]
   json.fields += ['name': 'processGroupId', 'type': ['null', 'string']]
   session.putAttribute(flowFile, 'avro.schema', JsonOutput.toJson(json))
}, REL_SUCCESS)

The script grabs up to 1000 flow files, then for each one, it reads in the schema from the avro.schema attribute, parses the JSON, then adds processGroupName and processGroupId (both optional strings) to the "fields", then converts the JSON back to a string and updates the avro.schema attribute. Once this is done for all flow files, they are transferred to success.

View solution in original post

5 REPLIES 5

avatar
Master Guru

can you post template, screen shots and a log.

if you use a record writer with the new schema it can write to hive.

I would convert to AVRO with the new schema then you can easily write to ORC for Hive or something else.

avatar
Explorer

Hi Tim,

Thanks for the quick reply. I can post the template and screen shots, but its a small part of a rather large process, so let me try and outline just the part that is hanging and you tell me if thats enough info. Here is the flow;

1. User inputs some info about the database they wish to fetch from. That invokes a process which executes a fetch table statement and ultimately sends those statements to a execute SQL processor.

2. Data is output in avro and the next step is to gather the avro.schema info from the output. So thats where I currently have the ExtractAvroMetadata processor. I do use the RecordReader/Writer, and this is the schema its currently using.

3. Data is moving through a series of python and groovy scripts to encrypt the incoming PII (based on user input attributes) and normalize all DATE fields (again based on user inputs). So the output is encrypted data and new fields for dates with the original field name_normalized.

So here is where it gets a bit fuzzy. At this point the schema has changed b/c I have introduced new fields into the flow file. I can't use the RecordReader/Writer b/c its referring back to the "avro.schema", which doesn't yet know about these new fields. So I am putting an "InferAvroSchema" before the RecordWriter in attempt to update the avro.schema from the inferred.avro.schema so the RecordWriter knows about these new fields. That is the part I am stuck on.

If you think of a better approach or if you know something about the writer service (such as being able to infer the new schema), I am all ears. 🙂

Thanks

Chris L

avatar
Explorer

I should also mention that if I just used the "inferred.avro.schema" with the record reader service, it errors out with the following error: "Avro Runtime Exception: Not a named type". It appears that the reader service is unable to correctly read the new schema from the output of the InferAvroSchema processor. That is what I tried first and then started down the regex route.

avatar
Master Guru

If you're updating the record structure, you'll need to update the schema as well. It sounds like the data coming out of ExecuteSQL may have different schemas based on which table is specified by the user. If you know all the possible schemas, you could enumerate them in a Schema Registry, where the key is some attribute (such as table name, let's say it's called table.name), and in the reader(s) you can use ${table.name} as the Schema Name property, and "Use Schema Name" as the access strategy. Then you could add "copies" of the input schemas, with the additional fields and any updated types, as output schemas. Those would be the ones used by the readers/writers after all your scripts have run.

If the schemas truly are dynamic and you just need to add fields, then if the schema is in the avro.schema attribute, you could update it manually using a scripted processor such as ExecuteScript. For example, here is a Groovy script to be used in ExecuteScript for updating the avro.schema attribute by adding 2 extra entries to the "fields" array:

import groovy.json.*
def flowFiles = session.get(1000)
if(!flowFiles) return
session.transfer(flowFiles.collect { flowFile ->
   def schema = flowFile.getAttribute('avro.schema')
   def json = new JsonSlurper().parseText(schema)
   json.fields += ['name': 'processGroupName', 'type': ['null', 'string']]
   json.fields += ['name': 'processGroupId', 'type': ['null', 'string']]
   session.putAttribute(flowFile, 'avro.schema', JsonOutput.toJson(json))
}, REL_SUCCESS)

The script grabs up to 1000 flow files, then for each one, it reads in the schema from the avro.schema attribute, parses the JSON, then adds processGroupName and processGroupId (both optional strings) to the "fields", then converts the JSON back to a string and updates the avro.schema attribute. Once this is done for all flow files, they are transferred to success.

avatar
Explorer

Yep - that will work nicely. Thanks Matt!