Created on 02-03-2024 12:23 PM - edited 02-03-2024 12:24 PM
Hi,
Im trying to write my own python extension that takes an excel bytes and produce an output flowfile for each sheet represented in html. Im using pandas dataframe for the conversion. I have been looking to the python extension developer guide but I cant find anything that will point me in the right direction:
The TransformRecord talks about partitioning flowfile input but it seems like it has to be nifi readable format (json, csv , avro...etc).
In executescript processor you could have easily generated multiple flow files by passing array of flowfiles to the session.transfer method:
However neither RecordTransformResult nor FlowFileTransformResult can do that.
Can someone provide me with the code if applicable.
Thanks
Created 05-08-2024 08:44 AM
@SAMSAL were you able to figure this out somehow?
Created 05-08-2024 09:01 AM
Hi @AsifArmanRahman ,
No I have not figured it out and I dont believe its possible with the current implementation. May I ask what is that you are trying to do ?
Created 05-09-2024 06:35 AM
I want to achieve something similar, to produce multiple flowfiles based on sheets in an excel file, but I also further want to separate one flowfiles data based on type of the data. For example in an array of JSON it can have a key 'type' and based on its value, I want to divide one flowfiles into type based multiple flowfiles.
Created on 05-10-2024 10:42 AM - edited 05-10-2024 10:45 AM
Hi,
I found a file "api/nifi api/record transform.py" where it shows how the stream data is partitioned. I did a test and apparently it is possible to partition the data into several different output flowfiles:
I made this adaptation:
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
from abc import ABC, abstractmethod
from nifiapi.properties import ProcessContext
from nifiapi.__jvm__ import JvmHolder
class RecordTransform(ABC
# These will be set by the PythonProcessorAdapter when the component is created
identifier = None
logger = None
def __init__(self
self.arrayList = JvmHolder.jvm.java.util.ArrayList
def setContext(self, context
self.process_context = ProcessContext(context)
def transformRecord(self, jsonarray, schema, attributemap
parsed_array = [
{
"example": "23122",
"company": "1htr23",
"Itemid": "12gersdg3",
"price": "12wgsww3"
},
{
"example": "13333123",
"company": "123",
"Itemid": "123",
"price": "12www3"
},
{
"example": "12hgerg3",
"company": "12gdgdg3",
"Itemid": "12rwerwer3",
"price": "12wrwwww3"
}
]
# parsed_array = json.dumps(jsonarray)
results = self.arrayList()
caching_attribute_map = CachingAttributeMap(attributemap)
for record in parsed_array:
output_records = self.transform(self.process_context, record, schema, caching_attribute_map)
if isinstance(output_records, list
for output_record in output_records:
results.add(__RecordTransformResult__(output_record, json.dumps(output_record.getRecord())))
else:
results.add(__RecordTransformResult__(output_records, json.dumps(output_records.getRecord())))
return results
@abstractmethod
def transform(self, context, record, schema, attributemap
pass
class CachingAttributeMap:
cache = None
def __init__(self, delegate
self.delegate = delegate
def getAttribute(self, attributeName
# Lazily initialize cache
if self.cache is None:
self.cache = {}
if attributeName in self.cache:
return self.cache[attributeName]
value = self.delegate.getAttribute(attributeName)
self.cache[attributeName] = value
return value
def getAttributes(self
return self.delegate.getAttributes()
class __RecordTransformResult__:
class Java:
implements = ['org.apache.nifi.python.processor.RecordTransformResult']
def __init__(self, processor_result, recordJson
self.processor_result = processor_result
self.recordJson = recordJson
def getRecordJson(self
return self.recordJson
def getSchema(self
return self.processor_result.schema
def getRelationship(self
return self.processor_result.relationship
def getPartition(self
return self.processor_result.partition
class RecordTransformResult:
def __init__(self, record=None, schema=None, relationship="success", partition=None
self.record = record
self.schema = schema
self.relationship = relationship
self.partition = partition
def getRecord(self
return self.record
def getSchema(self
return self.schema
def getRelationship(self
return self.relationship
def getPartition(self
return self.partition
Created 05-28-2024 05:44 AM
Even with the M3 release, no updates were made for this one either. Two of the custom processors I'm trying to implement relies on this very thing. Really hope it'll be brought to attention and available in M4 release.