#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ @Copyright (C) ansjer cop Video Technology Co.,Ltd.All rights reserved. @AUTHOR: ASJRD018 @NAME: langer @software: PyCharm @DATE: 2019/6/4 11:44 @Version: python3.6 @MODIFY DECORD:ansjer dev @file: LangArea.py @Contact: chanjunkai@163.com """ import json import requests from django.db.models import Min, Q from django.http import HttpResponse from django.views.generic import TemplateView from django.views.decorators.csrf import csrf_exempt from django.utils.decorators import method_decorator from object.ResponseObject import ResponseObject from django.contrib.auth.hashers import make_password, check_password from model.models import UserModel, LangSetModel, LangAreaModel, LangValModel, LangKeyModel, LangKeyClassModel, \ SearchToolBlock, SearchToolKeyModel, LangLocationModel import time from object.TokenObject import TokenObject from django.http import StreamingHttpResponse class LangAreaView(TemplateView): def post(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation', None) request_dict = json.loads(request.body.decode('utf-8')) return self.validate(request_dict, operation) def get(self, request, *args, **kwargs): request.encoding = 'utf-8' request_dict = request.GET operation = kwargs.get('operation', None) return self.validate(request_dict, operation) def validate(self, request_dict, operation): response = ResponseObject() token = request_dict.get('token', None) tko = TokenObject(token) if tko.code == 0: userID = tko.userID user_qs = UserModel.objects.filter(id=userID, username='admin') if not user_qs.exists(): return response.json(403) if operation == 'add': return self.do_add(request_dict, response) elif operation == 'update': return self.do_update(request_dict, response) elif operation == 'delete': return self.do_delete(request_dict, response) elif operation == 'query': return self.do_query(request_dict, response, userID) elif operation == 'export': return self.do_export(request_dict, response, userID) elif operation == 'AITranslation': return self.ai_translation(request_dict, response) elif operation == 'AITranslationTest': return self.ai_translation_test(request_dict, response) else: return response.json(414) else: return response.json(tko.code) def do_add(self, request_dict, response): lang = request_dict.get('lang', None) if lang: nowTime = int(time.time()) create_dict = { 'lang': lang, 'addTime': nowTime, 'updTime': nowTime } try: LangAreaModel.objects.create(**create_dict) except Exception as e: return response.json(404, repr(e)) else: return response.json(0) else: return response.json(414) def do_update(self, request_dict, response): lang = request_dict.get('lang', None) id = request_dict.get('id', None) nowTime = int(time.time()) update_dict = { 'lang': lang, 'updTime': nowTime } try: LangAreaModel.objects.filter(id=id).update(**update_dict) except Exception as e: return response.json(404, repr(e)) else: return response.json(0) def do_delete(self, request_dict, response): id = request_dict.get('id', None) try: LangAreaModel.objects.filter(id=id).delete() except Exception as e: return response.json(404, repr(e)) else: return response.json(0) def do_query(self, request_dict, response, userID): user_qs = UserModel.objects.filter(id=userID, username='admin') if not user_qs.exists(): return response.json(403) la_qs = LangAreaModel.objects.filter().values('lang', 'id') return response.json(0, list(la_qs)) def do_export(self, request_dict, response, userID): print("进来了") id = request_dict.get('id', None) type = request_dict.get('type', None) # key_list = LangKeyModel.objects.filter().values_list('word_key', flat=True) print('id = ' + id) langType = (1, 3, 5, 7) if type == 'avss': langType = (2, 3, 6, 7) elif type == 'link_ios' or type == 'link_android': langType = (4, 5, 6, 7) en_qs = LangKeyModel.objects.filter(langvalmodel__la__id=20, type__in=langType).values('word_key', 'langvalmodel__word_val') en_kv = {} for e in en_qs: en_kv[e['word_key']] = e['langvalmodel__word_val'] content = '' if type == 'ios' or type == 'link_ios' or type == 'flutter': res_qs = LangKeyModel.objects.filter(langvalmodel__la__id=id, type__in=langType). \ values('word_key', 'langvalmodel__word_val') res = {} for r in res_qs: print(r['langvalmodel__word_val']) if r['langvalmodel__word_val']: res[r['word_key']] = r['langvalmodel__word_val'] elif en_kv[r['word_key']]: res[r['word_key']] = en_kv[r['word_key']] for l in res: content_val = res[l].replace('"', '\'') if type == 'ios' or type == 'link_ios': content = content + '"' + l + '"="' + content_val + '";\n' else: content = content + '"' + l + '":"' + content_val + '",\n' if type == 'flutter': content = '{\n' + content + '}' elif type == 'android' or type == 'link_android': res_qs = LangKeyModel.objects.filter(langvalmodel__la__id=id, type__in=langType). \ values('word_key', 'langvalmodel__word_val').order_by('addTime') res = {} for r in res_qs: if r['langvalmodel__word_val']: res[r['word_key']] = r['langvalmodel__word_val'] elif en_kv[r['word_key']]: res[r['word_key']] = en_kv[r['word_key']] for l in res: lk = l lk = lk.replace('.', '_') lk = lk.replace('\\n', '_') lk = lk.replace('“', '_') lk = lk.replace('”', '_') lk = lk.replace(' ', '_') lk = lk.replace('\\\'', '_') lk = lk.replace('\'', '_') lk = lk.replace('\\', '_') lk = lk.replace('!', '_') lk = lk.replace('’', '_') lk = lk.replace('...', '_') lk = lk.replace('…', '_') lk = lk.replace(':', '_') lk = lk.replace(':', '_') lk = lk.replace('(', '_') lk = lk.replace(')', '_') lk = lk.replace('?', '_') lk = lk.replace('?', '_') lk = lk.replace(',', '_') lk = lk.replace('-', '_') lk = lk.replace('%', '_') lk = lk.replace(';', '_') lk = lk.replace('‘', '_') lk = lk.replace('&', '&') if lk[0].isdigit(): lk = lk.replace(lk[0], '_') # 正则表达式 # re.sub("[a-z]+", "$", "123abc78ab9cdeg00t8") content_val = res[l].replace("'", "\\'") content_val = content_val.replace('"', '\\"') content_val = content_val.replace('&', '&') content = content + '' + content_val + '\n' elif type == 'avss': content = self.do_download_avss(id) elif type == 'new_searchTool': content = self.do_download_SearchTool(id, 0) elif type == 'old_searchTool': content = self.do_download_SearchTool(id, 1) response = StreamingHttpResponse(content) response['Content-Type'] = 'application/octet-stream' response['Content-Disposition'] = 'attachment;filename="lang.txt"' return response @classmethod def ai_translation(cls, request_dict, response): lang_id = request_dict.get('lang_id', None) if not lang_id: return response.json(444) lang_id = int(lang_id) # 英文不翻译 if lang_id == 20: return response.json(0) try: # 查询语种 lang_area_qs = LangAreaModel.objects.filter(id=lang_id).values('lang') if not lang_area_qs.exists(): return response.json(444) lang = lang_area_qs[0]['lang'] # 查询英文不为空,并根据lk_id去重 lang_val_qs = LangValModel.objects.filter(~Q(word_val=''), Q(la_id=20)).values('lk_id').\ annotate(word_val=Min('word_val')).order_by('lk_id') if not lang_val_qs.exists(): return response.json(444) bulk = [] now_time = int(time.time()) # 根据lk_id查询有无对应语种,没有则调用API翻译并保存数据 for lang_val in lang_val_qs: lk_id = lang_val['lk_id'] lang_qs = LangValModel.objects.filter(la_id=lang_id, lk_id=lk_id).values('word_val') if not lang_qs.exists(): # 翻译内容不存在,进行AI翻译 word_val_en = lang_val['word_val'] word_val = cls.deepseek_translation(lang, word_val_en) lang_val_model = LangValModel( lk_id=lk_id, la_id=lang_id, word_val=word_val, addTime=now_time, updTime=now_time, status=1 ) bulk.append(lang_val_model) else: # 翻译内容为空,进行AI翻译 if lang_qs[0]['word_val'] == '': word_val_en = lang_val['word_val'] word_val = cls.deepseek_translation(lang, word_val_en) lang_qs.update(word_val=word_val, addTime=now_time, updTime=now_time, status=1) LangValModel.objects.bulk_create(bulk) return response.json(0) except Exception as e: return response.json(10, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @classmethod def ai_translation_test(cls, request_dict, response): lang = request_dict.get('lang') word_val_en = request_dict.get('word_val_en') if not all([lang, word_val_en]): return response.json(444) try: word_val = cls.deepseek_translation(lang, word_val_en) return response.json(0, word_val) except Exception as e: return response.json(10, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @staticmethod def deepseek_translation(lang, word_val_en): url = "https://maas-cn-southwest-2.modelarts-maas.com/v1/infers/271c9332-4aa6-4ff5-95b3-0cf8bd94c394/v1/chat/completions" # API地址 api_key = "obPL2x_aJKJpm11JLz1GFKTfXcwF_2ra4bdogC1pbFgV7GobDRW3AD6iHPxCvDZqM87buJ3xyT6-bl1xiDV6aQ" # 把yourApiKey替换成已获取的API Key # Send request. headers = { 'Content-Type': 'application/json', 'Authorization': f'Bearer {api_key}' } content = '请将以下英文内容翻译成{},忽略转义符和占位符,直接返回翻译好的内容即可,无需写上注释:{}'.format(lang, word_val_en) data = { "model": "DeepSeek-V3", # 模型名称 "max_tokens": 1024, # 最大输出token数 "messages": [ {"role": "system", "content": "你是一名翻译家"}, {"role": "user", "content": content} ], # 是否开启流式推理, 默认为False, 表示不开启流式推理 "stream": False, # 在流式输出时是否展示使用的token数目。只有当stream为True时改参数才会生效。 # "stream_options": { "include_usage": True }, # 控制采样随机性的浮点数,值较低时模型更具确定性,值较高时模型更具创造性。"0"表示贪婪取样。默认为0.6。 "temperature": 0.6 } content = '' # 延迟请求 time.sleep(1.1) try: response = requests.post(url, headers=headers, data=json.dumps(data), verify=False, timeout=60) text = json.loads(response.text) choices = text.get('choices') if choices is not None: content = choices[0]['message']['content'] return content except Exception as e: return content def do_download_avss(self, id): content = '' en_result = self.get_area_language(20) target_result = self.get_area_language(id) if target_result: content += '\n' # keys = result.keys() keys = en_result.keys() for key in keys: content += '\n' content += key content += '\n' messages = None if target_result.__contains__(key): messages = target_result[key] else: messages = en_result[key] messageKeys = en_result[key].keys() for messageKey in messageKeys: message = None if messages.__contains__(messageKey): message = messages[messageKey] else: message = en_result[key][messageKey] # message = messages[messageKey] content += '\n' content += '' content += message['source'] content += '\n' content += '' translation = message['translation'] # if translation is None or translation == '': # content += '23333' # else: content += translation content += '\n' locations = message['locations'] print(message) for location in locations: if location['filename'] != '': content += '\n' content += '\n' content += '\n' content += '' return content else: print('none') return content def get_area_language(self, id): result = {} lkc_qs = LangKeyModel.objects.filter(langvalmodel__la__id=id, type__in=(2, 3, 6, 7)).values('word_key', 'langkeyclassmodel__clazz__name', 'langvalmodel__word_val', 'langlocationmodel__filename', 'langlocationmodel__line').order_by( 'langkeyclassmodel__clazz__name', 'word_key') for lkc in lkc_qs: # print(lkc) name = lkc['langkeyclassmodel__clazz__name'] source = lkc['word_key'] localtion = {'filename': lkc['langlocationmodel__filename'], 'line': lkc['langlocationmodel__line']} hasClass = result.__contains__(name) if not hasClass: result[name] = { source: {'source': source, 'translation': lkc['langvalmodel__word_val'], 'locations': [localtion]}} else: hasKey = result[name].__contains__(source) if not hasKey: result[name][source] = {'source': source, 'translation': lkc['langvalmodel__word_val'], 'locations': [localtion]} else: if localtion not in result[name][source]['locations']: result[name][source]['locations'].append(localtion) print(result) return result def get_area_language_search(self, id): result = {} lkc_qs = LangKeyModel.objects.filter(langvalmodel__la__id=id, project__id=4).values('word_key', 'searchtoolkeymodel__bk__name', 'langvalmodel__word_val', 'langlocationmodel__filename', 'langlocationmodel__line') for lkc in lkc_qs: # print(lkc) name = lkc['searchtoolkeymodel__bk__name'] hasClass = result.__contains__(name) if hasClass is False: result[name] = {} source = lkc['word_key'] if not result[name].__contains__(source): result[name][source] = {} message = result[name][source] message['source'] = source[(len(name) + 1):] message['translation'] = lkc['langvalmodel__word_val'] # location locations = [] locations.append({'filename': lkc['langlocationmodel__filename'], 'line': lkc['langlocationmodel__line']}) message['locations'] = locations print(result) return result def get_avss_language(self, id): if id == 18: # 简体中文 return 'zh_CN' elif id == 19: # 繁体中文 return 'zh_HK' elif id == 20: # 英文 return 'en_US' elif id == 21: # 俄语 return 'ru' elif id == 22: # 德语 return 'de' elif id == 23: # 波兰语 return 'pl' elif id == 24: # 法语 return 'fr' elif id == 25: # 西班牙语 return 'es' elif id == 26: # 日语 return 'ja' elif id == 27: # 阿拉伯语 return 'ar' elif id == 29: # 意大利语 return 'it' elif id == 30: # 葡萄牙语 return 'pt' elif id == 31: # 荷兰语 return 'nl' elif id == 32: # 越南语 return 'vi' else: return 'en' def do_download_SearchTool(self, id, searchtype): content = '' en_result = self.get_area_language_search(20) print(en_result) target_result = self.get_area_language_search(id) if target_result: content += '\n' # keys = result.keys() keys = en_result.keys() for key in keys: content += '\n' content += key content += '\n' messages = None if target_result.__contains__(key): messages = target_result[key] else: messages = en_result[key] messageKeys = en_result[key].keys() for messageKey in messageKeys: message = None if messages.__contains__(messageKey): message = messages[messageKey] else: message = en_result[key][messageKey] # message = messages[messageKey] content += '\n' content += '' content += message['source'] content += '\n' content += '' translation = message['translation'] # if translation is None or translation == '': # content += '23333' # else: content += translation content += '\n' locations = message['locations'] print(message) for location in locations: if location['filename'] != '': content += '\n' content += '\n' content += '\n' content += '' return content else: print('none') return content class exportLangView(TemplateView): def post(self, request, *args, **kwargs): request.encoding = 'utf-8' request_dict = json.loads(request.body.decode('utf-8')) return self.validate(request_dict) def get(self, request, *args, **kwargs): request.encoding = 'utf-8' request_dict = request.GET return self.validate(request_dict) def validate(self, request_dict): # 得到即将下载文件的路径和名称 response = StreamingHttpResponse('123l4kjlkfjlksadjlfksdajlf') response['Content-Type'] = 'application/octet-stream' response['Content-Disposition'] = 'attachment;filename="lang.txt"' return response