Member since
06-26-2015
502
Posts
121
Kudos Received
113
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
226 | 09-20-2022 03:33 PM | |
749 | 09-19-2022 04:47 PM | |
383 | 09-11-2022 05:01 PM | |
492 | 09-06-2022 02:23 PM | |
599 | 09-06-2022 04:30 AM |
09-01-2022
02:25 AM
Would you be able to save one of these files in a file and share it with me?
... View more
08-31-2022
06:32 PM
@syntax_ , I believe you have a schema that you can use to parse your Avro data, right? Instead of using ConsumeKafka, use the ConsumeKafkaRecord processor. In that processor specify an Record Reader of type AvroReader and provide the correct schema so that the reader can properly deserialize your data. If you want to convert the data for JSON, you can then specify a JsonRecordSetWriter as the Record Writer for that processor, so that the output flowfiles will be in that format and you'll be able to inspect the content of the queues. Cheers, André
... View more
08-30-2022
03:47 PM
@Rgupta7 , Check your flows for flowfiles stuck in queues (.e.g. maybe you have connections to stopped processors or dead-end funnels and messages stay there indefinitely). Flowfiles are physically stored in bigger files that contain many flowfiles. If even one of those flowfiles are referenced by any queues, those files are never removed from disk. Cheers, André
... View more
08-30-2022
03:41 PM
@yagoaparecidoti , The opposite command doesn't exist, as far as I know. Querying the database is the only option that I'm aware. Cheers, André
... View more
08-30-2022
06:36 AM
Could you please run the kinit commands for both accounts and share a screenshot showing the command line and the output?
... View more
08-30-2022
06:32 AM
The names you listed are the servicePrincipalName. These are different from the userPrincipalName. Could you please check the latter and let me know what they are? Cheers, André
... View more
08-30-2022
05:24 AM
@sathish3389 , One way to do this is to use the ExecuteScript processor with a script like the one below. This script with set the attribute dv_sys_id with the content that you want and will also add that to the content of the flowfile. from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
import json
class PyStreamCallback(StreamCallback):
def __init__(self):
self.dv_sys_id = None
def process(self, inputStream, outputStream):
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
obj = json.loads(text)
self.dv_sys_id = 'dv_sys_id=%s' % (','.join(obj['dv_sys_id'],))
outputStream.write(bytearray(self.dv_sys_id.encode('utf-8','ignore')))
flow_file = session.get()
if flow_file != None:
callback = PyStreamCallback()
flow_file = session.write(flow_file, callback)
flow_file = session.putAttribute(flow_file, 'dv_sys_id', callback.dv_sys_id)
session.transfer(flow_file, REL_SUCCESS) Cheers, André
... View more
08-30-2022
04:31 AM
@lie , MySQL 8 is not a supported version for CM/CDH 6.3.1, which is the version you're using, as you can see here: https://docs.cloudera.com/documentation/enterprise/6/release-notes/topics/rg_database_requirements.html You should pick a version that is compatible with your CDH version. Alternatively, you can upgrade to CDP. Cheers, André
... View more
08-29-2022
04:49 PM
@Uday483 , Unfortunately the template option only works for a single domain with LDAP, I'm afraid. One thing you can test is to set "ldap.auth.user.dn.template = {0}". With this, though, when the client authenticate they would have to specify the qualified user name rather then just the short name (e.g. alice@domain1.org.com, or bob@domain2.org.com). I haven't tested this before, so I'm not 100% sure it will work. Can you use Kerberos authentication instead of LDAP? With Kerberos auth there should be no problems. Cheers, André
... View more
08-29-2022
04:43 PM
@Curry5103 , ListenSyslog can receive a very large amount of data and it may be hard to match the throughput of that with an InvokeHTTP processor. To avoid dropping syslog messages, especially if using UDP protocol, you will probably need to provide enough buffer for those messages to decouple the receipt of the messages from the InvokeHTTP execution. You can do that in NiFi by increase the maximum queue size limits, or maybe use something like Kafka, where you can temporarily store the syslog messages and another flow can read from Kafka and call InvokeHTTP for each of them. Cheers, André
... View more
08-29-2022
04:34 PM
Not sure what's going on. What's your job doing?
... View more
08-29-2022
04:04 PM
@yagoaparecidoti , You can do the reverse with the command "SHOW ROLE GRANT GROUP group name;". I don't think there's a command to do exactly what you need, but you can query the database directly: select
r.ROLE_NAME,
g.GROUP_NAME
from
SENTRY_GROUP g
join SENTRY_ROLE_GROUP_MAP rg on rg.GROUP_ID = g.GROUP_ID
join SENTRY_ROLE r on r.ROLE_ID = rg.ROLE_ID
order by
r.ROLE_NAME,
g.GROUP_NAME
; Cheers, André
... View more
08-29-2022
03:54 PM
@yagoaparecidoti , Do you know the passwords for the users livy and livy-http? Can you manually kinit with those 2 users from the command line? Can you also check in AD what's the value for userPrincipalName property of those two users and share it here? Cheers, André
... View more
08-28-2022
04:17 PM
@hegdemahendra , You need to set the time characteristic of the stream for it to work. For example, try setting it to processing time, as shown below: DataStream<String> matechedStream = patternStream
.inProcessingTime()
.process(new PatternProcessFunction<String, String>() {
@Override
public void processMatch(Map<String, List<String>> map, Context context, Collector<String> collector) throws Exception {
collector.collect(map.get("start").toString());
}
}
); Cheers, André
... View more
08-28-2022
03:45 PM
@ramesh0430 , What's the throughput that you are expecting? What's your processor configurations? Cheers, André
... View more
08-28-2022
03:44 PM
@Vinay91 , Do you see any errors in the nifi-app.log file? Do you have a subscription with Cloudera Support? This is something the support team could help you quickly resolve. Cheers, André
... View more
08-28-2022
03:40 PM
@belka , No, Apache Druid is not currently supported. Cheers, André
... View more
08-27-2022
06:05 PM
@learncloud1111 , All vulnerabilities regarding log4j have already been fixed/addressed by Cloudera in CDP 7.1.7 SP1. You should not need to fix anything else on your own. Cheers, André
... View more
08-27-2022
06:00 PM
@spserd , Do these error appear only once in the logs or are they recurring? What's the command line you used to run the job? Cheers, André
... View more
08-26-2022
09:59 PM
@Omarb , Initially I thought this was a problem with the CSVRecordSetWriter, but I was mistaken. The issue here is that even though your CSVReader is set to ignore the header line, it has Schema Access Strategy set to "Infer Schema", and this will cause the reader to consume the first line of the flow file to infer the schema, even though the other property tells it to ignore it. To avoid this, set the Schema Access Strategy property to " Use 'Schema Text' Property " and provide a schema that matches your flowfile structure. For example: "type": "record",
"name": "MyFlowFile",
"fields": [
{ "name": "col_a", "type": "string" },
{ "name": "col_b", "type": "string" },
{ "name": "col_c", "type": "string" },
...
]
} This will stop the first line being "consumed" by the reader. Cheers, André
... View more
08-26-2022
09:07 PM
@Griggsy , I don't know if there's a way to do exactly that with either JOLT or ReplaceText processors. You can do it with ExecuteScript and the following Python script, though: from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
import json
import re
def to_snake_case(name):
return re.sub(r'(?<!^)(?=[A-Z])', '_', name).lower()
def convert_keys_to_snake_case(obj):
if isinstance(obj, list):
return [convert_keys_to_snake_case(o) for o in obj]
else:
return {to_snake_case(k): v for k, v in obj.items()}
class PyStreamCallback(StreamCallback):
def __init__(self):
pass
def process(self, inputStream, outputStream):
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
converted = convert_keys_to_snake_case(json.loads(text))
outputStream.write(bytearray(json.dumps(converted).encode('utf-8','ignore')))
flow_file = session.get()
if flow_file != None:
flow_file = session.write(flow_file, PyStreamCallback())
session.transfer(flow_file, REL_SUCCESS) Cheers, André
... View more
08-26-2022
07:42 PM
1 Kudo
@nk20 , You can use the NiFi API to extract the current state of any stateful processor if you need this. See the example below for a PutFile processor: access_token=$(curl -k \
-X POST \
-H 'Content-Type: application/x-www-form-urlencoded' \
-d 'username=admin&password=supersecret1'
"https://localhost:8443/nifi-api/access/token")
curl -k \
-H "Authorization: Bearer $access_token" \
"https://localhost:8443/nifi-api/processors/dd24aaec-0182-1000-ffff-ffff9f128d94/state"
{
"componentState": {
"componentId": "dd24aaec-0182-1000-ffff-ffff9f128d94",
"stateDescription": "After performing a listing of files, the timestamp of the newest file is stored. This allows the Processor to list only files that have been added or modified after this date the next time that the Processor is run. Whether the state is stored with a Local or Cluster scope depends on the value of the <Input Directory Location> property.",
"clusterState": {
"scope": "CLUSTER",
"totalEntryCount": 0,
"state": []
},
"localState": {
"scope": "LOCAL",
"totalEntryCount": 6,
"state": [
{
"key": "id.0",
"value": "/tmp/hsperfdata_nifi/129",
"clusterNodeId": "420e540a-ccd6-4e7c-bc25-c572f503b338",
"clusterNodeAddress": "nifi1:8443"
},
{
"key": "id.0",
"value": "/tmp/hsperfdata_nifi/130",
"clusterNodeId": "00e23669-130f-4e12-8a26-be3ab95923d4",
"clusterNodeAddress": "nifi0:8443"
},
{
"key": "listing.timestamp",
"value": "1661567544049",
"clusterNodeId": "420e540a-ccd6-4e7c-bc25-c572f503b338",
"clusterNodeAddress": "nifi1:8443"
},
{
"key": "listing.timestamp",
"value": "1661567541525",
"clusterNodeId": "00e23669-130f-4e12-8a26-be3ab95923d4",
"clusterNodeAddress": "nifi0:8443"
},
{
"key": "processed.timestamp",
"value": "1661567544049",
"clusterNodeId": "420e540a-ccd6-4e7c-bc25-c572f503b338",
"clusterNodeAddress": "nifi1:8443"
},
{
"key": "processed.timestamp",
"value": "1661567541525",
"clusterNodeId": "00e23669-130f-4e12-8a26-be3ab95923d4",
"clusterNodeAddress": "nifi0:8443"
}
]
}
}
} Cheers, André
... View more
08-26-2022
07:27 PM
@FozzieN , You can do exactly what you're trying to achieve using standard NiFi processors and a schema to describe your data structure. Please see attached a flow example that replicates what you are doing without the need for scripting. This flow uses the following schema, set as a parameter: {
"type": "record",
"name": "MyData",
"fields": [
{ "name": "source_timestamp", "type": "string" },
{ "name": "signal", "type": "string" },
{ "name": "value", "type": "string" },
{ "name": "fecha_envio", "type": ["null", "string"], "default": null }
]
} Cheers, André
... View more
08-26-2022
07:06 PM
@FozzieN , Maybe if you uncomment the following if in your code it would help? if len(messages) > 0:
messages = json.dumps(messages)
outputStream.write(bytearray(messages.encode('utf-8'))) With that if commented out it will always send a message to Kafka even if there are no messages. Cheers, André
... View more
08-26-2022
06:55 PM
@yagoaparecidoti , The problem is that to generate a keytab for any principal you need to know the password for that principal. The HTTP/hostname principal probably already exists in your AD and has some unknown password. Without knowing that you would have to reset the principal password to be able to create a keytab for it. And if you reset its password you will invalidate any keytabs that already exist for that principal that other services may be using. Cheers, André
... View more
08-25-2022
05:07 PM
@Arash , It's likely that your flow is running on multiple nodes and each node is trying to process every file. When different nodes try to write the same file at the same time you get these errors. You can change your flow to use a List-Fetch pattern. The ListHDFS processor should run only on the primary node and the output of it (the list of files) can be load-balanced across the nodes so that each node will FetchHDFS different files. Cheers, André
... View more
08-25-2022
05:00 PM
@yagoaparecidoti What do you need the keytab with the HTTP principal for? André
... View more
08-14-2022
03:58 PM
1 Kudo
@totokogure , The EvaluateJsonPath that you added is extracting the $.hits array and storing it as an attribute. The next processor (SplitJson), does not even use that attribute, though. It extracts $.hits again from the flowfile content. The EvaluateJsonPath in this flow should be unnecessary. If you connected InvokeHTTP to SplitJson, things should work correctly. Cheers, André
... View more
08-14-2022
03:51 PM
1 Kudo
Hi @LorencH , I'm of the opinion that if security is a concern (as it should be for any deployment) you should never rely on the permissions that come within the tarball. Your deployment procedure, automated or not, should always extract the files and explicitly "chown" and "chmod" the appropriate files to set the desired ownership and permissions. I don't know of the reasons to eliminate the tarball, though. Cheers, André
... View more
08-11-2022
10:40 PM
@totokogure , I tested this on the same version of NiFi that you're using (1.17.0) and it worked fine for me. The sample content that you provided was split in 2 flowfiles. Cheers, André
... View more