2224
Posts
236
Kudos Received
83
Solutions
About
My expertise is not in hadoop but rather online communities, support and social media. Interests include: photography, travel, movies and watching sports.
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 984 | 05-07-2025 11:41 AM | |
| 10391 | 12-04-2023 09:38 AM | |
| 3896 | 06-29-2023 05:42 AM | |
| 3348 | 05-22-2023 07:03 AM | |
| 2321 | 05-22-2023 05:42 AM |
08-16-2023
06:57 AM
I set up the authorizers.xml file as you suggested and it's working perfectly, Thank you very much @MattWho !!
... View more
08-15-2023
06:23 AM
@Tenda Since you are saying you can freely navigate the NiFi UI when in this "stuck" state, NiFi is not stuck as both the UI and processor components all operate within the same JVM. What you circled indicates that at the exact moment (last time browser refreshed) there were 24 active threads out of the 32 configured in the Max Timer Driven Thread pool settings. Milliseconds later that could still be 24 active threads but consumed by different components. The NiFi processors will all show small a small number in the upper right corner if they have an active threads, so step one is determining which processors are holding these 24 threads for a long time. Then looking at those processors and the thread dumps to figure out why those threads are long running. Typically we would see this when external service connections are made which are unstable, network issues, local NiFi repo I/O, NiFi CPU utilization, or long or very frequent GC pauses, or even OOMs. So you have ruled out a few of these so far it sounds. If you found that the provided solution(s) assisted you with your query, please take a moment to login and click Accept as Solution below each response that helped. Thank you, Matt
... View more
06-30-2023
05:19 AM
@madhs Have you resolved your issue. If so, please mark the appropriate reply as the solution, as it will make it easier for others to find the answer in the future.
... View more
06-29-2023
11:30 AM
@scheeri FIFO is the default priority of a connection queue; however, if you have more then one concurrent task on a processor, multiple FlowFiles from the source connection can be executed about concurrently. This means that one of those concurrent execution may complete before the other leading to FlowFile no longer being in same order in follow-on connections. If you found that the provided solution(s) assisted you with your query, please take a moment to login and click Accept as Solution below each response that helped. Thank you, Matt
... View more
06-29-2023
06:12 AM
Welcome to the community @cpshah. I see that part 1 was posted in 2016 and I do not believe part 2 was created. That being said, maybe someone will see your post and work on a new article on the topic.
... View more
06-27-2023
01:54 PM
thanks it solved the problem
... View more
06-22-2023
07:44 AM
hi @rki_ / @cjervis , I forgot to ask, today the cluster already has thousands of blocks in hdfs, more than 23 million blocks. after configuring the rack in the cluster, the hdfs will recognize the racks and will start moving the blocks to the racks to increase the availability of the blocks or will i have to rebalance the hdfs?
... View more
06-15-2023
11:41 AM
This simple InvokeScriptedProcessor will look for a FlowFile attribute called "ip_address" and will attempt the reverse lookup and create a new attribute called "host_name" with the resolved value. import groovy.json.JsonOutput
import groovy.json.JsonSlurper
import java.net.InetAddress
import java.net.UnknownHostException
import java.nio.charset.StandardCharsets
import org.apache.commons.io.IOUtils
class GroovyProcessor implements Processor {
PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("BATCH_SIZE")
.displayName("Batch Size")
.description("The number of incoming FlowFiles to process in a single execution of this processor.")
.required(true)
.defaultValue("1000")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build()
Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description('FlowFiles that were successfully processed are routed here')
.build()
Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description('FlowFiles that were not successfully processed are routed here')
.build()
ComponentLog log
void initialize(ProcessorInitializationContext context) { log = context.logger }
Set<Relationship> getRelationships() { return [REL_FAILURE, REL_SUCCESS] as Set }
Collection<ValidationResult> validate(ValidationContext context) { null }
PropertyDescriptor getPropertyDescriptor(String name) { null }
void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { }
List<PropertyDescriptor> getPropertyDescriptors() { Collections.unmodifiableList([BATCH_SIZE]) as List<PropertyDescriptor> }
String getIdentifier() { null }
JsonSlurper jsonSlurper = new JsonSlurper()
JsonOutput jsonOutput = new JsonOutput()
def reverseDnsLookup(String ipAddress) {
try {
InetAddress inetAddress = InetAddress.getByName(ipAddress)
String hostName = inetAddress.getCanonicalHostName()
return hostName
} catch (UnknownHostException e) {
return "Unknown"
}
}
void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
ProcessSession session = sessionFactory.createSession()
try {
List<FlowFile> flowFiles = session.get(context.getProperty(BATCH_SIZE).asInteger())
if (!flowFiles) return
Map customAttributes = [:]
flowFiles.each { flowFile ->
String ipAddress = flowFile.getAttribute("ip_address")
if (ipAddress) {
String hostName = reverseDnsLookup(ipAddress)
customAttributes["host_name"] = hostName
flowFile = session.putAllAttributes(flowFile, customAttributes)
}
session.transfer(flowFile, REL_SUCCESS)
}
session.commit()
} catch (final Throwable t) {
log.error('{} failed to process due to {}; rolling back session', [this, t] as Object[])
session.rollback(true)
throw t
}
}
}
processor = new GroovyProcessor()
... View more
06-14-2023
02:33 PM
Does ExecuteSQL erase some of the attributes that could be used to associate the FlowFiles futher down stream?
... View more