Member since
06-16-2020
55
Posts
14
Kudos Received
5
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
1004 | 10-23-2024 11:21 AM | |
964 | 10-22-2024 07:59 AM | |
943 | 10-22-2024 07:37 AM | |
570 | 10-21-2024 09:25 AM | |
2371 | 06-16-2023 07:23 AM |
10-22-2024
07:59 AM
1 Kudo
@RanjitMohapatra NiFi expects the Jolt specification to be provided as an array of objects if it's set chain. This is typical when using JoltTransformJSON because it supports multiple transformations chained together. Also, if you click on your JoltTransformJSON and click on the advanced option - It will bring up a nice Jolt Spec tester for you that is consistent with how NiFi will handle the transformation - Solution is just wrapping it in an array. [ { "operation": "shift", "spec": { "ts": "ts", "sourceDeviceId": "source.deviceId", "sourceName": "source.name", "sourceType": "source.type", "locationLat": "location.lat", "locationLon": "location.lon", "locationId": "location.id", "recordDate": "recordDate", "_class": "_class", "status": "status", "power": "power", "current": "current", "intensity": "intensity", "voltage": "voltage" } } ] Please accept the solution!
... View more
10-22-2024
07:37 AM
2 Kudos
@rajivswe_2k7 I would avoid putting a lot of data in the flowfile attributes due to memory concerns. However, what you could do is change the property in the InvokeHTTP called "Response Generation Required" from False to True No matter what the status code of the HTTP response is , it will automatically get routed to the Response relationship with the full content of the response in the flowfile's content along with the response status code in the flowfile's attribute. You could then use a RouteOnAttribute Processor to filter for certain response codes and you will have your full response in the content of the flowfile where it should be 🙂 Please accept this solution if you find it to be helpful!
... View more
10-21-2024
09:34 AM
@nifier What does your configuration for the FetchS3 processor look like? I would say make sure your pointing to the correct region in the FetchS3 processor and make sure your AWSCredentialsService or credentials in the processor are set correctly.
... View more
10-21-2024
09:25 AM
1 Kudo
@xtd --- Based on your requirements, it looks like there is some unneeded processors that you have. Here is what my flow currently looks like - 1. ExecuteSQL This will return all the rows in the source table in avro format. If you look at the attributes, by default it will return an attribute called executesql.row.count which you can use as your source count for rows. So part of the left side on your flow can be removed. 2. PutDatabaseRecord This will populate the target table with all the records. Only on success will go to the next processor. 3. CalculateRecordStats This will create a new attribute called record.count that counts the number of rows in the full record - 4. UpdateAttribute - This prepares for the PutSQL an creates the corresponding attributes - 5. PutSQL Inserts new row into the table with input_count and output_count - Using the record-oriented processors, getting the counts paradigm is somewhat not needed because populating the target table either succeeds or fails. If it succeeds you can assume the row count will be same, if it fails then you could assume nothing was populated. However, above is approach to get the counts either way 🙂 Please accept this solution if it helped!
... View more
10-21-2024
08:03 AM
@nifier - I emulated the same setup as you via a testing S3 bucket. Using an access key id, and secret access key that had full access to all of S3, I was able to receive all the objects (including recursive ones) from that bucket. A couple questions I want to follow up with ... 1. In your ListS3 processor configuration, do you have anything set for prefix or delimiter? Just want to make sure because that could be filtering some files/directories coming from S3. 2. What is your IAM role and the bucket policy you are trying to consume from? Are you certain that the role you are using can access to all the objects in the bucket?
... View more
10-19-2024
06:46 PM
@AndreyDE Is one flowfile going into the SplitText processor and outputting 10000 flowfiles? How big is the flowfile going into the SplitText processor? Or is the source of the pipeline recursively getting all objects in your S3 bucket? I need to a little bit more about the input going into SplitText?
... View more
10-18-2024
10:43 AM
2 Kudos
Hi @AndreyDE , What's your input into the SplitFile processor? I used your example and getting a valid output - Make sure the file going into the SplitText is not re-reading the same file over and over again and also if you are using generateFlowFile make sure the scheduling isn't set to 0 sec because it will keep outputting a bunch of flowfiles. Please accept this solution if it's correct, thanks!
... View more
10-16-2024
11:51 AM
@SAMSAL Thanks so much - Here are my further findings... Here is my current directory structure - I tried a bunch of different ways to zip this and put the .nar in the corresponding NIFI_HOME/lib directory. Here is each test case running on my Mac OS. Please ignore the naming conventions as I was trying to go through this quickly 😊 Test Case 1: Normal zip Failed zip -r archive_name.zip TransformOpenskyStates.py NAR-INF/ META-INF/ cp archive_name.zip ../Downloads/nifi-2.0.0-M4/lib/archive_name.nar Restarting NiFi logs: 2024-10-15 15:52:07,787 ERROR [main] org.apache.nifi.web.server.JettyServer Failed to start web server... shutting down. java.lang.NullPointerException: Cannot invoke "org.apache.nifi.nar.ExtensionMapping.size()" because "extensionMapping" is null at org.apache.nifi.documentation.DocGenerator.generate(DocGenerator.java:64) at org.apache.nifi.web.server.JettyServer.start(JettyServer.java:833) at org.apache.nifi.NiFi.<init>(NiFi.java:172) at org.apache.nifi.NiFi.<init>(NiFi.java:83) at org.apache.nifi.NiFi.main(NiFi.java:332) ---- Test Case 2: Excluding Mac OS metadata files in Zip Command Failed zip -r archive_name.zip TransformOpenskyStates.py NAR-INF/ META-INF/ -x "*.DS_Store" -x "__MACOSX/*" cp archive_name.zip ../Downloads/nifi-2.0.0-M4/lib/archive_name.nar Restarting NiFi logs: 2024-10-15 15:52:07,787 ERROR [main] org.apache.nifi.web.server.JettyServer Failed to start web server... shutting down. java.lang.NullPointerException: Cannot invoke "org.apache.nifi.nar.ExtensionMapping.size()" because "extensionMapping" is null at org.apache.nifi.documentation.DocGenerator.generate(DocGenerator.java:64) at org.apache.nifi.web.server.JettyServer.start(JettyServer.java:833) at org.apache.nifi.NiFi.<init>(NiFi.java:172) at org.apache.nifi.NiFi.<init>(NiFi.java:83) at org.apache.nifi.NiFi.main(NiFi.java:332) ---- Test Case 3: Normal Zip Command Outer Folder Failed zip -r test.zip TransformOS-nar/ cp test.zip ../Downloads/nifi-2.0.0-M4/lib/archive_name.nar Restarting NiFi logs: java.lang.NullPointerException: Cannot invoke "java.util.jar.Manifest.getMainAttributes()" because "manifest" is null at org.apache.nifi.nar.NarUnpacker.createBundleCoordinate(NarUnpacker.java:258) at org.apache.nifi.nar.NarUnpacker.unpackNars(NarUnpacker.java:140) at org.apache.nifi.nar.NarUnpacker.unpackNars(NarUnpacker.java:90). at org.apache.nifi.nar.NarUnpacker.unpackNars(NarUnpacker.java:84) at org.apache.nifi.nar.NarUnpacker.unpackNars(NarUnpacker.java:75) at org.apache.nifi.NiFi.<init>(NiFi.java:142) at org.apache.nifi.NiFi.<init>(NiFi.java:83) at org.apache.nifi.NiFi.main(NiFi.java:332) ---- Test Case 4: Normal Zip Command Outer Folder with exclusions Failed zip -r archive_name.zip TransformOS-nar/ -x "*.DS_Store" -x "__MACOSX/*" cp archive_name.zip ../Downloads/nifi-2.0.0-M4/lib/archive_name.nar Restarting NiFi logs: java.lang.NullPointerException: Cannot invoke "java.util.jar.Manifest.getMainAttributes()" because "manifest" is null at org.apache.nifi.nar.NarUnpacker.createBundleCoordinate(NarUnpacker.java:258) at org.apache.nifi.nar.NarUnpacker.unpackNars(NarUnpacker.java:140) at org.apache.nifi.nar.NarUnpacker.unpackNars(NarUnpacker.java:90). at org.apache.nifi.nar.NarUnpacker.unpackNars(NarUnpacker.java:84) at org.apache.nifi.nar.NarUnpacker.unpackNars(NarUnpacker.java:75) at org.apache.nifi.NiFi.<init>(NiFi.java:142) at org.apache.nifi.NiFi.<init>(NiFi.java:83) at org.apache.nifi.NiFi.main(NiFi.java:332) ---- Test Case 5: Running Zip Command inside TransformOS-nar directory with * Passed!! zip -r archive_name.zip * cp archive_name.zip ../Downloads/nifi-2.0.0-M4/lib/archive_name.nar -------------------------------------------- Now you are probably wondering what the difference is between Test Case 1 and Test Case 5 is... To go in further detail, here's the output of each zip command - Zip command output for Test Case 1: drewnicolette@MacBook-Pro-2:~/TransformOS-nar $ zip -r archive_name.zip TransformOpenskyStates.py NAR-INF/ META-INF/ adding: TransformOpenskyStates.py (deflated 60%) adding: NAR-INF/ (stored 0%) adding: NAR-INF/.DS_Store (deflated 96%) adding: NAR-INF/bundled-dependencies/ (stored 0%) adding: META-INF/ (stored 0%) adding: META-INF/MANIFEST.MF (deflated 13%) adding: META-INF/.DS_Store (deflated 97%) Zip command output for Test Case 5: drewnicolette@MacBook-Pro-2:~/TransformOS-nar $ zip -r archive_name.zip * adding: META-INF/ (stored 0%) adding: META-INF/MANIFEST.MF (deflated 13%) adding: META-INF/.DS_Store (deflated 97%) adding: NAR-INF/ (stored 0%) adding: NAR-INF/.DS_Store (deflated 96%) adding: NAR-INF/bundled-dependencies/ (stored 0%) adding: TransformOpenskyStates.py (deflated 60%) It seems like it's zipping up the same files and the byte count for each .zip file are the same! Please see screenshot below! Solution: Order matters when zipping up the file! If you look above in Test Case 1, when I ran the zip command, it added TransformOpenskyStates.py first, while in Test Case 5 it added META-INF/ first. I believe NiFi code is expecting META-INF/ first?? @MattWho - Do you know anything about this?
... View more
10-14-2024
12:57 PM
Hi SAMSAL, Thanks for the recommendation. I tried just doing the python extension and it worked. However, when I compile the nar again it still doesn't show up. Can you try it with this .py file I am pasting below? Thanks! Filename - TransformOpenskyStates.py from nifiapi.flowfiletransform import (
FlowFileTransform,
FlowFileTransformResult
)
from nifiapi.properties import ProcessContext
import json
FIELD_MAP = [
"icao24", "callsign", "origin_country", "time_position", "last_contact",
"longitude", "latitude", "baro_altitude", "on_ground", "velocity",
"true_track", "vertical_rate", "sensors", "geo_altitude", "squawk",
"spi", "position_source"
]
RETURN_SCHEMA = [
"icao24", "callsign", "origin_country",
"reporting_time", "time_position", "last_contact",
"longitude", "latitude", "on_ground"
]
class TransformOpenskyStates(FlowFileTransform):
class Java:
implements = ['org.apache.nifi.python.processor.FlowFileTransform']
class ProcessorDetails:
version = '0.0.1-SNAPSHOT'
description = '''
Transform the data returned by the OpenSky Network API.
'''
tags = ["opensky", "transform", "tutorial"]
dependencies = []
def __init__(self, **kwargs):
super().__init__()
def transform(
self, context: ProcessContext, flow_file
) -> FlowFileTransformResult:
'''
Parameters:
context (ProcessContext)
flow_file
Returns:
FlowFileTransformResult
'''
contents = json.loads(flow_file.getContentsAsBytes())
def sanitize_value(value):
if isinstance(value, str):
return value.strip()
return value
states = []
for record in contents["states"]:
record = dict(zip(FIELD_MAP, record))
record["reporting_time"] = contents["time"]
# Choose only fields listed in the RETURN_SCHEMA
sanitized = {}
for key, value in record.items():
if key not in RETURN_SCHEMA:
continue
sanitized[key] = sanitize_value(value)
states.append(sanitized)
return FlowFileTransformResult(
"success",
contents=json.dumps(states)
)
... View more
10-12-2024
11:11 AM
@SAMSAL - Can you help out with this?
... View more