Support Questions

Find answers, ask questions, and share your expertise

python extension generate multiple flowfiles from bytes input

avatar
Super Guru

Hi,

Im trying to write my own python extension that takes an excel bytes and produce an output flowfile for each sheet represented in html. Im using pandas dataframe for the conversion. I have been looking to the python extension developer guide but I cant find anything that will point me in the right direction:

https://nifi.apache.org/documentation/nifi-2.0.0-M2/html/python-developer-guide.html#record-transfor...

The TransformRecord talks about partitioning flowfile input but it seems like it has to be nifi readable format (json, csv , avro...etc).

In executescript processor you could have easily generated multiple flow files by passing array of flowfiles to the session.transfer method:

https://community.cloudera.com/t5/Support-Questions/Split-one-Nifi-flow-file-into-Multiple-flow-file...

However neither RecordTransformResult  nor FlowFileTransformResult can do that.

Can someone provide me with the code if applicable.

Thanks

 

5 REPLIES 5

avatar

@SAMSAL were you able to figure this out somehow?

avatar
Super Guru

Hi @AsifArmanRahman ,

No I have not figured it out and I dont believe its possible with the current implementation. May I ask what is that you are trying to do ?

avatar

I want to achieve something similar, to produce multiple flowfiles based on sheets in an excel file, but I also further want to separate one flowfiles data based on type of the data. For example in an array of JSON it can have a key 'type' and based on its value, I want to divide one flowfiles into type based multiple flowfiles.

avatar
New Contributor

Hi,

I found a file "api/nifi api/record transform.py" where it shows how the stream data is partitioned. I did a test and apparently it is possible to partition the data into several different output flowfiles:

I made this adaptation:

 
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import json
from abc import ABC, abstractmethod
from nifiapi.properties import ProcessContext
from nifiapi.__jvm__ import JvmHolder

class RecordTransform(ABC‌‌
# These will be set by the PythonProcessorAdapter when the component is created
identifier = None
logger = None

def __init__(self‌‌
self.arrayList = JvmHolder.jvm.java.util.ArrayList

def setContext(self, context‌‌
self.process_context = ProcessContext(context)

def transformRecord(self, jsonarray, schema, attributemap‌‌
parsed_array = [
{
"example": "23122",
"company": "1htr23",
"Itemid": "12gersdg3",
"price": "12wgsww3"
},
{
"example": "13333123",
"company": "123",
"Itemid": "123",
"price": "12www3"
},
{
"example": "12hgerg3",
"company": "12gdgdg3",
"Itemid": "12rwerwer3",
"price": "12wrwwww3"
}
]
# parsed_array = json.dumps(jsonarray)
results = self.arrayList()
caching_attribute_map = CachingAttributeMap(attributemap)

for record in parsed_array:
output_records = self.transform(self.process_context, record, schema, caching_attribute_map)

if isinstance(output_records, list‌‌
for output_record in output_records:
results.add(__RecordTransformResult__(output_record, json.dumps(output_record.getRecord())))
else:
results.add(__RecordTransformResult__(output_records, json.dumps(output_records.getRecord())))

return results


@abstractmethod
def transform(self, context, record, schema, attributemap‌‌
pass


class CachingAttributeMap:
cache = None

def __init__(self, delegate‌‌
self.delegate = delegate

def getAttribute(self, attributeName‌‌
# Lazily initialize cache
if self.cache is None:
self.cache = {}
if attributeName in self.cache:
return self.cache[attributeName]

value = self.delegate.getAttribute(attributeName)
self.cache[attributeName] = value
return value

def getAttributes(self‌‌
return self.delegate.getAttributes()


class __RecordTransformResult__:
class Java:
implements = ['org.apache.nifi.python.processor.RecordTransformResult']

def __init__(self, processor_result, recordJson‌‌
self.processor_result = processor_result
self.recordJson = recordJson

def getRecordJson(self‌‌
return self.recordJson

def getSchema(self‌‌
return self.processor_result.schema

def getRelationship(self‌‌
return self.processor_result.relationship

def getPartition(self‌‌
return self.processor_result.partition



class RecordTransformResult:

def __init__(self, record=None, schema=None, relationship="success", partition=None‌‌
self.record = record
self.schema = schema
self.relationship = relationship
self.partition = partition

def getRecord(self‌‌
return self.record

def getSchema(self‌‌
return self.schema

def getRelationship(self‌‌
return self.relationship

def getPartition(self‌‌
return self.partition

avatar

Even with the M3 release, no updates were made for this one either. Two of the custom processors I'm trying to implement relies on this very thing. Really hope it'll be brought to attention and available in M4 release.