123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111 |
- #!/usr/bin/python3
- #coding=utf-8
- #-*- coding: UTF-8 -*
- import pymysql
- import json
- class PersistentMysql:
- def __init__(self):
- host = 'rm-2ze8r493yk3636g3p.mysql.rds.aliyuncs.com'
- user = 'api_online'
- password = 'vetted8#hatbox'
- port = 3306
- # host = 'cdb-41ibb8ki.bj.tencentcdb.com'
- # user = 'dynamic'
- # password = 'J8tIG4$%1CGkgs*M'
- # port = 10043
- self.db = pymysql.connect(
- host = host,
- user = user,
- password = password,
- database = 'kuaishou',
- port = port,
- charset='utf8mb4'
- )
- # 使用cursor()方法获取操作游标
- self.cursor = self.db.cursor()
- def batch_insert(self, batch_data):
- insert_data = []
- # 收到弹幕
- if batch_data.get('user'):
- for data in batch_data.get('user'):
- # live_stream_id,type,principal_id,user_name,content,gift_id,batch_size,combo_count,rank,created_at
- action = (
- self.live_stream_id,
- 1,
- data.get('user').get('principalId'),
- json.loads(json.dumps(data.get('user').get('userName'))),
- json.loads(json.dumps(data.get('content'))),
- None,
- None,
- None,
- None,
- self.created_at
- )
- insert_data.append(action)
- # 收到礼物
- if batch_data.get('gift'):
- # live_stream_id,type,principal_id,user_name,content,gift_id,batch_size,combo_count,rank,created_at
- for data in batch_data.get('gift'):
- action = (
- self.live_stream_id,
- 2,
- data.get('user').get('principalId'),
- json.loads(json.dumps(data.get('user').get('userName'))),
- None,
- data.get('giftId'),
- data.get('batchSize'),
- data.get('comboCount'),
- data.get('rank'),
- self.created_at
- )
- insert_data.append(action)
- # 收到点亮
- if batch_data.get('like'):
- for data in batch_data.get('like'):
- action = (
- self.live_stream_id,
- 3,
- data.get('user').get('principalId'),
- json.loads(json.dumps(data.get('user').get('userName'))),
- None,
- None,
- None,
- None,
- None,
- self.created_at
- )
- insert_data.append(action)
- data_len = len(insert_data)
- if data_len == 0:
- # 关闭数据库连接
- self.db.close()
- return
- sql = 'insert into sc_kwai_barrage(live_stream_id,type,principal_id,user_name,content,gift_id,batch_size,combo_count,rank,created_at) value(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)'
- try:
- # 批量插入数据
- result = self.cursor.executemany(sql, insert_data)
- print(self.live_stream_id + '_捕获' + str(data_len) + '条数据,成功记录' + str(result) + '条数据')
- finally:
- # 提交到数据库执行
- self.db.commit()
- # 最终关闭数据库连接
- self.db.close()
|