Member since
07-29-2020
193
Posts
44
Kudos Received
47
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
105 | 01-25-2023 12:42 PM | |
77 | 01-18-2023 12:26 PM | |
229 | 01-17-2023 10:41 AM | |
63 | 01-13-2023 06:53 AM | |
143 | 11-29-2022 09:27 AM |
01-26-2023
10:10 AM
Hi, Not sure what is the problem with the current situation. Usually when the first process group finishes and send the flowfile to the second process group, this flowfile will be dropped when executing the the "GenerateTableFetch" in the second group and new flowfiles will be generated based on that. Another option if you dont want to have any relationships between the first and second process groups through output\input ports and make them completely independent where no flowfiles are transferred between them is to use the Nifi Rest Api to trigger the second group GenerateTableFetch from the first group using the InvokeHttp Processor. Hope that helps. Thanks
... View more
01-26-2023
06:22 AM
Hi, You dont need to have programming language to learn Expression Language. It can help but its not required.
... View more
01-24-2023
08:57 AM
Hi, Try the following: [
{
"operation": "shift",
"spec": {
"*": {
"*": "[#2].&",
"$": "[#2].product"
}
}
}
] If that helps please accept solution. Thanks
... View more
01-21-2023
08:22 AM
Hi, I dont think you catch the SQL error in the sense that PutSQL wont report the error. However you can use the PutDatabaseRecrod instead and use the failure relationship to LogMessage where you can access the error message using the " putdatabaserecord.error " attribute. A better way of capturing errors from any processor (Global Catch) is to use the SiteToSiteBulletinReportingTask as explained here: SiteToSiteBulletinReportingTask If that helps please accept solution. Thanks
... View more
01-18-2023
12:26 PM
Hi, I was able to obtain the required result using the following processor: 1- SplitText : this is to help you split each json record into its own flowfile 2- UpdateRecord: This is used to update the dates fields and convert to the required format using Json Record Reader\Writer: The value used to convert the time for each field : ${field.value:toDate("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"):format("yyyy-MM-dd HH:mm:ss.SSS")} More info on UpdateRecord: https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.7.1/org.apache.nifi.processors.standard.UpdateRecord/additionalDetails.html Note: The only problem I noticed is that null values will be converted to "" . Not sure if that will cause you a problem but you can use replace text or json jolt to convert the values back to null. If you need the records to be merged back together before inserting into Hive, you can use MergeRecord processor. If that helps please accept solution. Thanks
... View more
01-17-2023
11:16 AM
Hi, Not sure if you are looking for the exact thing but this should give you the expected output from the sample you provided: [
// Flatten an array of photo objects into a prefixed
// soup of properties.
{
"operation": "shift",
"spec": {
"content": {
"*": {
"*": {
"*": {
"$": "error",
"$1": "product",
"$2": "ErrorType",
"@": "q"
}
}
}
}
}
}
] If that helps, please accept solution. Thanks
... View more
01-13-2023
06:53 AM
I was finally able to figure out the problem. To resolve this issue basically it seems like the py\jar file as specified in the "appResource" & ""spark.jars" needs to be accessible by all nodes in the cluster, for example if you have network path you can specify the network path in both attributes as follows: "appResource": "file:////Servername/somefolder/HelloWorld.jar", ... "spark.jars": "file:////Servername/someFolder/HelloWorld.jar", Note sure why if the job is being submitted to the master. If anybody knows please help me understand.
... View more
12-15-2022
05:16 AM
@Bello as this is an older post, you would have a better chance of receiving a resolution by starting a new thread. This will also be an opportunity to provide details specific to your environment that could aid others in assisting you with a more accurate answer to your question. You can link this thread as a reference in your new post.
... View more
12-12-2022
06:40 AM
Hi, I'm new to apache spark so Im not sure if this is the best set up, my goal is to create an environment where I can test and evaluate before making decision. I set up cluster on Windows using the steps from: https://aamargajbhiye.medium.com/apache-spark-setup-a-multi-node-standalone-cluster-on-windows-63d413296971 The cluster version Im using is the latest: 3.3.1\Hadoop 3 The master node is starting without an issue and Im able to register the workers on each worker node using the following comand: spark-class org.apache.spark.deploy.worker.Worker spark://<Master-IP>:7077 --host <Worker-IP> When I register the worker , its able to connect and register successfully as the message indicates , and Im able to see both workers in the US with the ALIVE status. Then I tried submitting simple hello_world py job using: spark-submit --master spark://<Master-IP>:7077 hello_world.py My hello_world.py application is like this: spark=SparkSession.builder.appName("Hello World").getOrCreate()
print("Hello From Spark!")
sparkContext=spark.sparkContext
rdd=sparkContext.parallelize([1,2,3])
print(rdd.collect()) What happens when I submit the job is that spark will continuously try to create different executors as if its retrying but they all exit with code 1, and I have to kill it in order to stop. When I check the UI and I click on a given executor I see the following in the stdout & std err: stdout: 22/12/12 08:04:11 INFO CoarseGrainedExecutorBackend: Started daemon with process name: 6544@HOU12-FSRM01
22/12/12 08:04:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/12/12 08:04:11 INFO SecurityManager: Changing view acls to: vnetadmin
22/12/12 08:04:11 INFO SecurityManager: Changing modify acls to: vnetadmin
22/12/12 08:04:11 INFO SecurityManager: Changing view acls groups to:
22/12/12 08:04:11 INFO SecurityManager: Changing modify acls groups to:
22/12/12 08:04:11 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(vnetadmin); groups with view permissions: Set(); users with modify permissions: Set(vnetadmin); groups with modify permissions: Set() stderr: sing Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Exception in thread "main" java.lang.reflect.UndeclaredThrowableException
....
Caused by: org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301)
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
at Caused by: java.io.IOException: Failed to connect to <Master DNS>/<Master IP>:56526
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:288)
.....
Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: <Master DNS>/<Master IP>:56526
Caused by: java.net.ConnectException: Connection refused: no further information
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:715)
at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:330)
.... Not sure how to fix the error above. I tried opening the referenced port "Failed to connect to <Master DNS>/<Master IP>:56526" from the master node but every time it shows a different port. Note sure what else I can do or how to troubleshoot. Any help is appreciated.
... View more
Labels:
- Labels:
-
Apache Spark
12-03-2022
09:35 AM
I am dealing with Kafka dataset where there are multiple types of message data is processing (coming) Sample data: eventType 1- { "type": "record", "name": "Dispatch_Accepted", "namespace": "accepted.avro", "fields": [ { "name": "John", "type": "string", "doc": "Name of the user account" }, { "name": "email", "type": "string", "doc": "The email of the user logging message on the blog" }, { "name": "timestamp", "type": "long", "doc": "time in seconds" } ], "doc:": "A basic schema of Dispatch_Rejected" } EventType-2 { "type": "record", "name": "Dispatch_Rejected", "namespace": "rejected.avro", "fields": [ { "name": "Merry", "type": "string", "doc": "Name of the user" }, { "name": "email", "type": "string", "doc": "The email of the user logging message on the blog" }, { "name": "timestamp", "type": "long", "doc": "time in seconds" } ], "doc:": "A basic schema Rejected data" } Schema of the data getting validated from Confluent Schema Regisry (Working Fine), I need to apply filter on Schema name (Dispatch_Rejected and Dispatch_Accepted) and crete two separate data files for each so I am using QueryRecord Processor which below query <Dispatch_Rejected>=Select * from FLOWFILE WHERE name='Dispatch_Rejected' <Dispatch_Accepted>=Select * from FLOWFILE WHERE name='Dispatch_Accepted' This is not working.. can't identify the schema name. Controller service is working fine. 1- How I can pick the schema name from Controller service 2- Should I need to assign the value ${schema.name} in another variable <My_schema> and need to write SELECT Statement like <Dispatch_Rejected>=Select * from FLOWFILE WHERE My_Schema.name='Dispatch_Rejected' <Dispatch_Accepted>=Select * from FLOWFILE WHERE My_Schema.name='Dispatch_Accepted' Summary-- I want to filter the data based on eventType, and create separate data files Please help
... View more
12-01-2022
02:33 AM
Hi, thanks for the details. Unfortunately it is not working. I get an empty array [] as output. I have tried it with extract and split mode. I applied the schema text property as suggested with "NestedKey" and "nestedValue" as name. None gives me an output. Meanwhile I have achieved what I wanted using SplitContent and then again another jolt processor. Of course it would be more elegant if I could make it work with ForkRecord.
... View more
11-29-2022
09:27 AM
Hi , I think after you split your csv you need to extract the values of both columns: status and client_id to attributes and then use in the ExecuteSQL processor, for that you need to : 1- convert the record to from CSV to JSON format using ConvertRecord Processor 2- use EvaluateJsonPath to extract both columns into defined attribute (dynamic properties). Make sure to set the Destination property to "flowfile-attribute". After that you can reference those attribute in the SQL query as ${status} & ${client_id}, assuming thats how you called the attributes in step 2. Another option if you dont want to use two processor , you can use ExtractText processor and provide regex to extract each value but you have to be careful how you define your regex for each value to make sure you are only pulling those values and nothing else. Hope that helps. If that answers your question please accept solution. Thanks
... View more
11-28-2022
08:51 PM
@SAMSAL Thank you for your help.
... View more
11-28-2022
12:01 PM
@Mohamed_Shaaban I recommend starting a new community question with the details specific to your setup. This allows the community to address/assist with your specific setup versus comparing your issue to what was shared in this post. Thanks, Matt
... View more
11-10-2022
12:17 PM
I think everything works fine, the json flowfiles are the response of an invokeHttp request to an API, I receive more than 5000 flowfiles, each flowfile contains 200 records. Problems happen only with 4 files that contain the ETX symbol.
... View more
11-08-2022
12:44 PM
Hi, I'm only able to do that via two processors: 1- JotlTransfomJSON : this is to add desired attributes (attr_id & attr_name) to the flowfile json using the following spec: [
{
"operation": "shift",
"spec": {
"#${attr_id}":"attr_id",
"#${attr_name}":"attr_name",
"model": "model",
"cars_drivers": {
"*": {
"id": "cars_drivers[#2].id",
"name": "cars_drivers[#2].name",
"is_processed": "cars_drivers[#2].is_processed"
}
}
}
}
] 2 - UpdateRecord Processor: Once the attributes are added to the Json , you can update those records Is_processed value only when the id matches the attr_id and name matches the attr_name. To do that set the following properties: a- Replacement Value Strategy: Literal Value b- /cars_drivers[*]/id[.=../../../attr_id]/../name[.=../../../attr_name]/../is_processed : 1 Hope that helps, if it does please accept solution.
... View more
- Tags:
- updaterecord
10-27-2022
06:20 AM
Thanks for the reply, it worked 🙂 Also, I got the expected output and didn't see the `MapRecord` as displayed in your output Thanks!
... View more
10-25-2022
03:28 AM
Hello bro @SAMSAL , were you successfully control the nifi component using postman? If yes, would it be okay to share your countermeasure. Currently encountering the same issue that you had. Thanks in advance.
... View more
10-23-2022
03:55 AM
How did anyone solve this issue? I was currently trying to solve similar issue via nifi api (Postman)
... View more
10-20-2022
08:57 PM
Input 1 "npi_code": "NEW", "cifa_category": "XM", "o9_category": "Accessories", "equipment_type": "AC", "equipment_sub_type": "CASEPROT", "model_manf": "137181-1050", "model_comcast": "Samsung Galaxy A51 Presidio Grip", "memory": "", "color": "Black", "os": "", "upc": 849000000000, "sku": 849000000000, "cifa_id": 273271, "manufacturer_part": "137181-1050", "description": "Speck Samsung Galaxy A51 Presidio Grip - Black", "uom": "", "manufacturer": "SPK", "serialized": "N", "equip_id_type": "ICCID", "pre_inserted_sim": "", "sim_type": "", "sim_form": "", "base_feature_tier": "", "market_attractiveness": "", "memory_tier": "", "size_tier": "", "planning_item": 848709087461, "stat_item": "Samsung Galaxy A51 Presidio Grip", "model_mem": "Samsung Galaxy A51 Presidio Grip", "model_mem_color": "Samsung Galaxy A51 Presidio Grip", "hero_status": "", "launch_date": "2020-05-08", "dummy_item_creation_date": "", "npi_announcement_date": "", "pre_order_date": "", "ga_date": "", "landed_at_launch_date": "", "planned_end_of_purchase": "", "planned_end_of_replen": "", "planned_end_of_forecast": "", "independence_date": "", "disable_date": "2021-04-29", "accessory_retail_end_of_life": "" } Input 2 { "Item": [ { "npi_code": "NEW", "cifa_category": "XM", "o9_category": "Accessories", "equipment_type": "AC", "equipment_sub_type": "CASEPROT", "model_manf": "137181-1050", "model_comcast": "Samsung Galaxy A51 Presidio Grip", "memory": "", "color": "Black", "os": "", "upc": 849000000000, "sku": 849000000000, "cifa_id": 273271, "manufacturer_part": "137181-1050", "description": "Speck Samsung Galaxy A51 Presidio Grip - Black", "uom": "", "manufacturer": "SPK", "serialized": "N", "equip_id_type": "ICCID", "pre_inserted_sim": "", "sim_type": "", "sim_form": "", "base_feature_tier": "", "market_attractiveness": "", "memory_tier": "", "size_tier": "", "planning_item": 848709087461, "stat_item": "Samsung Galaxy A51 Presidio Grip", "model_mem": "Samsung Galaxy A51 Presidio Grip", "model_mem_color": "Samsung Galaxy A51 Presidio Grip", "hero_status": "", "launch_date": "2020-05-08", "dummy_item_creation_date": "", "npi_announcement_date": "", "pre_order_date": "", "ga_date": "", "landed_at_launch_date": "", "planned_end_of_purchase": "", "planned_end_of_replen": "", "planned_end_of_forecast": "", "independence_date": "", "disable_date": "2021-04-29", "accessory_retail_end_of_life": "" }, { "npi_code": "NEW", "cifa_category": "XM", "o9_category": "Accessories", "equipment_type": "AC", "equipment_sub_type": "CASEPROT", "model_manf": "137181-1050", "model_comcast": "Samsung Galaxy A51 Presidio Grip", "memory": "", "color": "Black", "os": "", "upc": 849000000000, "sku": 849000000000, "cifa_id": 273271, "manufacturer_part": "137181-1050", "description": "Speck Samsung Galaxy A51 Presidio Grip - Black", "uom": "", "manufacturer": "SPK", "serialized": "N", "equip_id_type": "ICCID", "pre_inserted_sim": "", "sim_type": "", "sim_form": "", "base_feature_tier": "", "market_attractiveness": "", "memory_tier": "", "size_tier": "", "planning_item": 848709087461, "stat_item": "Samsung Galaxy A51 Presidio Grip", "model_mem": "Samsung Galaxy A51 Presidio Grip", "model_mem_color": "Samsung Galaxy A51 Presidio Grip", "hero_status": "", "launch_date": "2020-05-08", "dummy_item_creation_date": "", "npi_announcement_date": "", "pre_order_date": "", "ga_date": "", "landed_at_launch_date": "", "planned_end_of_purchase": "", "planned_end_of_replen": "", "planned_end_of_forecast": "", "independence_date": "", "disable_date": "2021-04-29", "accessory_retail_end_of_life": "" } ]
... View more
10-13-2022
08:52 AM
Thanks @SAMSAL, I was tracking that previous post but since it was so old I was hoping newer versions of NiFi could address this. This is such a simple thing with XML and the EvaluateXPath processor, I am just surprised that we can't do something similar with JSON. Oh well, 2 processors it is.
... View more
10-13-2022
06:23 AM
Hi, Try the following Jolt spec: [
{
"operation": "shift",
"spec": {
"tags": {
"*": {
"tag": "tags.[#2].Parameter",
"value": "tags.[#2].value"
}
}
}
}
] If you find this helpful please accept solution.
... View more
10-03-2022
11:59 PM
There should be two controllers configured: DistributedMapCacheClientService DistributedMapCacheServer Client is, eh, client, server is a backend for your cache. Without server client has nowhere to send a flowfile.
... View more
09-28-2022
05:10 PM
Hi, Based on the link below , it doesnt seem like you can access the variable registry directly from an ExecuteScript processor. You can either use the API - as the referenced article in the link below- to retrieve the value or just use an UpdateAttribute processor to get the value of the variable using Expression Language and store into flowfile attribute before passing it to the groovy script where its easy to retrieve the incoming flowfile attribute. https://community.cloudera.com/t5/Support-Questions/NiFi-Is-it-possible-to-access-Processor-Group-variables/m-p/204283 If that helps please accept solution. Thanks
... View more
09-28-2022
04:47 AM
Thank you all. I eventually evaluated the json path to extract the url. My mind was astray as i was using complex solution to a simple problem.
... View more
09-27-2022
11:26 AM
Brilliant! Exactly what I was looking for. Although it seems a little peculiar to me that we need to rely on a Jolt transform for this operation and not the UpdateRecord processor. Particularly since NiFi makes it a point to discuss Arrays and Maps in the documentation. Thanks for the Jolt transform because I spent a lot of time trying to get the Jolt transform to work and couldn't quite figure it out. Now I see what I was doing wrong.
... View more
09-26-2022
07:08 AM
Hello, you can configure that just 1 Flowfile will be handled inside of a specific ProcessorGroup. There is the config option on PG which is called: Process Group FlowFile Concurrency for that you can set the value: Single FlowFile per Node After PutKudu you will destroy the FlowFile or route it out from ProcessorGroup, then the next FlowFile will be released to enter to ProcessorGroup In your case the Flow would look like: ListFile Processor -> ProcessorGroup (handels fetchFile, data Transformation and putKudu)
... View more
09-23-2022
10:51 AM
I dont think there is an out of the box processor where you can utilize such thing. However you can do some workaround where you can use the ExecuteSQL processor instead since this processor allows you to return the stored proc output in Avro format in new flowfile based on whatever your select statement is in the ExecuteSQL SQL Select Query property. Since this will generate new flowfile, the assumption here is that you dont care about the original flowfile. before going farther and give you an example how to do it, do you want to preserve the original flow file and you were thinking of adding the stored proc output as attribute?
... View more
09-20-2022
10:21 AM
Hi, Please try the following spec: [
{
"operation": "shift",
"spec": {
"timestamp": {
"*": {
"@(2,resourceid)": "[&1].resourceid",
"@": "[&1].timestamp"
}
},
"Key": {
"*": {
"@": "[&1].key"
}
},
"data": {
"*": {
"@": "[&1].data"
}
}
}
}
] If you find this helpful, please accept solution. Thanks
... View more