How to write Kafka from Python logging module?

I have a large complex application that makes heavy use of the Python logging module.

I need to start getting these logs into the Kafka cluster, and you need to make sure that I am not modifying the data along the way.

For me, the ideal solution is simply to create a new handler for Kafka - and allow the logs to go to the old logging and kafka solution for a while. Then, ultimately, disable the old handlers and just send to Kafka.

However, I do not see any Kafka Kafka handlers - only kafka clients. Adding a kafka client would mean keeping track of all current registration calls and adding a separate call to the new kafka client. Obtaining identical results will be difficult.

+3
source share
3 answers

The implementation of the handler is very simple. In fact, setting up the environment took longer than implementing the handler.

The handler constructor accepts an optional argument key. If provided, written messages will be sent to one section indicated by this key. If this is not provided, messages will be distributed among the servers on a round-robin basis.

I have not tested this much, but it is so simple that I do not see what could go wrong. Hope this will be helpful.

from kafka.client import KafkaClient
from kafka.producer import SimpleProducer,KeyedProducer
import logging,sys

class KafkaLoggingHandler(logging.Handler):

    def __init__(self, host, port, topic, key=None):
        logging.Handler.__init__(self)
        self.kafka_client = KafkaClient(host, port)
        self.key = key
        if key is None:
            self.producer = SimpleProducer(self.kafka_client, topic)
        else:
            self.producer = KeyedProducer(self.kafka_client, topic)

    def emit(self, record):
        #drop kafka logging to avoid infinite recursion
        if record.name == 'kafka':
            return
        try:
            #use default formatting
            msg = self.format(record)
            #produce message
            if self.key is None:
                self.producer.send_messages(msg)
            else:
                self.producer.send(self.key, msg)
        except:
            import traceback
            ei = sys.exc_info()
            traceback.print_exception(ei[0], ei[1], ei[2], None, sys.stderr)
            del ei

    def close(self):
        self.producer.stop()
        logging.Handler.close(self)

kh = KafkaLoggingHandler("localhost", 9092, "test_log")
#OR
#kh = KafkaLoggingHandler("localhost", 9092, "test_log", "key1")

logger = logging.getLogger("")
logger.setLevel(logging.DEBUG)
logger.addHandler(kh)
logger.info("The %s boxing wizards jump %s", 5, "quickly")
logger.debug("The quick brown %s jumps over the lazy %s", "fox",  "dog")
try:
    import math
    math.exp(1000)
except:
    logger.exception("Problem with %s", "math.exp")

PS The handler uses this Kafka client: https://github.com/mumrah/kafka-python

+14
source

, ! , . , , .

SimpleProducer (deprecated) --> KafkaProducer
SimpleConsumer (deprecated) --> KafkaConsumer

, Kafka 1.0.0 kafka-python 1.4.2 , logstash .

, !

tester.py ( )

# -*- coding: utf-8 -*-
"""Module to test out logging to kafka."""

import json
import logging

from utils.kafka_handler import KafkaHandler
from kafka import KafkaProducer


def run_it(logger=None):
    """Run the actual connections."""

    logger = logging.getLogger(__name__)
    # enable the debug logger if you want to see ALL of the lines
    #logging.basicConfig(level=logging.DEBUG)
    logger.setLevel(logging.DEBUG)

    kh = KafkaHandler(['localhost:9092'], 'sebtest')
    logger.addHandler(kh)

    logger.info("I'm a little logger, short and stout")
    logger.debug("Don't tase me bro!")


if __name__ == "__main__":
    run_it()

utils/kafka_handler.py ( )

# -*- coding: utf-8 -*-
"""Module to provide kafka handlers for internal logging facility."""

import json
import logging
import sys

from kafka import KafkaProducer


class KafkaHandler(logging.Handler):
    """Class to instantiate the kafka logging facility."""

    def __init__(self, hostlist, topic='corp_it_testing', tls=None):
        """Initialize an instance of the kafka handler."""
        logging.Handler.__init__(self)
        self.producer = KafkaProducer(bootstrap_servers=hostlist,
                                      value_serializer=lambda v: json.dumps(v).encode('utf-8'),
                                      linger_ms=10)
        self.topic = topic

    def emit(self, record):
        """Emit the provided record to the kafka_client producer."""
        # drop kafka logging to avoid infinite recursion
        if 'kafka.' in record.name:
            return

        try:
            # apply the logger formatter
            msg = self.format(record)
            self.producer.send(self.topic, {'message': msg})
            self.flush(timeout=1.0)
        except Exception:
            logging.Handler.handleError(self, record)

    def flush(self, timeout=None):
        """Flush the objects."""
        self.producer.flush(timeout=timeout)

    def close(self):
        """Close the producer and clean up."""
        self.acquire()
        try:
            if self.producer:
                self.producer.close()

            logging.Handler.close(self)
        finally:
            self.release()
+2

I use your code for kafkahandler, but I have a problem with kafka, while comsuming data. Kafka cannot read the spec with utf-8.

Can you get any suggestion on these issues, please.

thanks in advance

0
source

All Articles