dengbiao 3週間前
コミット
49968c5686
3個のファイルの変更35行の追加12行の削除
  1. +1
    -1
      consume/user_register_for_my_my_fans.go
  2. +1
    -1
      consume/user_register_for_my_my_recommender.go
  3. +33
    -10
      consume/user_register_for_official_consume.go

+ 1
- 1
consume/user_register_for_my_my_fans.go ファイルの表示

@@ -36,7 +36,7 @@ func UserRegisterConsumeForMyFans(queue md.MqQueue) {
//1、将自己绑定到交换机上
ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey)
//2、取出数据进行消费
ch.Qos(1)
ch.Qos(100)
delivery := ch.Consume(queue.Name, false)

var res amqp.Delivery


+ 1
- 1
consume/user_register_for_my_my_recommender.go ファイルの表示

@@ -144,7 +144,7 @@ func handleUserRegisterConsumeForMyRecommender(msgData []byte) error {
if err != nil {
return err
}
if len(*userGroups) <= 1000 {
if len(*userGroups) < 1000 {
fansGroup = group
}
}


+ 33
- 10
consume/user_register_for_official_consume.go ファイルの表示

@@ -35,16 +35,42 @@ func UserRegisterConsumeForOfficial(queue md.MqQueue) {
//1、将自己绑定到交换机上
ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey)
//2、取出数据进行消费
ch.Qos(500)
ch.Qos(100) // 设置QoS为100,确保一次最多处理100个消息
delivery := ch.Consume(queue.Name, false)

var res amqp.Delivery
var ok bool
batchSize := 100
messageBuffer := make(chan amqp.Delivery, batchSize)

go func() {
for res := range delivery {
messageBuffer <- res
}
close(messageBuffer)
}()

for {
res, ok = <-delivery
if ok == true {
messages := make([]amqp.Delivery, 0, batchSize)
for i := 0; i < batchSize; i++ {
select {
case res, ok := <-messageBuffer:
if !ok {
fmt.Println("get msg done")
return
}
messages = append(messages, res)
default:
break
}
}

if len(messages) == 0 {
time.Sleep(1 * time.Second) // 如果没有消息,等待一段时间后再尝试
continue
}

for _, res := range messages {
fmt.Println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>")
err = handleUserRegisterConsumeForOfficial(res.Body)
err := handleUserRegisterConsumeForOfficial(res.Body)
if err != nil {
fmt.Println("!!!!!!!err!!!!!!!", err)
_ = res.Reject(false)
@@ -59,11 +85,8 @@ func UserRegisterConsumeForOfficial(queue md.MqQueue) {
} else {
_ = res.Ack(true)
}
} else {
panic(errors.New("error getting message"))
}
}
fmt.Println("get msg done")
}

func handleUserRegisterConsumeForOfficial(msgData []byte) error {
@@ -142,7 +165,7 @@ func handleUserRegisterConsumeForOfficial(msgData []byte) error {
"key": "group_id",
"value": group.GroupId,
})
if len(*userGroups) <= 1000 {
if len(*userGroups) < 1000 {
officialGroup = group
}
}


読み込み中…
キャンセル
保存