import json import logging import subprocess import time from datetime import datetime import requests from django.http import JsonResponse from django.shortcuts import render from django.views.generic import TemplateView from azoauth.config import * from model.models import UserModel, UidRtspModel, AlexaAuthModel, UserCountModel, SwitchModel from object.RedisObject import RedisObject from object.ResObject import ResObject from object.ResponseObject import ResponseObject from object.tkObject import tkObject from service.CommonService import CommonService class authView(TemplateView): def post(self, request, *args, **kwargs): request.encoding = 'utf-8' request_dict = json.loads(request.body.decode('utf-8')) return self.validate(request_dict) def get(self, request, *args, **kwargs): request.encoding = 'utf-8' request_dict = request.GET return self.validate(request_dict) def validate(self, request_dict): logger = logging.getLogger('django') logger.info('------oa2/auth请求参数: {}------'.format(request_dict)) state = request_dict.get("state", '') client_id = request_dict.get("client_id", '') response_type = request_dict.get("response_type", '') scope = request_dict.get("scope", '') redirect_uri = request_dict.get("redirect_uri", '') context = { 'state': state, 'client_id': client_id, 'response_type': response_type, 'scope': scope, 'redirect_uri': redirect_uri, 'skill_name': 'zosi smart' } return render(request=None, template_name="login.html", context=context) # Anlapus登录 class authAnlapusView(TemplateView): def post(self, request, *args, **kwargs): request.encoding = 'utf-8' request_dict = json.loads(request.body.decode('utf-8')) return self.validate(request_dict) def get(self, request, *args, **kwargs): request.encoding = 'utf-8' request_dict = request.GET return self.validate(request_dict) def validate(self, request_dict): state = request_dict.get("state", '') client_id = request_dict.get("client_id", '') response_type = request_dict.get("response_type", '') scope = request_dict.get("scope", '') redirect_uri = request_dict.get("redirect_uri", '') context = { 'state': state, 'client_id': client_id, 'response_type': response_type, 'scope': scope, 'redirect_uri': redirect_uri, 'skill_name': 'Anlapus' } return render(request=None, template_name="login_anlapus.html", context=context) # loocam登录 class authLoocamView(TemplateView): def post(self, request, *args, **kwargs): request.encoding = 'utf-8' request_dict = json.loads(request.body.decode('utf-8')) return self.validate(request_dict) def get(self, request, *args, **kwargs): request.encoding = 'utf-8' request_dict = request.GET return self.validate(request_dict) def validate(self, request_dict): state = request_dict.get("state", '') client_id = request_dict.get("client_id", '') response_type = request_dict.get("response_type", '') scope = request_dict.get("scope", '') redirect_uri = request_dict.get("redirect_uri", '') context = { 'state': state, 'client_id': client_id, 'response_type': response_type, 'scope': scope, 'redirect_uri': redirect_uri, 'skill_name': 'loocam' } logger = logging.getLogger('django') logger.info('loocam请求登录网页,参数为{}'.format(context)) return render(request=None, template_name="login_loocam.html", context=context) class loginHandleView(TemplateView): def post(self, request, *args, **kwargs): request.encoding = 'utf-8' request_dict = json.loads(request.body.decode('utf-8')) return self.validate(request_dict) def get(self, request, *args, **kwargs): request.encoding = 'utf-8' request_dict = request.GET return self.validate(request_dict) def validate(self, request_dict): response = ResObject() user = request_dict.get("user", '') pwd = request_dict.get("pwd", '') state = request_dict.get("state", '') client_id = request_dict.get("client_id", '') response_type = request_dict.get("response_type", '') scope = request_dict.get("scope", '') redirect_uri = request_dict.get("redirect_uri", '') skill_name = request_dict.get("skill_name", '') # 返回code logger = logging.getLogger('django') logger.info('------开始认证登录------参数:{}'.format(request_dict)) # 请求美洲服登录接口 region_code = 'US' auth_request_url = '{SERVER_PREFIX}/oalexa/auth'.format(SERVER_PREFIX=SERVER_PREFIX) requests_data = {'userName': user, 'userPwd': pwd} res = requests.post(url=auth_request_url, data=requests_data) if res.status_code != 200: return response.json(10, res={'错误': '请求响应异常'}) res_json = res.json() logger.info('美洲服务器响应: {}'.format(res_json)) # 账号不存在/密码错误/账号存在但用户地区为欧洲时请求欧洲服 result_code = res_json['result_code'] if result_code == 104 or result_code == 111 or (result_code == 0 and res_json['result']['region_code'] == 'EU'): auth_request_url = '{SERVER_PREFIX}/oalexa/auth'.format(SERVER_PREFIX=SERVER_PREFIX_EU) res = requests.post(url=auth_request_url, data=requests_data) if res.status_code != 200: return response.json(10, res={'错误': '请求响应异常'}) res_json = res.json() logger.info('欧洲服务器响应: {}'.format(res_json)) if res_json['result_code'] == 0: region_code = 'EU' if res_json['result_code'] != 0: return response.json(10, res={'msg': 'error'}, extra={'msg': res_json['reason']}) nowTime = int(time.time()) code = CommonService.encrypt_data(32) userID = res_json['result']['userID'] user_qs = UserModel.objects.filter(userID=userID) if user_qs.exists(): user_qs.update(region_code=region_code, code=code, updTime=nowTime) else: UserModel.objects.create(userID=userID, region_code=region_code, code=code, addTime=nowTime, updTime=nowTime) year_month = str(time.strftime('%Y%m', time.localtime(nowTime))) # 获取当前年月 user_count_qs = UserCountModel.objects.filter(skill_name=skill_name, year_month=year_month).values('amount') if not user_count_qs.exists(): UserCountModel.objects.create(skill_name=skill_name, year_month=year_month, amount=1) else: # 用户数量+1 amount = user_count_qs[0]['amount'] + 1 user_count_qs.update(amount=amount) redirect_uri += '?code=' + code + '&state=' + state return response.json(0, res=redirect_uri) class oa2TokenView(TemplateView): def post(self, request, *args, **kwargs): request.encoding = 'utf-8' # request_dict = json.loads(request.body.decode('utf-8')) request_dict = request.POST return self.validate(request_dict) def get(self, request, *args, **kwargs): request.encoding = 'utf-8' request_dict = request.GET return self.validate(request_dict) def validate(self, request_dict): # 增加对code和client_id的校验代码,返回access_token和refresh_token code = request_dict.get("code", None) grant_type = request_dict.get("grant_type", None) # refresh_token, authorization_code client_id = request_dict.get("client_id", None) refresh_token = request_dict.get("refresh_token", None) logger = logging.getLogger('django') logger.info('请求获取令牌接口参数:{}'.format(request_dict)) # 根据用户授权码获取令牌 user_qs = UserModel.objects.filter(code=code) if not user_qs.exists(): if grant_type == 'authorization_code': user_qs = UserModel.objects.filter(user_authorization_code=code) else: user_qs = UserModel.objects.filter(refresh_token=refresh_token) if not user_qs.exists(): res_json = {'msg': 'code not exists'} logger.info('请求获取令牌接口响应:{}'.format(res_json)) return JsonResponse(res_json) access_token = CommonService.encrypt_data(randomlength=32) refresh_token = CommonService.encrypt_data(randomlength=32) is_update = user_qs.update(access_token=access_token, refresh_token=refresh_token) if is_update: res_json = { "access_token": access_token, "token_type": "bearer", "expires_in": 3600, "refresh_token": refresh_token, } logger.info('请求获取令牌接口响应:{}'.format(res_json)) return JsonResponse(res_json) else: logger.info({'msg': 'error'}) return JsonResponse({'msg': 'error'}) def runSendRtspMsg_thread(UID, PWD, MSG): command = "./pushtool {UID} {PWD} {MSG} 1".format(UID=UID, PWD=PWD, MSG=MSG) print('command=>{command}'.format(command=command)) try: back = subprocess. \ Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE). \ communicate(timeout=10) except Exception as e: return False else: print("back0----", back[0].decode()) # 注意需要进行解码操作,默认输出的是字节 print("back1----", back[1].decode()) # back是一个元祖,可以通过元祖取值的方式获取结果 return True class oa2RtspStartView(TemplateView): def post(self, request, *args, **kwargs): request.encoding = 'utf-8' request_dict = request.POST return self.validate(request_dict) def get(self, request, *args, **kwargs): request.encoding = 'utf-8' request_dict = request.GET return self.validate(request_dict) def validate(self, request_dict): st = request_dict.get("st", 0) uid = request_dict.get("id", '') access_token = request_dict.get("access_token", '') skill_name = request_dict.get("skill_name", 'zosi smart') user_qs = UserModel.objects.filter(access_token=access_token) if not user_qs.exists(): return JsonResponse({'错误': '用户数据不存在'}) ur_qs = UidRtspModel.objects.filter(uid=uid).values('user_id', 'uid', 'nick', 'rtsp_url', 'password') if not ur_qs.exists(): return JsonResponse({'错误': 'uid数据不存在'}) # 确认设备的用户地区 region = 'US' user_id = ur_qs[0]['user_id'] user_qs = UserModel.objects.filter(userID=user_id).values('region_code') if user_qs.exists(): region = user_qs[0]['region_code'] UID = ur_qs[0]['uid'] nick = ur_qs[0]['nick'] PWD = ur_qs[0]['password'] stream_name = ur_qs[0]['rtsp_url'] channel = '0' if '_' in UID: # 多通道设备 channel = UID[-1:] UID = UID[:-2] # 根据用户地区确认域名 domain_name = SERVER_PREFIX_EU if region == 'EU' else SERVER_PREFIX RESP_SERVER_DOMAIN = RESP_SERVER_DOMAIN_DATA[region] MSG = '{}://{}:8554/{}'.format(RTSP_PREFIX, RESP_SERVER_DOMAIN, stream_name) logger = logging.getLogger('django') logger.info('------{} 开始向设备下发推流指令------'.format(skill_name)) # 此处后续应该用异步去发送指令 if int(st) == 1: send_flag = self.runSendStop(UID, PWD, MSG) logger.info('----------send_flag---st=1-----------------') if send_flag: return JsonResponse({'msg': 'stop yes', 'code': 0}) else: return JsonResponse({'msg': 'stop no', 'code': 0}) # pushtool指令 command = "./pushtool {UID} {PWD} {MSG} 1 {channel}".format(UID=UID, PWD=PWD, MSG=MSG, channel=channel) # 请求MQTT发布消息 url = '{}/iot/requestPublishMessage'.format(domain_name) requests_data = {'UID': UID, 'rtsp': MSG, 'enable': '1'} # 1: 开始推流,0: 停止推流; channel: 推流通道 r = requests.post(url, requests_data) if r.status_code == 200: res = r.json() logger.info('请求MQTT发布消息参数:{},result_code: {}'.format(requests_data, res['result_code'])) if res['result_code'] == 0: logger.info('请求MQTT下发指令成功---正式服') elif res['result_code'] == 10044: url = '{}/iot/requestPublishMessage'.format(SERVER_PREFIX_TEST) # 测试服务器 r = requests.post(url, requests_data) if r.status_code == 200: res = r.json() if res['result_code'] == 0: logger.info('请求MQTT下发指令成功---测试服') else: self.runSendRtspMsg(logger, region, command) else: self.runSendRtspMsg(logger, region, command) else: self.runSendRtspMsg(logger, region, command) else: # 使用pushtool通知设备推流 self.runSendRtspMsg(logger, region, command) # 拉流地址 rtsp_uri = '{}://{}:443/{}'.format(RTSP_PREFIX, RESP_SERVER_DOMAIN, stream_name) stop_time = int(time.time()) + 2 * 60 expirationTime = time.strftime('%Y-%m-%dT%H:%MZ', time.localtime(stop_time)) res_json = { 'uid': UID, 'pwd': PWD, 'msg': MSG, 'uri': rtsp_uri, 'endpointId': uid, 'friendlyName': nick, 'manufacturerName': skill_name, 'expirationTime': expirationTime, 'description': 'Camera connected via {}'.format(skill_name), 'audioCodecs': 'ACC', 'videoCodecs': 'H264', 'protocols': ['RTSP'], 'idleTimeoutSeconds': 5, 'modelName': 'P1425-LE', 'authorizationTypes': ['NONE'], 'manufacturerId': 'zosi-ACCC8E5E7513', 'resolutions': {'width': 640, 'height': 360}, } logger.info('------------返回控制摄像头的信息---------------: {}'.format(res_json)) return JsonResponse(res_json, safe=False) def runReqRtspMsg(self, UID, PWD, MSG): request_url = 'http://localhost:5000/?UID={UID}&MSG={MSG}&CMD=1&PWD={PWD}'. \ format(UID=UID, PWD=PWD, MSG=MSG) res = requests.get(url=request_url) print(res) return True # 触发此方法,让摄像头推流到MSG流地址 def runSendRtspMsg(self, logger, region, command): logger.info('------------推流指令: {}---------------'.format(command)) if region == 'CN': logger.info('------------国内发送推流指令---------------') url = "http://52.83.252.41:7880/alexa/command?command={command}".format(command=command) # 请求国内服务器调用pushtool try: requests.get(url=url, timeout=10) except Exception as e: logger.info('请求国内服务器调用pushtool异常: {}'.format(repr(e))) else: logger.info('------------国外发送推流指令---------------') try: back = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE). \ communicate(timeout=10) logger.info('back: {}'.format(str(back[0].decode()) + str(back[1].decode()))) except Exception as e: self.runSendRtspMsg(logger, 'CN', command) # 调用失败时尝试用国内的发送 logger.info('调用pushtool异常: {}'.format(repr(e))) def runSendStop(self, UID, PWD, MSG): command = "./pushtool {UID} {PWD} {MSG} 0".format(UID=UID, PWD=PWD, MSG=MSG) print('command=>{command}'.format(command=command)) try: back = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE). \ communicate(timeout=10) except Exception as e: return False else: print("back0----", back[0].decode()) # 注意需要进行解码操作,默认输出的是字节 print("back1----", back[1].decode()) # back是一个元祖,可以通过元祖取值的方式获取结果 return True class oa2DiscoveryDevice(TemplateView): def post(self, request, *args, **kwargs): request.encoding = 'utf-8' request_dict = request.POST return self.validate(request_dict) def get(self, request, *args, **kwargs): request.encoding = 'utf-8' request_dict = request.GET return self.validate(request_dict) def validate(self, request_dict): # 增加对code和client_id的校验代码,返回access_token和refresh_token skill_name = request_dict.get("skill_name", 'zosi smart') access_token = request_dict.get("access_token", None) logger = logging.getLogger('django') logger.info('--------{} 开始搜索设备--------'.format(skill_name)) response = ResObject() user_qs = UserModel.objects.filter(access_token=access_token) if not user_qs.exists(): return response.json(500, res={'msg': '用户数据不存在'}) user = user_qs[0] userID = user.userID region_code = user.region_code logger.info('搜索设备的用户id:{},地区:{}'.format(userID, region_code)) # AlexaAuthModel表数据 alexAuth = AlexaAuthModel.objects.filter(token=access_token, skill_name=skill_name).order_by('-addTime') if alexAuth.exists(): auth_res = alexAuth.values() event_access_token = auth_res[0]['access_token'] event_refresh_token = auth_res[0]['refresh_token'] event_token = auth_res[0]['token'] event_expiresTime = auth_res[0]['expiresTime'] event_addTime = auth_res[0]['addTime'] event_updTime = auth_res[0]['updTime'] event_alexa_region = auth_res[0]['alexa_region'] AlexaAuthModel.objects.filter(userID=userID).delete() alexAuth.delete() AlexaAuthModel.objects.create( userID=userID, access_token=event_access_token, refresh_token=event_refresh_token, token=event_token, expiresTime=event_expiresTime, addTime=event_addTime, updTime=event_updTime, alexa_region=event_alexa_region, skill_name=skill_name, ) # 请求搜索设备 domain_name = SERVER_PREFIX_EU if region_code == 'EU' else SERVER_PREFIX auth_request_url = '{}/oalexa/discoveryuid'.format(domain_name) requests_data = {'sid': 'admin', 'sst': 'admin', 'alexa_user_id': userID} res = requests.post(url=auth_request_url, data=requests_data) if res.status_code != 200: return response.json(500, res={'msg': 'discover device error'}) res_json = res.json() logger.info('{}服务器搜索设备响应:{}'.format(region_code, res_json)) if res_json['result_code'] != 0: return response.json(500, res={'msg': 'discover device error'}) uid_arr = res_json['result']['uid_arr'] rtko = tkObject(rank=1) now_time = int(time.time()) user.uid_rtsp.clear() res_json = [] uid_rtsp_id_list = [] for uid_a in uid_arr: uid = uid_a['uid'] nick = uid_a['nick'] rtsp_url = rtko.encrypt(data=uid) try: uid_rtsp_qs = UidRtspModel.objects.get(uid=uid) except UidRtspModel.DoesNotExist: uid_rtsp_qs = UidRtspModel.objects.create(user_id=userID, uid=uid, nick=nick, region=region_code, password=uid_a['password'], rtsp_url=rtsp_url, addTime=now_time, updTime=now_time) else: uid_rtsp_qs.nick = nick uid_rtsp_qs.region = region_code uid_rtsp_qs.password = uid_a['password'] uid_rtsp_qs.user_id = userID uid_rtsp_qs.save() uid_rtsp_id_list.append(uid_rtsp_qs.id) rtsp_domain = RESP_SERVER_DOMAIN_DATA[region_code] rtsp_uri = '{}://{}:443/{}'.format(RTSP_PREFIX, rtsp_domain, rtsp_url) ur_data = { 'uri': rtsp_uri, 'endpointId': uid, 'friendlyName': nick, 'manufacturerName': skill_name, 'description': 'Camera connected via {}'.format(skill_name), 'protocols': ['RTSP'], 'audioCodecs': ['ACC'], 'videoCodecs': ['H264'], 'modelName': 'P1425-LE', 'authorizationTypes': ['NONE'], 'manufacturerId': 'zosi-ACCC8E5E7513', 'resolutions': [{'width': 640, 'height': 360}], } res_json.append(ur_data) user.uid_rtsp.add(*uid_rtsp_id_list) logger.info('搜索设备返回值: {}'.format(res_json)) return JsonResponse(res_json, safe=False) class oa2DiscoverySwitch(TemplateView): def post(self, request, *args, **kwargs): request.encoding = 'utf-8' request_dict = request.POST return self.validate(request_dict) def get(self, request, *args, **kwargs): request.encoding = 'utf-8' request_dict = request.GET return self.validate(request_dict) def validate(self, request_dict): # 增加对code和client_id的校验代码,返回access_token和refresh_token skill_name = request_dict.get("skill_name", 'loocam') access_token = request_dict.get("access_token", None) logger = logging.getLogger('django') logger.info('--------{} 开始搜索设备--------'.format(skill_name)) user_qs = UserModel.objects.filter(access_token=access_token) if not user_qs.exists(): return JsonResponse({'错误': '用户数据不存在'}) response = ResObject() user = user_qs.first() userID = user.userID region_code = user.region_code logger.info('userID: {},region_code:{}'.format(userID, region_code)) # 更新事件网关接口 alexAuth = AlexaAuthModel.objects.filter(token=access_token, skill_name=skill_name).order_by('-addTime') if alexAuth.exists(): auth_res = alexAuth.values() event_access_token = auth_res[0]['access_token'] event_refresh_token = auth_res[0]['refresh_token'] event_token = auth_res[0]['token'] event_expiresTime = auth_res[0]['expiresTime'] event_addTime = auth_res[0]['addTime'] event_updTime = auth_res[0]['updTime'] event_alexa_region = auth_res[0]['alexa_region'] AlexaAuthModel.objects.filter(userID=userID).delete() alexAuth.delete() logger.info('update_event_access_token') logger.info(event_token) AlexaAuthModel.objects.create( userID=userID, access_token=event_access_token, refresh_token=event_refresh_token, token=event_token, expiresTime=event_expiresTime, addTime=event_addTime, updTime=event_updTime, alexa_region=event_alexa_region, skill_name=skill_name, ) domain_name = SERVER_PREFIX_EU if region_code == 'EU' else SERVER_PREFIX auth_request_url = '{}/oalexa/discoveryswitch'.format(domain_name) requests_data = {'sid': 'admin', 'sst': 'admin', 'alexa_user_id': userID} res = requests.post(url=auth_request_url, data=requests_data) res_json = res.json() logger.info('{}正式服务器响应: {}'.format(region_code, res_json)) # 添加测试服务器测试 if res_json['result_code'] != 0: auth_request_url = '{}/oalexa/discoveryswitch'.format(SERVER_PREFIX_TEST) res = requests.post(url=auth_request_url, data=requests_data) res_json = res.json() logger.info('请求服务器url: {}'.format(auth_request_url)) logger.info('服务器响应: {}'.format(res_json)) if res_json['result_code'] != 0: return response.json(0, res={'msg': 'error'}) switch_list = res_json['result']['switch_list'] now_time = int(time.time()) res_json = [] for switch in switch_list: serial_number = switch['uid'] nick = switch['nick'] switch_info_qs = SwitchModel.objects.filter(serial_number=serial_number, userID=userID) if switch_info_qs.exists(): switch_info_qs.update(nick=nick, region=switch['region'], updTime=now_time) else: SwitchModel.objects.create(serial_number=serial_number, nick=nick, region=switch['region'], addTime=now_time, updTime=now_time, userID=userID) ur_data = { 'endpointId': serial_number, 'friendlyName': nick, 'manufacturerName': skill_name, 'description': 'Plug connected via {}'.format(skill_name), 'modelName': 'P1425-LE', 'manufacturerId': 'zosi-ACCC8E5E7513', } res_json.append(ur_data) logger.info('搜索设备返回值: {}'.format(res_json)) return JsonResponse(res_json, safe=False) class powerController(TemplateView): def post(self, request, *args, **kwargs): request.encoding = 'utf-8' request_dict = request.POST return self.power_controller(request_dict) def get(self, request, *args, **kwargs): request.encoding = 'utf-8' request_dict = request.GET return self.power_controller(request_dict) def power_controller(self, request_dict): serial_number = request_dict.get('serial_number', '') access_token = request_dict.get('access_token', '') skill_name = request_dict.get('skill_name', 'loocam') power_controller = request_dict.get('power_controller', '') if not all([serial_number, access_token, skill_name]): return JsonResponse({'result_code': '500', '错误': '缺少参数'}) user_qs = UserModel.objects.filter(access_token=access_token) if not user_qs.exists(): return JsonResponse({'result_code': '500', '错误': '用户数据不存在'}) userID = user_qs.first().userID switch_qs = SwitchModel.objects.filter(serial_number=serial_number, userID=userID).values('nick', 'region') if not switch_qs.exists(): return JsonResponse({'result_code': '500', '错误': 'serial_number数据不存在'}) logger = logging.getLogger('django') logger.info('{} 控制插座 {}'.format(skill_name, serial_number)) # 请求MQTT发布消息 region = switch_qs[0]['region'] if region == 'EU': domain_name = SERVER_PREFIX_EU elif region == 'CN': domain_name = SERVER_PREFIX_TEST else: domain_name = SERVER_PREFIX url = '{}/api/loocam/open/socket/alexa-socket-switch'.format(domain_name) requests_data = {'serial_number': serial_number, 'power_controller': power_controller} # TurnOn, TurnOff r = requests.post(url, requests_data) if r.status_code != 200: return JsonResponse({'result_code': '500', '错误': '请求服务器响应异常'}) res = r.json() logger.info('{}服务器返回状态: {}'.format(region, res)) if res['result_code'] != 0: return JsonResponse({'result_code': '500', '错误': '请求MQTT下发指令失败'}) else: logger.info('请求MQTT下发指令成功') return JsonResponse({'result_code': '0'}) class VesseTest(TemplateView): def get(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.GET, operation, request) def post(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.POST, operation, request) def validation(self, request_dict, operation, request): response = ResponseObject() if operation == 'get-token': # 获取token return self.get_token(response) elif operation == 'get-user-id': # 获取userid return self.get_user_id(request_dict, response, request) elif operation == 'get-user-info': # 获取用户信息 return self.get_user_info(request_dict, response, request) def get_token(self, response): data = { 'corpid': 'ww467ec1685e8262e6', 'corpsecret': 'IeUoaQ-0hEhEduCQq1zyfVXjfeZpMsThK1nklszRzUY' } try: token_url = 'https://qyapi.weixin.qq.com/cgi-bin/gettoken' token_response = requests.get(token_url, params=data) if token_response.status_code == 200: data = token_response.json() access_token = data.get("access_token") if access_token: return response.json(0, {'access_token': access_token}) else: return response.json(0, "Failed to get AccessToken.") else: code = token_response.status_code content = token_response.content return response.json(code, content) except requests.exceptions.RequestException as e: print(e) return response.json(500, repr(e)) def get_user_id(self, request_dict, response, request): access_token = request_dict.get('access_token', None) code = request_dict.get('code', None) if not all([access_token, code]): return response.json(444, 'error: access_token, code') data = { 'access_token': access_token, 'code': code } try: token_url = 'https://qyapi.weixin.qq.com/cgi-bin/auth/getuserinfo' token_response = requests.get(token_url, params=data) if token_response.status_code == 200: data = token_response.json() userid = data.get("userid") if userid: return response.json(0, {'userid': userid}) else: code = token_response.status_code content = token_response.content return response.json(code, content) except Exception as e: print(e) return response.json(500, repr(e)) def get_user_info(self, request_dict, response, request): access_token = request_dict.get('access_token', None) userid = request_dict.get('userid', None) if not all([access_token, userid]): return response.json(444, 'error: access_token, userid') data = { 'access_token': access_token, 'userid': userid } try: token_url = 'https://qyapi.weixin.qq.com/cgi-bin/user/get' token_response = requests.get(token_url, params=data) if token_response.status_code == 200: data = token_response.json() userid = data.get("userid") name = data.get('name') position = data.get('position') status = data.get('status') if userid: url_data = { 'userid': userid, 'name': name, 'position': position, 'status': status } return response.json(0, {'data': url_data}) else: code = token_response.status_code content = token_response.content return response.json(code, content) except Exception as e: print(e) return response.json(500, repr(e))