Member since
06-26-2015
509
Posts
136
Kudos Received
114
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
1284 | 09-20-2022 03:33 PM | |
3781 | 09-19-2022 04:47 PM | |
2247 | 09-11-2022 05:01 PM | |
2332 | 09-06-2022 02:23 PM | |
3629 | 09-06-2022 04:30 AM |
02-10-2022
03:33 PM
Which version of NiFi are you using?
... View more
02-09-2022
09:26 PM
Hi, @Sam2020 , Run the following and try again: mv \ /opt/cloudera/parcel-repo/CDH-7.1.7-1.cdh7.1.7.p74.21057765-el7.parcel.sha1 \ /opt/cloudera/parcel-repo/CDH-7.1.7-1.cdh7.1.7.p74.21057765-el7.parcel.sha chown cloudera-scm:cloudera-scm/opt/cloudera/parcel-repo/* Please let me know if this helped. Cheers, André
... View more
02-09-2022
08:11 PM
1 Kudo
You can use a QueryRecord processor before the PutDatabaseRecord. You can add a relation to the QueryRecord processor with the following associated query: select "field one" as field_one, "field two" as field_two, "field three" as field_three from flowfile In the query above you can reference one field names using double-quotes if they have spaces. You can specify an alias for that column, which is the field name that will be used in the output. Cheers, Andre
... View more
02-09-2022
07:57 PM
Use a ConvertRecord processor with a JsonTreeReader as the record reader and a CSVRecordSetWriter as the record writer. Configure the CSVRecordSetWriter with the following: HTH, André
... View more
02-09-2022
07:49 PM
1 Kudo
The alternative your mentioned of using GenerateFlow file is the way to go here.
... View more
02-09-2022
07:47 PM
@hjfigueira , I ran a test with your schema and data and it worked without problems for me. I think there's something else in your environment that hasn't been described here that's contributing to the problem. André
... View more
02-09-2022
02:46 AM
Hi, @LejlaKM , Here's some guidelines that should help with performance: On the database side, ensure that both tables (source and target) have a primary key enforced by an index to guarantee uniqueness and delete performance GenerateTableFetch Columns to Return: specify only the primary key column (if the primary key is a composite key, specify the list of key columns separated by commas) Maximum-value Columns: If the primary key increases monotonically, list the primary key column in this property too. If not, you should look for an alternative column that increases monotonically to specify here. Without this performance can be terrible. Column for Value Partitioning: same as for "Maximum-value Columns" Please let me know if this helps. Regards, André
... View more
02-09-2022
12:57 AM
Can you please provide the full configuration of PutDatabaseRecord?
... View more
02-08-2022
10:51 PM
2 Kudos
The page Connecting Kafka clients to Data Hub provisioned clusters in the Cloudera documentation explains how to configure the Kafka Console Consumer to connect to a Kafka cluster in DataHub.
In this article, I will expand that explanation for a generic Java client and will show how to configure the client to use Schema Registry for the storage and retrieval of schemas.
A full version of the code in this article can be found in GitHub.
Client initialization
The initialization of the KafkaConsumer or KafkaProducer client is done simply by passing a Properties object to its constructor. The Properties object can be either initialized programmatically or loaded from a properties file.
Example:
Properties props = new Properties(); props.put("bootstrap.servers", "kafka-1.example.com:9093"); ... initialize other properties ... KafkaConsumer<String, Transaction> consumer = new KafkaConsumer<>(props);
The initialization of a KafkaProducer is identical to the above.
This consumer reads a key of type String and a value of type Transaction from each Kafka message. Transaction is an Avro object, which can be transparently serialized and deserialized with the help of the KafkaAvroSerializer and KafkaAvroDeserializer, respectively, as we will see later in this article.
Common properties
The properties listed in this section are common to both the KafkaConsumer and KafkaProducer.
The properties below specify the address of the cluster and the security settings used for authentication.
bootstrap.servers=<kafka-broker-fqdn>:9093 security.protocol=SASL_SSL sasl.mechanism=PLAIN ssl.truststore.location=<TRUSTSTORE_PATH> sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<USERNAME>" password="<PASSWORD>";
The bootstrap.servers property can take a list of broker addresses, separated by comma. The broker address should always use the broker's fully qualified domain name (FQDN) and have a port number.
If you are running this client in one node of your CDP Public Cloud cluster, the truststore path can be set to /var/lib/cloudera-scm-agent/agent-cert/cm-auto-global_truststore.jks. If you are running the client outside of the cluster, copy the truststore from one of the cluster hosts to the machine where the client will run and specify its location in the configuration.
The username specified in the sasl.jaas.config property must be a CDP Public Cloud user and its password is the Workload Password that the user can set in the CDP Console.
In order to connect to Schema Registry, you must also set the properties below:
schema.registry.url=<SCHEMA_REGISTRY_ENDPOINT> schema.registry.auth.username=<USERNAME> schema.registry.auth.password=<PASSWORD>
If schema.registry.url is not set, the client will not try to connect to Schema Registry.
If this property is set, the username and password ones must also be configured.
The Schema Registry Endpoint can be found in the CDP Console, under the Endpoints tab of your DataHub Kafka cluster:
Kafka-Avro Serializer and Deserializer
Cloudera provides Avro-specific serializer and deserializer classes for use with Kafka clients. This makes it really easy to write Avro objects to Kafka and read them later, since the serialization and deserialization are handled transparently.
These classes are Schema Registry-aware. When configured with the Schema Registry service details, the serializer will ensure that the schema used for serializing the messages that are sent to Kafka is a registry in Schema Registry. It will also add the schema's identifier from Schema Registry to the message payload, so that the consumer will know what's the correct schema version to fetch.
When the consumer reads those messages, the deserializer will fetch that schema using the identifier from the message and will use the schema to correctly deserialize the payload.
You must set the value.serializer and value.deserializer properties of your clients to use these classes to be able to handle the Avro messages correctly using Schema Registry, as shown as follows:
Producer properties:
key.serializer=org.apache.kafka.common.serialization.StringSerializer value.serializer=com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroSerializer
Consumer properties:
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer value.deserializer=com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroDeserializer specific.avro.reader=<true OR false>
The specific.avro.reader property is optional (defaults to false). It specifies whether the deserialized object should be an Avro GenericRecord (false) or an Avro specific object (true).
Other properties
Kafka consumers and producers accept many other properties, as listed in the Apache Kafka documentation. You should complete the configuration of your consumer and producer clients by setting the necessary/appropriate properties for them.
The KafkaConsumer, for example, requires at least one more property to be set:
group.id=<my-consumer-group-name>
Producing Avro messages to Kafka
Once the clients have been configured as per above, producing data to a topic is as simple as instantiating an Avro object and using the producer to send it to Kafka:
Transaction transaction = new Transaction(); // set transaction properties here if required ProducerRecord<String, Transaction> record = new ProducerRecord<>("my-topic", transaction); producer.send(record, new ProducerCallback());
Consuming Avro messages from Kafka
Consuming messages from Kafka is as simple as:
consumer.subscribe(Collections.singletonList("my-topic")); while (true) { ConsumerRecords<String, Transaction> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, Transaction> record : records) { Transaction transaction = record.value(); System.out.println(transaction); } }
Interacting with Schema Registry explicitly
Some applications may need to connect directly to Schema Registry to interact with it. For example, to retrieve or store a schema.
A Schema Registry API client is also provided by Cloudera for these cases. For example, to retrieve the latest version of a schema you can use the following:
Map<String, Object> config = new HashMap<>(); config.put("schema.registry.url", "<SCHEMA_REGISTRY_ENDPOINT>"); config.put("schema.registry.auth.username", "<USERNAME>"); config.put("schema.registry.auth.password", "<PASSWORD>"); String topicName = "my-topic"; SchemaRegistryClient client = new SchemaRegistryClient(config); try { SchemaVersionInfo latest = client.getLatestSchemaVersionInfo(topicName); System.out.println(latest.getSchemaText()); } catch (SchemaNotFoundException e) { LOG.info("Schema [{}] not found", topicName); throw e; }
... View more
02-08-2022
08:02 PM
We don't have an Academic version, but certain products, like the Cloudera Streaming Analytics stack, have a Community Edition. https://docs.cloudera.com/csa-ce/1.6.0/installation/topics/csa-ce-installing-ce.html Regards, André
... View more