Слияние кода завершено, страница обновится автоматически
package redis
import (
"fmt"
"time"
"github.com/mylxsw/coyotes/config"
"github.com/mylxsw/coyotes/console"
"github.com/mylxsw/coyotes/log"
redis "gopkg.in/redis.v5"
)
// Queue is the queue object for redis broker
type Queue struct {
Runtime *config.Runtime
Client *redis.Client
}
// Create a redis queue
func Create() *Queue {
client := createRedisClient()
return &Queue{
Client: client,
Runtime: config.GetRuntime(),
}
}
// Close task queue
func (queue *Queue) Close() {
queue.Client.Close()
}
// Listen to the redis queue
func (queue *Queue) Listen(channel *config.Channel) {
// 非任务模式不启用队列监听
if !queue.Runtime.Config.TaskMode {
return
}
log.Debug("queue listener %s started.", channel.Name)
defer log.Debug("queue listener %s stopped.", channel.Name)
for {
select {
case <-channel.StopChan:
close(channel.Command)
return
default:
res, err := queue.Client.BRPop(2*time.Second, TaskQueueKey(channel.Name)).Result()
if err != nil {
continue
}
queue.Client.SAdd(TaskQueueExecKey(channel.Name), res[1])
channel.Command <- res[1]
}
}
}
// Work function consuming the queue
func (queue *Queue) Work(i int, channel *config.Channel, callback func(command string, processID string)) {
processID := fmt.Sprintf("%s %d", channel.Name, i)
log.Debug("task customer [%s] started.", console.ColorfulText(console.TextRed, processID))
defer log.Debug("task customer [%s] stopped.", console.ColorfulText(console.TextRed, processID))
for {
select {
case res, ok := <-channel.Command:
if !ok {
return
}
func(res string) {
startTime := time.Now()
// 删除用于去重的缓存key
defer func() {
distinctKey := TaskQueueDistinctKey(channel.Name, res)
execKey := TaskQueueExecKey(channel.Name)
log.Debug(
"[%s] clean %s %s ...",
console.ColorfulText(console.TextRed, processID),
distinctKey,
execKey,
)
err := queue.Client.Del(distinctKey).Err()
if err != nil {
log.Error(
"[%s] delete key %s failed: %v",
console.ColorfulText(console.TextRed, processID),
distinctKey,
err,
)
}
err = queue.Client.SRem(execKey, res).Err()
if err != nil {
log.Error(
"[%s] remove key %s from %s: %v",
console.ColorfulText(console.TextRed, processID),
res,
execKey,
err,
)
}
log.Info(
"[%s] task [%s] time-consuming %v",
console.ColorfulText(console.TextRed, processID),
console.ColorfulText(console.TextGreen, res),
time.Since(startTime),
)
}()
callback(res, processID)
}(res)
}
}
}
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Комментарий ( 0 )