package redis import ( "fmt" "time" "github.com/mylxsw/coyotes/config" "github.com/mylxsw/coyotes/console" "github.com/mylxsw/coyotes/log" redis "gopkg.in/redis.v5" ) // Queue is the queue object for redis broker type Queue struct { Runtime *config.Runtime Client *redis.Client } // Create a redis queue func Create() *Queue { client := createRedisClient() return &Queue{ Client: client, Runtime: config.GetRuntime(), } } // Close task queue func (queue *Queue) Close() { queue.Client.Close() } // Listen to the redis queue func (queue *Queue) Listen(channel *config.Channel) { // 非任务模式不启用队列监听 if !queue.Runtime.Config.TaskMode { return } log.Debug("queue listener %s started.", channel.Name) defer log.Debug("queue listener %s stopped.", channel.Name) for { select { case <-channel.StopChan: close(channel.Command) return default: res, err := queue.Client.BRPop(2*time.Second, TaskQueueKey(channel.Name)).Result() if err != nil { continue } queue.Client.SAdd(TaskQueueExecKey(channel.Name), res[1]) channel.Command <- res[1] } } } // Work function consuming the queue func (queue *Queue) Work(i int, channel *config.Channel, callback func(command string, processID string)) { processID := fmt.Sprintf("%s %d", channel.Name, i) log.Debug("task customer [%s] started.", console.ColorfulText(console.TextRed, processID)) defer log.Debug("task customer [%s] stopped.", console.ColorfulText(console.TextRed, processID)) for { select { case res, ok := <-channel.Command: if !ok { return } func(res string) { startTime := time.Now() // 删除用于去重的缓存key defer func() { distinctKey := TaskQueueDistinctKey(channel.Name, res) execKey := TaskQueueExecKey(channel.Name) log.Debug( "[%s] clean %s %s ...", console.ColorfulText(console.TextRed, processID), distinctKey, execKey, ) err := queue.Client.Del(distinctKey).Err() if err != nil { log.Error( "[%s] delete key %s failed: %v", console.ColorfulText(console.TextRed, processID), distinctKey, err, ) } err = queue.Client.SRem(execKey, res).Err() if err != nil { log.Error( "[%s] remove key %s from %s: %v", console.ColorfulText(console.TextRed, processID), res, execKey, err, ) } log.Info( "[%s] task [%s] time-consuming %v", console.ColorfulText(console.TextRed, processID), console.ColorfulText(console.TextGreen, res), time.Since(startTime), ) }() callback(res, processID) }(res) } } }