From 227404d3ab313adee84fa74178c18e095612291e Mon Sep 17 00:00:00 2001 From: jiaoboxiang Date: Thu, 10 Nov 2022 16:26:38 +0800 Subject: [PATCH] =?UTF-8?q?=E5=88=9D=E5=A7=8B=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/cfg/cfg_app.go | 11 +++ app/cfg/init_cfg.go | 9 ++ app/cfg/init_rabbitmq.go | 28 ++++++ go.mod | 4 +- main.go | 88 ++++++++++--------- processor/mq_processor.go | 77 +++++++++++++++++ test/aces_test.go | 175 -------------------------------------- test/toke_test.go | 35 -------- test/work_queue_test.go | 33 +++++++ test/zhimeng_api.go | 156 --------------------------------- 10 files changed, 207 insertions(+), 409 deletions(-) create mode 100644 app/cfg/init_rabbitmq.go create mode 100644 processor/mq_processor.go delete mode 100644 test/aces_test.go delete mode 100644 test/toke_test.go create mode 100644 test/work_queue_test.go delete mode 100644 test/zhimeng_api.go diff --git a/app/cfg/cfg_app.go b/app/cfg/cfg_app.go index 9470942..292a20c 100644 --- a/app/cfg/cfg_app.go +++ b/app/cfg/cfg_app.go @@ -10,7 +10,11 @@ type Config struct { CurlDebug bool `yaml:"curldebug"` SrvAddr string `yaml:"srv_addr"` RedisAddr string `yaml:"redis_addr"` + MqWorkQueueName string `yaml:"mq_work_queue_name"` + RabbitMqAddr string `yaml:"rabbitMq_addr"` + RabbitMqAddrTest string `yaml:"rabbitMq_addr_test"` DB DBCfg `yaml:"db"` + MQ MQCfg `yaml:"mq"` Log LogCfg `yaml:"log"` ArkID ArkIDCfg `yaml:"arkid"` Admin AdminCfg `yaml:"admin"` @@ -62,6 +66,13 @@ type DBCfg struct { Path string `yaml:"path"` //日志文件存放路径 } +type MQCfg struct { + Host string `yaml:"host"` + Port string `yaml:"port"` + User string `yaml:"user"` + Pwd string `yaml:"pwd"` +} + //日志配置结构体 type LogCfg struct { AppName string `yaml:"app_name" ` diff --git a/app/cfg/init_cfg.go b/app/cfg/init_cfg.go index d12e74f..ad21ff4 100644 --- a/app/cfg/init_cfg.go +++ b/app/cfg/init_cfg.go @@ -15,6 +15,7 @@ var ( SrvAddr string RedisAddr string DB *DBCfg + MQ *MQCfg Log *LogCfg ArkID *ArkIDCfg Admin *AdminCfg @@ -22,6 +23,8 @@ var ( WxappletFilepath *WxappletFilepathCfg Local bool AppComm *AppCommCfg + MqWorkQueueName string + RabbitMqAddr string ) //初始化配置文件,将cfg.yml读入到内存 @@ -57,4 +60,10 @@ func InitCfg() { Official = &conf.Official WxappletFilepath = &conf.WxappletFilepath AppComm = &conf.AppComm + MqWorkQueueName = conf.MqWorkQueueName + RabbitMqAddr = conf.RabbitMqAddrTest + if conf.Prd { + RabbitMqAddr = conf.RabbitMqAddr + } + MQ = &conf.MQ } diff --git a/app/cfg/init_rabbitmq.go b/app/cfg/init_rabbitmq.go new file mode 100644 index 0000000..f04cd5e --- /dev/null +++ b/app/cfg/init_rabbitmq.go @@ -0,0 +1,28 @@ +package cfg + +import ( + "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit" + "encoding/json" + "os" + "strings" + "time" +) + +func InitMq() { + data, _ := json.Marshal(MQ) + filePutContents("init_rabbit_mq", string(data)) + err := rabbit.Init(MQ.Host, MQ.Port, MQ.User, MQ.Pwd) + if err != nil { + filePutContents("init_rabbit_mq", err.Error()) + return + } +} + +func filePutContents(fileName string, content string) { + fd, _ := os.OpenFile("./tmp/"+fileName+".log", os.O_RDWR|os.O_CREATE|os.O_APPEND, 0644) + fd_time := time.Now().Format("2006-01-02 15:04:05") + fd_content := strings.Join([]string{"[", fd_time, "] ", content, "\n"}, "") + buf := []byte(fd_content) + fd.Write(buf) + fd.Close() +} diff --git a/go.mod b/go.mod index b294c07..533bac1 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module applet go 1.15 require ( + code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git v0.0.3 github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5 github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 github.com/boombuler/barcode v1.0.1 @@ -25,7 +26,7 @@ require ( github.com/gorilla/sessions v1.2.1 // indirect github.com/iGoogle-ink/gopay v1.5.36 github.com/iGoogle-ink/gotil v1.0.20 - github.com/json-iterator/go v1.1.10 // indirect + github.com/json-iterator/go v1.1.10 github.com/leodido/go-urn v1.2.1 // indirect github.com/mailru/easyjson v0.7.7 // indirect github.com/makiuchi-d/gozxing v0.0.0-20210324052758-57132e828831 @@ -33,6 +34,7 @@ require ( github.com/qiniu/api.v7/v7 v7.8.2 github.com/robfig/cron/v3 v3.0.1 github.com/sony/sonyflake v1.0.0 + github.com/streadway/amqp v1.0.0 github.com/swaggo/swag v1.7.0 github.com/syyongx/php2go v0.9.4 github.com/tidwall/gjson v1.7.4 diff --git a/main.go b/main.go index 0974be1..2be26bd 100644 --- a/main.go +++ b/main.go @@ -1,19 +1,17 @@ package main import ( - "context" + "applet/app/cfg" + "applet/app/db" + "applet/app/utils" + "applet/app/utils/logx" + "applet/processor" + "errors" "fmt" - "log" - "net/http" "os" "os/signal" + "strings" "syscall" - "time" - - "applet/app/cfg" - "applet/app/db" - "applet/app/router" - "applet/app/utils" ) //系统初始化 @@ -21,6 +19,7 @@ func init() { cfg.InitCfg() //配置初始化 cfg.InitLog() //日志初始化 cfg.InitCache() //缓存初始化 + cfg.InitMq() //队列初始化 if cfg.Debug { //判断是否是debug if err := db.InitDB(cfg.DB); err != nil { //主数据库初始化 panic(err) @@ -44,45 +43,50 @@ func init() { // @name MasterID // @BasePath / func main() { - // 启动获取所有品牌 - //go taoke.GetAllBrand() - r := router.Init() //创建路由 - // arkid.Init() - srv := &http.Server{ //设置http服务参数 - Addr: cfg.SrvAddr, //指定ip和端口 - Handler: r, //指定路由 - } - // 读取默认站长的推广位 并写进redis - // master, err := db.UserProfileFindByID(,"1") - - // if err != nil { - // panic(err) - // } if cfg.CurlDebug { utils.CurlDebug = true } - // - // if has := cache.SetJson(svc.SysCfgGet(nil, "app_name")+"_default_pid_user", master, 0); has != true { - // panic(errors.New("设置默认pid缓存失败")) - // } - // Initializing the server in a goroutine so that it won't block the graceful shutdown handling below - go func() { //协程启动监听http服务 - fmt.Println("Listening and serving HTTP on " + cfg.SrvAddr) - if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { - log.Fatalf("listen: %s\n", err) - } - }() - // graceful shutdown + //r := router.Init() //创建路由 + //srv := &http.Server{ //设置http服务参数 + // Addr: cfg.SrvAddr, //指定ip和端口 + // Handler: r, //指定路由 + //} + //go func() { //协程启动监听队列 + // fmt.Println("Listening and serving HTTP on " + cfg.SrvAddr) + // if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { + // log.Fatalf("listen: %s\n", err) + // } + //}() //退出go守护进程 + //quit := make(chan os.Signal) + //signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) + //<-quit + //log.Println("Shutting down server...") + //ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + //defer cancel() + //if err := srv.Shutdown(ctx); err != nil { + // log.Fatal("Server forced to shutdown:", err) + //} + //log.Println("Server exiting") + + //获取需要监听多少个队列 + if cfg.RabbitMqAddr == "" { + panic(errors.New("请填写MQ参数")) + } + forever := make(chan bool) + for _, workQueueName := range strings.Split(cfg.MqWorkQueueName, ",") { + if workQueueName == "" { + continue + } + fmt.Printf("监听%s队列", workQueueName) + fmt.Println() + go processor.WorkReceive(workQueueName) + } + //processor.TestWorkSend() + <-forever quit := make(chan os.Signal) signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) <-quit - log.Println("Shutting down server...") - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - if err := srv.Shutdown(ctx); err != nil { - log.Fatal("Server forced to shutdown:", err) - } - log.Println("Server exiting") + _ = logx.Info("Server exiting...") } diff --git a/processor/mq_processor.go b/processor/mq_processor.go new file mode 100644 index 0000000..e477440 --- /dev/null +++ b/processor/mq_processor.go @@ -0,0 +1,77 @@ +package processor + +import ( + "applet/app/utils" + "applet/app/utils/logx" + "bytes" + "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit" + "encoding/json" + jsoniter "github.com/json-iterator/go" + "github.com/streadway/amqp" + "log" + "strings" +) + +var Json = jsoniter.ConfigCompatibleWithStandardLibrary + +type Message struct { + MessageType string `json:"message_type"` + Data int `json:"data"` +} + +func WorkReceive(name string) { + ch, err := rabbit.Cfg.Pool.GetChannel() + if err != nil { + logx.Error(err) + return + } + defer ch.Release() + //接收消息时,指定 + msgs := ch.Consume(name) + for msg := range msgs { + var message2 Message + jsonStr := string(msg.Body) + jsonStr = strings.Trim(jsonStr, "\"") + jsonStr = strings.ReplaceAll(jsonStr, "\\", "") + utils.Unserialize([]byte(jsonStr), &message2) + switch message2.MessageType { + case "test": + go func(msg *amqp.Delivery) { + log.Printf("recevie1 Received a message: %s", msg.Body) + msg.Ack(true) + }(&msg) + } + + } +} + +func TestWorkSend() { + // 推入rabbitMq + ch, err := rabbit.Cfg.Pool.GetChannel() + if err != nil { + logx.Error(err) + } + defer ch.Release() + var message struct { + MessageType string `json:"message_type"` + Data int `json:"data"` + } + message.MessageType = "test" + message.Data = 1 + for message.Data < 2 { + ch.Publish("test_work_queue_processor", utils.SerializeStr(message), "") + message.Data += 1 + //time.Sleep(time.Second * 5) + } +} + +// 去除json中的转义字符 +func disableEscapeHtml(data interface{}) (string, error) { + bf := bytes.NewBuffer([]byte{}) + jsonEncoder := json.NewEncoder(bf) + jsonEncoder.SetEscapeHTML(true) + if err := jsonEncoder.Encode(data); err != nil { + return "", err + } + return bf.String(), nil +} diff --git a/test/aces_test.go b/test/aces_test.go deleted file mode 100644 index bce9cd1..0000000 --- a/test/aces_test.go +++ /dev/null @@ -1,175 +0,0 @@ -package test - -import ( - "applet/app/db" - "applet/app/md" - "applet/app/svc" - "applet/app/utils" - "encoding/base64" - "encoding/json" - "fmt" - "testing" -) - -func TestAesCrypt_Encrypt(t *testing.T) { - var aesCrypt = utils.AesCrypt{ - Key: []byte("e{&[^Ft(.~g]1eR-]VO"), - Iv: []byte("ZV`7<5X]/2brS@sz"), - } - - var text = `{"uid":"82","applyOrder":"821607392542143106","db":{"db_host":"119.23.182.117","db_port":"3306","db_name":"fnuoos_template","db_username":"root","db_password":"Fnuo123com@"}}` - result, err := aesCrypt.Encrypt([]byte(text)) - if err != nil { - fmt.Println(err) - return - } - - pass64 := base64.StdEncoding.EncodeToString(result) - fmt.Println(pass64) -} - -func TestAesCrypt_Decrypt(t *testing.T) { - var aesCrypt = utils.AesCrypt{ - Key: []byte("e{&[^Ft(.~g]1eR-]VO"), - Iv: []byte("ZV`7<5X]/2brS@sz"), - } - - pass64 := "JD0RXX1YbZPWKeNiVKsq0jQ1Bfnbln3fIMcmJkovU5gUCf329y9ZdqECWe4OKpoOk25/hPNaBH9VwellhIQhpw==" - bytesPass, err := base64.StdEncoding.DecodeString(pass64) - if err != nil { - fmt.Println(err) - return - } - - plainText, err := aesCrypt.Decrypt(bytesPass) - if err != nil { - fmt.Println(err) - return - } - - fmt.Println(string(plainText)) -} - -func Test_Vi(t *testing.T) { - fmt.Println("123") - fmt.Println([]byte("ZV`7<5X]/2brS@sz")) -} - -func Test_CombineData(t *testing.T) { - data := - `{ - "Uid": 21699, - "Lv": 0, - "NewLv": 0, - "LevelWeight": -1, - "Profit": 7.13, - "SubsidyFee": 0, - "ProfitList": [ - { - "cid": "0", - "val": 7.13 - }, - { - "cid": "1", - "val": 10 - }, - { - "cid": "19", - "val": 120 - }, - { - "cid": "20", - "val": 0 - }, - { - "cid": "21", - "val": 0 - } - ], - "SubsidyFeeList": null, - "OwnbuyReturnType": 0, - "Diff": 0, - "ParentUser": { - "Uid": 553, - "Lv": 8, - "NewLv": 0, - "LevelWeight": 2, - "Profit": 0, - "SubsidyFee": 0, - "ProfitList": [ - { - "cid": "0", - "val": 0 - } - ], - "SubsidyFeeList": null, - "OwnbuyReturnType": 0, - "Diff": 1, - "ParentUser": { - "Uid": 21699, - "Lv": 0, - "NewLv": 0, - "LevelWeight": -1, - "Profit": 7.13, - "SubsidyFee": 0, - "ProfitList": [ - { - "cid": "0", - "val": 7.13 - }, - { - "cid": "1", - "val": 10 - }, - { - "cid": "19", - "val": 120 - }, - { - "cid": "20", - "val": 0 - }, - { - "cid": "21", - "val": 0 - } - ], - "SubsidyFeeList": null, - "OwnbuyReturnType": 0, - "Diff": 0, - "ParentUser": { - "Uid": 553, - "Lv": 8, - "NewLv": 0, - "LevelWeight": 2, - "Profit": 0, - "SubsidyFee": 0, - "ProfitList": [ - { - "cid": "0", - "val": 0 - } - ], - "SubsidyFeeList": null, - "OwnbuyReturnType": 0, - "Diff": 1, - "ParentUser": null - } - } - } - }` - bytes := []byte(data) - var lvUser md.LvUser - if err := json.Unmarshal(bytes, &lvUser); err != nil { - fmt.Println(err) - return - } - dataSlice := svc.CombineVirtualCoinRelateData(&lvUser, 249534162504132595, "mall_goods", 0) - for _, item := range dataSlice { - fmt.Printf("%#v\n", item) - } - err := db.DbInsertBatch(db.DBs["123456"], dataSlice) - if err != nil { - fmt.Println(err) - } -} diff --git a/test/toke_test.go b/test/toke_test.go deleted file mode 100644 index f37d853..0000000 --- a/test/toke_test.go +++ /dev/null @@ -1,35 +0,0 @@ -package test - -import ( - "applet/app/db" - "applet/app/lib/taoke" - "applet/app/svc" - "fmt" - "testing" -) - -func TestBrandList(t *testing.T) { - tkBrandStruct, err := taoke.BrandList("1", "20") - if err != nil { - t.Error(err) - } - for _, brandStruct := range *tkBrandStruct { - fmt.Println(brandStruct) - } - -} - -func TestSuperCategory(t *testing.T) { - category, err := taoke.SuperCategory() - if err != nil { - t.Error(err) - } - for _, brandStruct := range *category { - fmt.Println(brandStruct) - } -} - -func TestAddBrandDB(t *testing.T) { - engine := db.DBs["123456"] - svc.AddBrandDB(engine) -} diff --git a/test/work_queue_test.go b/test/work_queue_test.go new file mode 100644 index 0000000..9c5faf7 --- /dev/null +++ b/test/work_queue_test.go @@ -0,0 +1,33 @@ +package test + +import ( + "applet/app/utils" + "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit" + "testing" + "time" +) + +const WorkQueueName = "test_work_queue" + +/** +TODO:: 公平分发模式下的发送端和接收端(一个发动端,多个接受端) +*/ +func TestWorkSend(t *testing.T) { + // 推入rabbitMq + ch, err := rabbit.Cfg.Pool.GetChannel() + if err != nil { + t.Error(err) + } + defer ch.Release() + var message struct { + MessageType string `json:"message_type"` + Data int `json:"data"` + } + message.MessageType = "test" + message.Data = 1 + for message.Data < 10 { + ch.Publish(WorkQueueName+"_processor", utils.SerializeStr(message), "") + message.Data += 1 + time.Sleep(time.Second * 5) + } +} diff --git a/test/zhimeng_api.go b/test/zhimeng_api.go deleted file mode 100644 index 6ef1670..0000000 --- a/test/zhimeng_api.go +++ /dev/null @@ -1,156 +0,0 @@ -package test - -import ( - "fmt" - "github.com/gocolly/colly" - "github.com/gocolly/colly/extensions" - "github.com/tidwall/gjson" - "net/http" - "regexp" - "strings" -) - -/* -目前可用接口 -[商品查询]https://www.showdoc.com.cn/59349170678610?page_id=339616554551473 -[商品详情]https://www.showdoc.com.cn/59349170678610?page_id=339687047645094 - -*/ - -// Response is SDK Response -type Response struct { - Msg string `json:"msg"` - Success int `json:"success"` - Data interface{} `json:"data"` -} - -func main() { - // // JD - // postData := map[string]string{"keyword": "联想", "p": "1", "size": "10"} - // fmt.Println(postData["time"]) - // res, _ := zhimeng.Send("jd", "getgoods", postData) - // fmt.Println(string(res)) - // p := Response{} - // json.Unmarshal(res, &p) - // fmt.Println(p) - // // VIP - // postData = map[string]string{"keyword": "联想", "p": "1", "size": "10", "order": "0"} - // fmt.Println(postData["time"]) - // res, _ = zhimeng.Send("wph", "seach_goods", postData) - // fmt.Println(string(res)) - // p = Response{} - // json.Unmarshal(res, &p) - // fmt.Println(p) - // // PDD - // postData = map[string]string{"keyword": "联想", "p": "1", "size": "10", "sort": "goods_price asc"} - // res, _ = zhimeng.Send("pdd", "getgoods", postData) - // fmt.Println(string(res)) - // p = Response{} - // json.Unmarshal(res, &p) - // fmt.Println(p) - for i := 0; i < 1000; i++ { - fmt.Println(i) - scrapPDD() - } -} - -func scrapJD() { - c := colly.NewCollector(func(collector *colly.Collector) { - extensions.RandomUserAgent(collector) - }) - c.OnResponse(func(r *colly.Response) { - re, _ := regexp.Compile(`[(]//[^\s]*[)]`) - body := r.Body - fmt.Println(string(body)) - urls := re.FindAllString(string(body), -1) - fmt.Println(urls) - for _, url := range urls { - url = strip(url, "()") - url = "https:" + url - fmt.Println(url) - } - }) - c.Visit("https://wqsitem.jd.com/detail/100008309360_d100008309360_normal.html") -} - -func scrapPDD() { - var cookies = []*http.Cookie{} - var mapcookies = make(map[string]string) - url := fmt.Sprintf("https://mobile.yangkeduo.com/goods.html?goods_id=%s", "156632692649") - cs := "api_uid=CiHUKl9DZKpL6QBVK4qWAg==; _nano_fp=Xpdbl0PyX5Pxn0TynT_DTGXbst0kz5cjzGAQDnBR; ua=Mozilla%2F5.0%20(Windows%20NT%2010.0%3B%20Win64%3B%20x64)%20AppleWebKit%2F537.36%20(KHTML%2C%20like%20Gecko)%20Chrome%2F84.0.4147.135%20Safari%2F537.36; webp=1; quick_entrance_click_record=20200824%2C1; PDDAccessToken=XRC6FNX7FRBL6AJRMRBRN4CDG2PZXO3YJZYHFUA4O2PLDAWVYXHA1125821; pdd_user_id=9622705741400; pdd_user_uin=F27EAZ4V5S7EGEVMCJI2P7RFLE_GEXDA; chat_config={'host_whitelist':['.yangkeduo.com','.pinduoduo.com','.10010.com/queen/tencent/pinduoduo-fill.html','.ha.10086.cn/pay/card-sale!toforward.action','wap.ha.10086.cn','m.10010.com']}; pdd_vds=gaLMNqmfGfyYEpyYiZGWopaCicNHbXGWtDNcOZnWLqiDNfLHOXnZaqtCLDiX" - csList := strings.Split(cs, ";") - for _, c := range csList { - s := strings.Trim(c, " ") - sList := strings.SplitN(s, "=", 2) - - mapcookies[sList[len(sList)-len(sList)]] = sList[(len(sList) - len(sList) + 1)] - - } - fmt.Println(mapcookies) - for key, value := range mapcookies { - if key == "ua" { - continue - } - cookies = append(cookies, &http.Cookie{Name: key, Value: value}) - } - c := colly.NewCollector( - colly.UserAgent("Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/84.0.4147.135 Safari/537.36"), - ) - - c.OnResponse(func(r *colly.Response) { - re, _ := regexp.Compile(`window.rawData=.*}`) - body := r.Body - fmt.Println(string(body)) - result := re.FindString(string(body)) - // fmt.Println(result) - result = strings.SplitN(result, "=", 2)[1] - // fmt.Println(result) - value := gjson.Get(result, "store.initDataObj.goods.detailGallery") - // fmt.Println(value) - list := value.Array() - imageList := []string{} - for _, v := range list { - nv := gjson.Get(v.String(), "url") - imageList = append(imageList, nv.String()) - } - fmt.Println(imageList) - ck := c.Cookies("https://mobile.yangkeduo.com") - fmt.Println(ck) - cookies = ck - }) - - c.SetCookies("https://mobile.yangkeduo.com", cookies) - - c.Visit(url) -} - -func strip(ss string, charss string) string { - s, chars := []rune(ss), []rune(charss) - length := len(s) - max := len(s) - 1 - l, r := true, true //标记当左端或者右端找到正常字符后就停止继续寻找 - start, end := 0, max - tmpEnd := 0 - charset := make(map[rune]bool) //创建字符集,也就是唯一的字符,方便后面判断是否存在 - for i := 0; i < len(chars); i++ { - charset[chars[i]] = true - } - for i := 0; i < length; i++ { - if _, exist := charset[s[i]]; l && !exist { - start = i - l = false - } - tmpEnd = max - i - if _, exist := charset[s[tmpEnd]]; r && !exist { - end = tmpEnd - r = false - } - if !l && !r { - break - } - } - if l && r { // 如果左端和右端都没找到正常字符,那么表示该字符串没有正常字符 - return "" - } - return string(s[start : end+1]) -}