1 В избранное 0 Ответвления 0

OSCHINA-MIRROR/orionis-coyotes

Присоединиться к Gitlife
Откройте для себя и примите участие в публичных проектах с открытым исходным кодом с участием более 10 миллионов разработчиков. Приватные репозитории также полностью бесплатны :)
Присоединиться бесплатно
Это зеркальный репозиторий, синхронизируется ежедневно с исходного репозитория.
Клонировать/Скачать
queue.go 2.6 КБ
Копировать Редактировать Исходные данные Просмотреть построчно История
管宜尧 Отправлено 8 лет назад 94811e7
package redis
import (
"fmt"
"time"
"github.com/mylxsw/coyotes/config"
"github.com/mylxsw/coyotes/console"
"github.com/mylxsw/coyotes/log"
redis "gopkg.in/redis.v5"
)
// Queue is the queue object for redis broker
type Queue struct {
Runtime *config.Runtime
Client *redis.Client
}
// Create a redis queue
func Create() *Queue {
client := createRedisClient()
return &Queue{
Client: client,
Runtime: config.GetRuntime(),
}
}
// Close task queue
func (queue *Queue) Close() {
queue.Client.Close()
}
// Listen to the redis queue
func (queue *Queue) Listen(channel *config.Channel) {
// 非任务模式不启用队列监听
if !queue.Runtime.Config.TaskMode {
return
}
log.Debug("queue listener %s started.", channel.Name)
defer log.Debug("queue listener %s stopped.", channel.Name)
for {
select {
case <-channel.StopChan:
close(channel.Command)
return
default:
res, err := queue.Client.BRPop(2*time.Second, TaskQueueKey(channel.Name)).Result()
if err != nil {
continue
}
queue.Client.SAdd(TaskQueueExecKey(channel.Name), res[1])
channel.Command <- res[1]
}
}
}
// Work function consuming the queue
func (queue *Queue) Work(i int, channel *config.Channel, callback func(command string, processID string)) {
processID := fmt.Sprintf("%s %d", channel.Name, i)
log.Debug("task customer [%s] started.", console.ColorfulText(console.TextRed, processID))
defer log.Debug("task customer [%s] stopped.", console.ColorfulText(console.TextRed, processID))
for {
select {
case res, ok := <-channel.Command:
if !ok {
return
}
func(res string) {
startTime := time.Now()
// 删除用于去重的缓存key
defer func() {
distinctKey := TaskQueueDistinctKey(channel.Name, res)
execKey := TaskQueueExecKey(channel.Name)
log.Debug(
"[%s] clean %s %s ...",
console.ColorfulText(console.TextRed, processID),
distinctKey,
execKey,
)
err := queue.Client.Del(distinctKey).Err()
if err != nil {
log.Error(
"[%s] delete key %s failed: %v",
console.ColorfulText(console.TextRed, processID),
distinctKey,
err,
)
}
err = queue.Client.SRem(execKey, res).Err()
if err != nil {
log.Error(
"[%s] remove key %s from %s: %v",
console.ColorfulText(console.TextRed, processID),
res,
execKey,
err,
)
}
log.Info(
"[%s] task [%s] time-consuming %v",
console.ColorfulText(console.TextRed, processID),
console.ColorfulText(console.TextGreen, res),
time.Since(startTime),
)
}()
callback(res, processID)
}(res)
}
}
}

Комментарий ( 0 )

Вы можете оставить комментарий после Вход в систему

1
https://gitlife.ru/oschina-mirror/orionis-coyotes.git
git@gitlife.ru:oschina-mirror/orionis-coyotes.git
oschina-mirror
orionis-coyotes
orionis-coyotes
v1.0-beta