1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798 |
- import redis
- from redis.connection import SSLConnection
- from AnsjerPush.config import REDIS_ADDRESS, OCI_REDIS_NODE_PRIMARY, OCI_REDIS_NODE_READ, CONFIG_INFO, CONFIG_US
- # 本地调试把注释打开
- # REDIS_ADDRESS = '127.0.0.1'
- class RedisObject:
- def __init__(self, mode='w', db=0):
- if CONFIG_INFO != CONFIG_US:
- self.POOL = redis.ConnectionPool(host=REDIS_ADDRESS, port=6379, db=db)
- self.CONN = redis.Redis(connection_pool=self.POOL)
- else:
- if mode == 'w':
- host = OCI_REDIS_NODE_PRIMARY
- else:
- host = OCI_REDIS_NODE_READ
- self.POOL = redis.ConnectionPool(connection_class=SSLConnection, host=host, port=6379, db=db)
- self.CONN = redis.StrictRedis(
- connection_pool=self.POOL,
- host=host,
- ssl=True,
- ssl_cert_reqs=None,
- )
- def set_data(self, key, val, expire=0):
- try:
- self.CONN.set(key, val)
- if expire > 0:
- self.CONN.expire(key, expire)
- except Exception as e:
- return False
- else:
- return True
- def get_data(self, key):
- try:
- val = self.CONN.get(key)
- except Exception as e:
- print(repr(e))
- return False
- else:
- if val:
- return val.decode('utf-8')
- else:
- return False
- def del_data(self, key):
- try:
- val = self.CONN.delete(key)
- except Exception as e:
- print(repr(e))
- return False
- else:
- return True
- def get_size(self):
- return self.CONN.dbsize()
- # 向列表插入数据
- def rpush(self, name, val):
- self.CONN.rpush(name, val)
- # 获取列表长度
- def llen(self, name):
- return self.CONN.llen(name=name)
- # 获取列表所有数据
- def lrange(self, name, start, end):
- return self.CONN.lrange(name, start, end)
- def get_ttl(self, key):
- ttl = self.CONN.ttl(key)
- if ttl:
- return ttl
- else:
- return 0
- def ltrim(self, name, start, end):
- """
- 对一个列表进行修剪(trim),让列表只保留指定区间内的元素,不在指定区间之内的元素都将被删除
- @param name: 列表名称
- @param start: 区间开始下标, 0:第一个元素
- @param end: 区间结束下标, -1:最后一个元素
- @return : bool
- """
- return self.CONN.ltrim(name, start, end)
- def lindex(self, name, index=-1):
- """
- 根据下标查找元素
- @param name: 列表名称
- @param index: 区间结束下标, -1:最后一个元素
- @return : bool
- """
- return self.CONN.lindex(name, index)
|