Member since
12-14-2017
8
Posts
0
Kudos Received
0
Solutions
02-26-2018
07:59 PM
Yep - that will work nicely. Thanks Matt!
... View more
02-26-2018
04:23 PM
I should also mention that if I just used the "inferred.avro.schema" with the record reader service, it errors out with the following error: "Avro Runtime Exception: Not a named type". It appears that the reader service is unable to correctly read the new schema from the output of the InferAvroSchema processor. That is what I tried first and then started down the regex route.
... View more
02-26-2018
04:14 PM
Hi Tim, Thanks for the quick reply. I can post the template and screen shots, but its a small part of a rather large process, so let me try and outline just the part that is hanging and you tell me if thats enough info. Here is the flow; 1. User inputs some info about the database they wish to fetch from. That invokes a process which executes a fetch table statement and ultimately sends those statements to a execute SQL processor. 2. Data is output in avro and the next step is to gather the avro.schema info from the output. So thats where I currently have the ExtractAvroMetadata processor. I do use the RecordReader/Writer, and this is the schema its currently using. 3. Data is moving through a series of python and groovy scripts to encrypt the incoming PII (based on user input attributes) and normalize all DATE fields (again based on user inputs). So the output is encrypted data and new fields for dates with the original field name_normalized. So here is where it gets a bit fuzzy. At this point the schema has changed b/c I have introduced new fields into the flow file. I can't use the RecordReader/Writer b/c its referring back to the "avro.schema", which doesn't yet know about these new fields. So I am putting an "InferAvroSchema" before the RecordWriter in attempt to update the avro.schema from the inferred.avro.schema so the RecordWriter knows about these new fields. That is the part I am stuck on. If you think of a better approach or if you know something about the writer service (such as being able to infer the new schema), I am all ears. 🙂 Thanks Chris L
... View more
02-26-2018
03:55 PM
Hi there, I have something I am a bit stuck on. Within my flow my avro.schema changes because I have the flow files going through a few custom processors to change the data and add new fields to the data. At the point in which the data is coming out of the custom processors, its in json format, so I can't re-use the same "extract avro metadata" processor. I found that I could use the "infer avro schema" processor and just have it set the attribute "inferred.avro.schema" with what the new schema should be. At that point I can update the avro.schema attribute to be that of the inferred... but I found a few issues with the formatting of the "inferred.avro.schema" processor. It seems to be very verbose and hive won't accept that output as part of a create table statement. Because of this I have started to regex out the pieces that I know are not formatted correctly. I have most working, but stuck on one item. I don't see where I can start and end a string literal. Example dummy output snippet, from inferred.avro.schema: {"type":"record","name":"test_record","fields":[{"name":"bill_id","type":"string", '\"488\"'"},{"name":"congr_district","type":"string", '\"YEA\"'"},{"name":"cq","type":"string", '\"2004\"'"},{"name":"day","type":"string", '\"On Motion to Suspend the Rules and Pass\"'"},{"name":"month","type":"string", '\"4\"'"},{"name":"pull_date","type":"string", '\"2018-02-26\"'"},{"name":"pull_date_normalized","type":"string", '\"2018-01-26 00:02:00.000+0000\"'"},{"name":"question","type":"string", '\"nan\"'"},{"name":"rep_name","type":"string", '\"DEMOCRAT\"'"},{"name":"rep_party","type":"string", '\"OHIO\"'"},{"name":"state","type":"string", '\"24\"'"},{"name":"state_code","type":"string", '\"17\"'"},{"name":"title","type":"string", '\"encrypted_content"'"},{"name":"vote","type":"string", '\"To transfer Federal lands between the Secretary of Agriculture and the Secretary of the Interior\"'"},{"name":"year","type":"string", '\"OCTOBER\"'"}]}} So you will notice that I have parsed out the "Schema inferred by...." stuff, but I need to remove the items within the \. So this: {"name":"cq","type":"string", '\"2004\"'"},{"name":"day","type":"string", '\"On Motion to Suspend the Rules and Pass\"'"} Would be this: {"name":"cq","type":"string"},{"name":"day","type":"string"} I can't find a good way to start and stop within the expression language. I know for sure that regular java regex would pick it up as this: '\\.*?} Where it would start at the first \ and then go until it finds the first }. I don't see how it can be achieved from Nifi EL. It will accept this format, but don't do anything to the output of the attribute (it remains unchanged). Anyone have any ideas? Thanks, Chris L
... View more
Labels:
- Labels:
-
Apache NiFi
01-16-2018
10:04 PM
@Matt Clarke I was putting the "${schema.fingerprint}" within the MergeContent processor. That is likely the issue I am facing here. Thanks for the feedback. Also, thank you for the explanation about the thread pool and cluster info. I will take that into consideration as I move forward. Appreicate the tips! Chris
... View more
01-16-2018
08:14 PM
@Matt Clarke Hey Matt, Thanks for the quick and detailed response. Below are some answers for you: 1. Currently I have two tables I am testing with and I have the bin set to 5. 2. I did see some memory issues in the past and have upped the JVM to ~10GB. That seemed to work just fine. 3. I was using the following correlation attribute: "${avro_schema}_${tablename}". I like your suggestion about using the fingerprint from the avro metadata and have changed the correlation attribute to be: ${schema.fingerprint}. One question about that... is the processor smart enough to give a unique attribute for two different tables that might contain the same column names? Example: If i have two dimension tables that have something like ID, Name, Value, am I going to get the same fingerprint for those two tables because their schema is the same? If yes, should I concat another attribute onto it, like I was doing in the beginning? 4. The flow files being merged were tiny and I have implemented your suggestion to have two MergeContent processors. The first one groups in bundles of around 20K files and the second one further groups into chunk sizes of around 64MB. That worked pretty great. Thanks for the solution here. On your note about the following.... """ Are you putting "${avro_schema}_${tablename}" in the mergeContent processor's Correlation Attribute Name property value? What this property does is resolve the provide EL above to its actual values then checks the incoming Flowfiles for an attribute with that resolved value. """ My assumption with underscoring both attributes was to create a very unique attribute for the processor to merge on.. So if I had a schema named "foo" and a table named "bar", then my assumption was it would look for BOTH attributes and then create a "foo_bar" attribute. From the sound of it, I think you are saying the MergeContent processor is looking for a single attribute named "foo_bar"... which does make sense to me. That might be why the flow files were failing as the pre-merged attribute was not present. Am I on the right track with the above statement? Also do you have a suggestion on if a reusable flow template would be better to route these hundred tables to or if it would make sense to have 100 unique "end to end" flows (one for each table)? Thanks again.. I really appreciate your insights. Chris
... View more
01-16-2018
04:11 PM
Hi all, I am in the process of migrating our current data pipeline into Hadoop from traditional tools (SSIS, Python, etc) to using Nifi. We have several hundred tables that run daily. I like the approach that Kylo is taking and while it doesn't satisfy my needs 100%, I have used a few ideas from looking at their general data templates. I am currently at a stage now where I am not entirely sure how to: (1) best optimize this process to run concurrently for ~50 tables at a time and (2) get the MergeContent to work correctly for how I am using it. I am still fairly new to Nifi, so I am looking for someone to help me with best practices, if possible. Quick background on the how the entire flow works: Step 1 -- I have one processor group for each table we are bringing in. Its categorized by our servers, database, then table. I was not able to find a great way to create a reusable flow for these as the controller service for the DB couldn't be dynamically set. Nonetheless, I have a bit of SQL executing to fetch the delta from the source. That then feeds into an InferSchema processor so I can grab the schema and have that as an attribute. Next the flow goes into an UpdateAttribute processor where I have it pick up some table specific attributes I have set, such as schema_name, table_name, hdfs_parent_location, etc... All of these attributes are there to help the flow be unique and I also use them to build the hive "create if not exists" and "msck repair" scripts at the end of the process. Step 2 -- Once that bit of data is set for each source, they all flow into one reusable template, using the Input and Output processors. This is where I am a bit concerned that I am doing things correctly. The reusable flow consists of the following: - AvroToJson - Custom Groovy Script to encrypt all the incoming PII (being read from an attribute the user would set) - Split JSON - JSONtoAvro: Sending the flow back to Avro - MergeContent: Merging all with Bin-Packing, Avro Format, Using ${avro_schema}_${tablename} as the Correlation Attribute and I have it set now to try and merge the files to around 64MB, using 5 bins. - PutHDFS: Dynamically creates the fully qualified location from the attributes and partitions data by "DAY". - Replace Text: I build the create statement and msck repair table to add any partitions. - PutHQL: Run statement Here are my questions: 1. I am not sure the MergeContent processor is working correctly 100% of the time. I have found that if I use larger tables (1-2MM records) at the same time, it generally fails at the MergeContent step. It seems that it has a hard time using the correlation attribute that I have set and I have a bit of trouble getting the file sizes to be around that 64MB size. For the correlation attribute issue, I find that the processors fails off records (I have failure route back to itself so I can check the queue) and they generally sit in the queue. I can see which table its coming from, but when I check the failure queue, all of the attributes which were previously set are now gone. I have checked them step by step and they don't get erased until the MergeContent runs. So that is a bit strange. For file size, my assumption is the bins are getting full, but I have only two tables running durning my test and the MergeContent processor is set to 64MB on min group size and 5 minutes for the wait time. So please tell me if I am missing something (I am sure I am). Also please let me know if you think the correlation attribute problem I am having is typical in how I am using the processor or if you think it should work. In production, it will be much more than 2 tables running at one time... probably 50 or so. 2. From what I have listed above does anyone think the flow for each table should be its own end to end or am I on the right track with having this single reusable template for which all flows go into? I would assume the thread count would be much less for the reusable approach, but do worry a bit about moving that much data through a single group. Any help in direction would be much appreciated!! Thanks, Chris @Matt Burgess @Matt Clarke
... View more
Labels:
- Labels:
-
Apache NiFi