|
- import json
- import logging
- import time
- import csv
- import requests
- from bulk_update.helper import bulk_update
- from django.db import transaction
- from django.db.models import Count
- from django.views import View
- from Model.models import SerialNumberModel, UserModel, UserSerialNumberModel, CompanySerialModel, \
- MacModel, LogModel, CompanyModel, SerialMac
- from Object.RedisObject import RedisObject
- from Object.ResponseObject import ResponseObject
- from Object.S3Email import S3Email
- from Object.TokenObject import TokenObject
- from Service.AlgorithmService import AlgorithmBaseOn35
- from Service.CommonService import CommonService
- class SerialNumberView(View):
- def get(self, request, *args, **kwargs):
- request.encoding = 'utf-8'
- request_dict = request.GET
- operation = kwargs.get('operation')
- return self.validate(request_dict, operation, request)
- def post(self, request, *args, **kwargs):
- request.encoding = 'utf-8'
- request_dict = request.POST
- operation = kwargs.get('operation')
- return self.validate(request_dict, operation, request)
- def validate(self, request_dict, operation, request):
- response = ResponseObject()
- if operation == 'getSerial': # 获取序列号
- return self.get_serial(request_dict, response)
- elif operation == 'create': # 生成序列号
- return self.do_create(request_dict, response)
- elif operation == 'mac': # 生成mac
- return self.generate_mac(request_dict, response)
- elif operation == 'checkSerial': # 序列号库存数量少于5000,发送邮件通知
- return self.check_serial_number(response)
- elif operation == 'macAndSerial':
- return self.generate_mac_and_serial_numbers(request_dict, response)
- elif operation == 'checkSerialLog':
- return self.check_serial_log(request_dict, response)
- token = request_dict.get('token', None)
- token = TokenObject(token)
- if token.code != 0:
- return response.json(token.code)
- if operation == 'quantity': # 查询当前可用的UID的数量
- return self.do_quantity(token.userID, response)
- elif operation == 'allot': # 分配序列号
- return self.do_allot(request_dict, response)
- elif operation == 'createSerial': # 生成序列号
- return self.create_serial(request_dict, response, request, token.userID)
- elif operation == 'serialCompany': # 将序列号分匹配到指定企业
- return self.do_serial_company(request_dict, response, request, token.userID)
- elif operation == 'revise/state': # 修改序列号状态
- return self.revise_state(request_dict, response, request)
- else:
- return response.json(309)
- def revise_state(self, request_dict, response, request):
- """
- 修改序列号状态
- @param request_dict:请求参数
- @param response: 响应对象
- """
- use_status = request_dict.get('useStatus', None) # 序列号表的状态
- status = request_dict.get('status', None) # 关联企业序列号表的状态
- if not all([use_status, status]):
- return response.json(444)
- serial_number_qs = SerialNumberModel.objects.filter(use_status=use_status).values('serial_number')
- if not serial_number_qs.exists():
- return response.json(173)
- serial_list = [item[key] for item in serial_number_qs for key in item]
- try:
- country_serial_qs = CompanySerialModel.objects.filter(serial_number__in=serial_list).values('status')
- country_serial_qs.filter(status=status).update(status=2)
- return response.json(0)
- except Exception as e:
- print(e)
- return response.json(500)
- def do_serial_company(self, request_dict, response, request, user_id):
- """
- 将序列号分匹配到指定企业
- :param request_dict: 请求参数
- :param response: 响应对象
- :param request: 请求
- :param user_id: 用户id
- :return:
- """
- redis_obj = RedisObject()
- # redis加锁,防止同时进行其他操作
- redis_key = 'serial_generate_lock'
- serial_operate_lock = redis_obj.CONN.setnx(redis_key, 1)
- if not serial_operate_lock:
- return response.json(5)
- redis_obj.CONN.expire(redis_key, 60*5)
- id = request_dict.get('id', None)
- quantity = request_dict.get('quantity', None)
- if not all([id, quantity]):
- return response.json(444)
- company_qs = CompanyModel.objects.filter(id=id)
- if not company_qs.exists():
- return response.json(444)
- sum_Serial = SerialNumberModel.objects.filter().count()
- sum_Serial_company = CompanySerialModel.objects.filter().count()
- sum_bind = sum_Serial - sum_Serial_company # 剩余可绑定的序列号
- if int(quantity) > int(sum_bind):
- return response.json(10041)
- try:
- company = company_qs[0]
- start_1 = sum_Serial_company
- end_1 = int(start_1) + int(quantity)
- serial_qs = SerialNumberModel.objects.filter()[start_1:end_1]
- if not serial_qs.exists():
- return response.json(173)
- company_serial_bulk = []
- now_time = int(time.time())
- for item in serial_qs: # 更新状态为已分配但未使用
- company_serial_bulk.append(CompanySerialModel(
- status=3,
- add_time=now_time,
- update_time=now_time,
- company_id=company.id,
- serial_number=item.serial_number,
- ))
- SerialNumberModel.objects.filter(serial_number=item.serial_number).update(
- use_status=3,
- add_time=now_time
- )
- # 记录操作日志
- ip = CommonService.get_ip_address(request)
- content = json.loads(json.dumps(request_dict))
- log = {
- 'ip': ip,
- 'user_id': user_id,
- 'status': 200,
- 'time': now_time,
- 'url': 'serialNumber/serialCompany',
- 'content': json.dumps(content),
- 'operation': '{}生成{}个序列号{}: {}'.format(company.name, quantity, '成功', '同步更新成功'),
- }
- with transaction.atomic():
- CompanySerialModel.objects.bulk_create(company_serial_bulk)
- company.quantity = CompanySerialModel.objects.filter(company_id=id).count()
- company.save()
- Log = LogModel.objects.create(**log)
- # 同步更新业务服务器和uid管理系统的企业序列号表
- url1 = 'http://test.zositechc.cn/company/createSerial'
- url2 = 'https://www.zositechc.cn/company/createSerial'
- url3 = 'http://www.dvema.com/company/createSerial'
- url4 = 'http://api.zositeche.com/company/createSerial'
- requests_data = {'id': id, 'quantity': quantity}
- res1 = requests.post(url=url1, data=requests_data, timeout=2 * 60)
- if res1.status_code != 200:
- fail_reason = '请求测试服务器生成序列号响应状态码异常'
- return self.failResponse(company.name, quantity, fail_reason, Log, response)
- res1 = res1.json()
- if res1['result_code'] != 0:
- fail_reason = '测试服务器生成序列号发生异常'
- return self.failResponse(company.name, quantity, fail_reason, Log, response)
- res2 = requests.post(url=url2, data=requests_data, timeout=2 * 60)
- if res2.status_code != 200:
- fail_reason = '请求国内服务器生成序列号响应状态码异常'
- return self.failResponse(company.name, quantity, fail_reason, Log, response)
- res2 = res2.json()
- if res2['result_code'] != 0:
- fail_reason = '国内服务器生成序列号发生异常'
- return self.failResponse(company.name, quantity, fail_reason, Log, response)
- res3 = requests.post(url=url3, data=requests_data, timeout=2 * 60)
- if res3.status_code != 200:
- fail_reason = '请求美国服务器生成序列号响应状态码异常'
- return self.failResponse(company.name, quantity, fail_reason, Log, response)
- res3 = res3.json()
- if res3['result_code'] != 0:
- fail_reason = '美国服务器生成序列号发生异常'
- return self.failResponse(company.name, quantity, fail_reason, Log, response)
- res4 = requests.post(url=url4, data=requests_data, timeout=2 * 60)
- if res4.status_code != 200:
- fail_reason = '请求欧洲服务器生成序列号响应状态码异常'
- return self.failResponse(company.name, quantity, fail_reason, Log, response)
- res4 = res4.json()
- if res4['result_code'] != 0:
- fail_reason = '欧洲服务器生成序列号发生异常'
- return self.failResponse(company.name, quantity, fail_reason, Log, response)
- redis_obj.del_data(key=redis_key) # redis解锁
- return response.json(0)
- except Exception as e:
- redis_obj.del_data(key=redis_key) # redis解锁
- djangoLogger = logging.getLogger('django')
- djangoLogger.info(repr(e))
- return response.json(500)
- def failResponse(self, company_name, quantity, fail_reason, Log, response):
- operation = '{}生成{}个序列号{}: {}'.format(company_name, quantity, '失败', fail_reason)
- Log.operation = operation
- Log.save()
- return response.json(177)
- def create_serial(self, request_dict, response, request, user_id):
- """
- 生成序列号
- :param request_dict: 请求参数
- :param response: 响应对象
- :param request: 请求
- :param user_id: 用户id
- :return:
- """
- redis_obj = RedisObject()
- # redis加锁,防止同时进行其他操作
- redis_key = 'serial_generate_lock'
- serial_operate_lock = redis_obj.CONN.setnx(redis_key, 1)
- if not serial_operate_lock:
- return response.json(5)
- redis_obj.CONN.expire(redis_key, 60*5)
- quantity = int(request_dict.get('quantity', 0))
- if not quantity:
- return response.json(444)
- try:
- try:
- sum = SerialNumberModel.objects.last().id
- except:
- sum = 0
- serial_number_bulk = []
- now_time = int(time.time())
- algorithm = AlgorithmBaseOn35()
- for i in range(quantity):
- serial_number = algorithm.getLetter(sum)
- sum += 1 # sum每次递增1
- # 前面补0至六位
- serial_number = (6 - len(serial_number)) * '0' + serial_number
- serial_number_bulk.append(SerialNumberModel(serial_number=serial_number, add_time=now_time))
- # 记录操作日志
- ip = CommonService.get_ip_address(request)
- content = json.loads(json.dumps(request_dict))
- log = {
- 'ip': ip,
- 'user_id': user_id,
- 'status': 200,
- 'time': now_time,
- 'url': 'serialNumber/createSerial',
- 'content': json.dumps(content),
- 'operation': '生成{}个序列号{}: {}'.format(quantity, '成功', '同步更新成功'),
- }
- # 开启事务写入
- with transaction.atomic():
- SerialNumberModel.objects.bulk_create(serial_number_bulk)
- Log = LogModel.objects.create(**log)
- # 同步更新业务服务器和uid管理系统的序列号表
- url1 = 'http://test.zositechc.cn/serialNumber/create'
- url2 = 'https://www.zositechc.cn/serialNumber/create'
- url3 = 'http://www.dvema.com/serialNumber/create'
- url4 = 'http://api.zositeche.com/serialNumber/create'
- requests_data = {'quantity': quantity}
- res1 = requests.post(url=url1, data=requests_data, timeout=2 * 60)
- if res1.status_code != 200:
- fail_reason = '请求测试服务器生成序列号响应状态码异常'
- return self.generateFail(quantity, fail_reason, Log, response)
- res1 = res1.json()
- if res1['result_code'] != 0:
- fail_reason = '测试服务器生成序列号发生异常'
- return self.generateFail(quantity, fail_reason, Log, response)
- res2 = requests.post(url=url2, data=requests_data, timeout=2 * 60)
- if res2.status_code != 200:
- fail_reason = '请求国内服务器生成序列号响应状态码异常'
- return self.generateFail(quantity, fail_reason, Log, response)
- res2 = res2.json()
- if res2['result_code'] != 0:
- fail_reason = '国内服务器生成序列号发生异常'
- return self.generateFail(quantity, fail_reason, Log, response)
- res3 = requests.post(url=url3, data=requests_data, timeout=2 * 60)
- if res3.status_code != 200:
- fail_reason = '请求美国服务器生成序列号响应状态码异常'
- return self.generateFail(quantity, fail_reason, Log, response)
- res3 = res3.json()
- if res3['result_code'] != 0:
- fail_reason = '美国服务器生成序列号发生异常'
- return self.generateFail(quantity, fail_reason, Log, response)
- res4 = requests.post(url=url4, data=requests_data, timeout=2 * 60)
- if res4.status_code != 200:
- fail_reason = '请求欧洲服务器生成序列号响应状态码异常'
- return self.generateFail(quantity, fail_reason, Log, response)
- res4 = res4.json()
- if res4['result_code'] != 0:
- fail_reason = '欧洲服务器生成序列号发生异常'
- return self.generateFail(quantity, fail_reason, Log, response)
- redis_obj.del_data(key=redis_key) # redis解锁
- return response.json(0)
- except Exception as e:
- redis_obj.del_data(key=redis_key) # redis解锁
- djangoLogger = logging.getLogger('django')
- djangoLogger.info(repr(e))
- return response.json(500)
- def generateFail(self, quantity, fail_reason, Log, response):
- operation = '生成{}个序列号{}: {}'.format(quantity, '失败', fail_reason)
- Log.operation = operation
- Log.save()
- return response.json(177)
- def do_quantity(self, userID, response):
- """
- 查询当前可用的UID的数量
- :param userID: 响应对象
- :param response: 请求
- :return:
- """
- user_qs = UserModel.objects.filter(id=userID)
- if not user_qs.exists():
- return response.json(9)
- unused_serial_number_count = SerialNumberModel.objects.filter(use_status=0).count()
- remain_qs = CompanySerialModel.objects.filter(status=1).count()
- company_qs = CompanyModel.objects.values('id', 'name')
- try:
- company_serial_list = [] # 剩余已分配未使用的序列号数量
- company_not_used_list = [] # 已使用的序列号数量
- for company in company_qs:
- id = company['id']
- name = company['name']
- serial_list = []
- not_used_qs = CompanySerialModel.objects.filter(status=3, company_id=id).values('serial_number')
- res = {
- 'name': name,
- 'number': not_used_qs.count(),
- 'subMember': []
- }
- for serial in not_used_qs:
- serial_list.append(serial['serial_number'])
- user_serial_qs = UserSerialNumberModel.objects.filter(
- serial_number__serial_number__in=serial_list).values('user__username').annotate(
- count=Count('user__username'))
- for user in user_serial_qs:
- res['subMember'].append({'username': user['user__username'], 'count': user['count']})
- company_not_used_list.append(res)
- for company in company_qs:
- id = company['id']
- name = company['name']
- serial_list = []
- company_serial_qs = CompanySerialModel.objects.filter(status=1, company_id=id).order_by('id').values(
- 'serial_number')
- if company_serial_qs.exists():
- count = company_serial_qs.count()
- last_serial = company_serial_qs.last()['serial_number']
- for serial in company_serial_qs:
- serial_list.append(serial['serial_number'])
- else:
- count = 0
- last_serial = ''
- res = {
- 'name': name,
- 'numbers': count,
- 'lastSerial': last_serial,
- 'subMember': []
- }
- user_serial_qs = UserSerialNumberModel.objects.filter(
- serial_number__serial_number__in=serial_list).values('user__username').annotate(count=Count('user__username'))
- for user in user_serial_qs:
- res['subMember'].append({'username': user['user__username'], 'count': user['count']})
- company_serial_list.append(res)
- res_data = {'code': 0, 'companyRemainCount': company_not_used_list, 'companyRemain': company_serial_list,
- 'unused_serial_number_count': unused_serial_number_count,
- 'unused_all_count': remain_qs}
- return response.json(0, {'data': res_data})
- except Exception as e:
- print(e)
- return response.json(500)
- def do_create(self, request_dict, response):
- quantity = int(request_dict.get('quantity', 0))
- if not quantity:
- return response.json(444)
- try:
- try:
- sum = SerialNumberModel.objects.last().id
- except:
- sum = 0
- serial_number_bulk = []
- now_time = int(time.time())
- algorithm = AlgorithmBaseOn35()
- for i in range(quantity):
- serial_number = algorithm.getLetter(sum)
- sum += 1 # sum每次递增1
- # 前面补0至六位
- serial_number = (6 - len(serial_number)) * '0' + serial_number
- serial_number_bulk.append(SerialNumberModel(serial_number=serial_number, add_time=now_time))
- # 开启事务写入
- with transaction.atomic():
- SerialNumberModel.objects.bulk_create(serial_number_bulk)
- return response.json(0)
- except Exception as e:
- print(e)
- return response.json(500, repr(e))
- @transaction.atomic
- def do_allot(self, request_dict, response):
- username = request_dict.get('username', None)
- quantity = int(request_dict.get('quantity', None))
- token = request_dict.get('token', None)
- token = TokenObject(token)
- if token.code != 0:
- return response.json(token.code)
- user = UserModel.objects.get(id=token.userID)
- if not user or '0' not in user.permission:
- return response.json(404)
- # 要分配的对象
- allot_user_qs = UserModel.objects.filter(username=username)
- if not allot_user_qs.exists():
- return response.json(444, 'username')
- if allot_user_qs[0].permission not in ['1', '2']:
- return response.json(109)
- # 取出对应区域可用的UID分配给allot_user
- sn_qs = SerialNumberModel.objects.filter(use_status=3)[0:quantity]
- sns = []
- for sn in sn_qs:
- sns.append(sn.serial_number)
- cs_qs = CompanySerialModel.objects.filter(serial_number__in=sns, status=3)
- sns = []
- for cs in cs_qs:
- sns.append(cs.serial_number)
- sn_qs = SerialNumberModel.objects.filter(serial_number__in=sns)
- count = sn_qs.count()
- if count < quantity:
- return response.json(444, '序列号不足')
- try:
- updates = []
- datas = []
- if not sn_qs.exists():
- return response.json(444)
- sn_qs = sn_qs[0:quantity]
- now_time = int(time.time())
- for i in range(len(sn_qs)):
- item = sn_qs[i]
- serialNumberModel = SerialNumberModel(
- id=item.id,
- serial_number=item.serial_number,
- status=item.status,
- use_status=1,
- add_time=item.add_time
- )
- CompanySerialModel.objects.filter(serial_number=item.serial_number).update(status=1)
- user_serial_number = UserSerialNumberModel()
- user_serial_number.serial_number = serialNumberModel
- user_serial_number.user = allot_user_qs[0]
- user_serial_number.add_time = now_time
- user_serial_number.update_time = now_time
- datas.append(user_serial_number)
- updates.append(serialNumberModel)
- if len(updates) > 0:
- bulk_update(updates)
- UserSerialNumberModel.objects.bulk_create(datas)
- return response.json(0)
- except Exception as e:
- print(e)
- return response.json(500)
- @staticmethod
- def get_serial(request_dict, response):
- """
- 获取序列号
- @param request_dict: 请求数据
- @param response: 响应
- @return: response
- """
- token = request_dict.get('token', None)
- time_stamp = request_dict.get('time_stamp', None)
- company_secret = request_dict.get('company_id', None) # 企业id, Ansjer: MTEyMTNB, Loocam: VmXEWnBR
- if not all([token, time_stamp, company_secret]) or company_secret not in ['MTEyMTNB', 'VmXEWnBR']:
- return response.json(444)
- # 时间戳token校验
- if not CommonService.check_time_stamp_token(token, time_stamp):
- return response.json(13)
- redis_obj = RedisObject()
- # redis加锁,防止同时进行其他操作
- serial_operate_lock_key = 'serial_operate_lock'
- serial_operate_lock = redis_obj.CONN.setnx(serial_operate_lock_key, 1)
- if not serial_operate_lock:
- return response.json(5)
- redis_obj.CONN.expire(serial_operate_lock_key, 60)
- # 企业为Ansjer查询13800138005账号(user_id: 6)下未使用的序列号,Loocam查询13800138006账号(user_id: 7)
- if company_secret == 'MTEyMTNB':
- user_id = 6
- else:
- user_id = 7
- user_serial_number_qs = UserSerialNumberModel.objects.filter(
- user__id=user_id, serial_number__use_status=1).first()
- if user_serial_number_qs is None:
- redis_obj.del_data(key=serial_operate_lock_key) # redis解锁
- return response.json(14)
- serial_number = user_serial_number_qs.serial_number.serial_number
- # 查询序列号企业数据
- company_serial_qs = CompanySerialModel.objects.filter(company__secret=company_secret,
- serial_number=serial_number, status=1).first()
- if company_serial_qs is None:
- redis_obj.del_data(key=serial_operate_lock_key) # redis解锁
- return response.json(14)
- company_mark = company_serial_qs.company.mark
- # 防止重复获取序列号
- serial_number_lock = redis_obj.CONN.setnx(serial_number + 'serial_number_lock', 1)
- redis_obj.CONN.expire(serial_number + 'serial_number_lock', 60)
- if not serial_number_lock:
- return response.json(5)
- # 获取mac
- mac_qs = MacModel.objects.filter(is_active=True).values('value')
- if not mac_qs.exists():
- return response.json(175)
- mac = mac_qs[0]['value']
- # 绑定mac地址成功后更新mac表
- next_mac = CommonService.updateMac(mac)
- now_time = int(time.time())
- mac_data = {
- 'update_time': now_time
- }
- if next_mac:
- mac_data['value'] = next_mac
- else:
- mac_data['is_active'] = False
- # 操作日志数据
- operation = '获取序列号:{}'.format(serial_number)
- log = {
- 'user_id': user_id,
- 'time': now_time,
- 'operation': operation,
- 'url': 'serialNumber/getSerial',
- }
- try:
- with transaction.atomic():
- # 更新和创建数据
- mac_qs.update(**mac_data)
- SerialNumberModel.objects.filter(serial_number=serial_number).update(use_status=2, add_time=now_time)
- CompanySerialModel.objects.filter(serial_number=serial_number).update(status=2, update_time=now_time)
- LogModel.objects.create(**log)
- serial_number = serial_number + company_mark
- SerialMac.objects.create(serial_number=serial_number, mac=mac, add_time=now_time)
- redis_obj.del_data(key=serial_operate_lock_key) # redis解锁
- return response.json(0, {'serial_number': serial_number, 'mac': mac})
- except Exception as e:
- redis_obj.del_data(key=serial_operate_lock_key) # redis解锁
- return response.json(500, repr(e))
- @staticmethod
- def generate_mac(request_dict, response):
- """
- 生成mac到mac.txt文件
- :param request_dict: 请求
- :param response: 响应
- :return: response
- """
- quantity = int(request_dict.get('quantity', 0))
- if quantity == 0:
- return response.json(444)
- mac_qs = MacModel.objects.filter(is_active=True).values('value')
- if not mac_qs.exists():
- return response.json(175)
- mac = mac_qs[0]['value']
- now_time = int(time.time())
- with open('mac.txt', 'w') as f:
- f.write(mac + '\n')
- for i in range(quantity - 1):
- next_mac = CommonService.updateMac(mac)
- mac = next_mac
- f.write(next_mac + '\n')
- # 保存下个mac
- next_mac = CommonService.updateMac(mac)
- mac_qs.update(value=next_mac, update_time=now_time)
- return response.json(0)
- @staticmethod
- def check_serial_number(response):
- """
- 定时查询13800138005,13800138006账号未使用的序列号数量,不足5000发送邮件通知
- :param response: 响应
- :return: response
- """
- try:
- warning_count = 5000
- for user_id in [6, 7]:
- user_serial_number_count = UserSerialNumberModel.objects.filter(
- user__id=user_id, serial_number__use_status=1).count()
- if user_serial_number_count < warning_count:
- user_qs = UserModel.objects.filter(id=user_id).values('username')
- username = user_qs[0]['username']
- sys_msg_text = '{}账号序列号数量少于{}个,请及时处理'.format(username, warning_count)
- S3Email().faEmail(sys_msg_text, 'servers@ansjer.com')
- return response.json(0)
- except Exception as e:
- return response.json(500)
- @staticmethod
- def generate_mac_serial_number(request_dict, response):
- """
- 生成mac到mac.txt文件
- :param request_dict: 请求
- :param response: 响应
- :return: response
- """
- quantity = int(request_dict.get('quantity', 0))
- if quantity == 0:
- return response.json(444)
- mac_qs = MacModel.objects.filter(is_active=True).values('value')
- if not mac_qs.exists():
- return response.json(175)
- mac = mac_qs[0]['value']
- now_time = int(time.time())
- with open('mac.txt', 'w') as f:
- f.write(mac + '\n')
- for i in range(quantity - 1):
- next_mac = CommonService.updateMac(mac)
- mac = next_mac
- f.write(next_mac + '\n')
- # 保存下个mac
- next_mac = CommonService.updateMac(mac)
- mac_qs.update(value=next_mac, update_time=now_time)
- return response.json(0)
- @staticmethod
- def generate_mac_and_serial_numbers(request_dict, response):
- """
- 生成mac_serial.csv文件
- :param request_dict: 请求
- :request_dict quantity: 生成数量
- :request_dict user_id: 账号ID
- :param response: 响应
- :return: response
- """
- # 获取生成数量
- quantity = int(request_dict.get('quantity', 0))
- # 获取公司类型
- company_id = int(request_dict.get('company_id', 0))
- # 获取账号ID
- user_id = int(request_dict.get('user_id', 0))
- if not (quantity and company_id and user_id):
- return response.json(444)
- # 获取mac地址查询集
- mac_qs = MacModel.objects.filter(is_active=True).values('value')
- if not mac_qs.exists():
- return response.json(175)
- mac = mac_qs[0]['value']
- # 获取序列号地址查询集
- user_serial_numbers = UserSerialNumberModel.objects.filter(
- user__id=user_id,
- serial_number__status=1,
- serial_number__use_status=1
- ).values('serial_number__serial_number')[:quantity]
- # 判断序列号地址是否充足
- if len(user_serial_numbers) < quantity:
- return response.json(444, '序列号不足')
- now_time = int(time.time())
- # 需要更新的序列号表
- serial_numbers_list = []
- # 保存到CSV的数据
- csv_data = []
- mark = ''
- if company_id == 1:
- mark = '11A'
- elif company_id == 2:
- mark = '11L'
- for user_serial in user_serial_numbers:
- serial_number = user_serial['serial_number__serial_number']
- serial_numbers_list.append(serial_number)
- next_mac = CommonService.updateMac(mac)
- csv_serial_number = serial_number + mark
- csv_data.append([next_mac, csv_serial_number])
- mac = next_mac
- # 批量写入CSV文件
- with open('mac_serial.csv', 'w', newline='') as csvfile:
- writer = csv.writer(csvfile)
- writer.writerow(['MAC', 'SN'])
- writer.writerows(csv_data)
- # 更新SerialNumberModel,CompanySerialModel表
- SerialNumberModel.objects.filter(serial_number__in=serial_numbers_list) \
- .update(use_status=2)
- CompanySerialModel.objects.filter(serial_number__in=serial_numbers_list) \
- .update(status=2, update_time=now_time)
- next_mac = CommonService.updateMac(mac)
- mac_qs.update(value=next_mac, update_time=now_time)
- return response.json(0)
- @staticmethod
- def check_serial_log(request_dict, response):
- """
- 检查获取序列号日志
- :param request_dict: 请求
- :param response: 响应
- :return: response
- """
- serial = request_dict.get('serial', None)
- if not serial:
- return response.json(444)
- operation = '获取序列号:{}'.format(serial[:6])
- log_qs = LogModel.objects.filter(operation=operation).values('id')
- if not log_qs.exists():
- return response.json(173)
- return response.json(0)
|