Custom listener for Flume-ng null event

I am trying to write a custom shell for flume-ng. I looked at existing sinks and documentation and encoded them. However, the process () method, which should receive events, always ends with a null value. I am doing Event event = channel.take (); but the event is null. I see in the logs that this method is called multiple times since the event is still in the channel.

Can someone point me in the right direction?

+5
source share
2 answers

This is the skeleton of the process function ... If you did not receive the event, you rollback , change the status to BACKOFF . If you do not commit and set the status to READY . Regardless, you always close the transaction.

    Status status = null;
    Channel channel = getChannel();
    Transaction transaction = channel.getTransaction();
    transaction.begin();
    try {
        Event event = channel.take();

        if (event != null && validEvent(event.getBody()) >= 0) {
           # make some printing
        }
        transaction.commit();
        status = Status.READY;
    } catch (Throwable ex) {
        transaction.rollback();
        status = Status.BACKOFF;
        logger.error("Failed to deliver event. Exception follows.", ex);
        throw new EventDeliveryException("Failed to deliver event: " + ex);
    } finally {
        transaction.close();
    }
    return status;

I am sure this will work :).

+5
source

This is by design. The owner of the sink will interrogate the sink with events nullso that he can make sure that the sink is alive and ready to accept future events. When you receive an event null, make sure that you return Status.BACKOFF, and the processor subsystem will wait a bit before retrying.

+4
source

All Articles