Преглед на файлове

检查苹果证书接口

locky преди 2 месеца
родител
ревизия
24f7eca42f
променени са 2 файла, в които са добавени 111 реда и са изтрити 27 реда
  1. 45 0
      Controller/InitController.py
  2. 66 27
      Object/S3Email.py

+ 45 - 0
Controller/InitController.py

@@ -3,14 +3,21 @@
 # @Time      : 2023/4/11 17:26
 import json
 import logging
+import os
 import time
 
+import apns2
+from _ssl import SSLError
 from django.http import HttpResponse
 from django.views import View
+from AnsjerPush.config import CONFIG_INFO
 
 from Model.models import Device_Info, SceneLog, EquipmentInfo1
 from Object.RedisObject import RedisObject
 from Object.ResponseObject import ResponseObject
+from AnsjerPush.config import APP_BUNDLE_DICT, APNS_MODE, BASE_DIR, APNS_CONFIG, DATA_PUSH_EVENT_TYPE_LIST
+from Object.S3Email import S3Email
+from Service.CommonService import CommonService
 
 ERROR_INFO_LOGGER = logging.getLogger('error_info')
 
@@ -30,6 +37,8 @@ class InitView(View):
     def validation(self, request_dict, operation):
         if operation == 'health-check':  # 负载均衡器健康检测接口
             return self.health_check(request_dict)
+        if operation == 'checkApnsPem':  # 检查🍎推送证书
+            return self.check_apns_pem()
         elif operation == 'oci_redis_test':
             return self.oci_redis_test(request_dict)
         elif operation == 'oci_redis_test_2':
@@ -49,6 +58,42 @@ class InitView(View):
                 '健康检查接口异常,error_line:{},error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
             return HttpResponse(repr(e), status=500)
 
+    @staticmethod
+    def check_apns_pem():
+        response = ResponseObject()
+        try:
+            server_id = os.environ.get('ServerID', 0)
+            n_time = int(time.time())
+            uid, msg_title, msg_text, launch_image, event_type, channel, nickname, token_val = \
+                '', '', '', '', 1, '', '', ''
+            app_bundle_id_list = [
+                'com.ansjer.zccloud', 'com.ansjer.loocamccloud', 'com.cloudlife.commissionf', 'com.ansjer.customizeda',
+                'com.ansjer.customizedb', 'com.ansjer.customizedc', 'com.ansjer.customizedd', 'com.ansjer.customizede'
+            ]
+            for app_bundle_id in app_bundle_id_list:
+                pem_path = os.path.join(BASE_DIR, APNS_CONFIG[app_bundle_id]['pem_path'])
+                cli = apns2.APNSClient(mode=APNS_MODE, client_cert=pem_path)
+                alert = apns2.PayloadAlert(title=msg_title, body=msg_text, launch_image=launch_image)
+                # 跳转类型,1:推送消息,2:系统消息,3:音视频通话消息
+                jump_type = CommonService.get_jump_type(event_type)
+                push_data = {
+                    'alert': msg_text, 'msg': '', 'sound': '', 'zpush': '1', 'uid': uid, 'channel': channel,
+                    'received_at': n_time, 'event_time': n_time, 'event_type': event_type, 'nickname': nickname,
+                    'image_url': launch_image, 'jump_type': jump_type
+                }
+                sound = 'call_phone.mp3' if event_type in DATA_PUSH_EVENT_TYPE_LIST else 'default'
+                payload = apns2.Payload(alert=alert, custom=push_data, sound=sound, category='myCategory',
+                                        mutable_content=True)
+                n = apns2.Notification(payload=payload, priority=apns2.PRIORITY_LOW)
+                try:
+                    res = cli.push(n=n, device_token=token_val, topic=app_bundle_id)
+                except SSLError as e:
+                    email_content = '{}服第{}台服务器苹果推送证书过期: {}'.format(CONFIG_INFO, server_id, pem_path)
+                    S3Email().faEmail(email_content, 'servers@ansjer.com')
+            return response.json(0)
+        except Exception as e:
+            return response.json(0, 'error_line:{},error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
+
     @staticmethod
     def oci_redis_test(request_dict):
         try:

+ 66 - 27
Object/S3Email.py

@@ -1,45 +1,84 @@
-# @Author    : Rocky
-# @File      : S3Email.py
-# @Time      : 2023/8/11 10:01
-import smtplib
 import email.utils
+import smtplib
 from email.mime.multipart import MIMEMultipart
 from email.mime.text import MIMEText
 
-SENDER = 'rdpublic@ansjer.com'
-SENDER_NAME = 'rdpublic@ansjer.com'
-USERNAME_SMTP = 'AKIA2E67UIMD6MOSFKXW'  # 带有邮件权限的 IAM 帐号
-PASSWORD_SMTP = 'BHuQ6EQTtFK4qh46o9omO9ZzO3NXzjk/JCWLXnVFmqzM'  # 带有邮件权限的 IAM 密码
-HOST = 'email-smtp.us-east-1.amazonaws.com'
-PORT = 587
+import boto3
+
+AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY = 'AKIA2E67UIMD45Y3HL53', 'ckYLg4Lo9ZXJIcJEAKkzf2rWvs8Xth1FCjqiAqUw'
 
 
 class S3Email:
 
-    @staticmethod
-    def send_email(email_content, username):
-        body_text = (email_content)
-        body_html = """<html>
+    def faEmail(self, sys_msg_text, username):
+        SENDER = 'rdpublic@ansjer.com'  # 邮箱名
+        SENDERNAME = 'rdpublic@ansjer.com'
+        USERNAME_SMTP = 'AKIA2E67UIMD6MOSFKXW'  # 带有邮件权限的 IAM 帐号
+        PASSWORD_SMTP = 'BHuQ6EQTtFK4qh46o9omO9ZzO3NXzjk/JCWLXnVFmqzM'  # 带有邮件权限的 IAM 密码
+        PORT = '587'
+        HOST = 'email-smtp.us-east-1.amazonaws.com'
+        SUBJECT = sys_msg_text
+        BODY_TEXT = (sys_msg_text
+                     )
+        BODY_HTML = """<html>
         <head></head>
         <body>
-            <h1>{}<h1>
+          <h1>{}<h1>
         </body>
         </html>
-        """.format(email_content)
+                    """.format(sys_msg_text)
 
         msg = MIMEMultipart('alternative')
-        msg['Subject'] = email_content
-        msg['From'] = email.utils.formataddr((SENDER_NAME, SENDER))
+        msg['Subject'] = SUBJECT
+        msg['From'] = email.utils.formataddr((SENDERNAME, SENDER))
         msg['To'] = username
-        part1 = MIMEText(body_text, 'plain')
-        part2 = MIMEText(body_html, 'html')
+        part1 = MIMEText(BODY_TEXT, 'plain')
+        part2 = MIMEText(BODY_HTML, 'html')
         msg.attach(part1)
         msg.attach(part2)
 
-        server = smtplib.SMTP(HOST, PORT)
-        server.ehlo()
-        server.starttls()
-        server.ehlo()
-        server.login(USERNAME_SMTP, PASSWORD_SMTP)
-        server.sendmail(SENDER, username, msg.as_string())
-        server.close()
+        try:
+            server = smtplib.SMTP(HOST, PORT)
+            server.ehlo()
+            server.starttls()
+            server.ehlo()
+            server.login(USERNAME_SMTP, PASSWORD_SMTP)
+            server.sendmail(SENDER, username, msg.as_string())
+            server.close()
+        except Exception as e:
+            print("Error: ", e)
+        else:
+            print("Email sent!")
+
+    @staticmethod
+    def send_email(subject, data, username):
+        """
+        发送亚马逊SES电子邮件
+        @param subject: 标题
+        @param data: 内容
+        @param username: 发送到邮箱
+        @return: 成功 | 失败
+        """
+        client = boto3.client('ses', region_name='us-east-1',
+                              aws_access_key_id=AWS_ACCESS_KEY_ID[1],
+                              aws_secret_access_key=AWS_SECRET_ACCESS_KEY[1])
+        # 构建邮件请求
+        response = client.send_email(
+            Source='rdpublic@ansjer.com',  # 发件人地址
+            Destination={
+                'ToAddresses': [
+                    username  # 收件人地址
+                ]
+            },
+            Message={
+                'Subject': {
+                    'Data': subject  # 邮件主题
+                },
+                'Body': {
+                    'Html': {
+                        'Data': data  # 邮件正文的HTML内容
+                    }
+                }
+            }
+        )
+        return response['ResponseMetadata']['HTTPStatusCode'] == 200