暫無描述

Queue.php 14KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378
  1. <?php
  2. namespace AliyunMNS;
  3. use AliyunMNS\Http\HttpClient;
  4. use AliyunMNS\AsyncCallback;
  5. use AliyunMNS\Model\QueueAttributes;
  6. use AliyunMNS\Requests\SetQueueAttributeRequest;
  7. use AliyunMNS\Responses\SetQueueAttributeResponse;
  8. use AliyunMNS\Requests\GetQueueAttributeRequest;
  9. use AliyunMNS\Responses\GetQueueAttributeResponse;
  10. use AliyunMNS\Requests\SendMessageRequest;
  11. use AliyunMNS\Responses\SendMessageResponse;
  12. use AliyunMNS\Requests\PeekMessageRequest;
  13. use AliyunMNS\Responses\PeekMessageResponse;
  14. use AliyunMNS\Requests\ReceiveMessageRequest;
  15. use AliyunMNS\Responses\ReceiveMessageResponse;
  16. use AliyunMNS\Requests\DeleteMessageRequest;
  17. use AliyunMNS\Responses\DeleteMessageResponse;
  18. use AliyunMNS\Requests\ChangeMessageVisibilityRequest;
  19. use AliyunMNS\Responses\ChangeMessageVisibilityResponse;
  20. use AliyunMNS\Requests\BatchSendMessageRequest;
  21. use AliyunMNS\Responses\BatchSendMessageResponse;
  22. use AliyunMNS\Requests\BatchReceiveMessageRequest;
  23. use AliyunMNS\Responses\BatchReceiveMessageResponse;
  24. use AliyunMNS\Requests\BatchPeekMessageRequest;
  25. use AliyunMNS\Responses\BatchPeekMessageResponse;
  26. use AliyunMNS\Requests\BatchDeleteMessageRequest;
  27. use AliyunMNS\Responses\BatchDeleteMessageResponse;
  28. class Queue
  29. {
  30. private $queueName;
  31. private $client;
  32. // boolean, whether the message body will be encoded in base64
  33. private $base64;
  34. public function __construct(HttpClient $client, $queueName, $base64 = TRUE)
  35. {
  36. $this->queueName = $queueName;
  37. $this->client = $client;
  38. $this->base64 = $base64;
  39. }
  40. public function setBase64($base64)
  41. {
  42. $this->base64 = $base64;
  43. }
  44. public function isBase64()
  45. {
  46. return ($this->base64 == TRUE);
  47. }
  48. public function getQueueName()
  49. {
  50. return $this->queueName;
  51. }
  52. /**
  53. * Set the QueueAttributes, detail API sepcs:
  54. * https://docs.aliyun.com/?spm=#/pub/mns/api_reference/api_spec&queue_operation
  55. *
  56. * @param QueueAttributes $attributes: the QueueAttributes to set
  57. *
  58. * @return SetQueueAttributeResponse: the response
  59. *
  60. * @throws QueueNotExistException if queue does not exist
  61. * @throws InvalidArgumentException if any argument value is invalid
  62. * @throws MnsException if any other exception happends
  63. */
  64. public function setAttribute(QueueAttributes $attributes)
  65. {
  66. $request = new SetQueueAttributeRequest($this->queueName, $attributes);
  67. $response = new SetQueueAttributeResponse();
  68. return $this->client->sendRequest($request, $response);
  69. }
  70. public function setAttributeAsync(QueueAttributes $attributes,
  71. AsyncCallback $callback = NULL)
  72. {
  73. $request = new SetQueueAttributeRequest($this->queueName, $attributes);
  74. $response = new SetQueueAttributeResponse();
  75. return $this->client->sendRequestAsync($request, $response, $callback);
  76. }
  77. /**
  78. * Get the QueueAttributes, detail API sepcs:
  79. * https://docs.aliyun.com/?spm=#/pub/mns/api_reference/api_spec&queue_operation
  80. *
  81. * @return GetQueueAttributeResponse: containing the attributes
  82. *
  83. * @throws QueueNotExistException if queue does not exist
  84. * @throws MnsException if any other exception happends
  85. */
  86. public function getAttribute()
  87. {
  88. $request = new GetQueueAttributeRequest($this->queueName);
  89. $response = new GetQueueAttributeResponse();
  90. return $this->client->sendRequest($request, $response);
  91. }
  92. public function getAttributeAsync(AsyncCallback $callback = NULL)
  93. {
  94. $request = new GetQueueAttributeRequest($this->queueName);
  95. $response = new GetQueueAttributeResponse();
  96. return $this->client->sendRequestAsync($request, $response, $callback);
  97. }
  98. /**
  99. * SendMessage, the messageBody will be automatically encoded in base64
  100. * If you do not need the message body to be encoded in Base64,
  101. * please specify the $base64 = FALSE in Queue
  102. *
  103. * detail API sepcs:
  104. * https://docs.aliyun.com/?spm=#/pub/mns/api_reference/api_spec&message_operation
  105. *
  106. * @param SendMessageRequest: containing the message body and properties
  107. *
  108. * @return SendMessageResponse: containing the messageId and bodyMD5
  109. *
  110. * @throws QueueNotExistException if queue does not exist
  111. * @throws InvalidArgumentException if any argument value is invalid
  112. * @throws MalformedXMLException if any error in xml
  113. * @throws MnsException if any other exception happends
  114. */
  115. public function sendMessage(SendMessageRequest $request)
  116. {
  117. $request->setQueueName($this->queueName);
  118. $request->setBase64($this->base64);
  119. $response = new SendMessageResponse();
  120. return $this->client->sendRequest($request, $response);
  121. }
  122. public function sendMessageAsync(SendMessageRequest $request,
  123. AsyncCallback $callback = NULL)
  124. {
  125. $request->setQueueName($this->queueName);
  126. $request->setBase64($this->base64);
  127. $response = new SendMessageResponse();
  128. return $this->client->sendRequestAsync($request, $response, $callback);
  129. }
  130. /**
  131. * PeekMessage, the messageBody will be automatically decoded as base64 if the $base64 in Queue is TRUE
  132. *
  133. * detail API sepcs:
  134. * https://docs.aliyun.com/?spm=#/pub/mns/api_reference/api_spec&message_operation
  135. *
  136. * @return PeekMessageResponse: containing the messageBody and properties
  137. *
  138. * @throws QueueNotExistException if queue does not exist
  139. * @throws MessageNotExistException if no message exists in the queue
  140. * @throws MnsException if any other exception happends
  141. */
  142. public function peekMessage()
  143. {
  144. $request = new PeekMessageRequest($this->queueName);
  145. $response = new PeekMessageResponse($this->base64);
  146. return $this->client->sendRequest($request, $response);
  147. }
  148. public function peekMessageAsync(AsyncCallback $callback = NULL)
  149. {
  150. $request = new PeekMessageRequest($this->queueName);
  151. $response = new PeekMessageResponse($this->base64);
  152. return $this->client->sendRequestAsync($request, $response, $callback);
  153. }
  154. /**
  155. * ReceiveMessage, the messageBody will be automatically decoded as base64 if $base64 = TRUE in Queue
  156. * detail API sepcs:
  157. * https://docs.aliyun.com/?spm=#/pub/mns/api_reference/api_spec&message_operation
  158. *
  159. * @param waitSeconds: the long polling waitseconds
  160. *
  161. * @return ReceiveMessageResponse: containing the messageBody and properties
  162. * the response is same as PeekMessageResponse,
  163. * except that the receiptHandle is also returned in receiveMessage
  164. *
  165. * @throws QueueNotExistException if queue does not exist
  166. * @throws MessageNotExistException if no message exists in the queue
  167. * @throws MnsException if any other exception happends
  168. */
  169. public function receiveMessage($waitSeconds = NULL)
  170. {
  171. $request = new ReceiveMessageRequest($this->queueName, $waitSeconds);
  172. $response = new ReceiveMessageResponse($this->base64);
  173. return $this->client->sendRequest($request, $response);
  174. }
  175. public function receiveMessageAsync(AsyncCallback $callback = NULL)
  176. {
  177. $request = new ReceiveMessageRequest($this->queueName);
  178. $response = new ReceiveMessageResponse($this->base64);
  179. return $this->client->sendRequestAsync($request, $response, $callback);
  180. }
  181. /**
  182. * DeleteMessage
  183. * detail API sepcs:
  184. * https://docs.aliyun.com/?spm=#/pub/mns/api_reference/api_spec&message_operation
  185. *
  186. * @param $receiptHandle: the receiptHandle returned from receiveMessage
  187. *
  188. * @return ReceiveMessageResponse
  189. *
  190. * @throws QueueNotExistException if queue does not exist
  191. * @throws InvalidArgumentException if the argument is invalid
  192. * @throws ReceiptHandleErrorException if the $receiptHandle is invalid
  193. * @throws MnsException if any other exception happends
  194. */
  195. public function deleteMessage($receiptHandle)
  196. {
  197. $request = new DeleteMessageRequest($this->queueName, $receiptHandle);
  198. $response = new DeleteMessageResponse();
  199. return $this->client->sendRequest($request, $response);
  200. }
  201. public function deleteMessageAsync($receiptHandle,
  202. AsyncCallback $callback = NULL)
  203. {
  204. $request = new DeleteMessageRequest($this->queueName, $receiptHandle);
  205. $response = new DeleteMessageResponse();
  206. return $this->client->sendRequestAsync($request, $response, $callback);
  207. }
  208. /**
  209. * ChangeMessageVisibility, set the nextVisibleTime for the message
  210. * detail API sepcs:
  211. * https://docs.aliyun.com/?spm=#/pub/mns/api_reference/api_spec&message_operation
  212. *
  213. * @param $receiptHandle: the receiptHandle returned from receiveMessage
  214. *
  215. * @return ChangeMessageVisibilityResponse
  216. *
  217. * @throws QueueNotExistException if queue does not exist
  218. * @throws MessageNotExistException if the message does not exist
  219. * @throws InvalidArgumentException if the argument is invalid
  220. * @throws ReceiptHandleErrorException if the $receiptHandle is invalid
  221. * @throws MnsException if any other exception happends
  222. */
  223. public function changeMessageVisibility($receiptHandle, $visibilityTimeout)
  224. {
  225. $request = new ChangeMessageVisibilityRequest($this->queueName, $receiptHandle, $visibilityTimeout);
  226. $response = new ChangeMessageVisibilityResponse();
  227. return $this->client->sendRequest($request, $response);
  228. }
  229. /**
  230. * BatchSendMessage, message body will be automatically encoded in base64
  231. * If you do not need the message body to be encoded in Base64,
  232. * please specify the $base64 = FALSE in Queue
  233. *
  234. * detail API sepcs:
  235. * https://docs.aliyun.com/?spm=#/pub/mns/api_reference/api_spec&message_operation
  236. *
  237. * @param BatchSendMessageRequest:
  238. * the requests containing an array of SendMessageRequestItems
  239. *
  240. * @return BatchSendMessageResponse
  241. *
  242. * @throws QueueNotExistException if queue does not exist
  243. * @throws MalformedXMLException if any error in the xml
  244. * @throws InvalidArgumentException if the argument is invalid
  245. * @throws BatchSendFailException if some messages are not sent
  246. * @throws MnsException if any other exception happends
  247. */
  248. public function batchSendMessage(BatchSendMessageRequest $request)
  249. {
  250. $request->setQueueName($this->queueName);
  251. $request->setBase64($this->base64);
  252. $response = new BatchSendMessageResponse();
  253. return $this->client->sendRequest($request, $response);
  254. }
  255. public function batchSendMessageAsync(BatchSendMessageRequest $request,
  256. AsyncCallback $callback = NULL)
  257. {
  258. $request->setQueueName($this->queueName);
  259. $request->setBase64($this->base64);
  260. $response = new BatchSendMessageResponse();
  261. return $this->client->sendRequestAsync($request, $response, $callback);
  262. }
  263. /**
  264. * BatchReceiveMessage, message body will be automatically decoded as base64 if $base64 = TRUE in Queue
  265. *
  266. * detail API sepcs:
  267. * https://docs.aliyun.com/?spm=#/pub/mns/api_reference/api_spec&message_operation
  268. *
  269. * @param BatchReceiveMessageRequest:
  270. * containing numOfMessages and waitSeconds
  271. *
  272. * @return BatchReceiveMessageResponse:
  273. * the received messages
  274. *
  275. * @throws QueueNotExistException if queue does not exist
  276. * @throws MessageNotExistException if no message exists
  277. * @throws MnsException if any other exception happends
  278. */
  279. public function batchReceiveMessage(BatchReceiveMessageRequest $request)
  280. {
  281. $request->setQueueName($this->queueName);
  282. $response = new BatchReceiveMessageResponse($this->base64);
  283. return $this->client->sendRequest($request, $response);
  284. }
  285. public function batchReceiveMessageAsync(BatchReceiveMessageRequest $request, AsyncCallback $callback = NULL)
  286. {
  287. $request->setQueueName($this->queueName);
  288. $response = new BatchReceiveMessageResponse($this->base64);
  289. return $this->client->sendRequestAsync($request, $response, $callback);
  290. }
  291. /**
  292. * BatchPeekMessage, message body will be automatically decoded as base64 is $base64 = TRUE in Queue
  293. *
  294. * detail API sepcs:
  295. * https://docs.aliyun.com/?spm=#/pub/mns/api_reference/api_spec&message_operation
  296. *
  297. * @param BatchPeekMessageRequest:
  298. * containing numOfMessages and waitSeconds
  299. *
  300. * @return BatchPeekMessageResponse:
  301. * the received messages
  302. *
  303. * @throws QueueNotExistException if queue does not exist
  304. * @throws MessageNotExistException if no message exists
  305. * @throws MnsException if any other exception happends
  306. */
  307. public function batchPeekMessage($numOfMessages)
  308. {
  309. $request = new BatchPeekMessageRequest($this->queueName, $numOfMessages);
  310. $response = new BatchPeekMessageResponse($this->base64);
  311. return $this->client->sendRequest($request, $response);
  312. }
  313. public function batchPeekMessageAsync($numOfMessages, AsyncCallback $callback = NULL)
  314. {
  315. $request = new BatchPeekMessageRequest($this->queueName, $numOfMessages);
  316. $response = new BatchPeekMessageResponse($this->base64);
  317. return $this->client->sendRequestAsync($request, $response, $callback);
  318. }
  319. /**
  320. * BatchDeleteMessage
  321. * detail API sepcs:
  322. * https://docs.aliyun.com/?spm=#/pub/mns/api_reference/api_spec&message_operation
  323. *
  324. * @param $receiptHandles:
  325. * array of $receiptHandle, which is got from receiveMessage
  326. *
  327. * @return BatchDeleteMessageResponse
  328. *
  329. * @throws QueueNotExistException if queue does not exist
  330. * @throws ReceiptHandleErrorException if the receiptHandle is invalid
  331. * @throws InvalidArgumentException if the argument is invalid
  332. * @throws BatchDeleteFailException if any message not deleted
  333. * @throws MnsException if any other exception happends
  334. */
  335. public function batchDeleteMessage($receiptHandles)
  336. {
  337. $request = new BatchDeleteMessageRequest($this->queueName, $receiptHandles);
  338. $response = new BatchDeleteMessageResponse();
  339. return $this->client->sendRequest($request, $response);
  340. }
  341. public function batchDeleteMessageAsync($receiptHandles, AsyncCallback $callback = NULL)
  342. {
  343. $request = new BatchDeleteMessageRequest($this->queueName, $receiptHandles);
  344. $response = new BatchDeleteMessageResponse();
  345. return $this->client->sendRequestAsync($request, $response, $callback);
  346. }
  347. }
  348. ?>