123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306 |
- import time
- import requests
- import random
- import uuid
- import json
- from libs.aesgzip import tt_encrypt
- from xlog03 import *
- from rds_model.db_redis import DbRedis
- def get_mc():
- def a():
- seed = "1234567890ABCDEF"
- sa = []
- for i in range(2):
- sa.append(random.choice(seed))
- salt = ''.join(sa)
- return salt
- k = ''
- for i in range(6):
- k += a() + ':'
- return k[:-1]
- def get_random(i, random_type=1):
- if random_type == 1:
- return str(random.randint(1 * 10 ** (i - 1), 1 * 10 ** i - 1))
- elif random_type == 8:
- seed = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
- sa = []
- for i in range(i):
- sa.append(random.choice(seed))
- salt = ''.join(sa)
- return salt
- else:
- seed = "1234567890abcde"
- sa = []
- for i in range(i):
- sa.append(random.choice(seed))
- salt = ''.join(sa)
- return salt
- V1 = '6'
- V2 = '8'
- channel = 'huawei'
- device_type = 'RKK-YZ00'
- device_brand = 'HUAWEI'
- print(channel, device_type)
- class DouYinApi:
- USER_AGENT = f'com.ss.android.ugc.aweme/{V1}{V2}0 (Linux; U; Android 5.1.1; zh_CN; {device_type}; Build/LMY47V; Cronet/58.0.2991.0)'
- COMMON_DEVICE_PARAMS = {
- 'retry_type': 'no_retry',
- 'ac': '4g',
- 'channel': channel,
- 'aid': '1128',
- 'app_name': 'aweme',
- 'version_code': f'{V1}{V2}0',
- 'version_name': f'{V1}.{V2}.0',
- 'device_platform': 'android',
- 'ssmix': 'a',
- 'device_type': device_type,
- 'device_brand': device_brand,
- 'language': 'zh',
- 'os_api': '26',
- 'os_version': '8.0.0',
- 'manifest_version_code': f'{V1}{V2}0',
- 'resolution': '720*1280',
- 'dpi': '320',
- 'update_version_code': f'{V1}{V2}02',
- 'mcc_mnc': '46000'
- }
- PROXY = {}
- def __init__(self, sessionid, proxies):
- """
- :param cid: client id
- """
- self.proxies = proxies
- self.__cid = ''
- self.__device_id = ''
- self.__iid = ''
- self.__uuid = ''
- self.__openudid = ''
- self.__device_params = {}
- self.__cookie = {
- }
- def init_device_ids(self, device_id, iid, udid, openudid, cc=None):
- self.__device_id = device_id
- self.__iid = iid
- self.__uuid = udid
- self.__openudid = openudid
- device_ids = {
- 'uuid': udid,
- 'openudid': openudid
- }
- if device_id and iid:
- device_ids.update({
- 'device_id': device_id,
- 'iid': iid,
- })
- self.__device_params = self.COMMON_DEVICE_PARAMS.copy()
- self.__device_params.update(device_ids)
- if cc:
- self.__cookie.update(cc)
- def __get_encrypted_device_info(self, device_id, openudid, udid, clientudid, serial_number, mac, iid):
- register_info = {
- "magic_tag": "ss_app_log",
- "header": {
- "display_name": "抖音短视频",
- "update_version_code": int(self.COMMON_DEVICE_PARAMS['update_version_code']),
- "manifest_version_code": int(self.COMMON_DEVICE_PARAMS['manifest_version_code']),
- "aid": 1128,
- "channel": self.COMMON_DEVICE_PARAMS['channel'],
- "package": "com.ss.android.ugc.aweme",
- "app_version": self.COMMON_DEVICE_PARAMS['version_name'],
- "version_code": int(self.COMMON_DEVICE_PARAMS['version_code']),
- "sdk_version": "2.7.5.8",
- "os": "Android",
- "os_version": self.COMMON_DEVICE_PARAMS['os_version'],
- "os_api": self.COMMON_DEVICE_PARAMS['os_api'],
- "device_model": self.COMMON_DEVICE_PARAMS['device_type'],
- "device_brand": self.COMMON_DEVICE_PARAMS['device_brand'],
- "device_manufacturer": self.COMMON_DEVICE_PARAMS['device_brand'],
- "cpu_abi": "armeabi-v7a",
- "build_serial": serial_number,
- "release_build": "2132ca7_20190320",
- "density_dpi": self.COMMON_DEVICE_PARAMS['dpi'],
- "display_density": "xhdpi",
- "resolution": "1280x720",
- "language": "zh",
- "mc": mac,
- "timezone": 8,
- "access": "4G",
- "not_request_sender": 0,
- "rom": "MIUI-9.11.7",
- "rom_version": "miui_V11_9.11.7",
- "openudid": str(openudid),
- "udid": str(udid),
- "clientudid": str(clientudid),
- "serial_number": str(serial_number),
- "sim_serial_number": [
- ],
- "region": "CN",
- "tz_name": "Asia/Shanghai",
- "tz_offset": 28800
- },
- "_gen_time": str(round(time.time() * 1000))
- }
- if device_id:
- register_info['header']['device_id'] = str(device_id)
- if iid:
- register_info['header']['iid'] = str(iid)
- register_info['header']['push_sdk'] = '[1, 2, 6, 7, 8, 9]'
- return tt_encrypt((json.dumps(register_info)))
- def register_device(self):
- udid = '8604' + get_random(11)
- serial_number = str(uuid.uuid4())[-12:]
- openudid = '3b22' + str(uuid.uuid4())[-12:]
- clientudid = str(uuid.uuid4())
- mc = get_mc()
- params = {
- 'uuid': udid,
- 'openudid': openudid,
- '_rticket': str(int(round(time.time() * 1000)))
- }
- params.update(self.COMMON_DEVICE_PARAMS)
- device_register_url = 'https://log.snssdk.com/service/2/device_register/?' + parse.urlencode(params)
- headers = {
- 'User-Agent': DouYinApi.USER_AGENT
- }
- d = self.__get_encrypted_device_info(None, openudid, udid, clientudid, serial_number, mc, iid=None)
- if self.proxies:
- resp = requests.post(device_register_url,
- data=d, proxies=self.proxies,
- headers=headers, verify=False)
- else:
- resp = requests.post(device_register_url,
- data=d,
- headers=headers, verify=False)
- cookie = resp.cookies.get_dict()
- if len(cookie) != 0:
- self.__cookie.update(cookie)
- resp = resp.json()
- ids = {
- 'new_user': resp['new_user'],
- 'device_id': str(resp['device_id']),
- 'iid': str(resp['install_id']),
- 'uuid': udid,
- 'openudid': openudid,
- 'serial_number': serial_number,
- 'clientudid': clientudid,
- 'mc': mc,
- 'cookie': parse.urlencode(self.__cookie),
- 'times': 0
- }
- return ids
- def __get_random(self, len):
- return ''.join(str(random.choice(range(10))) for _ in range(len))
- def __get_msg(self, resp):
- return json.loads(resp.text)['msg']
- def __get_random_mac(self):
- mac = [0x10, 0x2a, 0xb3,
- random.randint(0x00, 0x7f),
- random.randint(0x00, 0xff),
- random.randint(0x00, 0xff)]
- return ':'.join(map(lambda x: "%02x" % x, mac))
- def __add_other_params(self, douyin_url, params=None):
- if params is None:
- params = {}
- if not douyin_url.__contains__('?'):
- douyin_url = douyin_url + '?'
- common_params = parse.urlencode(self.__device_params)
- if douyin_url.endswith('?') or douyin_url.endswith('&'):
- douyin_url = douyin_url + common_params
- else:
- douyin_url = douyin_url + '&' + common_params
- if len(params) > 0:
- douyin_url = douyin_url + '&' + parse.urlencode(params)
- douyin_url = douyin_url + "&_rticket=" + str(int(round(time.time() * 1000))) + "&ts=" + str(int(time.time()))
- return douyin_url
- def __http_get(self, url, query_params=None):
- if query_params is None:
- query_params = {}
- url = self.__add_other_params(url, query_params)
- print(url)
- sign = self.__get_sign(url)
- if self.proxies:
- resp = requests.get(url, headers=self.__get_headers(sign), cookies=self.__cookie, proxies=self.proxies,
- verify=False)
- else:
- resp = requests.get(url, headers=self.__get_headers(sign), cookies=self.__cookie, verify=False)
- cookie = resp.cookies.get_dict()
- if len(cookie) != 0:
- self.__cookie.update(cookie)
- # print(resp.text)
- # return resp.json()
- return resp
- def __get_sign(self, url, form_params=None):
- stub = ''
- if form_params:
- a = parse.urlencode(form_params)
- stub = hashlib.md5(a.encode('utf-8')).hexdigest()
- ts = int(time.time())
- ppp = url[url.index('?') + 1:]
- s = getXGon(ppp, stub, parse.urlencode(self.__cookie))
- gorgon = xGorgon(ts, strToByte(s))
- sign = {
- 'X-Khronos': str(ts),
- 'X-Gorgon': gorgon,
- 'X-Pods': ''
- }
- print(gorgon)
- if stub:
- sign.update({
- 'X-SS-STUB': stub.upper()
- })
- return sign
- def __get_headers(self, sign=None):
- if sign is None:
- sign = {}
- headers = {
- 'User-Agent': DouYinApi.USER_AGENT,
- 'X-SS-REQ-TICKET': str(round(time.time() * 1000)),
- }
- headers.update(sign)
- return headers
- def get_user_info(self, user_id):
- """获取用户信息
- :param user_id: 用户ID
- :return:
- """
- params = {
- 'user_id': user_id
- }
- douyin_url = 'https://aweme-eagle.snssdk.com/aweme/v1/user/?'
- return self.__http_get(douyin_url, params)
|