Member since
07-29-2020
574
Posts
323
Kudos Received
176
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 2143 | 12-20-2024 05:49 AM | |
| 2438 | 12-19-2024 08:33 PM | |
| 2189 | 12-19-2024 06:48 AM | |
| 1456 | 12-17-2024 12:56 PM | |
| 2082 | 12-16-2024 04:38 AM |
01-17-2023
10:41 AM
1 Kudo
Yes. Have you ever used expression language to set\get flowfile attributes and use them as parameters for other processors? For example Username and Password on the GetSFTP can be set as follows : Username: ${my.sftp.db.username} Password: ${my.sftp.db.password} Both (my.sftp.db.username, my.sftp.db.password) are set once you get this info from DB. For example if use ExecuteSQLRecrod processor where the "SQL Select Query" is set to something like: select username, password from myCredentialTable Assuming you use JsonWriter to output the data in Json format like: { "username": "...", password: "..." } Then you can use EvaluateJsonPath to set the attributes by adding dynamic properties to this processor as follows: Hope that helps. If it does, please accept solution
... View more
01-16-2023
06:50 AM
Hi, You can read data from DB using processors like ExecuteSQL or ExecuteSQLRecord. The result can be parsed into flowfile attributes which you can use later to set the credential in the SFPT nifi processor using expression language.
... View more
01-13-2023
06:53 AM
I was finally able to figure out the problem. To resolve this issue basically it seems like the py\jar file as specified in the "appResource" & ""spark.jars" needs to be accessible by all nodes in the cluster, for example if you have network path you can specify the network path in both attributes as follows: "appResource": "file:////Servername/somefolder/HelloWorld.jar", ... "spark.jars": "file:////Servername/someFolder/HelloWorld.jar", Note sure why if the job is being submitted to the master. If anybody knows please help me understand.
... View more
12-16-2022
09:35 AM
Hi , I have a spark cluster deployed on windows. I'm trying to submit a simple spark job using the rest api. The job is just python code that does simple hello world sentence as follows : from pyspark.sql import SparkSession
def main(args):
print('hello world')
return 0
if __name__ == '__main__':
main(None) The url Im using to submit the job is: http://<Master-IP>:6066/v1/submissions/create With the following Post Body: {
"appResource": "file:../../helloworld.py",
"sparkProperties": {
"spark.executor.memory": "2g",
"spark.master": "spark://<Master IP>:7077",
"spark.app.name": "Spark REST API - Hello world",
"spark.driver.memory": "2g",
"spark.eventLog.enabled": "false",
"spark.driver.cores": "2",
"spark.submit.deployMode": "cluster",
"spark.driver.supervise": "true"
},
"clientSparkVersion": "3.3.1",
"mainClass": "org.apache.spark.deploy.SparkSubmit",
"environmentVariables": {
"SPARK_ENV_LOADED": "1"
},
"action": "CreateSubmissionRequest",
"appArgs": [
"../../helloworld.py", "80"
]
} After I run this post using postmant, I get the following response: {
"action": "CreateSubmissionResponse",
"message": "Driver successfully submitted as driver-20221216112633-0005",
"serverSparkVersion": "3.3.1",
"submissionId": "driver-20221216112633-0005",
"success": true
} However when I try to get the job status using : http://<Master-IP>:6066/v1/submissions/status/driver-20221216112633-0005 I get the driverState: ERROR , NullPointerException as follows: {
"action": "SubmissionStatusResponse",
"driverState": "ERROR",
"message": "Exception from the cluster:\njava.lang.NullPointerException\n\torg.apache.spark.deploy.worker.DriverRunner.downloadUserJar(DriverRunner.scala:158)\n\torg.apache.spark.deploy.worker.DriverRunner.prepareAndRunDriver(DriverRunner.scala:179)\n\torg.apache.spark.deploy.worker.DriverRunner$$anon$2.run(DriverRunner.scala:99)",
"serverSparkVersion": "3.3.1",
"submissionId": "driver-20221216112633-0005",
"success": true,
"workerHostPort": "10.9.8.120:56060",
"workerId": "worker-20221216093629-<IP>-56060"
} Not sure why Im getting this error and what it means. Can someone please point me in the right direction or help me at least how I can trouble this farther? Thanks
... View more
Labels:
- Labels:
-
Apache Spark
12-12-2022
06:40 AM
Hi, I'm new to apache spark so Im not sure if this is the best set up, my goal is to create an environment where I can test and evaluate before making decision. I set up cluster on Windows using the steps from: https://aamargajbhiye.medium.com/apache-spark-setup-a-multi-node-standalone-cluster-on-windows-63d413296971 The cluster version Im using is the latest: 3.3.1\Hadoop 3 The master node is starting without an issue and Im able to register the workers on each worker node using the following comand: spark-class org.apache.spark.deploy.worker.Worker spark://<Master-IP>:7077 --host <Worker-IP> When I register the worker , its able to connect and register successfully as the message indicates , and Im able to see both workers in the US with the ALIVE status. Then I tried submitting simple hello_world py job using: spark-submit --master spark://<Master-IP>:7077 hello_world.py My hello_world.py application is like this: spark=SparkSession.builder.appName("Hello World").getOrCreate()
print("Hello From Spark!")
sparkContext=spark.sparkContext
rdd=sparkContext.parallelize([1,2,3])
print(rdd.collect()) What happens when I submit the job is that spark will continuously try to create different executors as if its retrying but they all exit with code 1, and I have to kill it in order to stop. When I check the UI and I click on a given executor I see the following in the stdout & std err: stdout: 22/12/12 08:04:11 INFO CoarseGrainedExecutorBackend: Started daemon with process name: 6544@HOU12-FSRM01
22/12/12 08:04:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/12/12 08:04:11 INFO SecurityManager: Changing view acls to: vnetadmin
22/12/12 08:04:11 INFO SecurityManager: Changing modify acls to: vnetadmin
22/12/12 08:04:11 INFO SecurityManager: Changing view acls groups to:
22/12/12 08:04:11 INFO SecurityManager: Changing modify acls groups to:
22/12/12 08:04:11 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(vnetadmin); groups with view permissions: Set(); users with modify permissions: Set(vnetadmin); groups with modify permissions: Set() stderr: sing Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Exception in thread "main" java.lang.reflect.UndeclaredThrowableException
....
Caused by: org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301)
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
at Caused by: java.io.IOException: Failed to connect to <Master DNS>/<Master IP>:56526
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:288)
.....
Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: <Master DNS>/<Master IP>:56526
Caused by: java.net.ConnectException: Connection refused: no further information
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:715)
at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:330)
.... Not sure how to fix the error above. I tried opening the referenced port "Failed to connect to <Master DNS>/<Master IP>:56526" from the master node but every time it shows a different port. Note sure what else I can do or how to troubleshoot. Any help is appreciated.
... View more
Labels:
- Labels:
-
Apache Spark
11-30-2022
06:36 AM
Hi, If you are able to do the jolt transformation to have the out put as specified above then in the ForkRecord ReocrdWriter should be set to "JsonRecordWriter" and the JsonRecordWriter can be set as follows: The Schema Text property can be set to the following: {
"type": "record",
"name": "TestObject",
"namespace": "ca.dataedu",
"fields": [{
"name": "NestedKey1",
"type": ["null", "string"],
"default": null
}, {
"name": "NestedKey3",
"type": ["null", "string"],
"default": null
}, {
"name": "NestedKey2",
"type": ["null", "string"],
"default": null
}]
} This should give you the desired output you specified above. For more information on the JsonRecorWriter please refer to : https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-record-serialization-services-nar/1.13.0/org.apache.nifi.json.JsonRecordSetWriter/index.html Hope that helps, if it does please accept solution. Thanks
... View more
11-29-2022
09:27 AM
Hi , I think after you split your csv you need to extract the values of both columns: status and client_id to attributes and then use in the ExecuteSQL processor, for that you need to : 1- convert the record to from CSV to JSON format using ConvertRecord Processor 2- use EvaluateJsonPath to extract both columns into defined attribute (dynamic properties). Make sure to set the Destination property to "flowfile-attribute". After that you can reference those attribute in the SQL query as ${status} & ${client_id}, assuming thats how you called the attributes in step 2. Another option if you dont want to use two processor , you can use ExtractText processor and provide regex to extract each value but you have to be careful how you define your regex for each value to make sure you are only pulling those values and nothing else. Hope that helps. If that answers your question please accept solution. Thanks
... View more
11-28-2022
10:41 AM
No . I dont think I have used the following : nifi.security.identity.mapping.pattern.dn = nifi.security.identity.mapping.value.dn = Have you tried using simple single authorization just to see if you can log in. It helps in this cases to start from simple config and then build up just to be able to isolate where the issue is. hope that helps
... View more
11-28-2022
08:21 AM
can you send me sample input and your json jolt spec?
... View more
11-28-2022
06:30 AM
Hi, Yes that is the point of the fork\join enrichment. the original flow file will wait until both response and original meet at the join enrichment.
... View more