Member since
06-26-2015
509
Posts
136
Kudos Received
114
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
1300 | 09-20-2022 03:33 PM | |
3846 | 09-19-2022 04:47 PM | |
2268 | 09-11-2022 05:01 PM | |
2350 | 09-06-2022 02:23 PM | |
3694 | 09-06-2022 04:30 AM |
08-28-2022
04:17 PM
@hegdemahendra , You need to set the time characteristic of the stream for it to work. For example, try setting it to processing time, as shown below: DataStream<String> matechedStream = patternStream
.inProcessingTime()
.process(new PatternProcessFunction<String, String>() {
@Override
public void processMatch(Map<String, List<String>> map, Context context, Collector<String> collector) throws Exception {
collector.collect(map.get("start").toString());
}
}
); Cheers, André
... View more
08-28-2022
03:45 PM
@ramesh0430 , What's the throughput that you are expecting? What's your processor configurations? Cheers, André
... View more
08-28-2022
03:44 PM
@Vinay91 , Do you see any errors in the nifi-app.log file? Do you have a subscription with Cloudera Support? This is something the support team could help you quickly resolve. Cheers, André
... View more
08-28-2022
03:40 PM
@belka , No, Apache Druid is not currently supported. Cheers, André
... View more
08-27-2022
06:05 PM
@learncloud1111 , All vulnerabilities regarding log4j have already been fixed/addressed by Cloudera in CDP 7.1.7 SP1. You should not need to fix anything else on your own. Cheers, André
... View more
08-27-2022
06:00 PM
@spserd , Do these error appear only once in the logs or are they recurring? What's the command line you used to run the job? Cheers, André
... View more
08-26-2022
09:59 PM
@Omarb , Initially I thought this was a problem with the CSVRecordSetWriter, but I was mistaken. The issue here is that even though your CSVReader is set to ignore the header line, it has Schema Access Strategy set to "Infer Schema", and this will cause the reader to consume the first line of the flow file to infer the schema, even though the other property tells it to ignore it. To avoid this, set the Schema Access Strategy property to "Use 'Schema Text' Property" and provide a schema that matches your flowfile structure. For example: "type": "record",
"name": "MyFlowFile",
"fields": [
{ "name": "col_a", "type": "string" },
{ "name": "col_b", "type": "string" },
{ "name": "col_c", "type": "string" },
...
]
} This will stop the first line being "consumed" by the reader. Cheers, André
... View more
08-26-2022
09:07 PM
@Griggsy , I don't know if there's a way to do exactly that with either JOLT or ReplaceText processors. You can do it with ExecuteScript and the following Python script, though: from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
import json
import re
def to_snake_case(name):
return re.sub(r'(?<!^)(?=[A-Z])', '_', name).lower()
def convert_keys_to_snake_case(obj):
if isinstance(obj, list):
return [convert_keys_to_snake_case(o) for o in obj]
else:
return {to_snake_case(k): v for k, v in obj.items()}
class PyStreamCallback(StreamCallback):
def __init__(self):
pass
def process(self, inputStream, outputStream):
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
converted = convert_keys_to_snake_case(json.loads(text))
outputStream.write(bytearray(json.dumps(converted).encode('utf-8','ignore')))
flow_file = session.get()
if flow_file != None:
flow_file = session.write(flow_file, PyStreamCallback())
session.transfer(flow_file, REL_SUCCESS) Cheers, André
... View more
08-26-2022
07:42 PM
1 Kudo
@nk20 , You can use the NiFi API to extract the current state of any stateful processor if you need this. See the example below for a PutFile processor: access_token=$(curl -k \
-X POST \
-H 'Content-Type: application/x-www-form-urlencoded' \
-d 'username=admin&password=supersecret1'
"https://localhost:8443/nifi-api/access/token")
curl -k \
-H "Authorization: Bearer $access_token" \
"https://localhost:8443/nifi-api/processors/dd24aaec-0182-1000-ffff-ffff9f128d94/state"
{
"componentState": {
"componentId": "dd24aaec-0182-1000-ffff-ffff9f128d94",
"stateDescription": "After performing a listing of files, the timestamp of the newest file is stored. This allows the Processor to list only files that have been added or modified after this date the next time that the Processor is run. Whether the state is stored with a Local or Cluster scope depends on the value of the <Input Directory Location> property.",
"clusterState": {
"scope": "CLUSTER",
"totalEntryCount": 0,
"state": []
},
"localState": {
"scope": "LOCAL",
"totalEntryCount": 6,
"state": [
{
"key": "id.0",
"value": "/tmp/hsperfdata_nifi/129",
"clusterNodeId": "420e540a-ccd6-4e7c-bc25-c572f503b338",
"clusterNodeAddress": "nifi1:8443"
},
{
"key": "id.0",
"value": "/tmp/hsperfdata_nifi/130",
"clusterNodeId": "00e23669-130f-4e12-8a26-be3ab95923d4",
"clusterNodeAddress": "nifi0:8443"
},
{
"key": "listing.timestamp",
"value": "1661567544049",
"clusterNodeId": "420e540a-ccd6-4e7c-bc25-c572f503b338",
"clusterNodeAddress": "nifi1:8443"
},
{
"key": "listing.timestamp",
"value": "1661567541525",
"clusterNodeId": "00e23669-130f-4e12-8a26-be3ab95923d4",
"clusterNodeAddress": "nifi0:8443"
},
{
"key": "processed.timestamp",
"value": "1661567544049",
"clusterNodeId": "420e540a-ccd6-4e7c-bc25-c572f503b338",
"clusterNodeAddress": "nifi1:8443"
},
{
"key": "processed.timestamp",
"value": "1661567541525",
"clusterNodeId": "00e23669-130f-4e12-8a26-be3ab95923d4",
"clusterNodeAddress": "nifi0:8443"
}
]
}
}
} Cheers, André
... View more
08-26-2022
07:27 PM
@FozzieN , You can do exactly what you're trying to achieve using standard NiFi processors and a schema to describe your data structure. Please see attached a flow example that replicates what you are doing without the need for scripting. This flow uses the following schema, set as a parameter: {
"type": "record",
"name": "MyData",
"fields": [
{ "name": "source_timestamp", "type": "string" },
{ "name": "signal", "type": "string" },
{ "name": "value", "type": "string" },
{ "name": "fecha_envio", "type": ["null", "string"], "default": null }
]
} Cheers, André
... View more