import redis from AnsjerPush.config import REDIS_ADDRESS, CONFIG_INFO, CONFIG_US # 本地调试把注释打开 # REDIS_ADDRESS = '127.0.0.1' class RedisObject: def __init__(self, db=0): if CONFIG_INFO == CONFIG_US: password = '012bzaKUhleHc2645465' pool = redis.ConnectionPool(host=REDIS_ADDRESS, password=password, port=6379, db=db) else: pool = redis.ConnectionPool(host=REDIS_ADDRESS, port=6379, db=db) self.CONN = redis.Redis(connection_pool=pool) def set_data(self, key, val, expire=0): try: self.CONN.set(key, val) if expire > 0: self.CONN.expire(key, expire) except Exception as e: return False else: return True def get_data(self, key): try: val = self.CONN.get(key) except Exception as e: print(repr(e)) return False else: if val: return val.decode('utf-8') else: return False def del_data(self, key): try: val = self.CONN.delete(key) except Exception as e: print(repr(e)) return False else: return True def get_size(self): return self.CONN.dbsize() # 向列表插入数据 def rpush(self, name, val): self.CONN.rpush(name, val) # 获取列表长度 def llen(self, name): return self.CONN.llen(name=name) # 获取列表所有数据 def lrange(self, name, start, end): return self.CONN.lrange(name, start, end) def get_ttl(self, key): ttl = self.CONN.ttl(key) if ttl: return ttl else: return 0 def ltrim(self, name, start, end): """ 对一个列表进行修剪(trim),让列表只保留指定区间内的元素,不在指定区间之内的元素都将被删除 @param name: 列表名称 @param start: 区间开始下标, 0:第一个元素 @param end: 区间结束下标, -1:最后一个元素 @return : bool """ return self.CONN.ltrim(name, start, end) def lindex(self, name, index=-1): """ 根据下标查找元素 @param name: 列表名称 @param index: 区间结束下标, -1:最后一个元素 @return : bool """ return self.CONN.lindex(name, index) def close(self): """ 关闭连接 """ return self.CONN.close() def hset_data(self, key, field, value, expire=0): """ 新增:存储 Hash 数据 """ try: self.CONN.hset(key, field, value) if expire > 0: self.CONN.expire(key, expire) return True except Exception as e: print(f"Redis hset error: {repr(e)}") return False def hget_data(self, key, field): """ 新增:获取 Hash 数据 """ try: val = self.CONN.hget(key, field) return val.decode('utf-8') if val else None except Exception as e: print(f"Redis hget error: {repr(e)}") return None