we consider to implement a custom processor for some complex transformation logic on kafka streams. The flow is straightforward : input topic -> our processor -> output topics The processor logic needs to be initialized by data provided through a rest service and/or database records. What is the NiFi way to implement the initialization? As we implement the processor, we can do lots of stuff in e.g. init method, however, we would prefer to use NiFi infrastructure reasonably.
Could you please recommend some examples, blogs etc. for similar solution.
I am not completely clear what you mean by "needs to be initialized by data".
NiFi Processor components transfer FlowFiles between processors via connections. Those connections can consist of one or more relationships. Relationships are defined by each processor components code.
There are many stock processors for ingesting data (for example: ListenTCP, ListenHTTP, QueryDatabase*, SelectHive*, etc...).
From an input Processor component the FlowFile would be routed if successful to the "success relationship. That "success" relationship would be routed via a connection as input to your custom processor. Your custom processor code would then need to pull FlowFile(s) from the inbound connection(s) queue, process it and then place the resulting FlowFile on one or more relationships defined by your processor code based on the outcome of that processing.
*** Note: Regardless of what you read in the above blogs, keep in mind the following: 1. Do NOT add your custom nar to the default NiFi lib directory. It is advisable that you define a custom lib directory in the nifi.properties file just for your custom components. Refer to the Apache NiFI Admin Guide for more detail: https://community.cloudera.com/t5/Community-Articles/Build-Custom-Nifi-Processor/ta-p/244734 2. Avoid building more functionality then needed in to a single processor component. It makes reuse in different use case harder.
Hi, @MattWho we actually already implemented prototypes of such custom processors and the basic principles are clear. By processor logic, I did not mean the structure of the processor like relationships. The structure is static in our case. By the processor logic, I refer, the code which is called in "onTrigger" in "StreamCallback.process". In my case, the logic can be parametrized by some configuration data (These configuration data is provided by a service or database tables). The configuration data is static during the whole processor run and must be provided during instantiation of the business logic object. You might see it as lookup, which must not happen during the processing "onTrigger", but "onScheduled" (sorry I wrote in "init" previously, that was not right)