店播爬取Python脚本

unknown_fields_test.py 20KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482
  1. #! /usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. #
  4. # Protocol Buffers - Google's data interchange format
  5. # Copyright 2008 Google Inc. All rights reserved.
  6. # https://developers.google.com/protocol-buffers/
  7. #
  8. # Redistribution and use in source and binary forms, with or without
  9. # modification, are permitted provided that the following conditions are
  10. # met:
  11. #
  12. # * Redistributions of source code must retain the above copyright
  13. # notice, this list of conditions and the following disclaimer.
  14. # * Redistributions in binary form must reproduce the above
  15. # copyright notice, this list of conditions and the following disclaimer
  16. # in the documentation and/or other materials provided with the
  17. # distribution.
  18. # * Neither the name of Google Inc. nor the names of its
  19. # contributors may be used to endorse or promote products derived from
  20. # this software without specific prior written permission.
  21. #
  22. # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  23. # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  24. # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  25. # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  26. # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  27. # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  28. # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  29. # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  30. # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  31. # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  32. # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  33. """Test for preservation of unknown fields in the pure Python implementation."""
  34. __author__ = 'bohdank@google.com (Bohdan Koval)'
  35. try:
  36. import unittest2 as unittest #PY26
  37. except ImportError:
  38. import unittest
  39. import sys
  40. from google.protobuf import map_unittest_pb2
  41. from google.protobuf import unittest_mset_pb2
  42. from google.protobuf import unittest_pb2
  43. from google.protobuf import unittest_proto3_arena_pb2
  44. from google.protobuf.internal import api_implementation
  45. from google.protobuf.internal import encoder
  46. from google.protobuf.internal import message_set_extensions_pb2
  47. from google.protobuf.internal import missing_enum_values_pb2
  48. from google.protobuf.internal import test_util
  49. from google.protobuf.internal import testing_refleaks
  50. from google.protobuf.internal import type_checkers
  51. from google.protobuf.internal import wire_format
  52. from google.protobuf import descriptor
  53. try:
  54. import tracemalloc # pylint: disable=g-import-not-at-top
  55. except ImportError:
  56. # Requires python 3.4+
  57. pass
  58. @testing_refleaks.TestCase
  59. class UnknownFieldsTest(unittest.TestCase):
  60. def setUp(self):
  61. self.descriptor = unittest_pb2.TestAllTypes.DESCRIPTOR
  62. self.all_fields = unittest_pb2.TestAllTypes()
  63. test_util.SetAllFields(self.all_fields)
  64. self.all_fields_data = self.all_fields.SerializeToString()
  65. self.empty_message = unittest_pb2.TestEmptyMessage()
  66. self.empty_message.ParseFromString(self.all_fields_data)
  67. def testSerialize(self):
  68. data = self.empty_message.SerializeToString()
  69. # Don't use assertEqual because we don't want to dump raw binary data to
  70. # stdout.
  71. self.assertTrue(data == self.all_fields_data)
  72. def testSerializeProto3(self):
  73. # Verify proto3 unknown fields behavior.
  74. message = unittest_proto3_arena_pb2.TestEmptyMessage()
  75. message.ParseFromString(self.all_fields_data)
  76. self.assertEqual(self.all_fields_data, message.SerializeToString())
  77. def testByteSize(self):
  78. self.assertEqual(self.all_fields.ByteSize(), self.empty_message.ByteSize())
  79. def testListFields(self):
  80. # Make sure ListFields doesn't return unknown fields.
  81. self.assertEqual(0, len(self.empty_message.ListFields()))
  82. def testSerializeMessageSetWireFormatUnknownExtension(self):
  83. # Create a message using the message set wire format with an unknown
  84. # message.
  85. raw = unittest_mset_pb2.RawMessageSet()
  86. # Add an unknown extension.
  87. item = raw.item.add()
  88. item.type_id = 98218603
  89. message1 = message_set_extensions_pb2.TestMessageSetExtension1()
  90. message1.i = 12345
  91. item.message = message1.SerializeToString()
  92. serialized = raw.SerializeToString()
  93. # Parse message using the message set wire format.
  94. proto = message_set_extensions_pb2.TestMessageSet()
  95. proto.MergeFromString(serialized)
  96. unknown_fields = proto.UnknownFields()
  97. self.assertEqual(len(unknown_fields), 1)
  98. # Unknown field should have wire format data which can be parsed back to
  99. # original message.
  100. self.assertEqual(unknown_fields[0].field_number, item.type_id)
  101. self.assertEqual(unknown_fields[0].wire_type,
  102. wire_format.WIRETYPE_LENGTH_DELIMITED)
  103. d = unknown_fields[0].data
  104. message_new = message_set_extensions_pb2.TestMessageSetExtension1()
  105. message_new.ParseFromString(d)
  106. self.assertEqual(message1, message_new)
  107. # Verify that the unknown extension is serialized unchanged
  108. reserialized = proto.SerializeToString()
  109. new_raw = unittest_mset_pb2.RawMessageSet()
  110. new_raw.MergeFromString(reserialized)
  111. self.assertEqual(raw, new_raw)
  112. def testEquals(self):
  113. message = unittest_pb2.TestEmptyMessage()
  114. message.ParseFromString(self.all_fields_data)
  115. self.assertEqual(self.empty_message, message)
  116. self.all_fields.ClearField('optional_string')
  117. message.ParseFromString(self.all_fields.SerializeToString())
  118. self.assertNotEqual(self.empty_message, message)
  119. def testDiscardUnknownFields(self):
  120. self.empty_message.DiscardUnknownFields()
  121. self.assertEqual(b'', self.empty_message.SerializeToString())
  122. # Test message field and repeated message field.
  123. message = unittest_pb2.TestAllTypes()
  124. other_message = unittest_pb2.TestAllTypes()
  125. other_message.optional_string = 'discard'
  126. message.optional_nested_message.ParseFromString(
  127. other_message.SerializeToString())
  128. message.repeated_nested_message.add().ParseFromString(
  129. other_message.SerializeToString())
  130. self.assertNotEqual(
  131. b'', message.optional_nested_message.SerializeToString())
  132. self.assertNotEqual(
  133. b'', message.repeated_nested_message[0].SerializeToString())
  134. message.DiscardUnknownFields()
  135. self.assertEqual(b'', message.optional_nested_message.SerializeToString())
  136. self.assertEqual(
  137. b'', message.repeated_nested_message[0].SerializeToString())
  138. msg = map_unittest_pb2.TestMap()
  139. msg.map_int32_all_types[1].optional_nested_message.ParseFromString(
  140. other_message.SerializeToString())
  141. msg.map_string_string['1'] = 'test'
  142. self.assertNotEqual(
  143. b'',
  144. msg.map_int32_all_types[1].optional_nested_message.SerializeToString())
  145. msg.DiscardUnknownFields()
  146. self.assertEqual(
  147. b'',
  148. msg.map_int32_all_types[1].optional_nested_message.SerializeToString())
  149. @testing_refleaks.TestCase
  150. class UnknownFieldsAccessorsTest(unittest.TestCase):
  151. def setUp(self):
  152. self.descriptor = unittest_pb2.TestAllTypes.DESCRIPTOR
  153. self.all_fields = unittest_pb2.TestAllTypes()
  154. test_util.SetAllFields(self.all_fields)
  155. self.all_fields_data = self.all_fields.SerializeToString()
  156. self.empty_message = unittest_pb2.TestEmptyMessage()
  157. self.empty_message.ParseFromString(self.all_fields_data)
  158. # InternalCheckUnknownField() is an additional Pure Python check which checks
  159. # a detail of unknown fields. It cannot be used by the C++
  160. # implementation because some protect members are called.
  161. # The test is added for historical reasons. It is not necessary as
  162. # serialized string is checked.
  163. # TODO(jieluo): Remove message._unknown_fields.
  164. def InternalCheckUnknownField(self, name, expected_value):
  165. if api_implementation.Type() == 'cpp':
  166. return
  167. field_descriptor = self.descriptor.fields_by_name[name]
  168. wire_type = type_checkers.FIELD_TYPE_TO_WIRE_TYPE[field_descriptor.type]
  169. field_tag = encoder.TagBytes(field_descriptor.number, wire_type)
  170. result_dict = {}
  171. for tag_bytes, value in self.empty_message._unknown_fields:
  172. if tag_bytes == field_tag:
  173. decoder = unittest_pb2.TestAllTypes._decoders_by_tag[tag_bytes][0]
  174. decoder(memoryview(value), 0, len(value), self.all_fields, result_dict)
  175. self.assertEqual(expected_value, result_dict[field_descriptor])
  176. def CheckUnknownField(self, name, unknown_fields, expected_value):
  177. field_descriptor = self.descriptor.fields_by_name[name]
  178. expected_type = type_checkers.FIELD_TYPE_TO_WIRE_TYPE[
  179. field_descriptor.type]
  180. for unknown_field in unknown_fields:
  181. if unknown_field.field_number == field_descriptor.number:
  182. self.assertEqual(expected_type, unknown_field.wire_type)
  183. if expected_type == 3:
  184. # Check group
  185. self.assertEqual(expected_value[0],
  186. unknown_field.data[0].field_number)
  187. self.assertEqual(expected_value[1], unknown_field.data[0].wire_type)
  188. self.assertEqual(expected_value[2], unknown_field.data[0].data)
  189. continue
  190. if expected_type == wire_format.WIRETYPE_LENGTH_DELIMITED:
  191. self.assertIn(type(unknown_field.data), (str, bytes))
  192. if field_descriptor.label == descriptor.FieldDescriptor.LABEL_REPEATED:
  193. self.assertIn(unknown_field.data, expected_value)
  194. else:
  195. self.assertEqual(expected_value, unknown_field.data)
  196. def testCheckUnknownFieldValue(self):
  197. unknown_fields = self.empty_message.UnknownFields()
  198. # Test enum.
  199. self.CheckUnknownField('optional_nested_enum',
  200. unknown_fields,
  201. self.all_fields.optional_nested_enum)
  202. self.InternalCheckUnknownField('optional_nested_enum',
  203. self.all_fields.optional_nested_enum)
  204. # Test repeated enum.
  205. self.CheckUnknownField('repeated_nested_enum',
  206. unknown_fields,
  207. self.all_fields.repeated_nested_enum)
  208. self.InternalCheckUnknownField('repeated_nested_enum',
  209. self.all_fields.repeated_nested_enum)
  210. # Test varint.
  211. self.CheckUnknownField('optional_int32',
  212. unknown_fields,
  213. self.all_fields.optional_int32)
  214. self.InternalCheckUnknownField('optional_int32',
  215. self.all_fields.optional_int32)
  216. # Test fixed32.
  217. self.CheckUnknownField('optional_fixed32',
  218. unknown_fields,
  219. self.all_fields.optional_fixed32)
  220. self.InternalCheckUnknownField('optional_fixed32',
  221. self.all_fields.optional_fixed32)
  222. # Test fixed64.
  223. self.CheckUnknownField('optional_fixed64',
  224. unknown_fields,
  225. self.all_fields.optional_fixed64)
  226. self.InternalCheckUnknownField('optional_fixed64',
  227. self.all_fields.optional_fixed64)
  228. # Test length delimited.
  229. self.CheckUnknownField('optional_string',
  230. unknown_fields,
  231. self.all_fields.optional_string.encode('utf-8'))
  232. self.InternalCheckUnknownField('optional_string',
  233. self.all_fields.optional_string)
  234. # Test group.
  235. self.CheckUnknownField('optionalgroup',
  236. unknown_fields,
  237. (17, 0, 117))
  238. self.InternalCheckUnknownField('optionalgroup',
  239. self.all_fields.optionalgroup)
  240. self.assertEqual(97, len(unknown_fields))
  241. def testCopyFrom(self):
  242. message = unittest_pb2.TestEmptyMessage()
  243. message.CopyFrom(self.empty_message)
  244. self.assertEqual(message.SerializeToString(), self.all_fields_data)
  245. def testMergeFrom(self):
  246. message = unittest_pb2.TestAllTypes()
  247. message.optional_int32 = 1
  248. message.optional_uint32 = 2
  249. source = unittest_pb2.TestEmptyMessage()
  250. source.ParseFromString(message.SerializeToString())
  251. message.ClearField('optional_int32')
  252. message.optional_int64 = 3
  253. message.optional_uint32 = 4
  254. destination = unittest_pb2.TestEmptyMessage()
  255. unknown_fields = destination.UnknownFields()
  256. self.assertEqual(0, len(unknown_fields))
  257. destination.ParseFromString(message.SerializeToString())
  258. # ParseFromString clears the message thus unknown fields is invalid.
  259. with self.assertRaises(ValueError) as context:
  260. len(unknown_fields)
  261. self.assertIn('UnknownFields does not exist.',
  262. str(context.exception))
  263. unknown_fields = destination.UnknownFields()
  264. self.assertEqual(2, len(unknown_fields))
  265. destination.MergeFrom(source)
  266. self.assertEqual(4, len(unknown_fields))
  267. # Check that the fields where correctly merged, even stored in the unknown
  268. # fields set.
  269. message.ParseFromString(destination.SerializeToString())
  270. self.assertEqual(message.optional_int32, 1)
  271. self.assertEqual(message.optional_uint32, 2)
  272. self.assertEqual(message.optional_int64, 3)
  273. def testClear(self):
  274. unknown_fields = self.empty_message.UnknownFields()
  275. self.empty_message.Clear()
  276. # All cleared, even unknown fields.
  277. self.assertEqual(self.empty_message.SerializeToString(), b'')
  278. with self.assertRaises(ValueError) as context:
  279. len(unknown_fields)
  280. self.assertIn('UnknownFields does not exist.',
  281. str(context.exception))
  282. @unittest.skipIf((sys.version_info.major, sys.version_info.minor) < (3, 4),
  283. 'tracemalloc requires python 3.4+')
  284. def testUnknownFieldsNoMemoryLeak(self):
  285. # Call to UnknownFields must not leak memory
  286. nb_leaks = 1234
  287. def leaking_function():
  288. for _ in range(nb_leaks):
  289. self.empty_message.UnknownFields()
  290. tracemalloc.start()
  291. snapshot1 = tracemalloc.take_snapshot()
  292. leaking_function()
  293. snapshot2 = tracemalloc.take_snapshot()
  294. top_stats = snapshot2.compare_to(snapshot1, 'lineno')
  295. tracemalloc.stop()
  296. # There's no easy way to look for a precise leak source.
  297. # Rely on a "marker" count value while checking allocated memory.
  298. self.assertEqual([], [x for x in top_stats if x.count_diff == nb_leaks])
  299. def testSubUnknownFields(self):
  300. message = unittest_pb2.TestAllTypes()
  301. message.optionalgroup.a = 123
  302. destination = unittest_pb2.TestEmptyMessage()
  303. destination.ParseFromString(message.SerializeToString())
  304. sub_unknown_fields = destination.UnknownFields()[0].data
  305. self.assertEqual(1, len(sub_unknown_fields))
  306. self.assertEqual(sub_unknown_fields[0].data, 123)
  307. destination.Clear()
  308. with self.assertRaises(ValueError) as context:
  309. len(sub_unknown_fields)
  310. self.assertIn('UnknownFields does not exist.',
  311. str(context.exception))
  312. with self.assertRaises(ValueError) as context:
  313. # pylint: disable=pointless-statement
  314. sub_unknown_fields[0]
  315. self.assertIn('UnknownFields does not exist.',
  316. str(context.exception))
  317. message.Clear()
  318. message.optional_uint32 = 456
  319. nested_message = unittest_pb2.NestedTestAllTypes()
  320. nested_message.payload.optional_nested_message.ParseFromString(
  321. message.SerializeToString())
  322. unknown_fields = (
  323. nested_message.payload.optional_nested_message.UnknownFields())
  324. self.assertEqual(unknown_fields[0].data, 456)
  325. nested_message.ClearField('payload')
  326. self.assertEqual(unknown_fields[0].data, 456)
  327. unknown_fields = (
  328. nested_message.payload.optional_nested_message.UnknownFields())
  329. self.assertEqual(0, len(unknown_fields))
  330. def testUnknownField(self):
  331. message = unittest_pb2.TestAllTypes()
  332. message.optional_int32 = 123
  333. destination = unittest_pb2.TestEmptyMessage()
  334. destination.ParseFromString(message.SerializeToString())
  335. unknown_field = destination.UnknownFields()[0]
  336. destination.Clear()
  337. with self.assertRaises(ValueError) as context:
  338. unknown_field.data # pylint: disable=pointless-statement
  339. self.assertIn('The parent message might be cleared.',
  340. str(context.exception))
  341. def testUnknownExtensions(self):
  342. message = unittest_pb2.TestEmptyMessageWithExtensions()
  343. message.ParseFromString(self.all_fields_data)
  344. self.assertEqual(len(message.UnknownFields()), 97)
  345. self.assertEqual(message.SerializeToString(), self.all_fields_data)
  346. @testing_refleaks.TestCase
  347. class UnknownEnumValuesTest(unittest.TestCase):
  348. def setUp(self):
  349. self.descriptor = missing_enum_values_pb2.TestEnumValues.DESCRIPTOR
  350. self.message = missing_enum_values_pb2.TestEnumValues()
  351. # TestEnumValues.ZERO = 0, but does not exist in the other NestedEnum.
  352. self.message.optional_nested_enum = (
  353. missing_enum_values_pb2.TestEnumValues.ZERO)
  354. self.message.repeated_nested_enum.extend([
  355. missing_enum_values_pb2.TestEnumValues.ZERO,
  356. missing_enum_values_pb2.TestEnumValues.ONE,
  357. ])
  358. self.message.packed_nested_enum.extend([
  359. missing_enum_values_pb2.TestEnumValues.ZERO,
  360. missing_enum_values_pb2.TestEnumValues.ONE,
  361. ])
  362. self.message_data = self.message.SerializeToString()
  363. self.missing_message = missing_enum_values_pb2.TestMissingEnumValues()
  364. self.missing_message.ParseFromString(self.message_data)
  365. # CheckUnknownField() is an additional Pure Python check which checks
  366. # a detail of unknown fields. It cannot be used by the C++
  367. # implementation because some protect members are called.
  368. # The test is added for historical reasons. It is not necessary as
  369. # serialized string is checked.
  370. def CheckUnknownField(self, name, expected_value):
  371. field_descriptor = self.descriptor.fields_by_name[name]
  372. unknown_fields = self.missing_message.UnknownFields()
  373. count = 0
  374. for field in unknown_fields:
  375. if field.field_number == field_descriptor.number:
  376. count += 1
  377. if field_descriptor.label == descriptor.FieldDescriptor.LABEL_REPEATED:
  378. self.assertIn(field.data, expected_value)
  379. else:
  380. self.assertEqual(expected_value, field.data)
  381. if field_descriptor.label == descriptor.FieldDescriptor.LABEL_REPEATED:
  382. self.assertEqual(count, len(expected_value))
  383. else:
  384. self.assertEqual(count, 1)
  385. def testUnknownParseMismatchEnumValue(self):
  386. just_string = missing_enum_values_pb2.JustString()
  387. just_string.dummy = 'blah'
  388. missing = missing_enum_values_pb2.TestEnumValues()
  389. # The parse is invalid, storing the string proto into the set of
  390. # unknown fields.
  391. missing.ParseFromString(just_string.SerializeToString())
  392. # Fetching the enum field shouldn't crash, instead returning the
  393. # default value.
  394. self.assertEqual(missing.optional_nested_enum, 0)
  395. def testUnknownEnumValue(self):
  396. self.assertFalse(self.missing_message.HasField('optional_nested_enum'))
  397. self.assertEqual(self.missing_message.optional_nested_enum, 2)
  398. # Clear does not do anything.
  399. serialized = self.missing_message.SerializeToString()
  400. self.missing_message.ClearField('optional_nested_enum')
  401. self.assertEqual(self.missing_message.SerializeToString(), serialized)
  402. def testUnknownRepeatedEnumValue(self):
  403. self.assertEqual([], self.missing_message.repeated_nested_enum)
  404. def testUnknownPackedEnumValue(self):
  405. self.assertEqual([], self.missing_message.packed_nested_enum)
  406. def testCheckUnknownFieldValueForEnum(self):
  407. unknown_fields = self.missing_message.UnknownFields()
  408. self.assertEqual(len(unknown_fields), 5)
  409. self.CheckUnknownField('optional_nested_enum',
  410. self.message.optional_nested_enum)
  411. self.CheckUnknownField('repeated_nested_enum',
  412. self.message.repeated_nested_enum)
  413. self.CheckUnknownField('packed_nested_enum',
  414. self.message.packed_nested_enum)
  415. def testRoundTrip(self):
  416. new_message = missing_enum_values_pb2.TestEnumValues()
  417. new_message.ParseFromString(self.missing_message.SerializeToString())
  418. self.assertEqual(self.message, new_message)
  419. if __name__ == '__main__':
  420. unittest.main()