Created 05-12-2018 01:07 PM
I have a csv file of the format -
1512340821, 26,576.09, 39824, 989459.009.. and so on 35 total fields
Each of these columns is a long or double format in avro format. Now I have used the Convert Record processor in nifi, which forst converts or uses avro schema and then produces the json format data.
My goal is to have data coming out of json format like the following
{"timestamp":"1512312024","tupvalues":[112.5,0.0,1872296.0,134760.0,0.0,7134.0,19.0,0.0,0.0,3.8136152E7,13703.0,18.0,111.0,37.0,1.38252288E8,1.91762842E9,5.9564032E7,4055040.0,0.0,1.41528269E9,0.0,8.0539648E7,9.5470592E7,0.0,2.0,0.76,0.44,0.0,0.0,2.0,2.0,0.0,0.0,0.0,0.0]}
My original data does not have headers, but I expect the data to be of the format {key, value} where key is the timestamp and values are the other column numbers.
Here is the avro schema that i put in the avro registry
{
"type": "record",
"namespace": "testavro.schema",
"name": "test",
"fields": [
{
"type": "double",
"name": "timestamp"
},
{
"name": "tupvalues",
"type" : {
"type": "array",
"items": "double"
}
}
]
}
I used this website -- "
https://json-schema-validator.herokuapp.com/avro.jsp" to check the conversion and it reads success. But when applied in the avro registry, the data is not picked. I get an error of the following order-- "Cannot create value [26] of type java.lang.string to object array for field tupvalues.
Any sort of help is appreciated. I am a newbie to avro schema writing, I have a feeling thats where I am wrong.
Created 05-12-2018 07:26 PM
The schema is a valid Avro schema, but the problem is that the CSVReader is taking only the second field (a double) and trying to convert it into an array. It does not realize you'd like to convert the remaining column values into an array. This seems like a decent improvement for the CSVReader, I'll think it over and perhaps write an Improvement Jira for it (or feel free to write one yourself).
Since you want JSON as the target, you can use JoltTransformJSON to get the output you want. If it were an arbitrary output format, you could use UpdateRecord, but you'd have 35 or so user-defined fields to put the individual column values into the array. In any case, you can use this schema to parse the column names (whether or not you have a header line, just make sure you skip any header line if using this schema):
{ "type": "record", "namespace": "testavro.schema", "name": "test", "fields": [ { "name": "timestamp", "type": {"type": "long","logicalType": "timestamp-millis"} }, { "name": "tupvalue0", "type" : "double" }, { "name": "tupvalue1", "type" : "double" }, { "name": "tupvalue2", "type" : "double" }, { "name": "tupvalue3", "type" : "double" }, { "name": "tupvalue4", "type" : "double" }, { "name": "tupvalue5", "type" : "double" }, { "name": "tupvalue6", "type" : "double" }, { "name": "tupvalue7", "type" : "double" }, { "name": "tupvalue8", "type" : "double" }, { "name": "tupvalue9", "type" : "double" }, { "name": "tupvalue10", "type" : "double" }, { "name": "tupvalue11", "type" : "double" }, { "name": "tupvalue12", "type" : "double" }, { "name": "tupvalue13", "type" : "double" }, { "name": "tupvalue14", "type" : "double" }, { "name": "tupvalue15", "type" : "double" }, { "name": "tupvalue16", "type" : "double" }, { "name": "tupvalue17", "type" : "double" }, { "name": "tupvalue18", "type" : "double" }, { "name": "tupvalue19", "type" : "double" }, { "name": "tupvalue20", "type" : "double" }, { "name": "tupvalue21", "type" : "double" }, { "name": "tupvalue22", "type" : "double" }, { "name": "tupvalue23", "type" : "double" }, { "name": "tupvalue24", "type" : "double" }, { "name": "tupvalue25", "type" : "double" }, { "name": "tupvalue26", "type" : "double" }, { "name": "tupvalue27", "type" : "double" }, { "name": "tupvalue28", "type" : "double" }, { "name": "tupvalue29", "type" : "double" }, { "name": "tupvalue30", "type" : "double" }, { "name": "tupvalue31", "type" : "double" }, { "name": "tupvalue32", "type" : "double" }, { "name": "tupvalue33", "type" : "double" }, { "name": "tupvalue34", "type" : "double" } ] }
Your JSONRecordSetWriter can inherit the schema or use the same one from the registry. Now you can send the output to either UpdateRecord or (for this case) JoltTransformJSON, the following is a Chain spec you can use to put the tupvalues into an array:
[ { "operation": "shift", "spec": { "*": { "timestamp": "[&1].timestamp", "tupvalue*": "[&1].tupvalue[&(0,1)]" } } } ]
This should give you the desired output and works on single lines of CSV or (more efficiently for) full CSV files.
Created 05-12-2018 07:26 PM
The schema is a valid Avro schema, but the problem is that the CSVReader is taking only the second field (a double) and trying to convert it into an array. It does not realize you'd like to convert the remaining column values into an array. This seems like a decent improvement for the CSVReader, I'll think it over and perhaps write an Improvement Jira for it (or feel free to write one yourself).
Since you want JSON as the target, you can use JoltTransformJSON to get the output you want. If it were an arbitrary output format, you could use UpdateRecord, but you'd have 35 or so user-defined fields to put the individual column values into the array. In any case, you can use this schema to parse the column names (whether or not you have a header line, just make sure you skip any header line if using this schema):
{ "type": "record", "namespace": "testavro.schema", "name": "test", "fields": [ { "name": "timestamp", "type": {"type": "long","logicalType": "timestamp-millis"} }, { "name": "tupvalue0", "type" : "double" }, { "name": "tupvalue1", "type" : "double" }, { "name": "tupvalue2", "type" : "double" }, { "name": "tupvalue3", "type" : "double" }, { "name": "tupvalue4", "type" : "double" }, { "name": "tupvalue5", "type" : "double" }, { "name": "tupvalue6", "type" : "double" }, { "name": "tupvalue7", "type" : "double" }, { "name": "tupvalue8", "type" : "double" }, { "name": "tupvalue9", "type" : "double" }, { "name": "tupvalue10", "type" : "double" }, { "name": "tupvalue11", "type" : "double" }, { "name": "tupvalue12", "type" : "double" }, { "name": "tupvalue13", "type" : "double" }, { "name": "tupvalue14", "type" : "double" }, { "name": "tupvalue15", "type" : "double" }, { "name": "tupvalue16", "type" : "double" }, { "name": "tupvalue17", "type" : "double" }, { "name": "tupvalue18", "type" : "double" }, { "name": "tupvalue19", "type" : "double" }, { "name": "tupvalue20", "type" : "double" }, { "name": "tupvalue21", "type" : "double" }, { "name": "tupvalue22", "type" : "double" }, { "name": "tupvalue23", "type" : "double" }, { "name": "tupvalue24", "type" : "double" }, { "name": "tupvalue25", "type" : "double" }, { "name": "tupvalue26", "type" : "double" }, { "name": "tupvalue27", "type" : "double" }, { "name": "tupvalue28", "type" : "double" }, { "name": "tupvalue29", "type" : "double" }, { "name": "tupvalue30", "type" : "double" }, { "name": "tupvalue31", "type" : "double" }, { "name": "tupvalue32", "type" : "double" }, { "name": "tupvalue33", "type" : "double" }, { "name": "tupvalue34", "type" : "double" } ] }
Your JSONRecordSetWriter can inherit the schema or use the same one from the registry. Now you can send the output to either UpdateRecord or (for this case) JoltTransformJSON, the following is a Chain spec you can use to put the tupvalues into an array:
[ { "operation": "shift", "spec": { "*": { "timestamp": "[&1].timestamp", "tupvalue*": "[&1].tupvalue[&(0,1)]" } } } ]
This should give you the desired output and works on single lines of CSV or (more efficiently for) full CSV files.
Created 05-17-2018 12:39 PM
@Matt Burgess I was able to do this and worked perfectly. However, there is just one small request. The data that I finally receive in PUTfile, is all in one line. I tried to insert newlines after each record ends. However in PublishKafka_0_11 there is message demarcator where Shift+Ctrl is also not helping my situation.
I figured that it is because
[{"timestamp":1512.., "tupvalues":[1,2,3,4...]},
{[{"timestamp":1512.., "tupvalues":[1,2,3,4...]},
[{"timestamp":1512.., "tupvalues":[1,2,3,4...]},
[{"timestamp":1512.., "tupvalues":[1,2,3,4...]}.....]
The square bracket is right at the very end. Whereas the required output is somewhat like this:
{"timestamp":"1512312021","tupvalues":[0.8,0.0,18244.0,3176.0,0.0,122.0,11.0,0.0,0.0,100052.0,1783.0,4.0,59.0,1.0,3252224.0,1.8681856E7,2777088.0,999424.0,0.0,524288.0,0.0,487424.0,740352.0,0.0,1.0,0.04,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0]} {"timestamp":"1512312022","tupvalues":[207.8,0.2,3778460.0,309000.0,0.0,22342.0,27.0,0.0,0.0,1.06732936E8,25623.0,36.0,749.0,110.0,3.19459328E8,3.87224371E9,1.17956608E8,7110656.0,0.0,2.87654298E9,0.0,2.0957184E8,2.46372352E8,0.0,3.0,1.95,1.23,0.0,0.0,3.0,6.0,0.0,0.0,0.0,0.0]}
any suggestions? Do u think split json or split record should be now be introduced?
Created 05-17-2018 01:32 PM
In NiFi 1.7.0 (via NIFI-4456) you will be able to select that your JsonRecordSetWriter issue one JSON object per line, to give you the output you want. In this case with JoltTransformJSON you'd need a ConvertRecord after the fact, since it will issue an array if you pass in an array. As a workaround you might be able to use ReplaceText to remove the first and last square brackets, and change },{ to },\n{ to put each value on the same line.
Created 05-21-2018 04:01 AM
Hi @Matt Burgess, the output is still in one line instead of multiple lines. Even though I have tried using what you mentioned above. I have used Replace text and in place of regex : (\[)(\{\"timestamp\"\:15123[0-9]+),(\"tupvalues\"\:\[([0-9]+\.[a-zA-Z0-9-]+),([0-9]+\.[a-zA-Z0-9-]+)([0-9]+\.[a-zA-Z0-9-]+)([0-9]+\.[a-zA-Z0-9-]+)([0-9]+\.[a-zA-Z0-9-]+)([0-9]+\.[a-zA-Z0-9-]+)([0-9]+\.[a-zA-Z0-9-]+)([0-9]+\.[a-zA-Z0-9-]+)([0-9]+\.[a-zA-Z0-9-]+)([0-9]+\.[a-zA-Z0-9-]+)([0-9]+\.[a-zA-Z0-9-]+)([0-9]+\.[a-zA-Z0-9-]+)([0-9]+\.[a-zA-Z0-9-]+)([0-9]+\.[a-zA-Z0-9-]+)([0-9]+\.[a-zA-Z0-9-]+)([0-9]+\.[a-zA-Z0-9-]+)([0-9]+\.[a-zA-Z0-9-]+)([0-9]+\.[a-zA-Z0-9-]+)([0-9]+\.[a-zA-Z0-9-]+)([0-9]+\.[a-zA-Z0-9-]+)([0-9]+\.[a-zA-Z0-9-]+)([0-9]+\.[a-zA-Z0-9-]+)([0-9]+\.[a-zA-Z0-9-]+)([0-9]+\.[a-zA-Z0-9-]+)([0-9]+\.[a-zA-Z0-9-]+)([0-9]+\.[a-zA-Z0-9-]+)([0-9]+\.[a-zA-Z0-9-]+)([0-9]+\.[a-zA-Z0-9-]+)([0-9]+\.[a-zA-Z0-9-]+)([0-9]+\.[a-zA-Z0-9-]+)([0-9]+\.[a-zA-Z0-9-]+)([0-9]+\.[a-zA-Z0-9-]+)([0-9]+\.[a-zA-Z0-9-]+)\]\})(\,)
For replacement values $2, $3. Followed by Split text in order to split line by line. But the output is still the same.
I even tried the solution given by you on post https://community.hortonworks.com/questions/109064/nifi-replace-text-how-t0-replace-string-with.html
And tried substituting the expression [\[\]](\{|\}) but this gives me an output which has no square brackets in the beginning and inside the array. I know its been a week almost, but still have not got a hang of it.