123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149 |
- #!/usr/bin/python3
- # coding=utf-8
- # -*- coding: utf-8 -*-
- import time
- import json
- import threading
- import sys
- import random
- from rds_model.rds_live_commodity_detail_request_list import RdsLiveCommodityDetailRequestList
- from libs.live_commodity_detail_v1 import LiveCommodityDetailV1
- from libs.mysql_dy_live_ifsolt import MysqlDyLiveCommodity
- from log.print_log import PrintLog
- start_time = time.time()
- def scrape(heat):
- while True:
- rds = RdsLiveCommodityDetailRequestList()
- request_data = rds.get_request_params(heat)
- if request_data is None:
- time.sleep(0.1)
- continue
- rds_list = RdsLiveCommodityDetailRequestList()
- request_split = request_data.split('@')
- if len(request_split) < 5:
- print('数据格式不对' + request_data)
- continue
- promotion_id = request_split[0]
- product_id = request_split[1]
- uid = request_split[2]
- room_id = request_split[3]
- exec_time = int(request_split[4])
- if int(time.time()) < exec_time:
- sp_ns = exec_time - int(time.time())
- time.sleep(sp_ns)
- PrintLog.print(
- time.strftime("%H:%M:%S", time.localtime()) + ' '
- + time.strftime("%H:%M:%S", time.localtime(exec_time)) + ' '
- + str(heat) + ' ' + promotion_id
- )
- try:
- commodity_detail = LiveCommodityDetailV1.get_data(product_id)
- if (commodity_detail is None) or (commodity_detail == '') or ('real_sell_num' in commodity_detail)==False :
- rds_list.record_v1_score(0)
- PrintLog.print(
- time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ' 数据为空 ' + promotion_id
- )
- if heat==0:
- if random.randint(0, 100) > 0:
- rds_list.add_request_params(request_data, heat)
- print('首次失败重新插入 ' + request_data)
- continue
- if heat>0:
- if random.randint(0, 9) > 0:
- rds_list.add_request_params(request_data, heat)
- print(str(heat) + '非首次失败重新插入 ' + request_data)
- continue
- if heat < 2:
- goodsDb = MysqlDyLiveCommodity()
- pix = str(uid)[-1]
- goodsInfo = goodsDb.get_goods_info(room_id, product_id, pix)
- dataid,sold_out_at,revise_price,created_at = goodsInfo
- if revise_price is not None:
- continue
- if sold_out_at == '0000-00-00 00:00:00' or sold_out_at == '1970-01-01 08:00:00':
- add_t = 300
- news_time = int(time.time()) + add_t
- next_rds = promotion_id + '@' + product_id + '@' + uid + '@' + room_id + '@' + str(news_time)
- rds_list.push_middle_list(next_rds)
- else :
- add_t = 1200
- news_time = int(time.time()) + add_t
- next_rds = promotion_id + '@' + product_id + '@' + uid + '@' + room_id + '@' + str(news_time)
- rds_list.push_last_list(next_rds)
- if (commodity_detail is None) or (commodity_detail == '') or ('real_sell_num' in commodity_detail)==False:
- continue
- data = json.dumps({
- "data": commodity_detail,
- "extra": {
- 'room_id': room_id,
- 'promotion_id': promotion_id,
- 'product_id': product_id,
- 'uid': uid,
- 'heat': heat
- }
- })
- rds_list.record_v1_score(1)
- rds_list.push_v1_data_list(data)
- except Exception as e:
- rds_list.record_v1_score(0)
- rds_list.add_request_params(request_data, heat)
- PrintLog.print(
- time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + '抛出异常!' + str(e) + '\n'
- + request_data
- )
- if __name__ == "__main__":
- import warnings
- warnings.filterwarnings("ignore")
- heat = int(sys.argv[1])
- threading_count = int(sys.argv[2])
- rds = RdsLiveCommodityDetailRequestList()
- print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ' ' + str(heat) + ' 开始执行,商品销量队列长度:' + str(rds.get_len(heat)))
- while True:
- sys.stdout.flush()
- # 减去主线程
- active_count = threading.active_count() - 1
- increment = threading_count - active_count
- if increment > 0:
- sys.stdout.flush()
- task = threading.Thread(target=scrape, args=(heat,))
- print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ' 启动线程' + str(increment))
- task.start() # 准备就绪,等待cpu执行
- current_time = time.time()
- time.sleep(0.01)
|