🎉: init project

This commit is contained in:
loveuer
2024-07-11 16:37:26 +08:00
commit c46458c6f2
159 changed files with 19246 additions and 0 deletions

View File

@ -0,0 +1,89 @@
package mq
import (
"crypto/tls"
"fmt"
amqp "github.com/rabbitmq/amqp091-go"
"net/url"
"sync"
)
// Init - init mq client:
// - @param.uri: "{scheme[amqp/amqps]}://{username}:{password}@{endpoint}/{virtual_host}"
// - @param.certs: with amqps, certs[0]=client crt bytes, certs[0]=client key bytes
type _client struct {
sync.Mutex
uri string
tlsCfg *tls.Config
conn *amqp.Connection
ch *amqp.Channel
consume <-chan amqp.Delivery
queue *queueOption
}
func (c *_client) open() error {
var (
err error
)
c.Lock()
defer c.Unlock()
if c.tlsCfg != nil {
c.conn, err = amqp.DialTLS(c.uri, c.tlsCfg)
} else {
c.conn, err = amqp.Dial(c.uri)
}
if err != nil {
return err
}
if c.ch, err = c.conn.Channel(); err != nil {
return err
}
if client.queue != nil && client.queue.name != "" {
if _, err = client.ch.QueueDeclare(
client.queue.name,
client.queue.durable,
client.queue.autoDelete,
client.queue.exclusive,
client.queue.noWait,
client.queue.args,
); err != nil {
return fmt.Errorf("declare queue: %s, err: %w", client.queue.name, err)
}
}
return nil
}
var (
client = &_client{
uri: "amqp://guest:guest@127.0.0.1:5672/",
tlsCfg: nil,
}
)
// Init - init mq client
func Init(opts ...OptionFn) error {
var (
err error
)
for _, fn := range opts {
fn(client)
}
if _, err = url.Parse(client.uri); err != nil {
return fmt.Errorf("url parse uri err: %w", err)
}
if err = client.open(); err != nil {
return err
}
return nil
}

View File

@ -0,0 +1,111 @@
package mq
import (
"context"
"crypto/tls"
"crypto/x509"
amqp "github.com/rabbitmq/amqp091-go"
"os"
"os/signal"
"strconv"
"syscall"
"testing"
"time"
)
func TestConsume(t *testing.T) {
clientCert, err := tls.LoadX509KeyPair(
"/Users/loveuer/codes/project/bifrost-pro/search_v3/internal/database/mq/tls/client.crt",
"/Users/loveuer/codes/project/bifrost-pro/search_v3/internal/database/mq/tls/client.key",
)
if err != nil {
t.Fatal(err.Error())
}
ca, err := os.ReadFile("/Users/loveuer/codes/project/bifrost-pro/search_v3/internal/database/mq/tls/ca.crt")
if err != nil {
t.Fatal(err.Error())
}
caCertPool := x509.NewCertPool()
if !caCertPool.AppendCertsFromPEM(ca) {
t.Fatal("ca pool append ca crt err")
}
if err := Init(
WithURI("amqps://admin:password@mq.dev:5671/export"),
WithTLS(&tls.Config{
Certificates: []tls.Certificate{clientCert},
RootCAs: caCertPool,
InsecureSkipVerify: true,
}),
WithQueueDeclare("export", false, false, false, false, amqp.Table{"x-max-priority": 100}),
); err != nil {
t.Fatal(err)
}
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
defer cancel()
ch, err := Consume(ctx, "export", &ConsumeOpt{MaxReconnection: -1})
if err != nil {
t.Fatal(err)
}
t.Log("[TEST] start consume msg")
for msg := range ch {
t.Logf("[TEST] [%s] [msg: %s]", time.Now().Format("060102T150405"), string(msg.Body))
_ = msg.Ack(false)
}
}
func TestPublish(t *testing.T) {
clientCert, err := tls.LoadX509KeyPair(
"/Users/loveuer/codes/project/bifrost-pro/search_v3/internal/database/mq/tls/client.crt",
"/Users/loveuer/codes/project/bifrost-pro/search_v3/internal/database/mq/tls/client.key",
)
if err != nil {
t.Fatal(err.Error())
}
ca, err := os.ReadFile("/Users/loveuer/codes/project/bifrost-pro/search_v3/internal/database/mq/tls/ca.crt")
if err != nil {
t.Fatal(err.Error())
}
caCertPool := x509.NewCertPool()
if !caCertPool.AppendCertsFromPEM(ca) {
t.Fatal("ca pool append ca crt err")
}
if err := Init(
WithURI("amqps://admin:password@mq.dev:5671/export"),
WithTLS(&tls.Config{
Certificates: []tls.Certificate{clientCert},
RootCAs: caCertPool,
InsecureSkipVerify: true,
}),
WithQueueDeclare("export", false, false, false, false, amqp.Table{"x-max-priority": 100}),
); err != nil {
t.Fatal(err)
}
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
defer cancel()
count := 1
t.Log("[TEST] start publish msg...")
for {
if err = Publish(ctx, "export", amqp.Publishing{
ContentType: "text/plain",
Body: []byte(time.Now().Format(time.RFC3339) + " => hello_world@" + strconv.Itoa(count)),
}); err != nil {
t.Log(err.Error())
}
time.Sleep(11 * time.Second)
count++
}
}

View File

@ -0,0 +1,97 @@
package mq
import (
"context"
"fmt"
"github.com/loveuer/esgo2dump/log"
amqp "github.com/rabbitmq/amqp091-go"
"os"
"time"
"ultone/internal/tool"
)
// ConsumeOpt
// - Name: consumer's name, default unamed_<timestamp>
// - MaxReconnection: when mq connection closed, max reconnection times, default 3, -1 for unlimited
type ConsumeOpt struct {
Name string // consumer's name, default unamed_<timestamp>
AutoAck bool
Exclusive bool
NoLocal bool
NoWait bool
MaxReconnection int // when mq connection closed, max reconnection times, default 3, -1 for unlimited
Args amqp.Table
}
func Consume(ctx context.Context, queue string, opts ...*ConsumeOpt) (<-chan amqp.Delivery, error) {
var (
err error
res = make(chan amqp.Delivery, 1)
opt = &ConsumeOpt{
Name: os.Getenv("HOSTNAME"),
AutoAck: false,
Exclusive: false,
NoLocal: false,
NoWait: false,
Args: nil,
MaxReconnection: 3,
}
)
if len(opts) > 0 && opts[0] != nil {
opt = opts[0]
}
if opt.Name == "" {
opt.Name = fmt.Sprintf("unamed_%d", time.Now().UnixMilli())
}
client.Lock()
if client.consume, err = client.ch.Consume(queue, opt.Name, opt.AutoAck, opt.Exclusive, opt.NoLocal, opt.NoWait, opt.Args); err != nil {
client.Unlock()
return nil, err
}
client.Unlock()
go func() {
Run:
retry := 0
for {
select {
case <-ctx.Done():
close(res)
return
case m, ok := <-client.consume:
if !ok {
log.Warn("[mq] consume channel closed!!!")
goto Reconnect
}
res <- m
}
}
Reconnect:
if opt.MaxReconnection == -1 || opt.MaxReconnection > retry {
retry++
log.Warn("[mq] try reconnect[%d/%d] to mq server after %d seconds...err: %v", retry, opt.MaxReconnection, tool.Min(60, retry*5), err)
time.Sleep(time.Duration(tool.Min(60, retry*5)) * time.Second)
if err = client.open(); err != nil {
goto Reconnect
}
client.Lock()
if client.consume, err = client.ch.Consume(queue, opt.Name, opt.AutoAck, opt.Exclusive, opt.NoLocal, opt.NoWait, opt.Args); err != nil {
client.Unlock()
goto Reconnect
}
client.Unlock()
log.Info("[mq] reconnect success!!!")
goto Run
}
}()
return res, nil
}

View File

@ -0,0 +1,48 @@
package mq
import (
"crypto/tls"
amqp "github.com/rabbitmq/amqp091-go"
)
type OptionFn func(*_client)
// WithURI
// - amqp uri, e.g. amqp://guest:guest@127.0.0.1:5672/vhost
// - tips: with tls, scheme should be amqps, amqps://xx:xx@127.0.0.1:5672/vhost
func WithURI(uri string) OptionFn {
return func(c *_client) {
c.uri = uri
}
}
// WithTLS
// - amqps tls config
// - include client cert, client key, ca cert
func WithTLS(tlsCfg *tls.Config) OptionFn {
return func(c *_client) {
c.tlsCfg = tlsCfg
}
}
type queueOption struct {
name string
durable bool
autoDelete bool
exclusive bool
noWait bool
args amqp.Table
}
func WithQueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) OptionFn {
return func(c *_client) {
c.queue = &queueOption{
name: name,
durable: durable,
autoDelete: autoDelete,
exclusive: exclusive,
noWait: noWait,
args: args,
}
}
}

View File

@ -0,0 +1,62 @@
package mq
import (
"context"
"errors"
"github.com/loveuer/esgo2dump/log"
amqp "github.com/rabbitmq/amqp091-go"
"time"
"ultone/internal/tool"
)
// PublishOpt
// - MaxReconnect: publish msg auto retry with reconnect, should not be big, case memory leak
type PublishOpt struct {
Exchange string
Mandatory bool
Immediate bool
MaxReconnect uint8 // publish msg auto retry with reconnect, should not be big(default 1), case memory leak
}
func Publish(ctx context.Context, queue string, msg amqp.Publishing, opts ...*PublishOpt) error {
var (
err error
opt = &PublishOpt{
Exchange: amqp.DefaultExchange,
Mandatory: false,
Immediate: false,
MaxReconnect: 1,
}
retry = 0
)
if len(opts) > 0 && opts[0] != nil {
opt = opts[0]
}
for ; retry <= int(opt.MaxReconnect); retry++ {
if err = client.ch.PublishWithContext(ctx, opt.Exchange, queue, opt.Mandatory, opt.Immediate, msg); err == nil {
return nil
}
if errors.Is(err, amqp.ErrClosed) {
sleep := tool.Min(120, (retry+1)*30)
log.Warn("[mq] connection closed, reconnect[%d/%d] after %d seconds", retry+1, opt.MaxReconnect, sleep)
time.Sleep(time.Duration(sleep) * time.Second)
if oerr := client.open(); oerr != nil {
log.Error("[mq] reconnect[%d/%d] mq err: %v", oerr, retry+1, opt.MaxReconnect)
} else {
log.Info("[mq] reconnect mq success!!!")
}
continue
}
return err
}
return err
}