Member since
11-21-2022
7
Posts
0
Kudos Received
3
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
646 | 02-21-2023 07:30 AM | |
800 | 02-16-2023 06:38 AM | |
921 | 02-15-2023 02:00 PM |
02-21-2023
07:30 AM
I used Correlation Attribute Name , setting it to `${schema.name}`, and it's working as expected. Quote from documentation: > If specified, two FlowFiles will be binned together only if they have the same value for this Attribute. If not specified, FlowFiles are bundled by the order in which they are pulled from the queue.
... View more
02-17-2023
01:13 PM
My use-case is: 1) Have API credentials 2) Use UpdateAttribute to update (1) schema, (2) s3 bucket/location (my list of reports) 3) Query API endpoint for report 4) API endpoint paginates and gets more records 5) Call MergeRecord 6) Save to s3 Since 3, 4, 5, 6 are all the same, I'm re-using the processors like below (screenshot). My problem is (5) MergeRecord will try to merge different schemas together, which is obviously a problem. How can I restructure this? I'd like to re-use processors as much as possible, but still be able to add more schemas as my needs evolve.
... View more
Labels:
- Labels:
-
Apache NiFi
02-16-2023
06:38 AM
I was able to achieve this with `ExecuteScript` with pyhon. import json
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
flow_file = session.get()
if flow_file is not None:
# get the flowfile content as json
stream_content = session.read(flow_file)
text_content = IOUtils.toString(stream_content, StandardCharsets.UTF_8)
json_content = json.loads(text_content)
# close the stream; without this, `putAttribute` will fail
stream_content.close()
# get table name and fields
table_name = json_content["name"]
fields = json_content["fields"]
columns = ",".join(col["name"] for col in fields)
# format the select statement
select_statement = "select " + columns + " from " + table_name
# set the select statement as an attr
flow_file = session.putAttribute(flow_file, "select_statement", select_statement)
# finalize and pass it on
session.transfer(flow_file, REL_SUCCESS)
session.commit()
... View more
02-15-2023
02:00 PM
Figured out the issue. The java SDK that nifi uses does not honour "noProxy" flags.
... View more
02-15-2023
01:54 PM
I have an avro schema that I can get from a registry: {
"name": "bar_tbl",
"fields": [
{ "name": "foo1", "type": "string" },
{ "name": "foo2", "type": "string" }
]
} I know how to use `EvaluateJsonPath` and get `name` and `fields`. I'd like to set an attribute with `UpdateAttribute` to equal: select foo1, foo2 from bar_tbl to use it later downstream in another processor. How can I achieve this? Or, more directly, I have a REST api that takes an url-encoded sql statement. How can I feed the sql statement to `InvokeHttp`?
... View more
Labels:
- Labels:
-
Apache NiFi
11-21-2022
07:02 AM
I posted a similar thread here. Nifi doesn't seem to be honouring HTTP_PROXY settings. https://community.cloudera.com/t5/Support-Questions/Nifi-tasks-not-honouring-HTTP-PROXY-settings/m-p/358074#M237742
... View more
11-21-2022
06:22 AM
I have a apache/nifi:latest instance spun inside an Amazon Linux 2 EC2. For reference, see this guide: here I have a QuerySalesforceObject ver. 1.18.0 that makes use of StandardOauth2AccessTokenProvider. The oauth2 provider url is configured at https://test.salesforce.com/services/oauth2/token I can curl this url from the box and from inside the docker container just fine (I don’t get a timeout). [root@ip-10-229-18-107 \~\]# docker exec -it nifi_container_persistent /bin/sh
printenv | grep -i proxy
HTTPS_PROXY=http://proxy.MY_DOMAIN.com:3128
no_proxy=localhost,127.0.0.1,MY_DOMAIN.com,.amazonaws.com
NO_PROXY=localhost,127.0.0.1, MY_DOMAIN.com,.amazonaws.com
https_proxy=http://proxy.MY_DOMAIN.com:3128
http_proxy=http://proxy.MY_DOMAIN.com:3128
HTTP_PROXY=http://proxy.MY_DOMAIN.com:3128
curl https://test.salesforce.com/services/oauth2/token
{"error":"unsupported_grant_type","error_description":"grant type not supported"}# But when I run the task, oauth2 fails with an error java.io.UncheckedIOException: OAuth2 access token request failed
Caused by: java.net.SocketTimeoutException: connect timed out This leads me to believe the proxy settings are not being honored by the class. How can I fix this?
... View more
Labels:
- Labels:
-
Apache NiFi