diff --git a/.gitignore b/.gitignore index f0bd72e..f21e67f 100644 --- a/.gitignore +++ b/.gitignore @@ -2,5 +2,7 @@ .vscode .DS_Store data.json +mapping.json +setting.json output.json *.txt \ No newline at end of file diff --git a/internal/cmd/run.go b/internal/cmd/run.go index ab492d3..4ecd795 100644 --- a/internal/cmd/run.go +++ b/internal/cmd/run.go @@ -1,6 +1,7 @@ package cmd import ( + "context" "encoding/json" "errors" "esgo2dump/internal/interfaces" @@ -17,17 +18,21 @@ import ( func run(cmd *cobra.Command, args []string) error { var ( - err error - ioi interfaces.DumpIO - ioo interfaces.DumpIO - lines []*interfaces.ESSource - succeed int + err error + ioi interfaces.DumpIO + ioo interfaces.DumpIO ) if opt.Debug { logrus.SetLevel(logrus.DebugLevel) } + switch f_type { + case "data", "mapping", "setting": + default: + return fmt.Errorf("unknown type=%s", f_type) + } + if ioi, err = newIO(f_input, interfaces.IOInput); err != nil { return err } @@ -36,13 +41,46 @@ func run(cmd *cobra.Command, args []string) error { return err } + defer func() { + _ = ioi.Close() + _ = ioo.Close() + }() + + switch f_type { + case "data": + return executeData(cmd.Context(), ioi, ioo) + case "mapping": + var mapping map[string]any + if mapping, err = ioi.ReadMapping(); err != nil { + return err + } + + return ioo.WriteMapping(mapping) + case "setting": + var setting map[string]any + if setting, err = ioi.ReadSetting(); err != nil { + return err + } + + return ioo.WriteSetting(setting) + default: + return fmt.Errorf("unknown type=%s", f_type) + } +} + +func executeData(ctx context.Context, input, output interfaces.DumpIO) error { + var ( + err error + lines []*interfaces.ESSource + succeed int + ) + for { select { - case <-cmd.Context().Done(): + case <-ctx.Done(): default: - succeed = 0 - if lines, err = ioi.Read(f_limit); err != nil { + if lines, err = input.ReadData(f_limit); err != nil { if errors.Is(err, io.EOF) { return nil } @@ -54,7 +92,7 @@ func run(cmd *cobra.Command, args []string) error { return nil } - if succeed, err = ioo.Write(lines); err != nil { + if succeed, err = output.WriteData(lines); err != nil { return err } diff --git a/internal/interfaces/dumpio.go b/internal/interfaces/dumpio.go index a3d77e2..203351c 100644 --- a/internal/interfaces/dumpio.go +++ b/internal/interfaces/dumpio.go @@ -1,8 +1,12 @@ package interfaces type DumpIO interface { - Write(docs []*ESSource) (int, error) - Read(int) ([]*ESSource, error) + ReadData(int) ([]*ESSource, error) + ReadMapping() (map[string]any, error) + ReadSetting() (map[string]any, error) + WriteData(docs []*ESSource) (int, error) + WriteMapping(map[string]any) error + WriteSetting(map[string]any) error Close() error IsInput() bool diff --git a/internal/interfaces/enum.go b/internal/interfaces/enum.go index d05d369..5012b1b 100644 --- a/internal/interfaces/enum.go +++ b/internal/interfaces/enum.go @@ -17,3 +17,11 @@ func (io IO) Code() string { return "unknown" } } + +type DataType int64 + +const ( + DataTypeData DataType = iota + DataTypeMapping + DataTypeSetting +) diff --git a/internal/xes/xes.go b/internal/xes/xes.go index 844abdf..e165f4c 100644 --- a/internal/xes/xes.go +++ b/internal/xes/xes.go @@ -82,6 +82,32 @@ type client struct { queryMap map[string]any } +func (c *client) ReadSetting() (map[string]any, error) { + r, err := c.c.Indices.GetSettings( + c.c.Indices.GetSettings.WithIndex(c.index), + ) + if err != nil { + return nil, err + } + + if r.StatusCode != 200 { + return nil, fmt.Errorf("status=%d, msg=%s", r.StatusCode, r.String()) + } + + m := make(map[string]any) + decoder := json.NewDecoder(r.Body) + if err = decoder.Decode(&m); err != nil { + return nil, err + } + + return m, nil +} + +func (c *client) WriteSetting(m map[string]any) error { + //TODO implement me + panic("implement me") +} + func (c *client) IsInput() bool { //TODO implement me panic("implement me") @@ -95,7 +121,7 @@ func (c *client) Close() error { return nil } -func (c *client) Write(docs []*interfaces.ESSource) (int, error) { +func (c *client) WriteData(docs []*interfaces.ESSource) (int, error) { var ( err error indexer esutil.BulkIndexer @@ -141,7 +167,7 @@ func (c *client) Write(docs []*interfaces.ESSource) (int, error) { return count, nil } -func (c *client) Read(i int) ([]*interfaces.ESSource, error) { +func (c *client) ReadData(i int) ([]*interfaces.ESSource, error) { var ( err error resp *esapi.Response @@ -194,3 +220,28 @@ func (c *client) Read(i int) ([]*interfaces.ESSource, error) { return result.Hits.Hits, nil } + +func (c *client) ReadMapping() (map[string]any, error) { + r, err := c.c.Indices.GetMapping( + c.c.Indices.GetMapping.WithIndex(c.index), + ) + if err != nil { + return nil, err + } + + if r.StatusCode != 200 { + return nil, fmt.Errorf("status=%d, msg=%s", r.StatusCode, r.String()) + } + + m := make(map[string]any) + decoder := json.NewDecoder(r.Body) + if err = decoder.Decode(&m); err != nil { + return nil, err + } + + return m, nil +} +func (c *client) WriteMapping(m map[string]any) error { + //TODO implement me + panic("implement me") +} diff --git a/internal/xfile/xfile.go b/internal/xfile/xfile.go index 91b964d..c1c4b20 100644 --- a/internal/xfile/xfile.go +++ b/internal/xfile/xfile.go @@ -13,6 +13,38 @@ type client struct { scanner *bufio.Scanner } +func (c *client) ReadMapping() (map[string]any, error) { + //TODO implement me + panic("implement me") +} + +func (c *client) ReadSetting() (map[string]any, error) { + //TODO implement me + panic("implement me") +} + +func (c *client) WriteMapping(m map[string]any) error { + bs, err := json.Marshal(m) + if err != nil { + return err + } + + _, err = c.f.Write(bs) + + return err +} + +func (c *client) WriteSetting(m map[string]any) error { + bs, err := json.Marshal(m) + if err != nil { + return err + } + + _, err = c.f.Write(bs) + + return err +} + func (c *client) IsInput() bool { //TODO implement me panic("implement me") @@ -22,7 +54,7 @@ func (c *client) IsFile() bool { return true } -func (c *client) Write(docs []*interfaces.ESSource) (int, error) { +func (c *client) WriteData(docs []*interfaces.ESSource) (int, error) { var ( err error bs []byte @@ -46,7 +78,7 @@ func (c *client) Write(docs []*interfaces.ESSource) (int, error) { return count, nil } -func (c *client) Read(i int) ([]*interfaces.ESSource, error) { +func (c *client) ReadData(i int) ([]*interfaces.ESSource, error) { var ( err error count = 0