rabbitmq 操作库
Non puoi selezionare più di 25 argomenti Gli argomenti devono iniziare con una lettera o un numero, possono includere trattini ('-') e possono essere lunghi fino a 35 caratteri.

73 righe
2.2 KiB

  1. package test
  2. import (
  3. "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbitmq"
  4. "log"
  5. "strconv"
  6. "strings"
  7. "testing"
  8. "time"
  9. )
  10. const FanoutExchangeName = "test_fanout_exchange"
  11. func TestFanoutExchangeSend(t *testing.T) {
  12. ch := rabbitmq.Connect("amqp://user:password@ip:port/")
  13. rabbitmq.NewExchange("amqp://user:password@ip:port/", FanoutExchangeName, "fanout")
  14. i := 0
  15. for {
  16. time.Sleep(1)
  17. greetings := []string{"Helloworld!", strconv.Itoa(i)}
  18. ch.Publish(FanoutExchangeName, strings.Join(greetings, " "), "")
  19. i = i + 1
  20. }
  21. }
  22. func TestFanoutExchangeReceive1(t *testing.T) {
  23. // 1.接收者,首先创建自己队列
  24. // 2.创建交换机
  25. // 3.将自己绑定到交换机上
  26. // 4.接收交换机上发过来的消息
  27. //第一个参数指定rabbitmq服务器的链接,第二个参数指定创建队列的名字
  28. receiveMq := rabbitmq.New("amqp://user:password@ip:port/", "test_fanout_exchange_receive_queue_1")
  29. //第一个参数:rabbitmq服务器的链接,第二个参数:交换机名字,第三个参数:交换机类型
  30. rabbitmq.NewExchange("amqp://user:password@ip:port/", FanoutExchangeName, "fanout")
  31. // 队列绑定到exchange
  32. receiveMq.Bind(FanoutExchangeName, "")
  33. //4
  34. for {
  35. //接收消息时,指定
  36. msgs := receiveMq.Consume()
  37. go func() {
  38. for d := range msgs {
  39. log.Printf("recevie1 Received a message: %s", d.Body)
  40. }
  41. }()
  42. }
  43. }
  44. func TestFanoutExchangeReceive2(t *testing.T) {
  45. // 1.接收者,首先创建自己队列
  46. // 2.创建交换机
  47. // 3.将自己绑定到交换机上
  48. // 4.接收交换机上发过来的消息
  49. //第一个参数指定rabbitmq服务器的链接,第二个参数指定创建队列的名字
  50. receiveMq := rabbitmq.New("amqp://user:password@ip:port/", "test_fanout_exchange_receive_queue_2")
  51. //第一个参数:rabbitmq服务器的链接,第二个参数:交换机名字,第三个参数:交换机类型
  52. rabbitmq.NewExchange("amqp://user:password@ip:port/", FanoutExchangeName, "fanout")
  53. // 队列绑定到exchange
  54. receiveMq.Bind(FanoutExchangeName, "")
  55. //4
  56. for {
  57. //接收消息时,指定
  58. msgs := receiveMq.Consume()
  59. go func() {
  60. for d := range msgs {
  61. log.Printf("recevie1 Received a message: %s", d.Body)
  62. }
  63. }()
  64. }
  65. }