package kafka import ( "context" "os/signal" "syscall" "testing" "time" "ultone/internal/tool" "github.com/loveuer/nf/nft/log" ) func TestKafka(t *testing.T) { ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) defer cancel() client, err := New("10.220.10.15:9092", WithTopic("test_zyp"), WithPlainAuth("admin", "Yhblsqt@!."), WithReconnection(), ) if err != nil { t.Fatal(1, err) } ch, err := client.ReadMessage(ctx, ReadConfig{}) if err != nil { t.Fatal(2, err) } for msg := range ch { if msg.err != nil { t.Logf("[Error] [TestKafka] msg.err = %v", msg.err) continue } t.Logf("[Info ] [TestKafka] [time = %s] [msg.topic = %s] [msg.key = %s] [msg.value = %s]", time.Now().Format("060102T150405"), msg.Topic, string(msg.Key), string(msg.Value)) } } func TestKafkaWrite(t *testing.T) { log.SetLogLevel(log.LogLevelDebug) client, err := New("10.220.10.15:9092", WithTopic("test_zyp"), WithPlainAuth("admin", "Yhblsqt@!."), WithReconnection(), ) if err != nil { t.Fatal(1, err) } if err = client.WriteMessages(tool.Timeout(5), &Payload{ Key: []byte(time.Now().Format("2006/01/02 15:04:05")), Value: []byte(tool.RandomString(16)), }, &Payload{ Key: []byte(time.Now().Format("2006/01/02 15:04:05")), Value: []byte(tool.RandomString(16)), }, &Payload{ Key: []byte(time.Now().Format("2006/01/02 15:04:05")), Value: []byte(tool.RandomString(16)), }, ); err != nil { t.Log(2, err) } if err = client.WriteMessages(context.Background(), &Payload{ Key: []byte(time.Now().Format("2006/01/02 15:04:05")), Value: []byte(tool.RandomString(16)), }, &Payload{ Key: []byte(time.Now().Format("2006/01/02 15:04:05")), Value: []byte(tool.RandomString(16)), }, &Payload{ Key: []byte(time.Now().Format("2006/01/02 15:04:05")), Value: []byte(tool.RandomString(16)), }, ); err != nil { t.Log(3, err) } }