Member since
11-16-2016
40
Posts
7
Kudos Received
1
Solution
My Accepted Solutions
Title | Views | Posted |
---|---|---|
2768 | 12-19-2016 03:17 PM |
04-28-2017
03:21 PM
It occurred to me that it might be better to just convert the entire flowfile contents to the stringify version rather than isolating the problematic blob first. I attempted this by using the following in the ExecuteScript: var StreamCallback = Java.type("org.apache.nifi.processor.io.StreamCallback");
var IOUtils = Java.type("org.apache.commons.io.IOUtils");
var StandardCharsets = Java.type("java.nio.charset.StandardCharsets");
var flowFile = session.get();
if(flowFile != null) {
// Create a new StreamCallback, passing in a function to define the interface method
flowFile = session.write(flowFile,
new StreamCallback(function(inputStream, outputStream) {
var text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
outputStream.write(JSON.stringify(text).getBytes(StandardCharsets.UTF_8))
}));
session.transfer(flowFile, REL_SUCCESS);
}
However, the resulting content has been modified where the existing json structure is now masked by the use of escape characters before double quotes for instance. Is there an easy way to convert this object back into a json array? Thanks.
... View more
04-27-2017
09:29 PM
Thank you for the reply Bryan. I'm stripping the data not needing to be converted prior to the ExecuteScript via two SplitContext processors that utilize hardcoded hexadecimal strings to isolate the string (called attributed 'default_payload') needing to be converted via the ExecuteStript: var flowFile = session.get();
if (flowFile !== null) {
var StreamCallback = Java.type("org.apache.nifi.processor.io.StreamCallback");
var IOUtils = Java.type("org.apache.commons.io.IOUtils");
var StandardCharsets = Java.type("java.nio.charset.StandardCharsets");
flowFile = session.write(flowFile, new StreamCallback(function(inputStream, outputStream) {
var contentObj = flowFile.getAttribute("default_payload");
outputStream.write(JSON.stringify(contentObj ).getBytes(StandardCharsets.UTF_8));
}));
session.transfer(flowFile, REL_SUCCESS);
} If I understand what your saying then the ExecuteScript would need to handled both the splitting and conversion at the same processor as to not write over attributes which would keep it synchronized in the flow? I had tried to use a parent UpdateAttribute to introduce a unique value to the flowfile(s) prior to the split/convertion and other JSON processing, but I am thinking this is to loose of a linkage for this type of processing.
... View more
04-27-2017
07:09 PM
I'm utilizing "SplitAvro" in prior to "ConvertAvroToJSON" to allow blob objects too large to convert via varchar in an SQL stream to be able to be ingested into a flowfile. I use several "SplitContent" to isolate the object needed to be handled specially via a custom "ExecuteScript" which, in turn, utilizes a "JSON.stringify" to get a string which can be handled by any subsequent JSON related processors. I'm wondering if I can utilize a MergeContent or something similar to re-merge the customized string back to the sibling flowfile which contained the other JSON structure? Is the processor dynamic enough to pair back to a separate flowfile using uuid or something similar and the 'Defragment' Merge Strategy. Or could I create a dynamic variable in the JSON structure flowfile which could map back to the corresponding customized string via some type of iterative process? Thank you for any hints or suggestions you can provide. ~Sean
... View more
Labels:
- Labels:
-
Apache NiFi
04-20-2017
04:20 PM
I suppose I should look outside of NiFi for solutions as there doesn't seem to be similar experiences or use cases as I have attempted.
... View more
04-20-2017
02:27 PM
If loading the entire table it the only option then would not providing a downstream processor allow the table to drain without clogging memory or other resources? Which processor would give me the most control on the number of rows/records returned with each iteration? With GenerateTableFetch it appears to be handled with the Partion Size parameter. With the QueryDatabaseTable processor I've experimented with using the Columns to Return
and Max Rows per File parameters without much success. The result of executing the QueryDatabaseTable processor always results in bring the NiFi cluster to a halt requiring one or more restarts to recover, so I'm reluctant to just arbitrarily try things. Any suggestions would greatly be appreciated.
... View more
04-19-2017
04:23 PM
Due to a limitation to the size (and type of data?) that can be cast within the ExecuteSQL processor (when casting a blob to a varchar I have issues if the field size exceeds 32K) I'm looking to utilize the GenerateTableFetch or QueryDatabaseTable processors provided with NiFi to ingest entire tables. QueryDatabaseTable seems promising, but I have run into two issues that has hindered using this approach. First of all the existing table size is way to large to ingest (probably terabytes or more) and I'm only interested in getting the records that are most recent, anyhow. Using a Max Initial Value on the date field seems to be a suitable key to keep track of the state. Is there anyway to inject a starting value other than the implicit key state achieve by ingesting the entire table? I was hoping to possible prime the query by initially hard coding a value for max_value_column in the `initial.maxvalue.{max_value_column}` parameter, but to no avail. Secondly, from what I've read in the forum it seems GenerateTableFetch would be the best option of there are joins required from more than one table. If this is so, could you provide an example of how one might go about implementing this? Thanks, ~Sean
... View more
Labels:
- Labels:
-
Apache NiFi
01-30-2017
08:30 PM
The issue was with the customized bundle utilizing a NiFi 0.5 api where the NiFi running the processor was at 1.0.0.
... View more
01-26-2017
08:02 PM
I realize my question is too vague to assist based on what I've provided. I'm not sure the "thread-dump.txt" is even capturing anything related to the problem. I know the processor is entering the exception part of the custom code, but I cannot get the inherent getLogger() to produce the error logs. (I've tried a standard System.out.println as well, but I'm not sure where the console output would be written to) I've seen examples where the logger is instantiated via "final ComponentLog logger = getLogger()" and other examples that suggest the getLogger() is inherented from the "AbstractProcessor" thus no instantiation is required. What is further confusing that if I try the above I see an no such method errors on the 'org.apache.nifi.logger.ProcessorLog' which I thought was deprecated. Our NiFi is running on HDF Version 2.0.1. Thanks of any assistance you can provide.
... View more
01-24-2017
05:18 PM
Hello, My custom java processor becomes unresponsive after being idle for a while resulting in or causing subsequent flows to fail. I used the ./nifi.sh dump thread-dump.txt to capture the problem and I've attached the resulting file. thread-dump.txt It appears that the dump shows predominately TIMED_WAITING on java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject, but I'm not sure how to interpret this issue. This is a continuation of the question asked earlier (https://community.hortonworks.com/questions/79331/debugging-custom-nifi-unresponsive-flows.html#answer-79337), but since I answered as satisfied I'm not sure that it will be monitored. Thanks, ~Sean
... View more
Labels:
- Labels:
-
Apache NiFi
01-23-2017
09:31 PM
Thank you for the prompt reply. I'll your suggestions a try.
... View more