otto 2 months ago
parent
commit
62b20e46d9
  1. 13
      queue.go

13
queue.go

@ -1,7 +1,6 @@
package queue
import (
"context"
"errors"
"sync"
"time"
@ -17,14 +16,14 @@ var (
type ConsumerRepo interface {
//Consumer 消费者
Consumer(context.Context, interface{}) error
Consumer(interface{}) error
//Error 消费错误方法,消费异常会传递进来
Error(error)
}
type ProducerRepo interface {
Producer(ctx context.Context, message interface{})
Producer(message interface{})
//run()
//runtime()
@ -36,7 +35,7 @@ type ProducerRepo interface {
type queueCh struct {
retry int32
message interface{}
ctx context.Context
//ctx context.Context
}
// Queue 队列
@ -123,13 +122,13 @@ func NewQueue(r ConsumerRepo, options ...Option) ProducerRepo {
}
// Producer 生成者
func (q *Queue) Producer(ctx context.Context, message interface{}) {
func (q *Queue) Producer(message interface{}) {
m.Lock()
defer m.Unlock()
q.ch <- &queueCh{
retry: q.retry,
message: message,
ctx: ctx,
//ctx: ctx,
}
}
@ -144,7 +143,7 @@ func (q *Queue) runtime() {
go func() {
for {
body := <-q.ch
err := q.r.Consumer(body.ctx, body.message)
err := q.r.Consumer(body.message)
if err != nil {
if body.retry != 0 {
body.retry = q.retry - 1

Loading…
Cancel
Save