Member since
07-21-2020
29
Posts
5
Kudos Received
3
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
407 | 03-21-2022 05:11 AM | |
2616 | 04-22-2021 12:21 PM | |
967 | 02-08-2021 11:44 PM |
10-04-2022
12:47 PM
Hello, on our system CDP 7.1.7, we have use konfiguration parameter kafka.properties_role_safety_valve add set attribute sasl.kerberos.principal.to.local.rules to map ActiveDirectory principals to entities created in ranger. In our system, the AD user have a prefix e.g. xjohndoe@SAMPLE.COM maps to a ranger entity "johndoe" During a spark-submit (over yarn), we also need to pass a principal, however as there is no such mapping, we obtain an error saying the unix user "xjohndoe" does not exist. This is true indeed, we eed to map it to "johndoe". Ist there any possibility to map principals during spark-spark-submit possibly similarly to sasl.kerberos.principal.to.local.rules in kafka or any other possibility? Best regards Jaro
... View more
Labels:
05-06-2022
06:25 AM
Hello, is it possible to alter compression.type on a kafka topic while having applications on the topic running? What should be expected hard drive saving factor for text records like json (2KB size) and compression.type=gzip. In CM, I see a global setting. Does it apply to KafkaMirrorMaker only? Are producer and consumer applications somehow affected, when the global parameter changes. Best regards Jaro
... View more
Labels:
- Labels:
-
Apache Kafka
03-21-2022
05:11 AM
I have eventually solved the issue. The Writter property "Schema Access Strategy" was misconfigured. It must be set to "Use 'Schema Text' Property" to really apply the given schema .
... View more
03-21-2022
04:09 AM
Hello, I have a problem with converting json to csv.
I use ConvertRecord processor with attached JsonTreeReader and CSVRecordSetWriter. The schema for both might look like this. Attributes in the json are optional.
{
"type": "record",
"name": "some_event",
"fields": [
{ "name": "date_time", "type": ["string","null"] },
{ "name": "id", "type": ["string","null"] },
{ "name": "is_success", "type": ["string","null"] },
{ "name": "username", "type": ["string","null"] },
{ "name": "username_type", "type": ["string","null"] }
]
}
for the input json like
{
"date_time": "2017-01-29T13:42:03.965Z",
"id": "20049015990001584400001"
"is_success": "TRUE",
}
where username and username_type is not set, the output does not correspond to my expectation
I get 2022-01-29T13:42:03.965Z|20049015990001584400001|true instead of 2022-01-29T13:42:03.965Z|20049015990001584400001|true|| which should be the right format. Note the separator for null values at the end of the second record.
Do you have some hints, how to solve it? It should be sort of standard task.
The CSVRecordSetWriter is configured like this:
We use
Cloudera Flow Management (CFM) 2.0.4.3 1.11.4.2.0.4.3-1 built 01/06/2021 23:14:00 CET
Best regards Jaro
... View more
Labels:
- Labels:
-
Apache NiFi
-
Cloudera DataFlow (CDF)
03-03-2022
10:54 AM
Hi,
I see the in Cloudera Manager in Kafka Status the diagram for "Total Bytes Received Across Kafka Brokers". What does it mean exactly? Is the pure input without replication, or is replication counted on it and the pure input would be only e.g. 1/3 for replication factor 3
Best regards
Jaro
... View more
Labels:
- Labels:
-
Apache Kafka
-
Cloudera Manager
02-10-2022
05:26 AM
Hello, a question regarding application design in NiFi. We have a REST resource which provides data changes since a unix timestamp, which is passed inside a json structure to be sent as http POST. The major question is how and where to keep the state of the last unix timestamp and how this state is accessible to operation. We should be able to reset the timestamp to any value to repeat calls. Also we should be able set increment, which es added to the timestmap in the next call. Similary, we have a related issue ina use case, which access a database. We generate a query in GenerateTableFetch and set the property "Maximum-value Columns". Here the state is maintened by the processor, but it pretty much unclear how to set the state to specific stating value. To clear the state is obviously not appropriate. Best regards Jaro
... View more
Labels:
- Labels:
-
Apache NiFi
11-22-2021
03:56 AM
Hello, after some operation in NiFi GUI (parameter value change in a parameter context), the GUI got sort of stuck. A controller service show status disabled in GUI, however, I cannot enable or delete a controller service, as the NiFi claims it is running on a node. Do you have some Idea how to get rid of this situation? Best regards Jaro
... View more
- Tags:
- NiFi
Labels:
- Labels:
-
Apache NiFi
10-05-2021
10:07 AM
Hi, This sounds promissing. Actually, I went accross principal mapping rules recently, but was quite unclear about the implications. The fact is, the "some_user" is no posix, or LDAP user or what so ever. It exists only as certificate. Also the user identifier in Ranger is like "OU=Dempt,O=Company,...". this is, how colleagues of mine have set up the policies. Does your assumption mean, that every single client certificate should be backed by a posix user? And what, if the user is an external party accessing the broker remotelly? Best regards Jaro
... View more
10-04-2021
02:11 AM
Hello, we observe lot of annoying messages in kafka logs like this. 2021-09-01 17:09:47,435 WARN org.apache.hadoop.security.ShellBasedUnixGroupsMapping: unable to return groups for user OU=Dept,O=Company,C=DE,ST=Germany,CN=some_user PartialGroupNameException The user name 'OU=Dept,O=Company,C=DE,ST=Germany,CN=some_user' is not found. id: OU=Dept,O=Company,C=DE,ST=Germany,CN=some_user: no such user id: OU=Dept,O=Company,C=DE,ST=Germany,CN=some_user: no such user at org.apache.hadoop.security.ShellBasedUnixGroupsMapping.resolvePartialGroupNames(ShellBasedUnixGroupsMapping.java:291) at org.apache.hadoop.security.ShellBasedUnixGroupsMapping.getUnixGroups(ShellBasedUnixGroupsMapping.java:215) at org.apache.hadoop.security.ShellBasedUnixGroupsMapping.getGroupsSet(ShellBasedUnixGroupsMapping.java:123) at org.apache.hadoop.security.Groups$GroupCacheLoader.fetchGroupSet(Groups.java:413) at org.apache.hadoop.security.Groups$GroupCacheLoader.load(Groups.java:351) at org.apache.hadoop.security.Groups$GroupCacheLoader.load(Groups.java:300) at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3529) at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2278) at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2155) at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2045) at com.google.common.cache.LocalCache.get(LocalCache.java:3953) at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3976) at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4960) at org.apache.hadoop.security.Groups.getGroupInternal(Groups.java:258) at org.apache.hadoop.security.Groups.getGroupsSet(Groups.java:230) at org.apache.hadoop.security.UserGroupInformation.getGroupsSet(UserGroupInformation.java:1760) at org.apache.hadoop.security.UserGroupInformation.getGroupNames(UserGroupInformation.java:1726) at org.apache.ranger.audit.provider.MiscUtil.getGroupsForRequestUser(MiscUtil.java:587) at org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer.authorize(RangerKafkaAuthorizer.java:155) at org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer.authorize(RangerKafkaAuthorizer.java:135) at kafka.security.authorizer.AuthorizerWrapper$$anonfun$authorize$1.apply(AuthorizerWrapper.scala:52) at kafka.security.authorizer.AuthorizerWrapper$$anonfun$authorize$1.apply(AuthorizerWrapper.scala:50) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at kafka.security.authorizer.AuthorizerWrapper.authorize(AuthorizerWrapper.scala:50) at kafka.server.KafkaApis.filterAuthorized(KafkaApis.scala:2775) at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:639) at kafka.server.KafkaApis.handle(KafkaApis.scala:128) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:75) at java.lang.Thread.run(Thread.java:748) OU=Dept,O=Company,C=DE,ST=Germany,CN=some_user (values changed) is a self-signed certificate used as a client certificate for TLS based kafka connection. The certificate exists, the name matches, and it was created accordingly in Ranger. The connection works actually. We still don't undertstand where these warnings come from. Cloudera Enterprise 7.2.4 Best Regards Jaro
... View more
Labels:
- Labels:
-
Apache Kafka
05-13-2021
09:17 AM
Hello, I struggle on a dependency issues in spark, now. Being new in spark, I hope there is a simple remedy. The question is, is there any mechanism how to separate dependencies of the spark engine and dependencies of a application. Example: The latest version of spark-core_2.12 (3.1.1, March 2021) depends on hadoop-client (3.3.0, March 2020) which itself depends on hadoop-common (3.3.0, July 2020) which finally depends on an antient version of gson (2.2.4, May 2013). You can easily find many other examples, e.g. commons-codec, protobuf-java ... So, what if your application, basically a library developed outside spark, depends on the latest (no longer compatible) version of gson 2.8.6? My obviously naive approach to start a spark application ends in runtime incompatibility clashes (e.g. with gson) Best regards Jaro
... View more
- Tags:
- dependecies
- Spark
Labels:
- Labels:
-
Apache Hadoop
-
Apache Spark
04-22-2021
10:21 AM
Hello, while attempting to start ListenHTTP (port=10000) on a fresh cluster, I obtain the following error. Do you know what to change to get it work? ListenHTTP[id=6222ab5d-0177-1000-ffff-ffffaeaba266] Failed to properly initialize Processor. If still scheduled to run, NiFi will attempt to initialize and run the Processor again after the 'Administrative Yield Duration' has elapsed. Failure is due to java.io.IOException: Failed to bind to 0.0.0.0/0.0.0.0:10000: java.io.IOException: Failed to bind to 0.0.0.0/0.0.0.0:10000 Cloudera Flow Management (CFM) 2.0.4.0 1.11.4.2.0.4.0-80 built 09/27/2020 09:52:49 CEST Tagged nifi-1.11.4-RC1 Best regards Jaro
... View more
Labels:
- Labels:
-
Apache NiFi
04-12-2021
01:44 AM
Hello, we see the following error on UpdateAttribute processor Failed to persist list of Peers due to java.io.IOException: All Partitions have been blacklisted due to failures when attempting to update. If the Write-Ahead Log is able to perform a checkpoint, this issue may resolve itself. Otherwise, manual intervention will be required.; if restarted and the nodes specified at the RPG are down, may be unable to transfer data until communications with those nodes are restored Similar issue was reported some some years ago: https://community.cloudera.com/t5/Support-Questions/Caused-by-java-io-IOException-All-Partitions-have-been/td-p/230571 but in our case the processor only updates a sequence number,thus the pattern does not apply. How to recover from this situation? Cloudera Flow Management (CFM) 2.0.4.0 1.11.4.2.0.4.0-80 built 09/27/2020 09:52:49 CEST Best regards Jaro
... View more
Labels:
- Labels:
-
Apache NiFi
02-10-2021
08:29 AM
1 Kudo
Hello, I try to work out some automated deployment situation in NiFi (no UI usage, just cli toolkit or alternatives). Let us asume, you have a nifi flow under a load which need to be updated. There are some data in in-between connection queues. What happens when you try to update the flow? Will some data be lost or will the update be refused? Are there any solution for procedure like this. 1) stop the data receiving processor 2) check if the success connection queue is empty 3) apply steps 1 and 2 to the next processor till the last processor is stopped. 4) update the flow ignoring any data in failure or error connection queues Do you think it is reasonable approach? Is there something ready to use? Best regards Jaro
... View more
Labels:
- Labels:
-
Apache NiFi
02-08-2021
11:44 PM
1 Kudo
OK,, I got it. I missed a parameter. It should be nifi set-param -u http://localhost:8080 -pcid 828f0882-0177-1000-aabd-d680a070c5c5 -pn sample.URL -pv newvalue
... View more
02-08-2021
12:27 PM
Hello, I have tried to use nifi cli toolkit (nifi 1.11.4) to set parameter like this. However, I can't see any effect. What have I doing wrong? #> nifi get-param-context -u http://localhost:8080 -pcid 828f0882-0177-1000-aabd-d680a070c5c5 # Name Value Sensitive Description - ------------- ------------- --------- ----------- 1 sample.Passwd ******** true 2 sample.Schema SAMPLE001 false 3 sample.URL jdbc://sample false 4 sample.User dbsample false #> nifi set-param -u http://localhost:8080 -pcid 828f0882-0177-1000-aabd-d680a070c5c5 -pn sample.URL newvalue Waiting for update request to complete... #> nifi get-param-context -u http://localhost:8080 -pcid 828f0882-0177-1000-aabd-d680a070c5c5 # Name Value Sensitive Description - ------------- ------------- --------- ----------- 1 sample.Passwd ******** true 2 sample.Schema SAMPLE001 false 3 sample.URL jdbc://sample false 4 sample.User dbsample false Best regards Jaro Jaro
... View more
Labels:
- Labels:
-
Apache NiFi
02-07-2021
11:55 AM
Hello, I have a question regarding the concept of parameter context. The admin guide says : A Process Group can only be assigned one Parameter Context, while a given Parameter Context can be assigned to multiple Process Groups. OK, let's assume you have internal kafka or database, and some external system you connect to. You might want to have a parameter "group" for the internal kafka connection, and another parameter contexts for each external system connection. However, your flow, clearly a part a process group, reads data from external system and stores data internally. Thus you need actually 2 connections in the flow, it means 2 parameter contexts, which is not possible. To solve that, you assign the connections (controller services) to the parent group and the parameters. Finally, for connections which are common to many flows, like your kafka, databases, and HttpSSL you land in the parent NiFi Flow, having parameter context which contains a bunch of pretty unrelated parameters (for kafka, databases, ..). What is see, there is hardly a possibility to reuse parameter context as they cannot be combined and thus you have 1:1 relationship between par. context and group just like for variables assigned to a processor group. Did I miss something? What is actually the big advantage of parameters over variables? Best regards Jaro
... View more
Labels:
- Labels:
-
Apache NiFi
12-04-2020
05:09 AM
Hello, Does nipyapi package works with windows 10 (64bit)? I have tried to install nipyapi on two machines, one with winpython (manual installation of downloaded packages) another with anaconda (installation with pip). Both failed, obviously due a number of missing obscure dependencies like libxml2. I have only a little knowledge, but the installation looked like a tough stuff under windows 10. Is nipyapi recommended at all or are there better alternatives? Best regards Jaro
... View more
Labels:
- Labels:
-
Apache NiFi
09-24-2020
12:59 PM
Hi, @MattWho we actually already implemented prototypes of such custom processors and the basic principles are clear. By processor logic, I did not mean the structure of the processor like relationships. The structure is static in our case. By the processor logic, I refer, the code which is called in "onTrigger" in "StreamCallback.process". In my case, the logic can be parametrized by some configuration data (These configuration data is provided by a service or database tables). The configuration data is static during the whole processor run and must be provided during instantiation of the business logic object. You might see it as lookup, which must not happen during the processing "onTrigger", but "onScheduled" (sorry I wrote in "init" previously, that was not right)
Best regards Jaro
... View more
- Tags:
- apache nifi
09-24-2020
03:58 AM
Hello, we consider to implement a custom processor for some complex transformation logic on kafka streams. The flow is straightforward : input topic -> our processor -> output topics The processor logic needs to be initialized by data provided through a rest service and/or database records. What is the NiFi way to implement the initialization? As we implement the processor, we can do lots of stuff in e.g. init method, however, we would prefer to use NiFi infrastructure reasonably. Could you please recommend some examples, blogs etc. for similar solution. Best regards Jaro
... View more
Labels:
- Labels:
-
Apache NiFi
09-15-2020
09:51 AM
1 Kudo
Actually, both replies can be considered as valid. I confirmed that one, which better fits to my use case.
... View more
09-08-2020
12:26 AM
1 Kudo
It sounds like your testing solution is exceeding the inbound capabilities of the flow tuning (nifi config, processor/queue config) Correct assessment. It has showed that the pipeline was not properly sized for the amount of data, which lead to a back-pressure in the ingest component
... View more
08-28-2020
08:12 AM
Well, EvaluateJsonPath sounds promising but it looks like that EvaluateJsonPath is somehow limitated. It is unable to extract json objects, only single attributes, which makes it impossible to use for slightly more complex data. Let's suppose this. Input: { whatever known or unknown json nested structures here } Desired output { "nifi-filename": "2c8a7845-8567-4eed-97ee-4dcd1c18668e", "nifi-content": { whatever known or unknown json possibly structures or sub-structures of the input message here } }
... View more
- Tags:
- apache nifi
08-27-2020
02:13 AM
Hello, I am puzzelling for some time if there is possibility to refer the both attributes and content of flow file and transform the content by combining both. Example: Assume a flow file with uuid=2c8a7845-8567-4eed-97ee-4dcd1c18668e and the json content { "data":"some business data" } The result of the transformation should be a content like { "nifi-filename": "2c8a7845-8567-4eed-97ee-4dcd1c18668e", "nifi-content": { "data": "some business data" } } How can I reach that.? Best regards Jaro
... View more
Labels:
- Labels:
-
Apache NiFi
08-14-2020
04:51 AM
Hello, I tested ListenHTTP on a single node under some load generated by jmeter. I observed that for the one or two client threads the ListenHTTP works fine, however, having 10 client threads leads to 60% request refused by 503,Service Unavailable. Is there any parameter, which helps to come accross the issue? Also the component does not indicate any problems in the GUI. Best regards Jaro
... View more
- Tags:
- listenhttp
- NiFi
Labels:
- Labels:
-
Apache NiFi
07-21-2020
04:08 AM
1 Kudo
Hi, being pretty new to Nifi, I am strugeling to extract interface monitoring data Assuming a very simple ingest flow like: HttpListener -> KafkaPublisher How can I get throughput information, i.e. number of records put (e.g.) every minute to a topic? There is NifiSummary -> Processors -> Status History, which might be useful. Can the statistics be accessed programatically and how? Best regards Jaro
... View more
Labels:
- Labels:
-
Apache NiFi