112 lines
2.8 KiB
Go
112 lines
2.8 KiB
Go
package mq
|
|
|
|
import (
|
|
"context"
|
|
"crypto/tls"
|
|
"crypto/x509"
|
|
amqp "github.com/rabbitmq/amqp091-go"
|
|
"os"
|
|
"os/signal"
|
|
"strconv"
|
|
"syscall"
|
|
"testing"
|
|
"time"
|
|
)
|
|
|
|
func TestConsume(t *testing.T) {
|
|
clientCert, err := tls.LoadX509KeyPair(
|
|
"/Users/loveuer/codes/project/bifrost-pro/search_v3/internal/database/mq/tls/client.crt",
|
|
"/Users/loveuer/codes/project/bifrost-pro/search_v3/internal/database/mq/tls/client.key",
|
|
)
|
|
if err != nil {
|
|
t.Fatal(err.Error())
|
|
}
|
|
|
|
ca, err := os.ReadFile("/Users/loveuer/codes/project/bifrost-pro/search_v3/internal/database/mq/tls/ca.crt")
|
|
if err != nil {
|
|
t.Fatal(err.Error())
|
|
}
|
|
|
|
caCertPool := x509.NewCertPool()
|
|
if !caCertPool.AppendCertsFromPEM(ca) {
|
|
t.Fatal("ca pool append ca crt err")
|
|
}
|
|
|
|
if err := Init(
|
|
WithURI("amqps://admin:password@mq.dev:5671/export"),
|
|
WithTLS(&tls.Config{
|
|
Certificates: []tls.Certificate{clientCert},
|
|
RootCAs: caCertPool,
|
|
InsecureSkipVerify: true,
|
|
}),
|
|
WithQueueDeclare("export", false, false, false, false, amqp.Table{"x-max-priority": 100}),
|
|
); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
|
|
defer cancel()
|
|
|
|
ch, err := Consume(ctx, "export", &ConsumeOpt{MaxReconnection: -1})
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
t.Log("[TEST] start consume msg")
|
|
for msg := range ch {
|
|
t.Logf("[TEST] [%s] [msg: %s]", time.Now().Format("060102T150405"), string(msg.Body))
|
|
_ = msg.Ack(false)
|
|
}
|
|
}
|
|
|
|
func TestPublish(t *testing.T) {
|
|
clientCert, err := tls.LoadX509KeyPair(
|
|
"/Users/loveuer/codes/project/bifrost-pro/search_v3/internal/database/mq/tls/client.crt",
|
|
"/Users/loveuer/codes/project/bifrost-pro/search_v3/internal/database/mq/tls/client.key",
|
|
)
|
|
if err != nil {
|
|
t.Fatal(err.Error())
|
|
}
|
|
|
|
ca, err := os.ReadFile("/Users/loveuer/codes/project/bifrost-pro/search_v3/internal/database/mq/tls/ca.crt")
|
|
if err != nil {
|
|
t.Fatal(err.Error())
|
|
}
|
|
|
|
caCertPool := x509.NewCertPool()
|
|
if !caCertPool.AppendCertsFromPEM(ca) {
|
|
t.Fatal("ca pool append ca crt err")
|
|
}
|
|
|
|
if err := Init(
|
|
WithURI("amqps://admin:password@mq.dev:5671/export"),
|
|
WithTLS(&tls.Config{
|
|
Certificates: []tls.Certificate{clientCert},
|
|
RootCAs: caCertPool,
|
|
InsecureSkipVerify: true,
|
|
}),
|
|
WithQueueDeclare("export", false, false, false, false, amqp.Table{"x-max-priority": 100}),
|
|
); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
|
|
defer cancel()
|
|
|
|
count := 1
|
|
|
|
t.Log("[TEST] start publish msg...")
|
|
|
|
for {
|
|
if err = Publish(ctx, "export", amqp.Publishing{
|
|
ContentType: "text/plain",
|
|
Body: []byte(time.Now().Format(time.RFC3339) + " => hello_world@" + strconv.Itoa(count)),
|
|
}); err != nil {
|
|
t.Log(err.Error())
|
|
}
|
|
|
|
time.Sleep(11 * time.Second)
|
|
count++
|
|
}
|
|
}
|