店播爬取Python脚本

es.py 2.8KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. #!/usr/bin/python3
  2. #coding=utf-8
  3. #-*- coding: UTF-8 -*
  4. from elasticsearch import Elasticsearch
  5. from elasticsearch.helpers import bulk
  6. class ES:
  7. def __init__(self):
  8. self.index = 'sc_kwai_barrages'
  9. self.type = 'sc_kwai_barrage'
  10. self.connection = Elasticsearch('es-cn-0pp19srqi00047s72.elasticsearch.aliyuncs.com:9200', http_auth=('elastic', 'Kuxuan666!'))
  11. def batch_insert(self, batch_data):
  12. actions = []
  13. # 收到弹幕
  14. if batch_data.get('user'):
  15. for data in batch_data.get('user'):
  16. action = {
  17. "_index": self.index,
  18. "_type": self.type,
  19. "_source": {
  20. "live_stream_id": batch_data.get('live_stream_id'),
  21. "type": 1,
  22. "principal_id": data.get('user').get('principalId'),
  23. "user_name": data.get('user').get('userName'),
  24. "content": data.get('content'),
  25. "created_at": batch_data.get('created_at')
  26. }
  27. }
  28. actions.append(action)
  29. # 收到礼物
  30. if batch_data.get('gift'):
  31. for data in batch_data.get('gift'):
  32. action = {
  33. "_index": self.index,
  34. "_type": self.type,
  35. "_source": {
  36. "live_stream_id": batch_data.get('live_stream_id'),
  37. "type": 2,
  38. "principal_id": data.get('user').get('principalId'),
  39. "user_name": data.get('user').get('userName'),
  40. "gift_id": data.get('giftId'),
  41. "batch_size": data.get('batchSize'),
  42. "combo_count": data.get('comboCount'),
  43. "rank": data.get('rank'),
  44. "created_at": batch_data.get('created_at')
  45. }
  46. }
  47. actions.append(action)
  48. # 收到点亮
  49. if batch_data.get('like'):
  50. for data in batch_data.get('like'):
  51. action = {
  52. "_index": self.index,
  53. "_type": self.type,
  54. "_source": {
  55. "live_stream_id": batch_data.get('live_stream_id'),
  56. "type": 3,
  57. "principal_id": data.get('user').get('principalId'),
  58. "user_name": data.get('user').get('userName'),
  59. "created_at": batch_data.get('created_at')
  60. }
  61. }
  62. actions.append(action)
  63. if len(actions) == 0:
  64. return
  65. res, _ = bulk(self.connection, actions, index = self.index, raise_on_error=True)