Halo
发布于 2023-03-14 / 120 阅读 / 0 评论 / 0 点赞

redis sub

import redis
import time
import threading


class RedisListener(object):
    def __init__(self, channels: list,
                 host: str, port: int, db: int):
        self.__all_client = []
        self.__t = threading.Thread(target=self.__handler,
                                    args=(channels, host, port, db))
        self.__t.daemon = True
        self.__t.start()

    def register(self, client):
        self.__all_client.append(client)

    def __handler(self, channels: list, host: str, port: int, db: int):
        connection = redis.Redis(
            connection_pool=redis.ConnectionPool(host=host, port=port, db=db))
        pubsub = connection.pubsub()
        pubsub.psubscribe(channels)
        while True:
            msg = pubsub.get_message(timeout=None)
            if msg["type"] == "psubscribe":
                continue
            else:
                self.__send(msg["data"].decode("utf8"))
        connection.close()

    def __send(self, msg: str):
        for client in self.__all_client:
            try:
                client.send(msg)
            except Exception:
                self.__all_client.remove(client)


class MyClient(object):
    def send(self, msg: str):
        print(msg)


if __name__ == "__main__":
    rl = RedisListener(["*"], "10.60.2.114", 6379, 0)
    cli = MyClient()
    rl.register(cli)
    while True:
        time.sleep(0.1)


评论