Support Questions

Find answers, ask questions, and share your expertise

Nifi - BulletinRepository API returns maximum 5 bulletins (errors/info/warn) per component

avatar
Expert Contributor

Hi,

I have written a ReportingTask service for Nifi, where I use 'BulletinRepository.findBulletins(queryProcessor);' to retrieve all bulletins and report metrics regarding number of errors/warns/info. The issue is that I am receiving at-max 5 bulletins/messages per component though more messages are viewable through the Bulletin UI.

Following is how I construct and execute my query:

BulletinRepository repository =  context.getBulletinRepository();
final BulletinQuery queryProcessor= new BulletinQuery.Builder().
		sourceType(ComponentType.PROCESSOR).
		limit(500).build();
bulletinsList.addAll(repository.findBulletins(queryProcessor)); 

How do I get ALL the messages from Bulletin repository rather then just receiving 5?

Thanks

Obaid

1 ACCEPTED SOLUTION

avatar
Master Guru

Here is your answer:

public class VolatileBulletinRepository implements BulletinRepository {
private static final int CONTROLLER_BUFFER_SIZE = 10;
private static final int COMPONENT_BUFFER_SIZE = 5;

The limit for bulletins returned is set by the component buffer, which is hard coded to 5.

https://github.com/apache/nifi/blob/d838f61291d2582592754a37314911b701c6891b/nifi-nar-bundles/nifi-f...

https://github.com/apache/nifi/blob/7f5eabd603bfc326dadc35590bbe69304e8c90fa/nifi-nar-bundles/nifi-f...

I am not sure who is setting that limit, but that may be 5 somewhere.

final BulletinQuery.Builder queryBuilder = new BulletinQuery.Builder()
.groupIdMatches(query.getGroupId())
.sourceIdMatches(query.getSourceId())
.nameMatches(query.getName())
.messageMatches(query.getMessage())
.after(query.getAfter())
.limit(query.getLimit());
// perform the query
final List<Bulletin> results = bulletinRepository.findBulletins(queryBuilder.build());

https://github.com/apache/nifi/blob/2d6bba080f90a45a9f4149f6844f452150ed6bc1/nifi-nar-bundles/nifi-f...

@QueryParam("limit") IntegerParameterlimit) throwsInterruptedException {

Have you traced it. Maybe 500 is too big.

View solution in original post

10 REPLIES 10

avatar
Master Guru

Here is your answer:

public class VolatileBulletinRepository implements BulletinRepository {
private static final int CONTROLLER_BUFFER_SIZE = 10;
private static final int COMPONENT_BUFFER_SIZE = 5;

The limit for bulletins returned is set by the component buffer, which is hard coded to 5.

https://github.com/apache/nifi/blob/d838f61291d2582592754a37314911b701c6891b/nifi-nar-bundles/nifi-f...

https://github.com/apache/nifi/blob/7f5eabd603bfc326dadc35590bbe69304e8c90fa/nifi-nar-bundles/nifi-f...

I am not sure who is setting that limit, but that may be 5 somewhere.

final BulletinQuery.Builder queryBuilder = new BulletinQuery.Builder()
.groupIdMatches(query.getGroupId())
.sourceIdMatches(query.getSourceId())
.nameMatches(query.getName())
.messageMatches(query.getMessage())
.after(query.getAfter())
.limit(query.getLimit());
// perform the query
final List<Bulletin> results = bulletinRepository.findBulletins(queryBuilder.build());

https://github.com/apache/nifi/blob/2d6bba080f90a45a9f4149f6844f452150ed6bc1/nifi-nar-bundles/nifi-f...

@QueryParam("limit") IntegerParameterlimit) throwsInterruptedException {

Have you traced it. Maybe 500 is too big.

avatar
Expert Contributor

UPDATE: FOLLOWING SOLUTION DOSENT WORKS : We always get at-max 5 messages,

Thanks @Timothy Spann:

Here is what I found out (there is a bulletin id maintained for each bulletin messages, and it is always increasing. By using .after(id) I am able to fetch all messages in repeated calls:

/**
 * Retrieve bulletins from ButtetinRepository
 * These bulletin messages are used to generate system health metrics (Errors/Warns/Info)
 * @param context
 * @param previousBulletinId
 * @param maxBulletins
 * @return
 */
public static List<Bulletin> findBulletins(ReportingContext context, ComponentType componentType, long previousBulletinId, final int maxBulletins){
ArrayList<Bulletin> bulletinsList = new ArrayList<>();
BulletinRepository repository =  context.getBulletinRepository();
int bulletinsFound = 0;
	do{
		bulletinsFound = 0;
		final BulletinQuery queryProcessor= new BulletinQuery.Builder().sourceType(componentType).after(previousBulletinId).build();
		List<Bulletin> bulletinsThisQuery = repository.findBulletins(queryProcessor);
		if(bulletinsThisQuery != null && bulletinsThisQuery.size() > 0){
			bulletinsFound = bulletinsThisQuery.size();
			previousBulletinId = bulletinsThisQuery.get(0).getId(); /** Retrieve bulletin id*/
			bulletinsList.addAll(bulletinsThisQuery); 
		}
	}
	while(bulletinsFound > 0 && bulletinsList.size() < maxBulletins);
	return bulletinsList;
}

avatar
Master Guru

Sweet. You should write an article, that's good to know.

avatar
Expert Contributor

Ok, actually I verified that I am still getting total of 5 messages, no matter how many times I call it, apologies !

So now, I need to re-think how I monitor errors. Do you think through Rest API I would be able to get all error messages?

Or should I monitor logs for errors?

avatar
Master Guru

Monitor logs and REST API are good.

avatar
Rising Star

This is by design. A bulletin is simply a notification about a passing event. To get access to every event you'd either need to monitor the log files or implement a custom Reporting Task to exfil the bulletins as they occur. The reporting context provided to the task has access to the bulletin repository. We have a similar Reporting Task for sending provenance events [1].

[1] https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-...

avatar
Expert Contributor

Thanks mgilman:

Yes that is what I am trying to achieve, a ReportingTask that is fetching events from BulletinRepository. The issue is that it always returns at-max 5, so I cannot report all arrors (assume you have more then 5 errors for a processor etc) so which API should I use to extract all bulletins? essentially, how to do "exfil the bulletins as they occur"? It would be great if you could elaborate, thanks

avatar
Rising Star

The Reporting Task would follow the same pattern as the REST API. The only difference is that you'll be able to schedule it to run much more frequently than you could poll the REST API. You do need to maintain some local state, specifically the last bulletin seen. Then in your on trigger you can findBulletins starting from that id. Update the last bulletin seen. Repeat in the next onTrigger.

Since we're limited to 5 per component, that means that this will need to run at least once before the component is able to generate more than 5 bulletins. The REST API follows a similar patten for the Bulletin Board but it's not guaranteed to pull back every single bulletin. It is guaranteed to pull back up to the last 5. Once the next bulletin is triggered, the oldest one falls out of scope. Polling the REST API will definitely be slower than running a Report Task and will increase the risk of possibly missing one.

Alternatively you can scrap the logs. The bulletins are essentially notifications of log messages.

avatar
Expert Contributor

Thanks @mgilman

That was helpful. So for ReportingTask, how frequent should I run it? Lets say I am running every 10 seconds, and we receive 100 errors/Warn/info bulletins every minute (just for example), do you think we might loose messages, are we guaranteed to receive all bulletins?