APISender.py 2.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. # coding=utf-8
  2. import time
  3. from Service.VivoPushService.push_admin.APIConstants import Constants
  4. from Service.VivoPushService.push_admin.APISenderBase import Base
  5. from Service.VivoPushService.push_admin.APISignUtil import SignUtil
  6. _BROADCAST_TOPIC_MAX = 5
  7. _TOPIC_SPLITTER = ';$;'
  8. class APISender(Base):
  9. """
  10. 发送消息API(send push message class)
  11. 构造方法接收两个参数:
  12. @:param secret 必填 - APP_SECRET
  13. @:param token 可选 - authToken,发送消息时需带该参数以进行鉴权操作,调用鉴权接口获得
  14. """
  15. def get_token(self, app_id, app_key):
  16. timestamp = int(round(time.time() * 1000))
  17. sign = SignUtil(app_id, app_key, self.secret).sign_util(timestamp)
  18. get_token = {'appId': app_id, 'appKey': app_key, 'timestamp': timestamp, 'sign': sign}
  19. return self._try_http_request(Constants.request_path.GET_TOKEN, retry_times=3, **get_token)
  20. def send(self, push_message, retry_times=3):
  21. """
  22. 发送单推消息
  23. :param push_message: 消息体(请求参数对象)
  24. :param retry_times: 重试次数
  25. """
  26. return self._try_http_request(Constants.request_path.PUSH_TO_SINGLE, retry_times, **push_message)
  27. def save_list_payload(self, push_message, retry_times=3):
  28. """
  29. 保存群推消息
  30. :param push_message: 消息体(请求参数对象)
  31. :param retry_times: 重试次数
  32. """
  33. return self._try_http_request(Constants.request_path.SAVE_LIST_PAYLOAD, retry_times, **push_message)
  34. def send_to_list(self, target_push_message, retry_times=3):
  35. """
  36. 推送群推消息
  37. :param target_push_message: 消息体(请求参数对象)
  38. :param retry_times: 重试次数
  39. """
  40. return self._try_http_request(Constants.request_path.PUSH_TO_LIST, retry_times, **target_push_message)
  41. def send_to_all(self, push_message, retry_times=3):
  42. """
  43. 发送全推消息
  44. :param push_message: 消息体(请求参数对象)
  45. :param retry_times: 重试次数
  46. """
  47. return self._try_http_request(Constants.request_path.PUSH_TO_ALL, retry_times, **push_message)
  48. def send_to_tag(self, push_message, retry_times=3):
  49. """
  50. 发送标签推消息
  51. :param push_message: 消息体(请求参数对象)
  52. :param retry_times: 重试次数
  53. """
  54. return self._try_http_request(Constants.request_path.PUSH_TO_TAG,retry_times,**push_message)
  55. def get_statistics(self, taskids_message, retry_times=3):
  56. """
  57. 发送全推消息
  58. :param taskids_message: 消息体(请求参数对象)
  59. :param retry_times: 重试次数
  60. """
  61. return self._try_http_request(Constants.request_path.GET_STATISTICS, retry_times, Constants.__HTTP_GET__,
  62. **taskids_message)