Support Questions

Find answers, ask questions, and share your expertise

Nifi not able to convert byte buffer avro Decimal Logical type

avatar
Contributor

Hi, 

There is a kafka producer converting decimals to bytes using this:

import java.math.BigDecimal;
import java.math.RoundingMode;
import java.nio.ByteBuffer;
import org.apache.avro.Conversion;
import org.apache.avro.Conversions;
import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
import org.mapstruct.Mapper;

@Mapper
public class BigDecimalMapper {

  public ByteBuffer bigDecimalToByteBuffer(BigDecimal bigDecimal) {
    if (bigDecimal == null) return null;

    Conversion<BigDecimal> conversion = new Conversions.DecimalConversion();
    LogicalType type = LogicalTypes.decimal(15, 4);
    BigDecimal scaledBigDecimal = bigDecimal.setScale(4, RoundingMode.HALF_UP);

    // Schema parameter is not used by Conversions so pass as null
    return conversion.toBytes(scaledBigDecimal, null, type);
  }

We are using the confluent schema registry and this is the schema:

{
"type": "record",
"namespace": "event",
"name": "Event",
"fields": [
{
"name": "maxApprovedAmount",
"type": "bytes",
"logicalType": "decimal",
"precision": 4,
"scale": 4
}]
}

In Nifi I am using ConsumeKafkaRecord_2_6. using AvroReader and AvroRecordSetWriter. 
In the reader I am using the schema from confluent. On the writer I am using Inherent Record Schema. When viewing the data in view flow file and selecting formatted, i see the value as: 

 
"maxApprovedAmount" : {
"bytes" : "\u0001}x@"
},

Trouble is how to get the actual value and not this. I have tried  multiple changes to my avro writer, and I have read other posts on similar issues but not able to get this resolved. I believe it has to with the lack of support for Logical types. I am even up to write a script in jython to do the conversion if it will work. Any help would be greatly appreciated!

2 REPLIES 2

avatar
Contributor

I am still unable to see how to achieve this. If anyone has any suggestions, I am open to it. Thanks!

 

avatar
Contributor

I found that the schema was defined incorrectly causing an issue. It should be 

From this 
 "name": "maxApprovedAmount",
            "type": "bytes",
            "logicalType": "decimal",
            "precision": 4,
            "scale": 4
to this:
 "name": "maxApprovedAmount",
            "type": {type: "bytes",
            "logicalType": "decimal",
            "precision": 4,
            "scale": 4}