You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
175 lines
3.1 KiB
175 lines
3.1 KiB
package queue |
|
|
|
import ( |
|
"errors" |
|
"sync" |
|
"time" |
|
) |
|
|
|
var ( |
|
RetryErr = errors.New("retry failed") |
|
MessageFormatErr = errors.New("Message format error") |
|
m sync.Mutex |
|
) |
|
|
|
// ConsumerRepo 消费者接口 |
|
type ConsumerRepo interface { |
|
|
|
//Consumer 消费者 |
|
Consumer(interface{}) error |
|
|
|
//Error 消费错误方法,消费异常会传递进来 |
|
Error(error) |
|
} |
|
|
|
type ProducerRepo interface { |
|
Producer(message interface{}) |
|
|
|
//run() |
|
//runtime() |
|
|
|
Error() error |
|
} |
|
|
|
// queueCh 队列信息体 |
|
type queueCh struct { |
|
retry int32 |
|
message interface{} |
|
//ctx context.Context |
|
} |
|
|
|
// Queue 队列 |
|
type Queue struct { |
|
// 队列 |
|
ch chan *queueCh |
|
|
|
// 错误体 |
|
err chan error |
|
|
|
// 执行对象 |
|
r ConsumerRepo |
|
|
|
// 日志 |
|
//log log.Logger |
|
|
|
//队列最大容量:默认:10 |
|
num int |
|
|
|
// 重试次数:默认:0 |
|
retry int32 |
|
|
|
// 最大容错等待时间:建议小于1s,默认是1毫秒 |
|
timeout time.Duration |
|
|
|
// 启动消费者数量:默认:1 |
|
coroutines int |
|
} |
|
|
|
type Option func(*Queue) |
|
|
|
// SetMaxNum 队列最大容量,超过需要等待 |
|
func SetMaxNum(num int) Option { |
|
return func(queue *Queue) { |
|
queue.num = num |
|
} |
|
} |
|
|
|
// SetRetry 每个请求重试次数 |
|
func SetRetry(num int32) Option { |
|
return func(queue *Queue) { |
|
queue.retry = num |
|
} |
|
} |
|
|
|
// SetTimeout 最大容错等待时间 |
|
func SetTimeout(duration time.Duration) Option { |
|
return func(queue *Queue) { |
|
queue.timeout = duration |
|
} |
|
} |
|
|
|
// SetCoroutines 设置消费队列数量 |
|
func SetCoroutines(num int) Option { |
|
return func(queue *Queue) { |
|
queue.coroutines = num |
|
} |
|
} |
|
|
|
// NewQueue 创建一个本地队列 |
|
// QueueRepo 队列承载体,队列操作内容 |
|
// SetMaxNum 队列最大容量,超过需要等待 |
|
// SetRetry 每个请求重试次数 |
|
// SetTimeout 最大容错等待时间 |
|
// SetCoroutines 设置消费队列数量 |
|
func NewQueue(r ConsumerRepo, options ...Option) ProducerRepo { |
|
q := &Queue{ |
|
//log: log, |
|
err: make(chan error, 10), |
|
r: r, |
|
timeout: time.Millisecond * 1, |
|
num: 10, |
|
retry: 0, |
|
coroutines: 1, |
|
} |
|
if len(options) > 0 { |
|
for _, option := range options { |
|
option(q) |
|
} |
|
} |
|
q.ch = make(chan *queueCh, q.num) |
|
q.run() |
|
return q |
|
} |
|
|
|
// Producer 生成者 |
|
func (q *Queue) Producer(message interface{}) { |
|
m.Lock() |
|
defer m.Unlock() |
|
q.ch <- &queueCh{ |
|
retry: q.retry, |
|
message: message, |
|
//ctx: ctx, |
|
} |
|
} |
|
|
|
// 启动队列等待 |
|
func (q *Queue) run() { |
|
for i := 0; i < q.coroutines; i++ { |
|
q.runtime() |
|
} |
|
} |
|
|
|
func (q *Queue) runtime() { |
|
go func() { |
|
for { |
|
body := <-q.ch |
|
err := q.r.Consumer(body.message) |
|
if err != nil { |
|
if body.retry != 0 { |
|
body.retry = q.retry - 1 |
|
select { |
|
case q.ch <- &queueCh{retry: body.retry, message: body.message}: |
|
case <-time.After(1 * time.Millisecond): |
|
//q.log.Error(RetryErr) |
|
err = RetryErr |
|
//q.r.Error(RetryErr) |
|
} |
|
} |
|
select { |
|
case q.err <- err: |
|
case <-time.After(time.Millisecond * 1): |
|
//q.log.Errorf("queue push err:%v", err) |
|
} |
|
q.r.Error(err) |
|
} |
|
} |
|
}() |
|
} |
|
|
|
// Error 返回错误信息,不严谨,谨慎使用 |
|
func (q *Queue) Error() error { |
|
for { |
|
err := <-q.err |
|
return err |
|
} |
|
}
|
|
|