Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Nifi not able to convert byte buffer avro Decimal Logical type

avatar
Contributor

Hi, 

There is a kafka producer converting decimals to bytes using this:

import java.math.BigDecimal;
import java.math.RoundingMode;
import java.nio.ByteBuffer;
import org.apache.avro.Conversion;
import org.apache.avro.Conversions;
import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
import org.mapstruct.Mapper;

@Mapper
public class BigDecimalMapper {

  public ByteBuffer bigDecimalToByteBuffer(BigDecimal bigDecimal) {
    if (bigDecimal == null) return null;

    Conversion<BigDecimal> conversion = new Conversions.DecimalConversion();
    LogicalType type = LogicalTypes.decimal(15, 4);
    BigDecimal scaledBigDecimal = bigDecimal.setScale(4, RoundingMode.HALF_UP);

    // Schema parameter is not used by Conversions so pass as null
    return conversion.toBytes(scaledBigDecimal, null, type);
  }

We are using the confluent schema registry and this is the schema:

{
"type": "record",
"namespace": "event",
"name": "Event",
"fields": [
{
"name": "maxApprovedAmount",
"type": "bytes",
"logicalType": "decimal",
"precision": 4,
"scale": 4
}]
}

In Nifi I am using ConsumeKafkaRecord_2_6. using AvroReader and AvroRecordSetWriter. 
In the reader I am using the schema from confluent. On the writer I am using Inherent Record Schema. When viewing the data in view flow file and selecting formatted, i see the value as: 

 
"maxApprovedAmount" : {
"bytes" : "\u0001}x@"
},

Trouble is how to get the actual value and not this. I have tried  multiple changes to my avro writer, and I have read other posts on similar issues but not able to get this resolved. I believe it has to with the lack of support for Logical types. I am even up to write a script in jython to do the conversion if it will work. Any help would be greatly appreciated!

2 REPLIES 2

avatar
Contributor

I am still unable to see how to achieve this. If anyone has any suggestions, I am open to it. Thanks!

 

avatar
Contributor

I found that the schema was defined incorrectly causing an issue. It should be 

From this 
 "name": "maxApprovedAmount",
            "type": "bytes",
            "logicalType": "decimal",
            "precision": 4,
            "scale": 4
to this:
 "name": "maxApprovedAmount",
            "type": {type: "bytes",
            "logicalType": "decimal",
            "precision": 4,
            "scale": 4}