wip: mapping,setting import

This commit is contained in:
loveuer 2024-03-25 20:22:18 +08:00
parent b0b0ec2b58
commit d3524d0f05
6 changed files with 150 additions and 15 deletions

2
.gitignore vendored
View File

@ -2,5 +2,7 @@
.vscode .vscode
.DS_Store .DS_Store
data.json data.json
mapping.json
setting.json
output.json output.json
*.txt *.txt

View File

@ -1,6 +1,7 @@
package cmd package cmd
import ( import (
"context"
"encoding/json" "encoding/json"
"errors" "errors"
"esgo2dump/internal/interfaces" "esgo2dump/internal/interfaces"
@ -17,17 +18,21 @@ import (
func run(cmd *cobra.Command, args []string) error { func run(cmd *cobra.Command, args []string) error {
var ( var (
err error err error
ioi interfaces.DumpIO ioi interfaces.DumpIO
ioo interfaces.DumpIO ioo interfaces.DumpIO
lines []*interfaces.ESSource
succeed int
) )
if opt.Debug { if opt.Debug {
logrus.SetLevel(logrus.DebugLevel) logrus.SetLevel(logrus.DebugLevel)
} }
switch f_type {
case "data", "mapping", "setting":
default:
return fmt.Errorf("unknown type=%s", f_type)
}
if ioi, err = newIO(f_input, interfaces.IOInput); err != nil { if ioi, err = newIO(f_input, interfaces.IOInput); err != nil {
return err return err
} }
@ -36,13 +41,46 @@ func run(cmd *cobra.Command, args []string) error {
return err return err
} }
defer func() {
_ = ioi.Close()
_ = ioo.Close()
}()
switch f_type {
case "data":
return executeData(cmd.Context(), ioi, ioo)
case "mapping":
var mapping map[string]any
if mapping, err = ioi.ReadMapping(); err != nil {
return err
}
return ioo.WriteMapping(mapping)
case "setting":
var setting map[string]any
if setting, err = ioi.ReadSetting(); err != nil {
return err
}
return ioo.WriteSetting(setting)
default:
return fmt.Errorf("unknown type=%s", f_type)
}
}
func executeData(ctx context.Context, input, output interfaces.DumpIO) error {
var (
err error
lines []*interfaces.ESSource
succeed int
)
for { for {
select { select {
case <-cmd.Context().Done(): case <-ctx.Done():
default: default:
succeed = 0
if lines, err = ioi.Read(f_limit); err != nil { if lines, err = input.ReadData(f_limit); err != nil {
if errors.Is(err, io.EOF) { if errors.Is(err, io.EOF) {
return nil return nil
} }
@ -54,7 +92,7 @@ func run(cmd *cobra.Command, args []string) error {
return nil return nil
} }
if succeed, err = ioo.Write(lines); err != nil { if succeed, err = output.WriteData(lines); err != nil {
return err return err
} }

View File

@ -1,8 +1,12 @@
package interfaces package interfaces
type DumpIO interface { type DumpIO interface {
Write(docs []*ESSource) (int, error) ReadData(int) ([]*ESSource, error)
Read(int) ([]*ESSource, error) ReadMapping() (map[string]any, error)
ReadSetting() (map[string]any, error)
WriteData(docs []*ESSource) (int, error)
WriteMapping(map[string]any) error
WriteSetting(map[string]any) error
Close() error Close() error
IsInput() bool IsInput() bool

View File

@ -17,3 +17,11 @@ func (io IO) Code() string {
return "unknown" return "unknown"
} }
} }
type DataType int64
const (
DataTypeData DataType = iota
DataTypeMapping
DataTypeSetting
)

View File

@ -82,6 +82,32 @@ type client struct {
queryMap map[string]any queryMap map[string]any
} }
func (c *client) ReadSetting() (map[string]any, error) {
r, err := c.c.Indices.GetSettings(
c.c.Indices.GetSettings.WithIndex(c.index),
)
if err != nil {
return nil, err
}
if r.StatusCode != 200 {
return nil, fmt.Errorf("status=%d, msg=%s", r.StatusCode, r.String())
}
m := make(map[string]any)
decoder := json.NewDecoder(r.Body)
if err = decoder.Decode(&m); err != nil {
return nil, err
}
return m, nil
}
func (c *client) WriteSetting(m map[string]any) error {
//TODO implement me
panic("implement me")
}
func (c *client) IsInput() bool { func (c *client) IsInput() bool {
//TODO implement me //TODO implement me
panic("implement me") panic("implement me")
@ -95,7 +121,7 @@ func (c *client) Close() error {
return nil return nil
} }
func (c *client) Write(docs []*interfaces.ESSource) (int, error) { func (c *client) WriteData(docs []*interfaces.ESSource) (int, error) {
var ( var (
err error err error
indexer esutil.BulkIndexer indexer esutil.BulkIndexer
@ -141,7 +167,7 @@ func (c *client) Write(docs []*interfaces.ESSource) (int, error) {
return count, nil return count, nil
} }
func (c *client) Read(i int) ([]*interfaces.ESSource, error) { func (c *client) ReadData(i int) ([]*interfaces.ESSource, error) {
var ( var (
err error err error
resp *esapi.Response resp *esapi.Response
@ -194,3 +220,28 @@ func (c *client) Read(i int) ([]*interfaces.ESSource, error) {
return result.Hits.Hits, nil return result.Hits.Hits, nil
} }
func (c *client) ReadMapping() (map[string]any, error) {
r, err := c.c.Indices.GetMapping(
c.c.Indices.GetMapping.WithIndex(c.index),
)
if err != nil {
return nil, err
}
if r.StatusCode != 200 {
return nil, fmt.Errorf("status=%d, msg=%s", r.StatusCode, r.String())
}
m := make(map[string]any)
decoder := json.NewDecoder(r.Body)
if err = decoder.Decode(&m); err != nil {
return nil, err
}
return m, nil
}
func (c *client) WriteMapping(m map[string]any) error {
//TODO implement me
panic("implement me")
}

View File

@ -13,6 +13,38 @@ type client struct {
scanner *bufio.Scanner scanner *bufio.Scanner
} }
func (c *client) ReadMapping() (map[string]any, error) {
//TODO implement me
panic("implement me")
}
func (c *client) ReadSetting() (map[string]any, error) {
//TODO implement me
panic("implement me")
}
func (c *client) WriteMapping(m map[string]any) error {
bs, err := json.Marshal(m)
if err != nil {
return err
}
_, err = c.f.Write(bs)
return err
}
func (c *client) WriteSetting(m map[string]any) error {
bs, err := json.Marshal(m)
if err != nil {
return err
}
_, err = c.f.Write(bs)
return err
}
func (c *client) IsInput() bool { func (c *client) IsInput() bool {
//TODO implement me //TODO implement me
panic("implement me") panic("implement me")
@ -22,7 +54,7 @@ func (c *client) IsFile() bool {
return true return true
} }
func (c *client) Write(docs []*interfaces.ESSource) (int, error) { func (c *client) WriteData(docs []*interfaces.ESSource) (int, error) {
var ( var (
err error err error
bs []byte bs []byte
@ -46,7 +78,7 @@ func (c *client) Write(docs []*interfaces.ESSource) (int, error) {
return count, nil return count, nil
} }
func (c *client) Read(i int) ([]*interfaces.ESSource, error) { func (c *client) ReadData(i int) ([]*interfaces.ESSource, error) {
var ( var (
err error err error
count = 0 count = 0