Member since
07-26-2019
68
Posts
30
Kudos Received
10
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
15350 | 01-22-2019 02:40 PM | |
2640 | 01-09-2019 03:10 PM | |
2192 | 08-30-2018 08:14 PM | |
1654 | 08-06-2018 07:58 PM | |
4786 | 07-16-2018 07:05 PM |
11-27-2019
10:45 AM
I would consider bundling the rows into larger flowfiles using something like MergeRecord, and then using the PutDatabaseRecord processor, which uses prepared, parameterized SQL and is considerably faster than RBAR(Row By Agonizing Row) inserts as generated by PutSQL. There are faster alternatives, but this may be the simplest one that will improve performance noticeably.
... View more
10-21-2019
01:09 PM
Thanks, @HorizonNet - I've updated the article with the correction!
... View more
10-07-2019
12:56 PM
Hi - It looks like this error is being caused by having two persistence providers in the providers.xml. You may need to add the file-based provider properties (providers.flowPersistenceProvider.file-provider.property.Flow Storage Directory and xml.providers.flowPersistenceProvider.file-provider.class) to the nifi.registry.providers.ignored list so that they are not added to the xml file when CM generates it. Here is an article detailing the process: https://community.cloudera.com/t5/Community-Articles/Configuring-the-Git-Persistence-Provider-for-the-NiFi/ta-p/278867
... View more
10-04-2019
02:16 PM
3 Kudos
OBJECTIVE: Step-by step instructions for enabling the git persistence provider for NiFi in Cloudera Flow Management (CFM) on Cloudera Manager. OVERVIEW: The default install of early releases of Cloudera Flow Management (CFM) includes only basic configurations for NiFi Registry in Cloudera Manager. This article explains how to reconfigure the Nifi Registry persistence providers to use git instead of the filesystem provider. PREREQUISITES: CFM (CDF) 1.0.1 or later and CM 5.16 or CM 6.2 or later ADDING NiFi METRICS TO CM: 1. Pick a location where the Nifi Registry will store local git repository files. This location has to be accessible to the NiFi Registry service account (typically: nifiregistry). It must also be initialized as a git repository using git init and linked to a remote repository using git remote add origin. (Alternatively, an existing git repository can be cloned to create the folder using git clone). The substitution variable ${nifi.registry.working.directory} can be used to point to the install location for NiFi Registry (usually /var/lib/nifiregistry/ when NiFi registry is installed by Cloudera Manager). 2. From the NIFIREGISTRY Configuration tab in Cloudera Manager, search for “flowPersistenceProvider” 3. Locate the “NiFi Registry Advanced Configuration Snippet (Safety Valve) for staging/providers.xml” Safety Valve configuration. 4. Add the following Name/Value Pairs: Name Value (example) Description xml.providers.flowPersistenceProvider.git-provider.class org.apache.nifi.registry.provider.flow.git.GitFlowPersistenceProvider Provider java class for git - should always be org.apache.nifi.registry.provider.flow.git.GitFlowPersistenceProvider xml.providers.flowPersistenceProvider.git-provider.property.Flow Storage Directory ${nifi.registry.working.directory}/git/flow_storage Local directory for git project. Must be git initialized and have a remote defined. Use https link xml.providers.flowPersistenceProvider.git-provider.property.Remote To Push origin Name of remote to push to - typically "origin" xml.providers.flowPersistenceProvider.git-provider.property.Remote Access User somegitusername User account for git remote repository xml.providers.flowPersistenceProvider.git-provider.property.Remote Access Password 1234mygitaccesstoken567 Personal access token for Github API Sample XML <property>
<name>xml.providers.flowPersistenceProvider.git-provider.property.Remote Access Password</name>
<value>1234mygitaccesstoken567</value>
<description>Personal access token for Github API</description>
</property>
<property>
<name>xml.providers.flowPersistenceProvider.git-provider.class</name>
<value>org.apache.nifi.registry.provider.flow.git.GitFlowPersistenceProvider</value>
<description>Provider java class for git - should always be org.apache.nifi.registry.provider.flow.git.GitFlowPersistenceProvider</description>
</property>
<property>
<name>xml.providers.flowPersistenceProvider.git-provider.property.Flow Storage Directory</name>
<value>${nifi.registry.working.directory}/git/flow_storage</value>
<description>Local directory for git project. Must be git initialized and have a remote defined. Use https link</description>
</property>
<property>
<name>xml.providers.flowPersistenceProvider.git-provider.property.Remote To Push</name>
<value>origin</value>
<description>Name of remote to push to - typically "origin"</description>
</property>
<property>
<name>xml.providers.flowPersistenceProvider.git-provider.property.Remote Access User</name>
<value>somegitusername</value>
<description>User account for git remote repository</description>
</property> 5. From the NIFIREGISTRY Configuration tab in Cloudera Manager, search for “providers ignored properties” 6. Locate the “providers ignored properties” (nifi.registry.providers.ignored) configuration. 7. Add the following two properties to the ignored list: xml.providers.flowPersistenceProvider.file-provider.property.Flow Storage Directory xml.providers.flowPersistenceProvider.file-provider.class (this prevents NiFi Registry from adding the default filesystem provider to the providers.xml along with the git provider, which would be invalid) CONCLUSION: After making the changes above and restarting the NiFi Registry service, you should be able to enable a registry client in the NiFi Controller Settings, then start version control in the NiFi canvas and see your changes reflected in the remote git repository (such as GitHub). For more information about versioning and dev/ops workflows using the NiFi Flow Registry with git, check out Pierre Villard’s excellent article here.
... View more
09-12-2019
09:01 AM
1 Kudo
OBJECTIVE: Add more useful service-level metrics for NiFi in Cloudera Flow Management (CFM)/Cloudera DataFlow (CDF). OVERVIEW: The default install of early releases of Cloudera Flow Management (CFM) includes only a few basic metrics for NiFi in the Cloudera Manager dashboards. This article explains how to add several more service and host level metrics that improve the performance monitoring in Cloudera Manager 5.x or 6.x. PREREQUISITES: CFM (CDF) 1.0.1 or later and CM 5.16 or CM 6.2 or later ADDING NiFi METRICS TO CM Basic Metrics 1. From the NiFi status page in Cloudera Manager, select “Chart Builder” from the “Charts” menu. 2. Insert one of the following SELECT statements in the query text box: NiFi Memory SELECT total_mem_rss_across_nifi_nodes, total_mem_virtual_across_nifi_nodes WHERE category = SERVICE and entityName = $SERVICENAME CPU Rates SELECT cpu_user_rate_across_nifi_nodes, cpu_system_rate_across_nifi_nodes WHERE entityName = $SERVICENAME AND category = SERVICE NiFi Bytes Written SELECT write_bytes_rate_across_nifi_nodes, avg(write_bytes_rate_across_nifi_nodes) WHERE entityName = $SERVICENAME AND category = SERVICE 3. Select a chart type (“Line” works well for fluid metrics like memory and CPU) 4. Click “Build Chart” 5. Update the Title (e.g., “CPU Rates”) 6. Select “All Combined” to place all selected metrics on the same chart. Select “All Separate” to add a separate chart for each metric. (NOTE: You may see a syntax error related to the $SERVICENAME variable not being available in the builder outside of a dashboard. This may be ignored and will resolve after saving) Screenshot of new chart before saving 7. Click “Save” and select the NiFi Status Page dashboard (listed under CDH5 or CDH6, depending upon release) Screenshot of CM chart save operation 8. Open the NiFi Status Page and confirm that your new metric is added Screenshot with new NiFi metrics charts added Stacked Metrics: For some metrics, such as memory utilization, a stacked presentation better represents total usage across the service. 1. For stacked metrics, add the SELECT statement as before, for example: CPU Rates Stacked SELECT cpu_user_rate_across_nifi_nodes, cpu_system_rate_across_nifi_nodes WHERE entityName = $SERVICENAME AND category = SERVICE 2. Select “All Combined” 3. Select a stacked format, such as “Stack Area” 4. Build the chart CM chart builder, stacked area memory 5. Save the chart to the appropriate dashboard as above Conclusion: Once you have added the metrics, experiment with other metric categories, filters, and aggregation to develop a dashboard that suits their needs. For additional metrics reporting capabilities, consider adding a Reporting Controller for DataDog or AppDynamics to push NiFi metrics to one of these general purpose SEIM/Operations tools.
... View more
06-10-2019
10:38 PM
9 Kudos
OBJECTIVE: Provide a quick-start guide for using the Jolt language within a NiFi JoltTransform (JoltTransformJSON or JoltTransformRecord). OVERVIEW: The NiFi JoltTransform uses the powerful Jolt language to parse JSON. Combined with the NiFi Schema Registry, this gives NiFi the ability to traverse, recurse, transform, and modify nearly any data format that can be described in AVRO or, using JSON as an intermediary step. Although the language itself is open-source, and some documentation is available in the JavaDoc, this article can provide a starting point for understanding basic Jolt operations. PREREQUISITES: HDF 3.0 or later (NiFi 1.2.0.3 or later) BASICS OF JOLT: Simplified Overview The JoltTransform applies a set of transformations described in a JSON specification to an input JSON document and generates a new output JSON document. Jolt Specification Overview A Jolt Specification is a JSON structure that contains two root elements: operation (string): shift, sort, cardinality, modify-default-beta, modify-overwrite-beta, modify-define-beta, or remove spec (JSON): A set of key/value pairs of the form {“input-side search”: “output-side transformation”}. Simple: Select a single jolt transform type from the drop-down, then type or paste the specification JSON Chained: Multiple Jolt specifications can be chained together sequentially in an array of simple specifications Stock Transforms Shift: Read values or portions of the input JSON tree and add them to specified locations in the output. Example: I have a bunch of things in the breadbox that I want to move to the countertop. Let’s move everything in the breadbox to the countertop: Input: {
"breadbox": {
"loaf1": {
"type": "white"
},
"loaf2": {
"type": "wheat"
}
},
"fridge": {
"jar1": {
"contents": "peanut butter"
},
"jar2": {
"contents": "jelly"
}
}
} Spec: [
{
"operation": "shift",
"spec": {
"breadbox": "counterTop"
}
}
] Output: {
"counterTop": {
"loaf1": {
"type": "white"
},
"loaf2": {
"type": "wheat"
}
}
} Default: Non-destructively adds values or arrays of values to the output JSON. Example: I want to slice up loaf1of bread if it exists. Let’s add an array of slices to loaf1: Input: {
"counterTop": {
"loaf1": {
"type": "white"
},
"loaf2": {
"type": "wheat"
},
"jar1": {
"contents": "peanut butter"
},
"jar2": {
"contents": "jelly"
}
} Spec: [
{
"operation": "default",
"spec": {
"counterTop": {
"loaf1": {
"slices": [
"slice1",
"slice2",
"slice3",
"slice4
]
}
}
}
}
]
} Output: {
"counterTop" : {
"loaf1" : {
"type" : "white",
"slices" : [ "slice1", "slice2", "slice3", "slice4" ]
},
"loaf2" : {
"type" : "wheat"
},
"jar1" : {
"contents" : "peanut butter"
},
"jar2" : {
"contents" : "jelly"
}
}
} Cardinality: Transforms elements in the input JSON to single values or to arrays (lists) in the output. Example: I have too many slices of bread. No matter how many there are, I just want the first one in the array, but as a single value: Input: {
"counterTop": {
"loaf1": {
"type": "white",
"slices": [
"slice1",
"slice2",
"slice3",
"slice4"
]
},
"loaf2": {
"type": "wheat"
},
"jar1": {
"contents": "peanut butter"
},
"jar2": {
"contents": "jelly"
}
}
} Spec: [
{
"operation": "cardinality",
"spec": {
"counterTop": {
"loaf1": {
"slices": "ONE"
}
}
}
}
] Output: {
"counterTop" : {
"loaf1" : {
"type" : "white",
"slices" : "slice1"
},
"loaf2" : {
"type" : "wheat"
},
"jar1" : {
"contents" : "peanut butter"
},
"jar2" : {
"contents" : "jelly"
}
}
} Remove: Remove elements if found in the input JSON. Example: I don’t really want loaf2 or jar1 (who needs whole wheat bread or peanut butter when you have jelly on pain bread!). Let’s remove loaf2 and jar2: Input: {
"counterTop": {
"loaf1": {
"type": "white",
"slices": "slice1"
},
"loaf2": {
"type": "wheat"
},
"jar1": {
"contents": "peanut butter"
},
"jar2": {
"contents": "jelly"
}
}
} Spec: [
{
"operation": "remove",
"spec": {
"counterTop": {
"loaf2": "",
"jar1": ""
}
}
}
] Output: {
"counterTop" : {
"loaf1" : {
"type" : "white",
"slices" : "slice1"
},
"jar2" : {
"contents" : "jelly"
}
}
} Modify: Write calculated values to elements in the target JSON. Calculations include basic string and math operations (toLower, toUpper, concat, min/max/abs, toInteger, toDouble, toLong and can be applied to source JSON values. Example: I really like jelly. Let’s make whatever’s in jar1 ALL CAPS so we can shout about it! Input: {
"counterTop": {
"loaf1": {
"type": "white",
"slices": "slice1"
},
"jar2": {
"contents": "jelly"
}
}
} Spec: [
{
"operation": "modify-overwrite-beta",
"spec": {
"counterTop": {
"jar2": {
"contents": "=toUpper"
}
}
}
}
] Output: {
"counterTop" : {
"loaf1" : {
"type" : "white",
"slices" : "slice1"
},
"jar2" : {
"contents" : "JELLY"
}
}
} Sort: Sorts all arrays and maps from the input JSON into the output. Sort cannot be configured beyond this all-or-nothing sort. Let’s put the jelly first to make it easier to spread on the bread later: Example: Let's sort the ingredients so that the jelly comes first. Jelly is more important, and it will be easier to spread that way. Input: {
"counterTop": {
"loaf1": {
"type": "white",
"slices": "slice1"
},
"jar2": {
"contents": "JELLY"
}
}
} Spec: [
{
"operation": "sort"
}
] Output: {
"counterTop" : {
"jar2" : {
"contents" : "JELLY"
},
"loaf1" : {
"slices" : "slice1",
"type" : "white"
}
}
} Custom Transforms: (Custom Transforms are out of scope for this tutorial) Wildcards and Operators Input-side (lefthand side) Input-side wildcards retrieve a value or JSON tree from the input JSON. * (asterisk) The asterisk wildcard traverses and reads each element in the source JSON at the level of the preceding search specification. Typically, the asterisk wildcard will return an array of elements. Example: Rather than just one source element, such as the breadbox, let’s grab everything, no matter what element it’s in, and put it on the counter Input: {
"breadbox": {
"loaf1": {
"type": "white"
},
"loaf2": {
"type": "wheat"
}
},
"fridge": {
"jar1": {
"contents": "peanut butter"
},
"jar2": {
"contents": "jelly"
}
}
} Spec: [
{
"operation": "shift",
"spec": {
"*": "counterTop"
}
}
] Output: {
"counterTop" : [ {
"loaf1" : {
"type" : "white"
},
"loaf2" : {
"type" : "wheat"
}
}, {
"jar1" : {
"contents" : "peanut butter"
},
"jar2" : {
"contents" : "jelly"
}
} ]
} The asterisk wildcard can be used with other string characters to parse data within an input JSON element (we’ll use the $ wildcard notation here – see below for an explanation of that) Example: Let’s take a look at the expiration date on the jelly. I am not a stickler for expiration dates, so I just want to check the year: Input: {
"breadbox": {
"loaf1": {
"type": "white"
},
"loaf2": {
"type": "wheat"
}
},
"fridge": {
"jar1": {
"contents": "peanut butter"
},
"jar2": {
"contents": "jelly",
"expiration": "25-APR-2019"
}
}
} Spec: [
{
"operation": "shift",
"spec": {
"fridge": {
"jar2": {
"expiration": {
"*-*-*": {
"$(0,3)": "expiry.year"
}
}
}
}
}
}
] Output: {
"expiry" : {
"year" : "2019"
}
} @ (“at” or arobase) The “at” wildcard traverses backwards up the source JSON and returns the entire tree or value at the specified position. @ or @0 (return value or tree of the matched key from the input JSON) Example: Let’s say I take a look at the jelly in jar2, and it has spoiled – We can use @contents to just toss the jelly into the garbage, but if the jelly is terribly bad, we can use @ or @0 to throw out everything in the jar, @1 to throw out everything in the fridge, or @2 to toss the whole kitchen into the garbage! Input: {
"breadbox": {
"loaf1": {
"type": "white"
},
"loaf2": {
"type": "wheat"
}
},
"fridge": {
"jar1": {
"contents": "peanut butter"
},
"jar2": {
"contents": "jelly",
"expiration": "25-APR-2019"
}
}
} Spec: [
{
"operation": "shift",
"spec": {
"fridge": {
"jar2": {
"contents": "garbage1",
"@0": "garbage2",
"@1": "garbage3",
"@2": "garbage4"
}
}
}
}
] Output: {
"garbage0" : {
"contents" : "jelly",
"expiration" : "25-APR-2019"
},
"garbage1" : "jelly",
"garbage2" : {
"contents" : "jelly",
"expiration" : "25-APR-2019"
},
"garbage3" : {
"jar1" : {
"contents" : "peanut butter"
},
"jar2" : {
"contents" : "jelly",
"expiration" : "25-APR-2019"
}
},
"garbage4" : {
"breadbox" : {
"loaf1" : {
"type" : "white"
},
"loaf2" : {
"type" : "wheat"
}
},
"fridge" : {
"jar1" : {
"contents" : "peanut butter"
},
"jar2" : {
"contents" : "jelly",
"expiration" : "25-APR-2019"
}
}
}
} Output-side (righthand side) Output-side wildcards return a single value that can be used in a target JSON key, key path or value. & (ampersand) The ampersand wildcard traverses backwards up the source JSON tree, beginning at the level of the preceding match. It returns only the value or key name (not the tree). The ampersand can be used in three ways: & or &0 (return the name of the matched key from the input JSON) Example: Let’s look at jar2 more closely, but I only care about what’s in it. We’ll just put the value of “contents” into the same element’s name (@0) Input: {
"breadbox": {
"loaf1": {
"type": "white"
},
"loaf2": {
"type": "wheat"
}
},
"fridge": {
"jar1": {
"contents": "peanut butter"
},
"jar2": {
"contents": "jelly",
"expiration": "25-APR-2019"
}
}
} Spec: [
{
"operation": "shift",
"spec": {
"fridge": {
"jar2": {
"contents": "&0"
}
}
}
}
] Output: {
"contents" : "jelly"
} &n (walk back up the tree ‘n’ levels and return the key name from the specified level) Example: Since the extra “contents” key is a bit superfluous, let’s just use the name of the parent element (&1) instead: Input: {
"breadbox": {
"loaf1": {
"type": "white"
},
"loaf2": {
"type": "wheat"
}
},
"fridge": {
"jar1": {
"contents": "peanut butter"
},
"jar2": {
"contents": "jelly",
"expiration": "25-APR-2019"
}
}
} Spec: [
{
"operation": "shift",
"spec": {
"fridge": {
"jar2": {
"contents": "&1"
}
}
}
}
] Output: {
"jar2" : "jelly"
} &(n,x) (walk back up the tree ‘n’ levels and return the key name from the x th child of the key at that level) Example: We really just want to know where to look for jelly, no matter what container it’s in. Let’s look at the top-level parent’s name instead (&(2,0)) Input: {
"breadbox": {
"loaf1": {
"type": "white"
},
"loaf2": {
"type": "wheat"
}
},
"fridge": {
"jar1": {
"contents": "peanut butter"
},
"jar2": {
"contents": "jelly",
"expiration": "25-APR-2019"
}
}
} Spec: [
{
"operation": "shift",
"spec": {
"fridge": {
"jar2": {
"contents": "&(2,0)"
}
}
}
}
] Output: {
"fridge" : "jelly"
} @ (“at” or arobase) The “at” wildcard traverses backwards up the source JSON and returns the entire tree or value at the specified position. Same functionality as on the input side, above: @, @(n), @(keyName), and @(n,keyName) forms Example: Let’s see what we have in all the jars in our refrigerator. We want to match everything with a name starting with “jar” (jar*) and return the contents of each element we find (@0,contents): Input: {
"breadbox": {
"loaf1": {
"type": "white"
},
"loaf2": {
"type": "wheat"
}
},
"fridge": {
"jar1": {
"contents": "peanut butter"
},
"jar2": {
"contents": "jelly",
"expiration": "25-APR-2019"
}
}
} Spec: [
{
"operation": "shift",
"spec": {
"*": {
"jar*": {
"@(0,contents)": "Things in jars"
}
}
}
}
] Output: {
"Things in jars" : [ "peanut butter", "jelly" ]
} $ (dollar sign) The dollar sign traverses backwards up the source JSON and returns only the value at the specified position. Same functionality as @, above: $, $(n), and $(n,x) forms Input: {
"breadbox": {
"loaf1": {
"type": "white"
},
"loaf2": {
"type": "wheat"
}
},
"fridge": {
"jar1": {
"contents": "peanut butter"
},
"jar2": {
"contents": "jelly",
"expiration": "25-APR-2019"
}
}
} Spec: [
{
"operation": "shift",
"spec": {
"*": {
"jar*": {
"$0": "List of jars"
}
}
}
}
] Output: {
"List of jars" : [ "jar1", "jar2" ]
} “Temp” workspace In a chained Jolt specification, it is possible to create a temporary structure as a workspace within the output JSON. This temporary structure can be useful for making multi-pass transformations or for holding a copy of the original input JSON during destructive transformations. They can then be removed from the output JSON within the same chained specification before the output JSON is produced. For an example, see the JOLT transform for this article: https://community.hortonworks.com/articles/232333/image-data-flow-for-industrial-imaging.html In this example spec, three “shift” operations are chained together. The “particles-orig” element is created to back up the original data in “particles,” then three passes are attempted because there may be a variable number of semicolon-delimited values in the “particles” value. When the three passes are complete, the successful pass is written to the output as “particles” and the backup is removed with a “remove” operation. Chained Spec: [
{
"operation": "shift",
"spec": {
"particles": ["particles-orig",
"particles-0",
"particles-1",
"particles-2",
"particles-3",
"particles-4"],
"timestamp": "ts",
"*": "&"
}
},
{
"operation": "shift",
"spec": {
"particles-orig": "particles-orig",
"particles-0": {
"*;*;*;*;*": {
"$(0,1)": "tmp.particle1[]",
"$(0,2)": "tmp.particle2[]",
"$(0,3)": "tmp.particle3[]",
"$(0,4)": "tmp.particle4[]",
"$(0,5)": "tmp.particle5[]"
}
},
"particles-1": {
"*;*;*;*": {
"$(0,1)": "tmp.particle1[]",
"$(0,2)": "tmp.particle2[]",
"$(0,3)": "tmp.particle3[]",
"$(0,4)": "tmp.particle4[]"
}
},
"particles-2": {
"*;*;*": {
"$(0,1)": "tmp.particle1[]",
"$(0,2)": "tmp.particle2[]",
"$(0,3)": "tmp.particle3[]"
}
},
"particles-3": {
"*;*": {
"$(0,1)": "tmp.particle1[]",
"$(0,2)": "tmp.particle2[]"
}
},
"particles-4": "tmp.particle1[]",
"*": "&"
}
},
{
"operation": "shift",
"spec": {
"tmp": {
"*": {
"0": {
"*,*,*,*": {
"@(4,runid)": "particles.[#4].runid",
"@(4,ts)": "particles.[#4].ts",
"$(0,1)": "particles.[#4].Xloc",
"$(0,2)": "particles.[#4].Yloc",
"$(0,3)": "particles.[#4].Xdim",
"$(0,4)": "particles.[#4].Ydim"
}
}
}
},
"*": "&"
}
},
{
"operation": "remove",
"spec": {
"particles-orig": ""
}
}
] REFERENCES: Jolt transform language GitHub Jolt Demo site (like JSfiddle, but for Jolt) Complete Jolt transform readme at GitHub RELATED POSTS: Image Data Flow for Industrial Imaging
... View more
Labels:
01-22-2019
02:40 PM
Hi Rajendra - This can happen when the session times out. You can extend the timeout period as described in this article: https://community.hortonworks.com/content/supportkb/167640/error-invalid-sessionhandle-sessionhandle-ecb9eded.html
... View more
01-16-2019
04:23 PM
1 Kudo
Glad that helped! My colleague Greg Keys has a great series here on HCC about extending Atlas that may help you out, too: https://community.hortonworks.com/articles/229421/customizing-atlas-part1-model-governance-traceabil.html https://community.hortonworks.com/articles/231988/customizing-atlas-part2-deep-source-metadata-embed.html
... View more
01-09-2019
03:10 PM
1 Kudo
There are two ways I'd suggest off the top of my head. The simplest might be to add a metadata tag that would contain a small sample in CSV, JSON or HTML format, and populate it via the API or Atlas Kafka topics. For example, you could use HDF NiFi to periodically sample each table in the Hive, format the data, and populate the attribute. For a more integrated approach, you might consider using the DataPlane DSS data profiler framework to add a "sample" profile that could be stored alongside the other profiler metadata and surfaced in DSS.
... View more
01-04-2019
03:24 AM
11 Kudos
Image Data Flow for
Industrial Imaging OBJECTIVE: Ingest and store
manufacturing quality assurance images, measurements, and metadata in a
cost-effective and simple-to-retrieve-from platform that can provide analytic
capability in the future. OVERVIEW: In high-speed
manufacturing, imaging systems may be used to identify material imperfections,
monitor thermal state, or identify when tolerances are exceeded. Many
commercially-available systems automate measurement and reporting of specific
tests, but combining results from multiple instrumentation vendors, longer-term
storage, process analytics, and comprehensive auditability require different
technology. Using HDF’s NiFi and
HDP’s HDFS, Hive or Hbase, and Zeppelin, one can build a cost-effective and performant
solution to store and retrieve these images, as well as provide a platform for
machine learning based on that data. Sample files and code, including the Zeppelin notebook, can be found on this github repository: https://github.com/wcbdata/materials-imaging PREREQUISITES: HDF 3.0 or later
(NiFi 1.2.0.3+) HDP 2.6.5 or later
(Hadoop 2.6.3+ and Hive 1.2.1+) Spark
2.1.1.2.6.2.0-205 or later Zeppelin 0.7.2+ STEPS: Get the files to a
filesystem accessible to NiFi. In this case, we are assuming the source system
can get the files to a local directory (e.g., via an NFS mount). Ingest the image and
data files to long-term storage Use a ListFile
processor to scrape the directory. In this example, the collected data files
are in a root directory, with each manufacturing run’s image files placed in a
separate subdirectory. We’ll use that location to separate out the files later Use a FetchFile to
pull the files listed in the flowfile generated by our ListFile. FetchFile can
move all the files to an archive directory once they have been read. Since we’re using a
different path for the images versus the original source data, we’ll split the
flow using an UpdateAttribute to store the file type, then route them to two
PutHDFS processors to place them appropriately. Note that PutHDFS_images uses the
automatically-parsed original ${path} to reproduce the source folder structure. Parse the data files
to make them available for SQL queries Beginning with only
the csv flowfiles, the ExtractGrok processor is used to pick one field from the
second line of the flowfile (skipping the header row). This field is referenced
by expression language that sets the schema name we will use to parse the flowfile. A RouteOnAttribute
processor checks the schema name using a regex to determine whether it the
flowfile format is one that requires additional processing to parse. In the
example, flowfiles identified as the “sem_meta_10083” schema are routed to the
processor group “Preprocess-SEM-particle.” This processor group contains the
steps for parsing nested arrays within the csv flowfile. Within the
“Preprocess-SEM-particle” processor group, the flowfile is parsed using a
temporary schema. A temporary schema can be helpful to parse some sections of a
flowfile row (or tuple) while leaving others for later processing. The flowfile is
split into individual records by a SplitRecord processor. SplitRecord is
similar to a SplitJSON or SplitText processor, but it uses NiFi’s
record-oriented parsers to identify each record rather than relying strictly on
length or linebreaks. A JoltTransform uses
the powerful JOLT language to parse a section of the csv file with nested
arrays. In this case, a semicolon-delimited array of comma-separated values is
reformatted to valid JSON then split out into separate flowfiles by an
EvaluateJSONPath processor. This JOLT transform uses an interesting combination
of JOLT wildcards and repeated processing of the same path to handle multiple
possible formats. Once formatted by a
record-oriented processor such as ConvertRecord or SplitRecord, the flowfile
can be reformatted easily as Avro, then inserted into a Hive table using a
PutHiveStreaming processor. PutHiveStreaming can be configured to ignore extra
fields in the source flowfile or target Hive table so that many overlapping
formats can be written to a table with a superset of columns in Hive. In this
example, the 10083-formatted flowfiles are inserted row-by-row, and the
particle and 10021-formatted flowfiles are inserted in bulk. Create a simple interface to retrieve individual images for review. The browser-based Zeppelin notebook can natively render images stored in SQL tables or in HDFS. The notebook begins
with some basic queries to view the data loaded from the imaging subsystems. The first example
paragraph uses SQL to pull a specific record from the manufacturing run, then
looks for the matching file on HDFS by its timestamp. The second set of
example paragraphs use an HTML/Angular form to collect the information, then
display the matching image. The third set of
sample paragraphs demonstrates how to obtain the image via Scala for analysis
or display. RELATED POSTS: JOLT transformation quick reference FUTURE POSTS: Storing microscopy
data in HDF5/USID Schema to make it available for analysis using standard
libraries Applying TensorFlow
to microscopy data for image analysis
... View more