基于消息队列的非阻塞式将日志持久化到数据库
pip install -U https://github.com/xiaodongxiexie/log2db/tree/release
or
git clone https://github.com/xiaodongxiexie/log2db
pip install .
import logging
from log2db import init, Proxy, LoggingMQHandler
# 设置存储数据库链接
# sqlite
Proxy.DBURL = "sqlite:///{}".format("mylogger.db")
#MySQL
Proxy.DBURL = "mysql+pymysql://username:[email protected]:3306/your-database"
# 重建库表
# _ = init(rebuild=True)
# 存在则复用,否则重建
_ = init()
logger = logging.getLogger("log2db")
logger.parent = None
handler = LoggingMQHandler(logging.INFO)
logger.addHandler(handler)
if __name__ == '__main__':
logger.info("this is a test", extra={"user": "none"})
logger.warning("this is a warn %s", "ohuo")
try:
1/0
except:
logger.exception("this is a exception", exc_info=True)
import logging
import redis as _redis
from log2db import init, Proxy, LoggingMQHandler
from log2db.mq import redis
from log2db.mq import activemq
# 使用redis作为消息队列(程序异常崩溃后日志数据不影响)
# redis 设置host, port, password等
redis.RadisMQ.storage = _redis.Redis(host="", port=6379, password=None, decode_responses=True)
LoggingMQHandler.mq = redis.RedisMQ()
# 使用activemq作为消息队列
# LoggingMQHandler.mq = activemq.ActiveMQ()
# 设置存储数据库链接
# sqlite
Proxy.DBURL = "sqlite:///{}".format("mylogger.db")
#MySQL
Proxy.DBURL = "mysql+pymysql://username:[email protected]:3306/your-database"
_ = init()
logger = logging.getLogger("log2db")
logger.parent = None
handler = LoggingMQHandler(logging.INFO)
logger.addHandler(handler)
if __name__ == '__main__':
import random, time
from log2db import Logging
session = _.session
logger.info("this is a test", extra={"user": "none"})
logger.warning("this is a warn %s", "ohuo")
try:
1/0
except:
logger.exception("this is a exception", exc_info=True)
print(session.query(Logging).count())
for i in range(1000):
random.choice([logger.info, logger.warning, logger.exception, logger.debug])(i)
print(session.query(Logging).count())
# time.sleep(3)
print(session.query(Logging).count())