_app.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. # -*- coding: utf-8 -*-
  2. #
  3. # Copyright 2020. Huawei Technologies Co., Ltd. All rights reserved.
  4. #
  5. # Licensed under the Apache License, Version 2.0 (the "License");
  6. # you may not use this file except in compliance with the License.
  7. # You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS,
  13. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. import json
  17. import time
  18. import urllib
  19. import urllib.parse
  20. from Service.HuaweiPushService.push_admin import _http
  21. from Service.HuaweiPushService.push_admin import _message_serializer
  22. class App(object):
  23. """application for HW Cloud Message(HCM)"""
  24. JSON_ENCODER = _message_serializer.MessageSerializer()
  25. @classmethod
  26. def _send_to_server(cls, headers, body, url, verify_peer=False):
  27. try:
  28. msg_body = json.dumps(body)
  29. response = _http.post(url, msg_body, headers, verify_peer)
  30. if response.status_code is not 200:
  31. raise ApiCallError('http status code is {0} in send.'.format(response.status_code))
  32. # json text to dict
  33. resp_dict = json.loads(response.text)
  34. return resp_dict
  35. except Exception as e:
  36. raise ApiCallError('caught exception when send. {0}'.format(e))
  37. def __init__(self, appid_at, app_secret_at, appid_push, token_server='https://oauth-login.cloud.huawei.com/oauth2/v3/token',
  38. push_open_url='https://push-api.cloud.huawei.com'):
  39. """class init"""
  40. self.app_id_at = appid_at
  41. self.app_secret_at = app_secret_at
  42. if appid_push is None:
  43. self.appid_push = appid_at
  44. else:
  45. self.appid_push = appid_push
  46. self.token_expired_time = 0
  47. self.access_token = None
  48. self.token_server = token_server
  49. self.push_open_url = push_open_url
  50. self.hw_push_server = self.push_open_url + "/v1/{0}/messages:send"
  51. self.hw_push_topic_sub_server = self.push_open_url + "/v1/{0}/topic:subscribe"
  52. self.hw_push_topic_unsub_server = self.push_open_url + "/v1/{0}/topic:unsubscribe"
  53. self.hw_push_topic_query_server = self.push_open_url + "/v1/{0}/topic:list"
  54. def _refresh_token(self, verify_peer=False):
  55. """refresh access token
  56. :param verify_peer: (optional) Either a boolean, in which case it controls whether we verify
  57. the server's TLS certificate, or a string, in which case it must be a path
  58. to a CA bundle to use. Defaults to ``True``.
  59. """
  60. headers = dict()
  61. headers['Content-Type'] = 'application/x-www-form-urlencoded;charset=utf-8'
  62. params = dict()
  63. params['grant_type'] = 'client_credentials'
  64. params['client_secret'] = self.app_secret_at
  65. params['client_id'] = self.app_id_at
  66. msg_body = urllib.parse.urlencode(params)
  67. try:
  68. response = _http.post(self.token_server, msg_body, headers, verify_peer=verify_peer)
  69. if response.status_code is not 200:
  70. return False, 'http status code is {0} in get access token'.format(response.status_code)
  71. """ json string to directory """
  72. response_body = json.loads(response.text)
  73. self.access_token = response_body.get('access_token')
  74. self.token_expired_time = int(round(time.time() * 1000)) + (int(response_body.get('expires_in')) - 5 * 60) * 1000
  75. return True, None
  76. except Exception as e:
  77. raise ApiCallError(format(repr(e)))
  78. def _is_token_expired(self):
  79. """is access token expired"""
  80. if self.access_token is None:
  81. """ need refresh token """
  82. return True
  83. return int(round(time.time() * 1000)) >= self.token_expired_time
  84. def _update_token(self, verify_peer=False):
  85. """
  86. :param verify_peer: (optional) Either a boolean, in which case it controls whether we verify
  87. the server's TLS certificate, or a string, in which case it must be a path
  88. to a CA bundle to use. Defaults to ``True``.
  89. :return:
  90. """
  91. if self._is_token_expired() is True:
  92. result, reason = self._refresh_token(verify_peer)
  93. if result is False:
  94. raise ApiCallError(reason)
  95. def _create_header(self):
  96. headers = dict()
  97. headers['Content-Type'] = 'application/json;charset=utf-8'
  98. headers['Authorization'] = 'Bearer {0}'.format(self.access_token)
  99. return headers
  100. def send(self, message, validate_only, **kwargs):
  101. """
  102. Sends the given message Huawei Cloud Messaging (HCM)
  103. :param message: JSON format message
  104. :param validate_only: validate message format or not
  105. :param kwargs:
  106. verify_peer: HTTPS server identity verification, use library 'certifi'
  107. :return:
  108. response dict: response body dict
  109. :raise:
  110. ApiCallError: failure reason
  111. """
  112. verify_peer = kwargs['verify_peer']
  113. self._update_token(verify_peer)
  114. headers = self._create_header()
  115. url = self.hw_push_server.format(self.appid_push)
  116. msg_body_dict = dict()
  117. msg_body_dict['validate_only'] = validate_only
  118. msg_body_dict['message'] = App.JSON_ENCODER.default(message)
  119. return App._send_to_server(headers, msg_body_dict, url, verify_peer)
  120. def subscribe_topic(self, topic, token_list):
  121. """
  122. :param topic: The specific topic
  123. :param token_list: The token list to be added
  124. :return:
  125. """
  126. self._update_token()
  127. headers = self._create_header()
  128. url = self.hw_push_topic_sub_server.format(self.appid_push)
  129. msg_body_dict = {'topic': topic, 'tokenArray': token_list}
  130. return App._send_to_server(headers, msg_body_dict, url)
  131. def unsubscribe_topic(self, topic, token_list):
  132. """
  133. :param topic: The specific topic
  134. :param token_list: The token list to be deleted
  135. :return:
  136. """
  137. self._update_token()
  138. headers = self._create_header()
  139. url = self.hw_push_topic_unsub_server.format(self.appid_push)
  140. msg_body_dict = {'topic': topic, 'tokenArray': token_list}
  141. return App._send_to_server(headers, msg_body_dict, url)
  142. def query_subscribe_list(self, token):
  143. """
  144. :param token: The specific token
  145. :return:
  146. """
  147. self._update_token()
  148. headers = self._create_header()
  149. url = self.hw_push_topic_query_server.format(self.appid_push)
  150. msg_body_dict = {'token': token}
  151. return App._send_to_server(headers, msg_body_dict, url)
  152. class ApiCallError(Exception):
  153. """Represents an Exception encountered while invoking the HCM API.
  154. Attributes:
  155. message: A error message string.
  156. detail: Original low-level exception.
  157. """
  158. def __init__(self, message, detail=None):
  159. Exception.__init__(self, message)
  160. self.detail = detail