Member since
03-20-2019
15
Posts
4
Kudos Received
1
Solution
My Accepted Solutions
Title | Views | Posted |
---|---|---|
1083 | 11-05-2021 06:09 AM |
09-12-2024
04:11 PM
1 Kudo
Trying to bring this back up to see if anyone has a suggestion? Thank you.
... View more
12-19-2023
01:03 PM
I have a nifi flow to read from 1 table and write to another. But when it inserts it thows the error: PutCassandraRecord[id=bf945542-ef3f-1b23-0000-0000463e16ae] Unable to write the records into Cassandra table due to com.datastax.driver.core.exceptions.InvalidQueryException: You must use conditional updates for serializable writes
- Caused by: com.datastax.driver.core.exceptions.InvalidQueryException: You must use conditional updates for serializable writes: {} putCassandraRecord processor: I have tried both serial and local_serial, and none, but all give same error. I am able to run this if i use putCassandraQL but that means i also have to have a EvaluateJsonPath and a ReplaceText to create the insert statement. I am trying to make it more efficient by using the record aware processor. Any help would be appreciated. Thank you!
... View more
Labels:
- Labels:
-
Apache NiFi
11-29-2022
01:36 PM
I found that the schema was defined incorrectly causing an issue. It should be From this "name": "maxApprovedAmount",
"type": "bytes",
"logicalType": "decimal",
"precision": 4,
"scale": 4 to this: "name": "maxApprovedAmount",
"type": {type: "bytes",
"logicalType": "decimal",
"precision": 4,
"scale": 4}
... View more
11-29-2022
09:01 AM
I am still unable to see how to achieve this. If anyone has any suggestions, I am open to it. Thanks!
... View more
11-28-2022
12:24 PM
Hi, There is a kafka producer converting decimals to bytes using this: import java.math.BigDecimal;
import java.math.RoundingMode;
import java.nio.ByteBuffer;
import org.apache.avro.Conversion;
import org.apache.avro.Conversions;
import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
import org.mapstruct.Mapper;
@Mapper
public class BigDecimalMapper {
public ByteBuffer bigDecimalToByteBuffer(BigDecimal bigDecimal) {
if (bigDecimal == null) return null;
Conversion<BigDecimal> conversion = new Conversions.DecimalConversion();
LogicalType type = LogicalTypes.decimal(15, 4);
BigDecimal scaledBigDecimal = bigDecimal.setScale(4, RoundingMode.HALF_UP);
// Schema parameter is not used by Conversions so pass as null
return conversion.toBytes(scaledBigDecimal, null, type);
}
We are using the confluent schema registry and this is the schema: { "type": "record", "namespace": "event", "name": "Event", "fields": [ { "name": "maxApprovedAmount", "type": "bytes", "logicalType": "decimal", "precision": 4, "scale": 4 }] } In Nifi I am using ConsumeKafkaRecord_2_6. using AvroReader and AvroRecordSetWriter. In the reader I am using the schema from confluent. On the writer I am using Inherent Record Schema. When viewing the data in view flow file and selecting formatted, i see the value as: "maxApprovedAmount" : { "bytes" : "\u0001}x@" }, Trouble is how to get the actual value and not this. I have tried multiple changes to my avro writer, and I have read other posts on similar issues but not able to get this resolved. I believe it has to with the lack of support for Logical types. I am even up to write a script in jython to do the conversion if it will work. Any help would be greatly appreciated!
... View more
Labels:
- Labels:
-
NiFi Registry
-
Schema Registry
03-08-2022
05:25 AM
Hi, Thank you for your response. That is helpful. But it turns out I am not allowed to mark the pwd context Parameter as non-sensitive for the reasons you outlined. It sounds like to use the context parameter to call a soap service secured by WS Security I must create a custom processor. I want to ask if you know of any other possible solutions to call the service without compromising the security. Thank you!
... View more
03-07-2022
08:42 PM
Hi, I need to call a soap service that is secured with WS-Security username token authentication and not basic auth. I also need to rely on the Parameter Context value for the id and password. I am using replace text to crate the soap request with the http header. When I try to reference the password it says I cannot due to the fact its sensitive and Sensitive Parameters may only be referenced by Sensitive Properties. I understand basic auth and WS-Security are different types of security. I do not see how to build/populate the security header using my value from the parameter context. Any help would be appreciated. Thank you!
... View more
Labels:
- Labels:
-
Apache NiFi
-
Security
11-05-2021
06:09 AM
1 Kudo
After adding the jar to the consumeKafkaRecord NAR and bouncing the servers, I am no longer receiving the error. Now I am able to authenticate to Kafka using the GSSAPI SASL mechanism.
... View more
11-04-2021
07:43 AM
I am requesting now the nar for the ConsumeKafkaRecord be updated to include my new jar. Not sure if that is the only way for the processor to pick it up.
... View more
11-03-2021
07:28 PM
I am using the ConsumeKafkaRecord_2_6 processor to consume from kafka. I am trying to use a custom LoginModule. I have both a custom LoginModule and a custom CallbackHandler. I put my two classes in a jar then loaded the jar on the Nifi system. When the processor starts, I get this error: Caused by: javax.security.auth.login.LoginException: unable to find LoginModule class: com.eek.my.auth.krb.SauKrbLoginModuleWrapper. I am not sure how to add the jar so that the processor will find my classes. The dynamic property I am adding to reference the custom class is sasl.jaas.config. So the value of that looks like this: sasl.jaas.config=com.eek.my.auth.krb.SauKrbLoginModuleWrapper required krbProvider=com.sun.security.auth.module.Krb5LoginModule;
... View more
Labels:
- Labels:
-
Apache Kafka
-
Apache NiFi
-
Kerberos
-
Security