Member since 
    
	
		
		
		06-16-2020
	
	
	
	
	
	
	
	
	
	
	
	
	
	
			
      
                55
            
            
                Posts
            
        
                14
            
            
                Kudos Received
            
        
                5
            
            
                Solutions
            
        My Accepted Solutions
| Title | Views | Posted | 
|---|---|---|
| 1814 | 10-23-2024 11:21 AM | |
| 1649 | 10-22-2024 07:59 AM | |
| 1639 | 10-22-2024 07:37 AM | |
| 1029 | 10-21-2024 09:25 AM | |
| 3024 | 06-16-2023 07:23 AM | 
			
    
	
		
		
		10-22-2024
	
		
		07:59 AM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
	
		1 Kudo
		
	
				
		
	
		
					
							 @RanjitMohapatra   NiFi expects the Jolt specification to be provided as an array of objects if it's set chain. This is typical when using JoltTransformJSON because it supports multiple transformations chained together.     Also, if you click on your JoltTransformJSON and click on the advanced option -         It will bring up a nice Jolt Spec tester for you that is consistent with how NiFi will handle the transformation -        Solution is just wrapping it in an array.  [      {            "operation": "shift",            "spec": {                 "ts": "ts",                 "sourceDeviceId": "source.deviceId",                 "sourceName": "source.name",                 "sourceType": "source.type",                 "locationLat": "location.lat",                 "locationLon": "location.lon",                 "locationId": "location.id",                 "recordDate": "recordDate",                 "_class": "_class",                 "status": "status",                 "power": "power",                 "current": "current",                 "intensity": "intensity",                 "voltage": "voltage"             }        }  ]  Please accept the solution!  
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		10-22-2024
	
		
		07:37 AM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
	
		2 Kudos
		
	
				
		
	
		
					
							 @rajivswe_2k7     I would avoid putting a lot of data in the flowfile attributes due to memory concerns. However, what you could do is change the property in the InvokeHTTP called "Response Generation Required" from False to True          No matter what the status code of the HTTP response is , it will automatically get routed to the Response relationship with the full content of the response in the flowfile's content along with the response status code in the flowfile's attribute.   You could then use a RouteOnAttribute Processor to filter for certain response codes and you will have your full response in the content of the flowfile where it should be 🙂  Please accept this solution if you find it to be helpful!     
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		10-21-2024
	
		
		09:34 AM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
				
		
	
		
					
							 @nifier      What does your configuration for the FetchS3 processor look like?    I would say make sure your pointing to the correct region in the FetchS3 processor and make sure your AWSCredentialsService or credentials in the processor are set correctly. 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		10-21-2024
	
		
		09:25 AM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
	
		1 Kudo
		
	
				
		
	
		
					
							 @xtd   ---  Based on your requirements, it looks like there is some unneeded processors that you have.     Here is what my flow currently looks like -       1. ExecuteSQL  This will return all the rows in the source table in avro format. If you look at the attributes, by default it will return an attribute called executesql.row.count which you can use as your source count for rows. So part of the left side on your flow can be removed.      2. PutDatabaseRecord  This will populate the target table with all the records. Only on success will go to the next processor.  3. CalculateRecordStats  This will create a new attribute called record.count that counts the number of rows in the full record -       4. UpdateAttribute -   This prepares for the PutSQL an creates the corresponding attributes -       5. PutSQL   Inserts new row into the table with input_count and output_count -          Using the record-oriented processors, getting the counts paradigm is somewhat not needed because populating the target table either succeeds or fails. If it succeeds you can assume the row count will be same, if it fails then you could assume nothing was populated. However, above is approach to get the counts either way 🙂  Please accept this solution if it helped!          
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		10-21-2024
	
		
		08:03 AM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
				
		
	
		
					
							 @nifier -  I emulated the same setup as you via a testing S3 bucket.      Using an access key id, and secret access key that had full access to all of S3, I was able to receive all the objects (including recursive ones) from that bucket.               A couple questions I want to follow up with ...     1. In your ListS3 processor configuration, do you have anything set for prefix or delimiter? Just want to make sure because that could be filtering some files/directories coming from S3.    2. What is your IAM role and the bucket policy you are trying to consume from? Are you certain that the role you are using can access to all the objects in the bucket? 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		10-19-2024
	
		
		06:46 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
				
		
	
		
					
							 @AndreyDE     Is one flowfile going into the SplitText processor and outputting 10000 flowfiles?  How big is the flowfile going into the SplitText processor?    Or is the source of the pipeline recursively getting all objects in your S3 bucket?     I need to a little bit more about the input going into SplitText?  
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		10-18-2024
	
		
		10:43 AM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
	
		2 Kudos
		
	
				
		
	
		
					
							 Hi @AndreyDE ,    What's your input into the SplitFile processor?  I used your example and getting a valid output -       Make sure the file going into the SplitText is not re-reading the same file over and over again and also if you are using generateFlowFile make sure the scheduling isn't set to 0 sec because it will keep outputting a bunch of flowfiles.     Please accept this solution if it's correct, thanks! 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		10-16-2024
	
		
		11:51 AM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
				
		
	
		
					
							 @SAMSAL   Thanks so much - Here are my further findings...  Here is my current directory structure -       I tried a bunch of different ways to zip this and put the .nar in the corresponding NIFI_HOME/lib directory.    Here is each test case running on my Mac OS. Please ignore the naming conventions as I was trying to go through this quickly 😊     Test Case 1: Normal zip   Failed  zip -r archive_name.zip TransformOpenskyStates.py NAR-INF/ META-INF/  cp archive_name.zip ../Downloads/nifi-2.0.0-M4/lib/archive_name.nar  Restarting NiFi logs:  2024-10-15 15:52:07,787 ERROR [main] org.apache.nifi.web.server.JettyServer Failed to start web server... shutting down.  java.lang.NullPointerException: Cannot invoke "org.apache.nifi.nar.ExtensionMapping.size()" because "extensionMapping" is null   at org.apache.nifi.documentation.DocGenerator.generate(DocGenerator.java:64)   at org.apache.nifi.web.server.JettyServer.start(JettyServer.java:833)   at org.apache.nifi.NiFi.<init>(NiFi.java:172)   at org.apache.nifi.NiFi.<init>(NiFi.java:83)   at org.apache.nifi.NiFi.main(NiFi.java:332)  ----  Test Case 2: Excluding Mac OS metadata files in Zip Command  Failed     zip -r archive_name.zip TransformOpenskyStates.py NAR-INF/ META-INF/ -x "*.DS_Store" -x "__MACOSX/*"  cp archive_name.zip ../Downloads/nifi-2.0.0-M4/lib/archive_name.nar  Restarting NiFi logs:  2024-10-15 15:52:07,787 ERROR [main] org.apache.nifi.web.server.JettyServer Failed to start web server... shutting down.  java.lang.NullPointerException: Cannot invoke "org.apache.nifi.nar.ExtensionMapping.size()" because "extensionMapping" is null   at org.apache.nifi.documentation.DocGenerator.generate(DocGenerator.java:64)   at org.apache.nifi.web.server.JettyServer.start(JettyServer.java:833)   at org.apache.nifi.NiFi.<init>(NiFi.java:172)   at org.apache.nifi.NiFi.<init>(NiFi.java:83)   at org.apache.nifi.NiFi.main(NiFi.java:332)  ----  Test Case 3: Normal Zip Command Outer Folder  Failed     zip -r test.zip TransformOS-nar/  cp test.zip ../Downloads/nifi-2.0.0-M4/lib/archive_name.nar  Restarting NiFi logs:  java.lang.NullPointerException: Cannot invoke "java.util.jar.Manifest.getMainAttributes()" because "manifest" is null     at org.apache.nifi.nar.NarUnpacker.createBundleCoordinate(NarUnpacker.java:258)     at org.apache.nifi.nar.NarUnpacker.unpackNars(NarUnpacker.java:140)     at org.apache.nifi.nar.NarUnpacker.unpackNars(NarUnpacker.java:90).       at org.apache.nifi.nar.NarUnpacker.unpackNars(NarUnpacker.java:84)     at org.apache.nifi.nar.NarUnpacker.unpackNars(NarUnpacker.java:75)     at org.apache.nifi.NiFi.<init>(NiFi.java:142)     at org.apache.nifi.NiFi.<init>(NiFi.java:83)     at org.apache.nifi.NiFi.main(NiFi.java:332)  ----  Test Case 4: Normal Zip Command Outer Folder with exclusions  Failed     zip -r archive_name.zip TransformOS-nar/ -x "*.DS_Store" -x "__MACOSX/*"  cp archive_name.zip ../Downloads/nifi-2.0.0-M4/lib/archive_name.nar  Restarting NiFi logs:  java.lang.NullPointerException: Cannot invoke "java.util.jar.Manifest.getMainAttributes()" because "manifest" is null     at org.apache.nifi.nar.NarUnpacker.createBundleCoordinate(NarUnpacker.java:258)     at org.apache.nifi.nar.NarUnpacker.unpackNars(NarUnpacker.java:140)     at org.apache.nifi.nar.NarUnpacker.unpackNars(NarUnpacker.java:90).       at org.apache.nifi.nar.NarUnpacker.unpackNars(NarUnpacker.java:84)     at org.apache.nifi.nar.NarUnpacker.unpackNars(NarUnpacker.java:75)     at org.apache.nifi.NiFi.<init>(NiFi.java:142)     at org.apache.nifi.NiFi.<init>(NiFi.java:83)     at org.apache.nifi.NiFi.main(NiFi.java:332)  ----  Test Case 5: Running Zip Command inside TransformOS-nar directory with *  Passed!!    zip -r archive_name.zip *  cp archive_name.zip ../Downloads/nifi-2.0.0-M4/lib/archive_name.nar  --------------------------------------------  Now you are probably wondering what the difference is between Test Case 1 and Test Case 5 is... To go in further detail, here's the output of each zip command -    Zip command output for Test Case 1:  drewnicolette@MacBook-Pro-2:~/TransformOS-nar  $ zip -r archive_name.zip TransformOpenskyStates.py NAR-INF/ META-INF/      adding: TransformOpenskyStates.py (deflated 60%)      adding: NAR-INF/ (stored 0%)      adding: NAR-INF/.DS_Store (deflated 96%)      adding: NAR-INF/bundled-dependencies/ (stored 0%)      adding: META-INF/ (stored 0%)      adding: META-INF/MANIFEST.MF (deflated 13%)      adding: META-INF/.DS_Store (deflated 97%)  Zip command output for Test Case 5:  drewnicolette@MacBook-Pro-2:~/TransformOS-nar  $ zip -r archive_name.zip *      adding: META-INF/ (stored 0%)      adding: META-INF/MANIFEST.MF (deflated 13%)      adding: META-INF/.DS_Store (deflated 97%)      adding: NAR-INF/ (stored 0%)      adding: NAR-INF/.DS_Store (deflated 96%)      adding: NAR-INF/bundled-dependencies/ (stored 0%)      adding: TransformOpenskyStates.py (deflated 60%)  It seems like it's zipping up the same files and the byte count for each .zip file are the same! Please see screenshot below!      Solution: Order matters when zipping up the file! If you look above in Test Case 1, when I ran the zip command, it added TransformOpenskyStates.py first, while in Test Case 5 it added META-INF/ first.   I believe NiFi code is expecting META-INF/ first?? @MattWho - Do you know anything about this? 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		10-14-2024
	
		
		12:57 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
				
		
	
		
					
							 Hi SAMSAL,      Thanks for the recommendation. I tried just doing the python extension and it worked. However, when I compile the nar again it still doesn't show up.   Can you try it with this .py file I am pasting below?  Thanks!  Filename - TransformOpenskyStates.py  from nifiapi.flowfiletransform import (
    FlowFileTransform,
    FlowFileTransformResult
)
from nifiapi.properties import ProcessContext
import json
FIELD_MAP = [
    "icao24", "callsign", "origin_country", "time_position", "last_contact",
    "longitude", "latitude", "baro_altitude", "on_ground", "velocity",
    "true_track", "vertical_rate", "sensors", "geo_altitude", "squawk",
    "spi", "position_source"
]
RETURN_SCHEMA = [
    "icao24", "callsign", "origin_country",
    "reporting_time", "time_position", "last_contact",
    "longitude", "latitude", "on_ground"
]
class TransformOpenskyStates(FlowFileTransform):
    class Java:
        implements = ['org.apache.nifi.python.processor.FlowFileTransform']
    class ProcessorDetails:
        version = '0.0.1-SNAPSHOT'
        description = '''
        Transform the data returned by the OpenSky Network API.
        '''
        tags = ["opensky", "transform", "tutorial"]
        dependencies = []
    def __init__(self, **kwargs):
        super().__init__()
    def transform(
        self, context: ProcessContext, flow_file
    ) -> FlowFileTransformResult:
        '''
        Parameters:
            context (ProcessContext)
            flow_file
        Returns:
            FlowFileTransformResult
        '''
        contents = json.loads(flow_file.getContentsAsBytes())
        def sanitize_value(value):
            if isinstance(value, str):
                return value.strip()
            return value
        states = []
        for record in contents["states"]:
            record = dict(zip(FIELD_MAP, record))
            record["reporting_time"] = contents["time"]
            # Choose only fields listed in the RETURN_SCHEMA
            sanitized = {}
            for key, value in record.items():
                if key not in RETURN_SCHEMA:
                    continue
                sanitized[key] = sanitize_value(value)
            states.append(sanitized)
        return FlowFileTransformResult(
            "success",
            contents=json.dumps(states)
        )    
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		10-12-2024
	
		
		11:11 AM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
				
		
	
		
					
							 @SAMSAL  - Can you help out with this? 
						
					
					... View more