80 lines
1.3 KiB
Go
80 lines
1.3 KiB
Go
package xfile
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"github.com/loveuer/nfflow/internal/interfaces"
|
|
"github.com/loveuer/nfflow/internal/sqlType"
|
|
"github.com/sirupsen/logrus"
|
|
"os"
|
|
)
|
|
|
|
type LocalFile struct {
|
|
writer *os.File
|
|
Path string `json:"path"`
|
|
MaxLine int `json:"max_line"`
|
|
}
|
|
|
|
func NewFileOutput(cfg sqlType.JSONB) (interfaces.Output, error) {
|
|
var (
|
|
err error
|
|
ins = &LocalFile{}
|
|
)
|
|
|
|
if err = cfg.Bind(ins); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if _, err = os.Stat(ins.Path); !errors.Is(err, os.ErrNotExist) {
|
|
return nil, errors.New("file already exist")
|
|
}
|
|
|
|
if ins.writer, err = os.OpenFile(ins.Path, os.O_CREATE|os.O_RDWR, 0644); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return ins, nil
|
|
}
|
|
|
|
func (lf *LocalFile) Start(ctx context.Context, rowCh <-chan interfaces.Row, errCh chan<- error) error {
|
|
var (
|
|
err error
|
|
bs []byte
|
|
ready = make(chan bool)
|
|
)
|
|
|
|
go func() {
|
|
ready <- true
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
logrus.Warn("received quit signal...")
|
|
return
|
|
case row, ok := <-rowCh:
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
if bs, err = row.Bytes(); err != nil {
|
|
errCh <- err
|
|
return
|
|
}
|
|
|
|
if _, err = lf.writer.Write(append(bs, '\n')); err != nil {
|
|
errCh <- err
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
|
|
<-ready
|
|
|
|
return nil
|
|
}
|
|
|
|
func (lf *LocalFile) Close() {
|
|
_ = lf.writer.Close()
|
|
}
|