import time import requests import random import uuid import json from libs.aesgzip import tt_encrypt from xlog03 import * from rds_model.db_redis import DbRedis def get_mc(): def a(): seed = "1234567890ABCDEF" sa = [] for i in range(2): sa.append(random.choice(seed)) salt = ''.join(sa) return salt k = '' for i in range(6): k += a() + ':' return k[:-1] def get_random(i, random_type=1): if random_type == 1: return str(random.randint(1 * 10 ** (i - 1), 1 * 10 ** i - 1)) elif random_type == 8: seed = "ABCDEFGHIJKLMNOPQRSTUVWXYZ" sa = [] for i in range(i): sa.append(random.choice(seed)) salt = ''.join(sa) return salt else: seed = "1234567890abcde" sa = [] for i in range(i): sa.append(random.choice(seed)) salt = ''.join(sa) return salt V1 = '6' V2 = '8' channel = 'huawei' device_type = 'RKK-YZ00' device_brand = 'HUAWEI' print(channel, device_type) class DouYinApi: USER_AGENT = f'com.ss.android.ugc.aweme/{V1}{V2}0 (Linux; U; Android 5.1.1; zh_CN; {device_type}; Build/LMY47V; Cronet/58.0.2991.0)' COMMON_DEVICE_PARAMS = { 'retry_type': 'no_retry', 'ac': '4g', 'channel': channel, 'aid': '1128', 'app_name': 'aweme', 'version_code': f'{V1}{V2}0', 'version_name': f'{V1}.{V2}.0', 'device_platform': 'android', 'ssmix': 'a', 'device_type': device_type, 'device_brand': device_brand, 'language': 'zh', 'os_api': '26', 'os_version': '8.0.0', 'manifest_version_code': f'{V1}{V2}0', 'resolution': '720*1280', 'dpi': '320', 'update_version_code': f'{V1}{V2}02', 'mcc_mnc': '46000' } PROXY = {} def __init__(self, sessionid, proxies): """ :param cid: client id """ self.proxies = proxies self.__cid = '' self.__device_id = '' self.__iid = '' self.__uuid = '' self.__openudid = '' self.__device_params = {} self.__cookie = { } def init_device_ids(self, device_id, iid, udid, openudid, cc=None): self.__device_id = device_id self.__iid = iid self.__uuid = udid self.__openudid = openudid device_ids = { 'uuid': udid, 'openudid': openudid } if device_id and iid: device_ids.update({ 'device_id': device_id, 'iid': iid, }) self.__device_params = self.COMMON_DEVICE_PARAMS.copy() self.__device_params.update(device_ids) if cc: self.__cookie.update(cc) def __get_encrypted_device_info(self, device_id, openudid, udid, clientudid, serial_number, mac, iid): register_info = { "magic_tag": "ss_app_log", "header": { "display_name": "抖音短视频", "update_version_code": int(self.COMMON_DEVICE_PARAMS['update_version_code']), "manifest_version_code": int(self.COMMON_DEVICE_PARAMS['manifest_version_code']), "aid": 1128, "channel": self.COMMON_DEVICE_PARAMS['channel'], "package": "com.ss.android.ugc.aweme", "app_version": self.COMMON_DEVICE_PARAMS['version_name'], "version_code": int(self.COMMON_DEVICE_PARAMS['version_code']), "sdk_version": "2.7.5.8", "os": "Android", "os_version": self.COMMON_DEVICE_PARAMS['os_version'], "os_api": self.COMMON_DEVICE_PARAMS['os_api'], "device_model": self.COMMON_DEVICE_PARAMS['device_type'], "device_brand": self.COMMON_DEVICE_PARAMS['device_brand'], "device_manufacturer": self.COMMON_DEVICE_PARAMS['device_brand'], "cpu_abi": "armeabi-v7a", "build_serial": serial_number, "release_build": "2132ca7_20190320", "density_dpi": self.COMMON_DEVICE_PARAMS['dpi'], "display_density": "xhdpi", "resolution": "1280x720", "language": "zh", "mc": mac, "timezone": 8, "access": "4G", "not_request_sender": 0, "rom": "MIUI-9.11.7", "rom_version": "miui_V11_9.11.7", "openudid": str(openudid), "udid": str(udid), "clientudid": str(clientudid), "serial_number": str(serial_number), "sim_serial_number": [ ], "region": "CN", "tz_name": "Asia/Shanghai", "tz_offset": 28800 }, "_gen_time": str(round(time.time() * 1000)) } if device_id: register_info['header']['device_id'] = str(device_id) if iid: register_info['header']['iid'] = str(iid) register_info['header']['push_sdk'] = '[1, 2, 6, 7, 8, 9]' return tt_encrypt((json.dumps(register_info))) def register_device(self): udid = '8604' + get_random(11) serial_number = str(uuid.uuid4())[-12:] openudid = '3b22' + str(uuid.uuid4())[-12:] clientudid = str(uuid.uuid4()) mc = get_mc() params = { 'uuid': udid, 'openudid': openudid, '_rticket': str(int(round(time.time() * 1000))) } params.update(self.COMMON_DEVICE_PARAMS) device_register_url = 'https://log.snssdk.com/service/2/device_register/?' + parse.urlencode(params) headers = { 'User-Agent': DouYinApi.USER_AGENT } d = self.__get_encrypted_device_info(None, openudid, udid, clientudid, serial_number, mc, iid=None) if self.proxies: resp = requests.post(device_register_url, data=d, proxies=self.proxies, headers=headers, verify=False) else: resp = requests.post(device_register_url, data=d, headers=headers, verify=False) cookie = resp.cookies.get_dict() if len(cookie) != 0: self.__cookie.update(cookie) resp = resp.json() ids = { 'new_user': resp['new_user'], 'device_id': str(resp['device_id']), 'iid': str(resp['install_id']), 'uuid': udid, 'openudid': openudid, 'serial_number': serial_number, 'clientudid': clientudid, 'mc': mc, 'cookie': parse.urlencode(self.__cookie), 'times': 0 } return ids def __get_random(self, len): return ''.join(str(random.choice(range(10))) for _ in range(len)) def __get_msg(self, resp): return json.loads(resp.text)['msg'] def __get_random_mac(self): mac = [0x10, 0x2a, 0xb3, random.randint(0x00, 0x7f), random.randint(0x00, 0xff), random.randint(0x00, 0xff)] return ':'.join(map(lambda x: "%02x" % x, mac)) def __add_other_params(self, douyin_url, params=None): if params is None: params = {} if not douyin_url.__contains__('?'): douyin_url = douyin_url + '?' common_params = parse.urlencode(self.__device_params) if douyin_url.endswith('?') or douyin_url.endswith('&'): douyin_url = douyin_url + common_params else: douyin_url = douyin_url + '&' + common_params if len(params) > 0: douyin_url = douyin_url + '&' + parse.urlencode(params) douyin_url = douyin_url + "&_rticket=" + str(int(round(time.time() * 1000))) + "&ts=" + str(int(time.time())) return douyin_url def __http_get(self, url, query_params=None): if query_params is None: query_params = {} url = self.__add_other_params(url, query_params) print(url) sign = self.__get_sign(url) if self.proxies: resp = requests.get(url, headers=self.__get_headers(sign), cookies=self.__cookie, proxies=self.proxies, verify=False) else: resp = requests.get(url, headers=self.__get_headers(sign), cookies=self.__cookie, verify=False) cookie = resp.cookies.get_dict() if len(cookie) != 0: self.__cookie.update(cookie) # print(resp.text) # return resp.json() return resp def __get_sign(self, url, form_params=None): stub = '' if form_params: a = parse.urlencode(form_params) stub = hashlib.md5(a.encode('utf-8')).hexdigest() ts = int(time.time()) ppp = url[url.index('?') + 1:] s = getXGon(ppp, stub, parse.urlencode(self.__cookie)) gorgon = xGorgon(ts, strToByte(s)) sign = { 'X-Khronos': str(ts), 'X-Gorgon': gorgon, 'X-Pods': '' } print(gorgon) if stub: sign.update({ 'X-SS-STUB': stub.upper() }) return sign def __get_headers(self, sign=None): if sign is None: sign = {} headers = { 'User-Agent': DouYinApi.USER_AGENT, 'X-SS-REQ-TICKET': str(round(time.time() * 1000)), } headers.update(sign) return headers def get_user_info(self, user_id): """获取用户信息 :param user_id: 用户ID :return: """ params = { 'user_id': user_id } douyin_url = 'https://aweme-eagle.snssdk.com/aweme/v1/user/?' return self.__http_get(douyin_url, params)