Member since
11-16-2015
865
Posts
615
Kudos Received
240
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
131 | 08-09-2019 05:46 PM | |
299 | 07-31-2019 07:56 PM | |
168 | 05-29-2019 04:39 PM | |
112 | 05-24-2019 02:37 AM | |
444 | 05-10-2019 04:27 PM |
12-06-2019
06:57 AM
The PutCassandraRecord processor was added for this purpose. If you only have one line of data per flow file, it will be slower than it needs to be; instead you could use MergeContent or MergeRecord to consolidate a number of these rows into a single flow file, then use PutCassandraRecord (with a CSVReader that has a schema with your 3 fields in it), and it will load all the records in one execution.
... View more
08-09-2019
05:48 PM
You can put an ExecuteScript processor before SelectHiveQL, and use that to check the time. If it must not execute then it can simply return without fetching any FlowFiles from the queue; otherwise you can simply fetch and transfer all available FlowFiles.
... View more
08-09-2019
05:46 PM
If I am reading your use case correctly, I think you're looking for what the ForkRecord processor does; it allows you to fork a (usually single) record into multiple records based on a Record Path (similar to JSONPath but different syntax and expressiveness), possibly keeping the "root" elements common to each outgoing record.
... View more
08-09-2019
05:44 PM
PutDatabaseRecord does not with work Hive (at least with the Apache Hive or HDP Hive JDBC drivers) as it uses methods of the JDBC API that are not supported by the driver. I'm not sure if the Simba driver works or not. A workaround for managed ACID tables is to use PutHiveStreaming instead, or if you have Hive 3, you can use PutHive3Streaming. The latter is a lot more flexible in terms of input, and relaxes some other requirements for the target table. For external tables, you could put the files directly into HDFS and create a table atop the target directory. If that "temp table" is to be inserted into a managed Hive table, you can then do that with PutHiveQL (after setting the flowfile content to the appropriate INSERT FROM SELECT HiveQL statement). Another (less-than-ideal) workaround is to convert individual records to SQL, update it to HiveQL as necessary (not always needed but sometimes), then send individual statements to PutHiveQL. This is very slow and ineffiencent
... View more
07-31-2019
07:56 PM
If it works in NiFi 1.5 and not in 1.9.2, then the DBCP libraries probably got updated to DBCP 2, where the latter will call isValid() on the connection if a validation query is not set (see https://sourceforge.net/p/jtds/discussion/104388/thread/bbfbcf24/). Have you set the Validation Query property on the DatabaseConnectionPool? If not, set it to "SELECT 1" and try again, hopefully that will work. If that doesn't, you can try adding a user-defined property called "validationQuery" and set it to "SELECT 1". That should add it as a property on the JDBC connection itself. Another workaround would be to use the official SQL Server driver (although I assume you chose jTDS on purpose).
... View more
07-08-2019
02:59 PM
1 Kudo
Can you share the configuration of your HortonworksSchemaRegistry controller service?
... View more
07-02-2019
09:42 PM
I don't think there's much you can do in NiFi, unless you can tune parameters in the JDBC URL, or use PartitionRecord to ensure each FlowFile has only one unique value for the partition column, but even then it looks like you'd need some special logic on the PostgreSQL side. I found this article about maybe this being more performant in later or upcoming versions of PostgreSQL, and this one about doing partitioning via child tables, not sure how your table(s) are set up but maybe some of this will help.
... View more
07-02-2019
09:37 PM
PutSQL actually doesn't want the semicolon at the end (that's for command-line and other stuff that allow multiple statements), do you get the same error if you leave the semicolon off?
... View more
06-27-2019
05:32 PM
In the script, you're creating a variable `objList` that (for the first input format) points at the top-level array of objects, so you can call max() directly on that array (I think technically it's a List under the hood). In the second input format, objList will be pointing to the top-level object, so you'll need to get the array member `table` out of the object. Update the "def max" line to this: def max = objList.table.max {Date.parse("yyyyMMddHHmmss",it.elem_stand)}
... View more
05-30-2019
02:19 PM
The JsonPath Expression is meant to identify an array, then SplitJson will split each element into its own flow file. Try "$.data" as your JsonPath Expression, and use the "splits" relationship to send things downstream. The "original" relationship will contain the incoming flow file, which doesn't sound like what you want.
... View more
05-29-2019
04:39 PM
In nifi-assembly/target you'll find the built system as you mention, including a "conf" folder that contains (among other things) a file called bootstrap.conf. In that file there's a commented out JVM property to enable attachment by a debugger (the preceding line says "Enable Remote Debugging". When you uncomment that argument and start NiFi, it will listen on port 8000 for a debugger to attach. You can then attach a debugger from your IDE (Eclipse, NetBeans, IntelliJ, etc.). You can change the port and/or set "suspend=y" if you want it to wait until a debugger is attached before continuing startup, the latter is helpful if you are debugging something early in the startup sequence. Otherwise you can wait for NiFi to finish starting up and then attach whenever you like.
... View more
05-24-2019
02:52 AM
What does your query look like, what are the results, and what would you like to see in the single row/record?
... View more
05-24-2019
02:37 AM
2 Kudos
You were so close! By using the [] syntax it just adds to the outgoing array, but you wanted to associate them with the same index, namely the one matched by the * "above" the fields. Put #2 inside your braces (#2 is a reference to the array index you're iterating over, "two levels up" from where you are in the spec): [{
"operation": "shift",
"spec": {
"nummer": "Nummer",
"table": {
"*": {
"zn": "Positionen.[#2].ZeileNr",
"datum": "Positionen.[#2].Datum"
}
}
}
}]
... View more
05-10-2019
06:18 PM
1 Kudo
To address your comment below, I missed the part where you want to call the outgoing field "color". Change this line (8): "$": "colorsLove[].&2" To this: "$": "colorsLove[].color"
... View more
05-10-2019
04:27 PM
1 Kudo
This Chain spec will add the hardcoded value 20190905 into the array (after removing empty values): [
{
"operation": "shift",
"spec": {
"color_*": {
"": "TRASH",
"*": {
"$": "colorsLove[].&2"
}
},
"*": "&"
}
},
{
"operation": "shift",
"spec": {
"colorsLove": {
"*": {
"#20190905": "colorsLove[#2].date",
"*": "colorsLove[#2].&"
}
},
"*": "&"
}
},
{
"operation": "remove",
"spec": {
"TRASH": ""
}
}
] You should be able to replace "#20190905" with a NiFi Expression Language statement, maybe something like: "#${now:toNumber():format('yyyyddMM')}" ... but I didn't try that part.
... View more
05-10-2019
03:43 PM
The user-defined properties are available in your ProcessContext like all properties are, but they have a flag for being dynamic . Here's a snippet for iterating over the user-defined properties: context.getProperties().keySet().stream().filter(PropertyDescriptor::isDynamic).forEach() Fill in the lambda in the forEach() with your logic for handling the properties. (This is taken from my SO answer for the same question)
... View more
05-07-2019
01:58 PM
What does the generated SQL coming from ConvertJSONToSQL look like? Are the fields correctly uppercased? Does your database lowercase the column names? Did you try setting the "Translate Field Names" property to "true" in ConvertJSONToSQL? Does the case of the table name in the SQL match the case of the table name in the DB? If you're using "dbo.xxxx" as the Table Name property in ConvertJSONToSQL, instead try using just "xxxx" as the Table Name, and setting either Catalog Name or Schema Name (depending on your DB) to "dbo" (or DBO if necessary).
... View more
05-07-2019
01:43 PM
Have you tried PutDatabaseRecord? The reader provides the schema so there is no need to set the sql.args attributes. As of NiFi 1.9 the reader can also infer the datatypes, so you wouldn't have to specify the schema either. If you have an older NiFi, you can try ExecuteSQL -> ConvertRecord (Avro to JSON) -> InferAvroSchema -> PutDatabaseRecord, that's a heavy-handed way of getting the schema inferred.
... View more
05-07-2019
01:37 PM
You should be able to use SplitXml -> PutHDFS, using the Split Depth property to specify where to do the tag splitting. Each tag at that depth will be output as a separate flow file which you can send to HDFS via the PutHDFS processor. You may need to use UpdateAttribute to set the filename attribute, which is used by PutHDFS as the target filename.
... View more
05-07-2019
01:31 PM
You should be able to use ConvertRecord -> PutHDFS for this. ConvertRecord would use a XMLReader and CSVRecordSetWriter.
... View more
05-07-2019
01:30 PM
1 Kudo
If you are not obtaining keys from the database, not using fragmented transactions, and not rolling back on failure, then you should see the failed flow files in a batch being routed to the failure relationship. If you must configure the processor differently, then the flow files will be treated as a single transaction. In that case, in order to handle individual failures you'll want to not use batches, meaning set PutSQL's Batch Size property to 1.
... View more
05-03-2019
03:21 PM
Did you mean ExtractText instead of ReplaceText?
... View more
05-02-2019
08:51 PM
1 Kudo
I don't think your desired output is valid JSON, as the root object only has an array in it, not a key/value pair. If you want a key in there (let's call it root) the following spec will work in JoltTransformJSON: [
{
"operation": "shift",
"spec": {
"*": "root.[]"
}
}
] Otherwise if you just want to add braces around the array, you can use ReplaceText, replacing the entire thing with {$1}
... View more
04-29-2019
01:49 PM
1 Kudo
You can use the SiteToSiteProvenanceReportingTask for this. Filter the reporting task to only emit events at the "certain point" you mention above. Each event has a "timestampMillis" and "lineageStart" field, you should be able to route on the difference of the two using QueryRecord, with something like: SELECT * FROM FLOWFILE WHERE timestampMillis - lineageStart > 60000 Which should emit a flowfile containing all events for which the associated entity (in this case, the flow file in the system) has been in the flow for over a minute.
... View more
04-25-2019
05:35 PM
You'll need to provide the CLOB as an attribute, meaning you've set attributes like sql.args.1.type to 2005 and sql.args.1.value to the CLOB value. Then your SQL statement would have a ? parameter, and the CLOB value will be inserted when the SQL statement is prepared. See NIFI-4352 for more information.
... View more
04-24-2019
04:44 PM
Once the value is in an attribute, it loses its type information as all attribute values are treated as strings. So when you use AttributesToJSON, you'll get a string value of the timestamp, not the long value. Instead, try UpdateRecord, you can set/generate the field using a ReplacementValueStrategy of "Literal Value", where you can use NiFi Expression Language such as ${now():toNumber()}. Just make sure that the field is in the output schema as a long.
... View more
04-24-2019
01:25 PM
You'll want to convert now() into a number for use by PutCassandraRecord, try ${now():toNumber()} instead.
... View more
04-23-2019
09:02 PM
Where in your flow do you have the information to populate the STATUS field?
... View more
04-23-2019
04:57 PM
It depends on how you are trying to "flatten" the nested XML into CSV fields, you can try UpdateRecord, JoltTransformRecord, etc. Each record-based processor does different transformations, but in the end they can all convert from XML to CSV, you just have to tell the right processor what to do. If you provide more information here (sample input and output, e.g.) I can try to help get you going.
... View more
04-19-2019
02:27 AM
The hard part here is that Hive returns STRUCT columns as JSON strings, so even if we can parse the JSON, we've lost the type information. It's possible we can retrieve it from the metadata and (if so) create a nested record from the results. Please feel free to file a Jira for this enhancement.
... View more