package xfile import ( "context" "errors" "github.com/loveuer/nfflow/internal/interfaces" "github.com/loveuer/nfflow/internal/sqlType" "github.com/sirupsen/logrus" "os" ) type LocalFile struct { writer *os.File Path string `json:"path"` MaxLine int `json:"max_line"` } func NewFileOutput(cfg sqlType.JSONB) (interfaces.Output, error) { var ( err error ins = &LocalFile{} ) if err = cfg.Bind(ins); err != nil { return nil, err } if _, err = os.Stat(ins.Path); !errors.Is(err, os.ErrNotExist) { return nil, errors.New("file already exist") } if ins.writer, err = os.OpenFile(ins.Path, os.O_CREATE|os.O_RDWR, 0644); err != nil { return nil, err } return ins, nil } func (lf *LocalFile) Start(ctx context.Context, rowCh <-chan interfaces.Row, errCh chan<- error) error { var ( err error bs []byte ready = make(chan bool) ) go func() { ready <- true for { select { case <-ctx.Done(): logrus.Warn("received quit signal...") return case row, ok := <-rowCh: if !ok { return } if bs, err = row.Bytes(); err != nil { errCh <- err return } if _, err = lf.writer.Write(append(bs, '\n')); err != nil { errCh <- err return } } } }() <-ready return nil } func (lf *LocalFile) Close() { _ = lf.writer.Close() }