Member since
07-29-2020
193
Posts
44
Kudos Received
47
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
104 | 01-25-2023 12:42 PM | |
77 | 01-18-2023 12:26 PM | |
229 | 01-17-2023 10:41 AM | |
63 | 01-13-2023 06:53 AM | |
143 | 11-29-2022 09:27 AM |
01-26-2023
10:10 AM
Hi, Not sure what is the problem with the current situation. Usually when the first process group finishes and send the flowfile to the second process group, this flowfile will be dropped when executing the the "GenerateTableFetch" in the second group and new flowfiles will be generated based on that. Another option if you dont want to have any relationships between the first and second process groups through output\input ports and make them completely independent where no flowfiles are transferred between them is to use the Nifi Rest Api to trigger the second group GenerateTableFetch from the first group using the InvokeHttp Processor. Hope that helps. Thanks
... View more
01-26-2023
06:22 AM
Hi, You dont need to have programming language to learn Expression Language. It can help but its not required.
... View more
01-25-2023
12:42 PM
Hi , I think the right syntax will be : ${field.value:raplace('a', 'i'):replace('e', 'u'):replace(...)} Hope that helps. Thanks
... View more
01-24-2023
08:57 AM
Hi, Try the following: [
{
"operation": "shift",
"spec": {
"*": {
"*": "[#2].&",
"$": "[#2].product"
}
}
}
] If that helps please accept solution. Thanks
... View more
01-21-2023
08:22 AM
Hi, I dont think you catch the SQL error in the sense that PutSQL wont report the error. However you can use the PutDatabaseRecrod instead and use the failure relationship to LogMessage where you can access the error message using the " putdatabaserecord.error " attribute. A better way of capturing errors from any processor (Global Catch) is to use the SiteToSiteBulletinReportingTask as explained here: SiteToSiteBulletinReportingTask If that helps please accept solution. Thanks
... View more
01-18-2023
12:26 PM
Hi, I was able to obtain the required result using the following processor: 1- SplitText : this is to help you split each json record into its own flowfile 2- UpdateRecord: This is used to update the dates fields and convert to the required format using Json Record Reader\Writer: The value used to convert the time for each field : ${field.value:toDate("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"):format("yyyy-MM-dd HH:mm:ss.SSS")} More info on UpdateRecord: https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.7.1/org.apache.nifi.processors.standard.UpdateRecord/additionalDetails.html Note: The only problem I noticed is that null values will be converted to "" . Not sure if that will cause you a problem but you can use replace text or json jolt to convert the values back to null. If you need the records to be merged back together before inserting into Hive, you can use MergeRecord processor. If that helps please accept solution. Thanks
... View more
01-17-2023
11:16 AM
Hi, Not sure if you are looking for the exact thing but this should give you the expected output from the sample you provided: [
// Flatten an array of photo objects into a prefixed
// soup of properties.
{
"operation": "shift",
"spec": {
"content": {
"*": {
"*": {
"*": {
"$": "error",
"$1": "product",
"$2": "ErrorType",
"@": "q"
}
}
}
}
}
}
] If that helps, please accept solution. Thanks
... View more
01-17-2023
10:41 AM
1 Kudo
Yes. Have you ever used expression language to set\get flowfile attributes and use them as parameters for other processors? For example Username and Password on the GetSFTP can be set as follows : Username: ${my.sftp.db.username} Password: ${my.sftp.db.password} Both (my.sftp.db.username, my.sftp.db.password) are set once you get this info from DB. For example if use ExecuteSQLRecrod processor where the "SQL Select Query" is set to something like: select username, password from myCredentialTable Assuming you use JsonWriter to output the data in Json format like: { "username": "...", password: "..." } Then you can use EvaluateJsonPath to set the attributes by adding dynamic properties to this processor as follows: Hope that helps. If it does, please accept solution
... View more
01-16-2023
06:50 AM
Hi, You can read data from DB using processors like ExecuteSQL or ExecuteSQLRecord. The result can be parsed into flowfile attributes which you can use later to set the credential in the SFPT nifi processor using expression language.
... View more
01-13-2023
06:53 AM
I was finally able to figure out the problem. To resolve this issue basically it seems like the py\jar file as specified in the "appResource" & ""spark.jars" needs to be accessible by all nodes in the cluster, for example if you have network path you can specify the network path in both attributes as follows: "appResource": "file:////Servername/somefolder/HelloWorld.jar", ... "spark.jars": "file:////Servername/someFolder/HelloWorld.jar", Note sure why if the job is being submitted to the master. If anybody knows please help me understand.
... View more
12-16-2022
09:35 AM
Hi , I have a spark cluster deployed on windows. I'm trying to submit a simple spark job using the rest api. The job is just python code that does simple hello world sentence as follows : from pyspark.sql import SparkSession
def main(args):
print('hello world')
return 0
if __name__ == '__main__':
main(None) The url Im using to submit the job is: http://<Master-IP>:6066/v1/submissions/create With the following Post Body: {
"appResource": "file:../../helloworld.py",
"sparkProperties": {
"spark.executor.memory": "2g",
"spark.master": "spark://<Master IP>:7077",
"spark.app.name": "Spark REST API - Hello world",
"spark.driver.memory": "2g",
"spark.eventLog.enabled": "false",
"spark.driver.cores": "2",
"spark.submit.deployMode": "cluster",
"spark.driver.supervise": "true"
},
"clientSparkVersion": "3.3.1",
"mainClass": "org.apache.spark.deploy.SparkSubmit",
"environmentVariables": {
"SPARK_ENV_LOADED": "1"
},
"action": "CreateSubmissionRequest",
"appArgs": [
"../../helloworld.py", "80"
]
} After I run this post using postmant, I get the following response: {
"action": "CreateSubmissionResponse",
"message": "Driver successfully submitted as driver-20221216112633-0005",
"serverSparkVersion": "3.3.1",
"submissionId": "driver-20221216112633-0005",
"success": true
} However when I try to get the job status using : http://<Master-IP>:6066/v1/submissions/status/driver-20221216112633-0005 I get the driverState: ERROR , NullPointerException as follows: {
"action": "SubmissionStatusResponse",
"driverState": "ERROR",
"message": "Exception from the cluster:\njava.lang.NullPointerException\n\torg.apache.spark.deploy.worker.DriverRunner.downloadUserJar(DriverRunner.scala:158)\n\torg.apache.spark.deploy.worker.DriverRunner.prepareAndRunDriver(DriverRunner.scala:179)\n\torg.apache.spark.deploy.worker.DriverRunner$$anon$2.run(DriverRunner.scala:99)",
"serverSparkVersion": "3.3.1",
"submissionId": "driver-20221216112633-0005",
"success": true,
"workerHostPort": "10.9.8.120:56060",
"workerId": "worker-20221216093629-<IP>-56060"
} Not sure why Im getting this error and what it means. Can someone please point me in the right direction or help me at least how I can trouble this farther? Thanks
... View more
Labels:
- Labels:
-
Apache Spark
12-12-2022
06:40 AM
Hi, I'm new to apache spark so Im not sure if this is the best set up, my goal is to create an environment where I can test and evaluate before making decision. I set up cluster on Windows using the steps from: https://aamargajbhiye.medium.com/apache-spark-setup-a-multi-node-standalone-cluster-on-windows-63d413296971 The cluster version Im using is the latest: 3.3.1\Hadoop 3 The master node is starting without an issue and Im able to register the workers on each worker node using the following comand: spark-class org.apache.spark.deploy.worker.Worker spark://<Master-IP>:7077 --host <Worker-IP> When I register the worker , its able to connect and register successfully as the message indicates , and Im able to see both workers in the US with the ALIVE status. Then I tried submitting simple hello_world py job using: spark-submit --master spark://<Master-IP>:7077 hello_world.py My hello_world.py application is like this: spark=SparkSession.builder.appName("Hello World").getOrCreate()
print("Hello From Spark!")
sparkContext=spark.sparkContext
rdd=sparkContext.parallelize([1,2,3])
print(rdd.collect()) What happens when I submit the job is that spark will continuously try to create different executors as if its retrying but they all exit with code 1, and I have to kill it in order to stop. When I check the UI and I click on a given executor I see the following in the stdout & std err: stdout: 22/12/12 08:04:11 INFO CoarseGrainedExecutorBackend: Started daemon with process name: 6544@HOU12-FSRM01
22/12/12 08:04:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/12/12 08:04:11 INFO SecurityManager: Changing view acls to: vnetadmin
22/12/12 08:04:11 INFO SecurityManager: Changing modify acls to: vnetadmin
22/12/12 08:04:11 INFO SecurityManager: Changing view acls groups to:
22/12/12 08:04:11 INFO SecurityManager: Changing modify acls groups to:
22/12/12 08:04:11 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(vnetadmin); groups with view permissions: Set(); users with modify permissions: Set(vnetadmin); groups with modify permissions: Set() stderr: sing Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Exception in thread "main" java.lang.reflect.UndeclaredThrowableException
....
Caused by: org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301)
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
at Caused by: java.io.IOException: Failed to connect to <Master DNS>/<Master IP>:56526
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:288)
.....
Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: <Master DNS>/<Master IP>:56526
Caused by: java.net.ConnectException: Connection refused: no further information
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:715)
at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:330)
.... Not sure how to fix the error above. I tried opening the referenced port "Failed to connect to <Master DNS>/<Master IP>:56526" from the master node but every time it shows a different port. Note sure what else I can do or how to troubleshoot. Any help is appreciated.
... View more
Labels:
- Labels:
-
Apache Spark
11-30-2022
06:36 AM
Hi, If you are able to do the jolt transformation to have the out put as specified above then in the ForkRecord ReocrdWriter should be set to "JsonRecordWriter" and the JsonRecordWriter can be set as follows: The Schema Text property can be set to the following: {
"type": "record",
"name": "TestObject",
"namespace": "ca.dataedu",
"fields": [{
"name": "NestedKey1",
"type": ["null", "string"],
"default": null
}, {
"name": "NestedKey3",
"type": ["null", "string"],
"default": null
}, {
"name": "NestedKey2",
"type": ["null", "string"],
"default": null
}]
} This should give you the desired output you specified above. For more information on the JsonRecorWriter please refer to : https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-record-serialization-services-nar/1.13.0/org.apache.nifi.json.JsonRecordSetWriter/index.html Hope that helps, if it does please accept solution. Thanks
... View more
11-29-2022
09:27 AM
Hi , I think after you split your csv you need to extract the values of both columns: status and client_id to attributes and then use in the ExecuteSQL processor, for that you need to : 1- convert the record to from CSV to JSON format using ConvertRecord Processor 2- use EvaluateJsonPath to extract both columns into defined attribute (dynamic properties). Make sure to set the Destination property to "flowfile-attribute". After that you can reference those attribute in the SQL query as ${status} & ${client_id}, assuming thats how you called the attributes in step 2. Another option if you dont want to use two processor , you can use ExtractText processor and provide regex to extract each value but you have to be careful how you define your regex for each value to make sure you are only pulling those values and nothing else. Hope that helps. If that answers your question please accept solution. Thanks
... View more
11-28-2022
10:41 AM
No . I dont think I have used the following : nifi.security.identity.mapping.pattern.dn = nifi.security.identity.mapping.value.dn = Have you tried using simple single authorization just to see if you can log in. It helps in this cases to start from simple config and then build up just to be able to isolate where the issue is. hope that helps
... View more
11-28-2022
08:21 AM
can you send me sample input and your json jolt spec?
... View more
11-28-2022
06:30 AM
Hi, Yes that is the point of the fork\join enrichment. the original flow file will wait until both response and original meet at the join enrichment.
... View more
11-26-2022
05:49 PM
Hi, Not sure if what Im suggesting is the best solution so anyone who feels like there is better solution please advise. You can solve this in two different ways: 1- if you know the schema and you dont mind adding a header so you can use the QueryRecrod processor , then you can add header first (see: https://stackoverflow.com/questions/58707242/how-to-add-headers-to-a-csv-using-apache-nifi ) and then use three different queryrecrod processor for each value to query the different datasets. 2- if you dont want to add a header, then what you can do is the following: a. use SplitText processor to split each line b. do a RouteOnContent for each value (student, teacher, class...) to filter the records for each dataset. For example you will have RouteOnContent which will have dynamic property called "Student" and the search value "(Student)" where the Match Requirement property is set to "...contain match". This will filter only records that has Student in them. c. Use MergeContent processor to merge back the result set. Let me know if you have any questions. If you find this answers your question please accept solution. Thanks
... View more
11-25-2022
11:31 AM
1 Kudo
Hi @Green_ After farther investigation I found the reason the result is coming as blank is because we are missing the point from the processor description itself: "..The user must specify at least one Record Path, as a dynamic property, pointing to a field of type ARRAY containing RECORD objects..." Since the values in the array are just actual values and not a record its probably not working as expected. When I make the input looks like below , it works with the path specified: {
"Key1": [
{
"NestedKey1": "Value1",
"NestedKey2": [
{
"nestedValue": "Value2"
},
{
"nestedValue": "Value3"
},
{
"nestedValue": "Value4"
},
{
"nestedValue": "Value5"
}
],
"NestedKey3": "Value6"
}
]
} As suggestion - if that works with @Fredi - is to use Jolt transformation to convert the array into records as seen above and then use the Fork processor to achieve the desired result. The schema for the Json recrod writer can be as simple as the following : {
"type": "record",
"name": "TestObject",
"namespace": "ca.dataedu",
"fields": [{
"name": "NestedKey1",
"type": ["null", "string"],
"default": null
}, {
"name": "NestedKey3",
"type": ["null", "string"],
"default": null
}, {
"name": "nestedValue",
"type": ["null", "string"],
"default": null
}]
} Hope that helps. Thanks
... View more
11-25-2022
09:07 AM
Hi @Green_ @Fredi , I just tried it out of the box with the example mentioned in documentation: https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.17.0/org.apache.nifi.processors.standard.ForkRecord/additionalDetails.html It seems to only produce correct result when you provide the schema for the record writer service. Not sure if that is related but when I tried without providing schema in the json record writer it gave me all the values as null!
... View more
11-25-2022
07:55 AM
1 Kudo
Hi, It seems like your best and out of the box option is to use the Fork Enrichment \ Join Enrichment processor. If each record in the original data has an equivalent record in the response then that should be super easy and you can use Wrapper or Insertion Strategy to enrich your data to something similar to what you expect, however since you mentioned that successful records wont have an equivalent record in the response ,then you can take advantage of the SQL Enrichment strategy where you can do sql join between the two input (original & response) using common ID and generate the desired out. In Sql Strategy you have a lot of flexibility but you have to be careful with the performance. The challenge here to use the SQL strategy or any Join Enrichment strategies is to have common link between the two, for your case the only link is the record index, so you might use Json Jolt to produce such link, For example your original data will look like this: { "data": [ { "index" : "Record 1", "id": "1234569", "Date": "2022-08-22" }, { "index" : "Record 2", "id": "1234567", "Date": "2022-08-22" }, ... and you response will look like this after jolt transformation : { "Index":"Record 2", "Record 2" : "Invalid values for emp info" }, { "Index":"Record 3" "Record 3" : "Invalid values for emp info" } ] In this case you can do SQL join enrichment using "index" as the join key link For more info regarding fork \join enrichment , see the following links: https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.12.1/org.apache.nifi.processors.standard.JoinEnrichment/additionalDetails.html Please, if you find this is helpful please accept solution. Thanks
... View more
11-24-2022
03:33 PM
Hi Mohamed, I know the frustration. Its been a while honestly and I dont recall how did I resolve it, but for me I remember when I upgraded to 1.16 it took few times of uninstall\resinstall for it to work correctly. Can you please post what you have in your authorizer.xml and what is in the nifi.properties file regarding the security configuration -like I did above - . Also keep in mind the Initial User Identity is case sensitive so make sure that the one associated with the certificate files for the trust store and keystore and the one you define in the authorizer are the same letter case. Let me know. Thanks
... View more
11-10-2022
12:08 PM
Not sure how big is your Json and if its well formatted into multiple line. Make sure you have the Evaluation Mode is set to Line-by-Line , also you can increase the Maximum Buffer Size incase the text processed is greater than 1MB. Also what version of Nifi are you using ? there seems to be a bug around that as well where the flowfile will remain in the upstream queue and the overflow error is thrown: https://issues.apache.org/jira/browse/NIFI-10154
... View more
11-10-2022
11:26 AM
can you share the configuration for the ReplaceText Processor? Also how big is the jsonfile?
... View more
11-10-2022
08:14 AM
Hi, I tried the follow pattern on the sent file and it worked: [\x03]+
... View more
11-09-2022
01:09 PM
can you send me a sample json data with the error. The one you posted seems to be valid and Im able to split it .
... View more
11-09-2022
08:37 AM
Hi, I think you are having an issue because you have carriage return (\r\n) in the json , try using regex replace for the following as well: [\r\n] Hope that helps, if it does please accept solution. Thanks
... View more
11-08-2022
12:44 PM
Hi, I'm only able to do that via two processors: 1- JotlTransfomJSON : this is to add desired attributes (attr_id & attr_name) to the flowfile json using the following spec: [
{
"operation": "shift",
"spec": {
"#${attr_id}":"attr_id",
"#${attr_name}":"attr_name",
"model": "model",
"cars_drivers": {
"*": {
"id": "cars_drivers[#2].id",
"name": "cars_drivers[#2].name",
"is_processed": "cars_drivers[#2].is_processed"
}
}
}
}
] 2 - UpdateRecord Processor: Once the attributes are added to the Json , you can update those records Is_processed value only when the id matches the attr_id and name matches the attr_name. To do that set the following properties: a- Replacement Value Strategy: Literal Value b- /cars_drivers[*]/id[.=../../../attr_id]/../name[.=../../../attr_name]/../is_processed : 1 Hope that helps, if it does please accept solution.
... View more
- Tags:
- updaterecord
10-20-2022
09:52 AM
Hi , Can you provide examples of the different json input?
... View more
10-16-2022
08:24 AM
Hi, I think you were close with the last option but did not use the correct syntax with null values where instead of '<> null', use 'is not null' as follows: select * from FLOWFILE where RPATH(fields, '/field') is not null However when I run that I got the correct result but for some reason a MapRecord Syntax is added to each array element : [ {
"fields" : [ "MapRecord[{field=field1}]", "MapRecord[{field=field1}]" ]
}, {
"fields" : [ "MapRecord[{field=field2}]", "MapRecord[{field=field2}]" ]
}, {
"fields" : [ "MapRecord[{field=field3}]", "MapRecord[{field=field3}]" ]
} ] Not sure why this happens but that should get you close to what you are trying to accomplish. Hope that helps. If it does, please accept solution. Thanks
... View more