|
- package uid
-
- import (
- "database/sql"
- "errors"
- "log"
- "time"
- )
-
- type logger interface {
- Error(error)
- }
-
- // Logger Log接口,如果设置了Logger,就使用Logger打印日志,如果没有设置,就使用内置库log打印日志
- var Logger logger
-
- // ErrTimeOut 获取uid超时错误
- var ErrTimeOut = errors.New("get uid timeout")
-
- type Uid struct {
- db *sql.DB // 数据库连接
- businessId string // 业务id
- ch chan int64 // id缓冲池
- min, max int64 // id段最小值,最大值
- }
-
- // NewUid 创建一个Uid;len:缓冲池大小()
- // db:数据库连接
- // businessId:业务id
- // len:缓冲池大小(长度可控制缓存中剩下多少id时,去DB中加载)
- func NewUid(db *sql.DB, businessId string, len int) (*Uid, error) {
- lid := Uid{
- db: db,
- businessId: businessId,
- ch: make(chan int64, len),
- }
- go lid.productId()
- return &lid, nil
- }
-
- // Get 获取自增id,当发生超时,返回错误,避免大量请求阻塞,服务器崩溃
- func (u *Uid) Get() (int64, error) {
- select {
- case <-time.After(1 * time.Second):
- return 0, ErrTimeOut
- case uid := <-u.ch:
- return uid, nil
- }
- }
-
- // productId 生产id,当ch达到最大容量时,这个方法会阻塞,直到ch中的id被消费
- func (u *Uid) productId() {
- _ = u.reLoad()
-
- for {
- if u.min >= u.max {
- _ = u.reLoad()
- }
-
- u.min++
- u.ch <- u.min
- }
- }
-
- // reLoad 在数据库获取id段,如果失败,会每隔一秒尝试一次
- func (u *Uid) reLoad() error {
- var err error
- for {
- err = u.getFromDB()
- if err == nil {
- return nil
- }
-
- // 数据库发生异常,等待一秒之后再次进行尝试
- if Logger != nil {
- Logger.Error(err)
- } else {
- log.Println(err)
- }
- time.Sleep(time.Second)
- }
- }
-
- // getFromDB 从数据库获取id段
- func (u *Uid) getFromDB() error {
- var (
- maxId int64
- step int64
- )
-
- tx, err := u.db.Begin()
- if err != nil {
- return err
- }
- defer func() { _ = tx.Rollback() }()
-
- row := tx.QueryRow("SELECT max_id,step FROM uid WHERE business_id = ? FOR UPDATE", u.businessId)
- err = row.Scan(&maxId, &step)
- if err != nil {
- return err
- }
-
- _, err = tx.Exec("UPDATE uid SET max_id = ? WHERE business_id = ?", maxId+step, u.businessId)
- if err != nil {
- return err
- }
- err = tx.Commit()
- if err != nil {
- return err
- }
-
- u.min = maxId
- u.max = maxId + step
- return nil
- }
|