本地队列
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

175 lines
3.1 KiB

package queue
import (
"errors"
"sync"
"time"
)
var (
RetryErr = errors.New("retry failed")
MessageFormatErr = errors.New("Message format error")
m sync.Mutex
)
// ConsumerRepo 消费者接口
type ConsumerRepo interface {
//Consumer 消费者
Consumer(interface{}) error
//Error 消费错误方法,消费异常会传递进来
Error(error)
}
type ProducerRepo interface {
Producer(message interface{})
//run()
//runtime()
Error() error
}
// queueCh 队列信息体
type queueCh struct {
retry int32
message interface{}
//ctx context.Context
}
// Queue 队列
type Queue struct {
// 队列
ch chan *queueCh
// 错误体
err chan error
// 执行对象
r ConsumerRepo
// 日志
//log log.Logger
//队列最大容量:默认:10
num int
// 重试次数:默认:0
retry int32
// 最大容错等待时间:建议小于1s,默认是1毫秒
timeout time.Duration
// 启动消费者数量:默认:1
coroutines int
}
type Option func(*Queue)
// SetMaxNum 队列最大容量,超过需要等待
func SetMaxNum(num int) Option {
return func(queue *Queue) {
queue.num = num
}
}
// SetRetry 每个请求重试次数
func SetRetry(num int32) Option {
return func(queue *Queue) {
queue.retry = num
}
}
// SetTimeout 最大容错等待时间
func SetTimeout(duration time.Duration) Option {
return func(queue *Queue) {
queue.timeout = duration
}
}
// SetCoroutines 设置消费队列数量
func SetCoroutines(num int) Option {
return func(queue *Queue) {
queue.coroutines = num
}
}
// NewQueue 创建一个本地队列
// QueueRepo 队列承载体,队列操作内容
// SetMaxNum 队列最大容量,超过需要等待
// SetRetry 每个请求重试次数
// SetTimeout 最大容错等待时间
// SetCoroutines 设置消费队列数量
func NewQueue(r ConsumerRepo, options ...Option) ProducerRepo {
q := &Queue{
//log: log,
err: make(chan error, 10),
r: r,
timeout: time.Millisecond * 1,
num: 10,
retry: 0,
coroutines: 1,
}
if len(options) > 0 {
for _, option := range options {
option(q)
}
}
q.ch = make(chan *queueCh, q.num)
q.run()
return q
}
// Producer 生成者
func (q *Queue) Producer(message interface{}) {
m.Lock()
defer m.Unlock()
q.ch <- &queueCh{
retry: q.retry,
message: message,
//ctx: ctx,
}
}
// 启动队列等待
func (q *Queue) run() {
for i := 0; i < q.coroutines; i++ {
q.runtime()
}
}
func (q *Queue) runtime() {
go func() {
for {
body := <-q.ch
err := q.r.Consumer(body.message)
if err != nil {
if body.retry != 0 {
body.retry = q.retry - 1
select {
case q.ch <- &queueCh{retry: body.retry, message: body.message}:
case <-time.After(1 * time.Millisecond):
//q.log.Error(RetryErr)
err = RetryErr
//q.r.Error(RetryErr)
}
}
select {
case q.err <- err:
case <-time.After(time.Millisecond * 1):
//q.log.Errorf("queue push err:%v", err)
}
q.r.Error(err)
}
}
}()
}
// Error 返回错误信息,不严谨,谨慎使用
func (q *Queue) Error() error {
for {
err := <-q.err
return err
}
}