feat: add oliver elastic support; update db(sqlite)

This commit is contained in:
zhaoyupeng
2024-09-20 15:14:49 +08:00
parent f9d59b99a0
commit c117d363a0
14 changed files with 399 additions and 85 deletions

View File

@ -2,25 +2,29 @@ package db
import (
"context"
"gorm.io/gorm"
"io"
"ultone/internal/opt"
"ultone/internal/tool"
"gorm.io/gorm"
)
var (
cli = &client{}
Default *Client
)
type client struct {
cli *gorm.DB
ttype string
type Client struct {
ctx context.Context
cli *gorm.DB
ttype string
cfgSqlite *cfgSqlite
}
func Type() string {
return cli.ttype
func (c *Client) Type() string {
return c.ttype
}
func New(ctxs ...context.Context) *gorm.DB {
func (c *Client) Session(ctxs ...context.Context) *gorm.DB {
var ctx context.Context
if len(ctxs) > 0 && ctxs[0] != nil {
ctx = ctxs[0]
@ -28,7 +32,7 @@ func New(ctxs ...context.Context) *gorm.DB {
ctx = tool.Timeout(30)
}
session := cli.cli.Session(&gorm.Session{Context: ctx})
session := c.cli.Session(&gorm.Session{Context: ctx})
if opt.Debug {
session = session.Debug()
@ -36,3 +40,22 @@ func New(ctxs ...context.Context) *gorm.DB {
return session
}
func (c *Client) Close() {
d, _ := c.cli.DB()
d.Close()
}
// Dump
// Only for sqlite with mem mode to dump data to bytes(io.Reader)
func (c *Client) Dump() (reader io.ReadSeekCloser, ok bool) {
if c.ttype != "sqlite" {
return nil, false
}
if c.cfgSqlite.fsType != "mem" {
return nil, false
}
return c.cfgSqlite.memDump.Dump(), true
}

View File

@ -0,0 +1,44 @@
package db
import (
"context"
"io"
"os"
"testing"
)
func TestOpen(t *testing.T) {
myClient, err := New(context.TODO(), "sqlite::", OptSqliteByMem())
if err != nil {
t.Fatalf("TestOpen: New err = %v", err)
}
type Start struct {
Id int `json:"id" gorm:"column:id;primaryKey"`
Name string `json:"name" gorm:"column:name"`
Dis float64 `json:"dis" gorm:"column:dis"`
}
if err = myClient.Session().AutoMigrate(&Start{}); err != nil {
t.Fatalf("TestOpen: AutoMigrate err = %v", err)
}
if err = myClient.Session().Create(&Start{Name: "sun", Dis: 6631.76}).Error; err != nil {
t.Fatalf("TestOpen: Create err = %v", err)
}
if err = myClient.Session().Create(&Start{Name: "mar", Dis: 786.35}).Error; err != nil {
t.Fatalf("TestOpen: Create err = %v", err)
}
if reader, ok := myClient.Dump(); ok {
bs, err := io.ReadAll(reader)
if err != nil {
t.Fatalf("TestOpen: ReadAll err = %v", err)
}
os.WriteFile("dump.db", bs, 0644)
}
myClient.Close()
}

View File

@ -1,23 +1,26 @@
package db
import (
"context"
"fmt"
"github.com/glebarez/sqlite"
"strings"
"gorm.io/driver/mysql"
"gorm.io/driver/postgres"
"gorm.io/gorm"
"strings"
"ultone/internal/opt"
)
func Init() error {
strs := strings.Split(opt.Cfg.DB.Uri, "::")
func New(ctx context.Context, uri string, opts ...Option) (*Client, error) {
strs := strings.Split(uri, "::")
if len(strs) != 2 {
return fmt.Errorf("db.Init: opt db uri invalid: %s", opt.Cfg.DB.Uri)
return nil, fmt.Errorf("db.Init: opt db uri invalid: %s", uri)
}
cli.ttype = strs[0]
c := &Client{ttype: strs[0], cfgSqlite: &cfgSqlite{fsType: "file"}}
for _, f := range opts {
f(c)
}
var (
err error
@ -26,20 +29,25 @@ func Init() error {
switch strs[0] {
case "sqlite":
opt.Cfg.DB.Type = "sqlite"
cli.cli, err = gorm.Open(sqlite.Open(dsn))
err = openSqlite(c, dsn)
case "mysql":
opt.Cfg.DB.Type = "mysql"
cli.cli, err = gorm.Open(mysql.Open(dsn))
c.cli, err = gorm.Open(mysql.Open(dsn))
case "postgres":
opt.Cfg.DB.Type = "postgres"
cli.cli, err = gorm.Open(postgres.Open(dsn))
c.cli, err = gorm.Open(postgres.Open(dsn))
default:
return fmt.Errorf("db type only support: [sqlite, mysql, postgres], unsupported db type: %s", strs[0])
return nil, fmt.Errorf("db type only support: [sqlite, mysql, postgres], unsupported db type: %s", strs[0])
}
if err != nil {
return fmt.Errorf("db.Init: open %s with dsn:%s, err: %w", strs[0], dsn, err)
return nil, fmt.Errorf("db.Init: open %s with dsn:%s, err: %w", strs[0], dsn, err)
}
return c, nil
}
func Init(ctx context.Context, uri string, opts ...Option) (err error) {
if Default, err = New(ctx, uri, opts...); err != nil {
return err
}
return nil

View File

@ -0,0 +1,27 @@
package db
import (
_ "github.com/loveuer/go-sqlite3/embed"
"io"
)
type Option func(c *Client)
func OptSqliteByUrl(address string) Option {
return func(c *Client) {
c.cfgSqlite.fsType = "url"
}
}
type SqliteMemDumper interface {
Dump() io.ReadSeekCloser
}
// 如果传 nil 则表示新生成一个 mem 的 sqlite
// 如果传了一个合法的 reader 则会从这个 reader 初始化 database
func OptSqliteByMem(reader io.ReadCloser) Option {
return func(c *Client) {
c.cfgSqlite.memReader = reader
c.cfgSqlite.fsType = "mem"
}
}

View File

@ -0,0 +1,63 @@
package db
import (
"fmt"
"io"
"time"
_ "github.com/loveuer/go-sqlite3/embed"
"github.com/loveuer/go-sqlite3/vfs/memdb"
"github.com/loveuer/go-sqlite3/vfs/readervfs"
"github.com/ncruces/go-sqlite3/gormlite"
"github.com/psanford/httpreadat"
"gorm.io/gorm"
)
type cfgSqlite struct {
fsType string // file, mem(bytes), url
memDump *memdb.MemDB
memReader io.ReadCloser
}
func openSqlite(c *Client, dsn string) error {
var (
db gorm.Dialector
err error
)
switch c.cfgSqlite.fsType {
case "file":
db = gormlite.Open("file:" + dsn)
case "url":
name := fmt.Sprintf("%d.db", time.Now().UnixNano())
readervfs.Create(name, httpreadat.New(dsn))
uri := fmt.Sprintf("file:%s?vfs=reader", name)
db = gormlite.Open(uri)
case "mem":
var (
bs []byte
name = fmt.Sprintf("%d.db", time.Now().UnixNano())
)
if c.cfgSqlite.memReader == nil {
bs = make([]byte, 0)
} else {
if bs, err = io.ReadAll(c.cfgSqlite.memReader); err != nil {
return err
}
}
memDump := memdb.Create(name, bs)
c.cfgSqlite.memDump = memDump
uri := fmt.Sprintf("file:/%s?vfs=memdb", name)
db = gormlite.Open(uri)
default:
return fmt.Errorf("unsupported sqlite fs type: %s", c.cfgSqlite.fsType)
}
if c.cli, err = gorm.Open(db); err != nil {
return err
}
return nil
}

View File

@ -0,0 +1,119 @@
package elastic
import (
"context"
"crypto/tls"
"fmt"
"github.com/loveuer/nf/nft/log"
es "github.com/olivere/elastic/v7"
"github.com/spf13/cast"
"net/http"
"net/url"
"strings"
"time"
)
var (
Client *es.Client
)
const (
ExampleUri = "https://username:password@ip1:9200,ip2:9200?disable_sniff=false&health=false"
)
func New(ctx context.Context, uri string) (*es.Client, error) {
var (
err error
ins *url.URL
client *es.Client
version string
)
if ins, err = url.Parse(uri); err != nil {
return nil, fmt.Errorf("invalid uri: %v\nexample uri: %s", err, ExampleUri)
}
if !(ins.Scheme == "http" || ins.Scheme == "https") {
return nil, fmt.Errorf("invalid uri scheme: %v\nexample uri: %s", err, ExampleUri)
}
hosts := strings.Split(ins.Host, ",")
if len(hosts) == 0 {
return nil, fmt.Errorf("invalid uri hosts: %v\nexample uri: %s", err, ExampleUri)
}
opts := make([]es.ClientOptionFunc, 0)
shouldHosts := make([]string, 0)
oks := make([]string, 0)
bads := make([]string, 0)
for idx := range hosts {
if len(hosts[idx]) == 0 {
continue
}
opts = append(opts, es.SetURL(hosts[idx]))
shouldHosts = append(shouldHosts, hosts[idx])
}
if len(opts) == 0 {
return nil, fmt.Errorf("invalid uri hosts: %v\nexample uri: %s", err, ExampleUri)
}
if ins.Scheme == "https" {
opts = append(opts, es.SetScheme("https"))
opts = append(opts, es.SetHttpClient(
&http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
},
},
},
))
}
if ins.User != nil {
username := ins.User.Username()
password, _ := ins.User.Password()
opts = append(opts, es.SetBasicAuth(username, password))
}
if cast.ToBool(ins.Query().Get("disable_sniff")) {
opts = append(opts, es.SetSniff(false))
}
if cast.ToBool(ins.Query().Get("health")) {
opts = append(opts, es.SetHealthcheckInterval(time.Minute))
opts = append(opts, es.SetHealthcheckTimeout(5*time.Second))
} else {
opts = append(opts, es.SetHealthcheck(false))
}
client, err = es.NewClient(opts...)
for idx := range shouldHosts {
if version, err = client.ElasticsearchVersion(shouldHosts[idx]); err != nil {
bads = append(bads, shouldHosts[idx])
continue
}
oks = append(oks, shouldHosts[idx])
}
switch len(oks) {
case 0:
return nil, fmt.Errorf("all nodes: %+v unavailable", shouldHosts)
case len(shouldHosts):
log.Info("connect to elastic[version: %s] success, all nodes: %+v available", version, shouldHosts)
default:
log.Warn("connect to elastic all nodes: %+v, err nodes: %v", shouldHosts, bads)
}
return client, err
}
func Init(ctx context.Context, uri string) (err error) {
Client, err = New(ctx, uri)
return err
}

View File

@ -1,6 +1,7 @@
package es
import (
"context"
elastic "github.com/elastic/go-elasticsearch/v7"
"github.com/loveuer/esgo2dump/xes/es7"
"github.com/loveuer/nf/nft/log"
@ -13,17 +14,27 @@ var (
Client *elastic.Client
)
func Init() error {
ins, err := url.Parse(opt.Cfg.ES.Uri)
if err != nil {
return err
func New(ctx context.Context, uri string) (*elastic.Client, error) {
var (
err error
client *elastic.Client
ins *url.URL
)
if ins, err = url.Parse(uri); err != nil {
return nil, err
}
log.Debug("es.InitClient url parse uri: %s, result: %+v", opt.Cfg.ES.Uri, ins)
if Client, err = es7.NewClient(tool.Timeout(10), ins); err != nil {
return err
if client, err = es7.NewClient(tool.Timeout(10), ins); err != nil {
return nil, err
}
return nil
return client, nil
}
func Init(ctx context.Context, uri string) (err error) {
Client, err = New(ctx, uri)
return err
}