wip v1.0.0
This commit is contained in:
89
internal/health/checker.go
Normal file
89
internal/health/checker.go
Normal file
@@ -0,0 +1,89 @@
|
||||
package health
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
)
|
||||
|
||||
type CheckResult int
|
||||
|
||||
const (
|
||||
CheckResultUnknown CheckResult = iota
|
||||
CheckResultSuccess
|
||||
CheckResultFailure
|
||||
)
|
||||
|
||||
func (r CheckResult) String() string {
|
||||
switch r {
|
||||
case CheckResultSuccess:
|
||||
return "SUCCESS"
|
||||
case CheckResultFailure:
|
||||
return "FAILURE"
|
||||
default:
|
||||
return "UNKNOWN"
|
||||
}
|
||||
}
|
||||
|
||||
type Checker interface {
|
||||
Check(ctx context.Context) CheckResult
|
||||
Name() string
|
||||
Type() string
|
||||
}
|
||||
|
||||
type CheckerConfig struct {
|
||||
Name string
|
||||
Type string
|
||||
Interval time.Duration
|
||||
Timeout time.Duration
|
||||
Rise int
|
||||
Fall int
|
||||
Config map[string]interface{}
|
||||
}
|
||||
|
||||
type CheckerState struct {
|
||||
Name string
|
||||
Healthy bool
|
||||
LastResult CheckResult
|
||||
LastCheckTime time.Time
|
||||
SuccessCount int
|
||||
FailureCount int
|
||||
TotalChecks int
|
||||
ConsecutiveOK int
|
||||
ConsecutiveFail int
|
||||
}
|
||||
|
||||
func (s *CheckerState) IsHealthy() bool {
|
||||
return s.Healthy
|
||||
}
|
||||
|
||||
func (s *CheckerState) Update(result CheckResult, rise, fall int) bool {
|
||||
s.LastResult = result
|
||||
s.LastCheckTime = time.Now()
|
||||
s.TotalChecks++
|
||||
|
||||
oldHealthy := s.Healthy
|
||||
|
||||
switch result {
|
||||
case CheckResultSuccess:
|
||||
s.SuccessCount++
|
||||
s.ConsecutiveOK++
|
||||
s.ConsecutiveFail = 0
|
||||
|
||||
if !s.Healthy && s.ConsecutiveOK >= rise {
|
||||
s.Healthy = true
|
||||
}
|
||||
|
||||
case CheckResultFailure:
|
||||
s.FailureCount++
|
||||
s.ConsecutiveFail++
|
||||
s.ConsecutiveOK = 0
|
||||
|
||||
if s.Healthy && s.ConsecutiveFail >= fall {
|
||||
s.Healthy = false
|
||||
}
|
||||
}
|
||||
|
||||
return s.Healthy != oldHealthy
|
||||
}
|
||||
|
||||
type StateChangeCallback func(name string, oldHealthy, newHealthy bool)
|
||||
56
internal/health/factory.go
Normal file
56
internal/health/factory.go
Normal file
@@ -0,0 +1,56 @@
|
||||
package health
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/loveuer/go-alived/pkg/config"
|
||||
"github.com/loveuer/go-alived/pkg/logger"
|
||||
)
|
||||
|
||||
func CreateChecker(cfg *config.HealthChecker) (Checker, error) {
|
||||
configMap, ok := cfg.Config.(map[string]interface{})
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("invalid config for checker %s", cfg.Name)
|
||||
}
|
||||
|
||||
switch cfg.Type {
|
||||
case "tcp":
|
||||
return NewTCPChecker(cfg.Name, configMap)
|
||||
case "http", "https":
|
||||
return NewHTTPChecker(cfg.Name, configMap)
|
||||
case "ping", "icmp":
|
||||
return NewPingChecker(cfg.Name, configMap)
|
||||
case "script":
|
||||
return NewScriptChecker(cfg.Name, configMap)
|
||||
default:
|
||||
return nil, fmt.Errorf("unsupported checker type: %s", cfg.Type)
|
||||
}
|
||||
}
|
||||
|
||||
func LoadFromConfig(cfg *config.Config, log *logger.Logger) (*Manager, error) {
|
||||
manager := NewManager(log)
|
||||
|
||||
for _, healthCfg := range cfg.Health {
|
||||
checker, err := CreateChecker(&healthCfg)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create checker %s: %w", healthCfg.Name, err)
|
||||
}
|
||||
|
||||
monitorCfg := &CheckerConfig{
|
||||
Name: healthCfg.Name,
|
||||
Type: healthCfg.Type,
|
||||
Interval: healthCfg.Interval,
|
||||
Timeout: healthCfg.Timeout,
|
||||
Rise: healthCfg.Rise,
|
||||
Fall: healthCfg.Fall,
|
||||
Config: healthCfg.Config.(map[string]interface{}),
|
||||
}
|
||||
|
||||
monitor := NewMonitor(checker, monitorCfg, log)
|
||||
manager.AddMonitor(monitor)
|
||||
|
||||
log.Info("loaded health checker: %s (type=%s)", healthCfg.Name, healthCfg.Type)
|
||||
}
|
||||
|
||||
return manager, nil
|
||||
}
|
||||
90
internal/health/http.go
Normal file
90
internal/health/http.go
Normal file
@@ -0,0 +1,90 @@
|
||||
package health
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"time"
|
||||
)
|
||||
|
||||
type HTTPChecker struct {
|
||||
name string
|
||||
url string
|
||||
method string
|
||||
expectedStatus int
|
||||
client *http.Client
|
||||
}
|
||||
|
||||
func NewHTTPChecker(name string, config map[string]interface{}) (*HTTPChecker, error) {
|
||||
url, ok := config["url"].(string)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("http checker: missing or invalid 'url' field")
|
||||
}
|
||||
|
||||
method := "GET"
|
||||
if m, ok := config["method"].(string); ok {
|
||||
method = m
|
||||
}
|
||||
|
||||
expectedStatus := 200
|
||||
if status, ok := config["expected_status"]; ok {
|
||||
switch v := status.(type) {
|
||||
case int:
|
||||
expectedStatus = v
|
||||
case float64:
|
||||
expectedStatus = int(v)
|
||||
}
|
||||
}
|
||||
|
||||
insecureSkipVerify := false
|
||||
if skip, ok := config["insecure_skip_verify"].(bool); ok {
|
||||
insecureSkipVerify = skip
|
||||
}
|
||||
|
||||
transport := &http.Transport{
|
||||
TLSClientConfig: &tls.Config{
|
||||
InsecureSkipVerify: insecureSkipVerify,
|
||||
},
|
||||
}
|
||||
|
||||
client := &http.Client{
|
||||
Transport: transport,
|
||||
Timeout: 30 * time.Second,
|
||||
}
|
||||
|
||||
return &HTTPChecker{
|
||||
name: name,
|
||||
url: url,
|
||||
method: method,
|
||||
expectedStatus: expectedStatus,
|
||||
client: client,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (c *HTTPChecker) Name() string {
|
||||
return c.name
|
||||
}
|
||||
|
||||
func (c *HTTPChecker) Type() string {
|
||||
return "http"
|
||||
}
|
||||
|
||||
func (c *HTTPChecker) Check(ctx context.Context) CheckResult {
|
||||
req, err := http.NewRequestWithContext(ctx, c.method, c.url, nil)
|
||||
if err != nil {
|
||||
return CheckResultFailure
|
||||
}
|
||||
|
||||
resp, err := c.client.Do(req)
|
||||
if err != nil {
|
||||
return CheckResultFailure
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode == c.expectedStatus {
|
||||
return CheckResultSuccess
|
||||
}
|
||||
|
||||
return CheckResultFailure
|
||||
}
|
||||
192
internal/health/monitor.go
Normal file
192
internal/health/monitor.go
Normal file
@@ -0,0 +1,192 @@
|
||||
package health
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/loveuer/go-alived/pkg/logger"
|
||||
)
|
||||
|
||||
type Monitor struct {
|
||||
checker Checker
|
||||
config *CheckerConfig
|
||||
state *CheckerState
|
||||
log *logger.Logger
|
||||
callbacks []StateChangeCallback
|
||||
|
||||
running bool
|
||||
stopCh chan struct{}
|
||||
wg sync.WaitGroup
|
||||
mu sync.RWMutex
|
||||
}
|
||||
|
||||
func NewMonitor(checker Checker, config *CheckerConfig, log *logger.Logger) *Monitor {
|
||||
return &Monitor{
|
||||
checker: checker,
|
||||
config: config,
|
||||
state: &CheckerState{
|
||||
Name: config.Name,
|
||||
Healthy: false,
|
||||
},
|
||||
log: log,
|
||||
callbacks: make([]StateChangeCallback, 0),
|
||||
stopCh: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Monitor) Start() {
|
||||
m.mu.Lock()
|
||||
if m.running {
|
||||
m.mu.Unlock()
|
||||
return
|
||||
}
|
||||
m.running = true
|
||||
m.mu.Unlock()
|
||||
|
||||
m.log.Info("[HealthCheck:%s] starting health check monitor (interval=%s, timeout=%s)",
|
||||
m.config.Name, m.config.Interval, m.config.Timeout)
|
||||
|
||||
m.wg.Add(1)
|
||||
go m.checkLoop()
|
||||
}
|
||||
|
||||
func (m *Monitor) Stop() {
|
||||
m.mu.Lock()
|
||||
if !m.running {
|
||||
m.mu.Unlock()
|
||||
return
|
||||
}
|
||||
m.running = false
|
||||
m.mu.Unlock()
|
||||
|
||||
m.log.Info("[HealthCheck:%s] stopping health check monitor", m.config.Name)
|
||||
close(m.stopCh)
|
||||
m.wg.Wait()
|
||||
}
|
||||
|
||||
func (m *Monitor) checkLoop() {
|
||||
defer m.wg.Done()
|
||||
|
||||
ticker := time.NewTicker(m.config.Interval)
|
||||
defer ticker.Stop()
|
||||
|
||||
m.performCheck()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-m.stopCh:
|
||||
return
|
||||
case <-ticker.C:
|
||||
m.performCheck()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Monitor) performCheck() {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), m.config.Timeout)
|
||||
defer cancel()
|
||||
|
||||
startTime := time.Now()
|
||||
result := m.checker.Check(ctx)
|
||||
duration := time.Since(startTime)
|
||||
|
||||
m.mu.Lock()
|
||||
oldHealthy := m.state.Healthy
|
||||
stateChanged := m.state.Update(result, m.config.Rise, m.config.Fall)
|
||||
newHealthy := m.state.Healthy
|
||||
callbacks := m.callbacks
|
||||
m.mu.Unlock()
|
||||
|
||||
m.log.Debug("[HealthCheck:%s] check completed: result=%s, duration=%s, healthy=%v",
|
||||
m.config.Name, result, duration, newHealthy)
|
||||
|
||||
if stateChanged {
|
||||
m.log.Info("[HealthCheck:%s] health state changed: %v -> %v (consecutive_ok=%d, consecutive_fail=%d)",
|
||||
m.config.Name, oldHealthy, newHealthy, m.state.ConsecutiveOK, m.state.ConsecutiveFail)
|
||||
|
||||
for _, callback := range callbacks {
|
||||
callback(m.config.Name, oldHealthy, newHealthy)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Monitor) OnStateChange(callback StateChangeCallback) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
m.callbacks = append(m.callbacks, callback)
|
||||
}
|
||||
|
||||
func (m *Monitor) GetState() *CheckerState {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
|
||||
stateCopy := *m.state
|
||||
return &stateCopy
|
||||
}
|
||||
|
||||
func (m *Monitor) IsHealthy() bool {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
return m.state.Healthy
|
||||
}
|
||||
|
||||
type Manager struct {
|
||||
monitors map[string]*Monitor
|
||||
mu sync.RWMutex
|
||||
log *logger.Logger
|
||||
}
|
||||
|
||||
func NewManager(log *logger.Logger) *Manager {
|
||||
return &Manager{
|
||||
monitors: make(map[string]*Monitor),
|
||||
log: log,
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Manager) AddMonitor(monitor *Monitor) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
m.monitors[monitor.config.Name] = monitor
|
||||
}
|
||||
|
||||
func (m *Manager) GetMonitor(name string) (*Monitor, bool) {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
monitor, ok := m.monitors[name]
|
||||
return monitor, ok
|
||||
}
|
||||
|
||||
func (m *Manager) StartAll() {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
|
||||
for _, monitor := range m.monitors {
|
||||
monitor.Start()
|
||||
}
|
||||
|
||||
m.log.Info("started %d health check monitor(s)", len(m.monitors))
|
||||
}
|
||||
|
||||
func (m *Manager) StopAll() {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
|
||||
for _, monitor := range m.monitors {
|
||||
monitor.Stop()
|
||||
}
|
||||
|
||||
m.log.Info("stopped all health check monitors")
|
||||
}
|
||||
|
||||
func (m *Manager) GetAllStates() map[string]*CheckerState {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
|
||||
states := make(map[string]*CheckerState)
|
||||
for name, monitor := range m.monitors {
|
||||
states[name] = monitor.GetState()
|
||||
}
|
||||
|
||||
return states
|
||||
}
|
||||
129
internal/health/ping.go
Normal file
129
internal/health/ping.go
Normal file
@@ -0,0 +1,129 @@
|
||||
package health
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"time"
|
||||
|
||||
"golang.org/x/net/icmp"
|
||||
"golang.org/x/net/ipv4"
|
||||
)
|
||||
|
||||
type PingChecker struct {
|
||||
name string
|
||||
host string
|
||||
count int
|
||||
timeout time.Duration
|
||||
}
|
||||
|
||||
func NewPingChecker(name string, config map[string]interface{}) (*PingChecker, error) {
|
||||
host, ok := config["host"].(string)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("ping checker: missing or invalid 'host' field")
|
||||
}
|
||||
|
||||
count := 1
|
||||
if c, ok := config["count"]; ok {
|
||||
switch v := c.(type) {
|
||||
case int:
|
||||
count = v
|
||||
case float64:
|
||||
count = int(v)
|
||||
}
|
||||
}
|
||||
|
||||
timeout := 2 * time.Second
|
||||
if t, ok := config["timeout"].(string); ok {
|
||||
if d, err := time.ParseDuration(t); err == nil {
|
||||
timeout = d
|
||||
}
|
||||
}
|
||||
|
||||
return &PingChecker{
|
||||
name: name,
|
||||
host: host,
|
||||
count: count,
|
||||
timeout: timeout,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (c *PingChecker) Name() string {
|
||||
return c.name
|
||||
}
|
||||
|
||||
func (c *PingChecker) Type() string {
|
||||
return "ping"
|
||||
}
|
||||
|
||||
func (c *PingChecker) Check(ctx context.Context) CheckResult {
|
||||
addr, err := net.ResolveIPAddr("ip4", c.host)
|
||||
if err != nil {
|
||||
return CheckResultFailure
|
||||
}
|
||||
|
||||
conn, err := icmp.ListenPacket("ip4:icmp", "0.0.0.0")
|
||||
if err != nil {
|
||||
return CheckResultFailure
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
successCount := 0
|
||||
for i := 0; i < c.count; i++ {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return CheckResultFailure
|
||||
default:
|
||||
}
|
||||
|
||||
if c.sendPing(conn, addr) {
|
||||
successCount++
|
||||
}
|
||||
}
|
||||
|
||||
if successCount > 0 {
|
||||
return CheckResultSuccess
|
||||
}
|
||||
|
||||
return CheckResultFailure
|
||||
}
|
||||
|
||||
func (c *PingChecker) sendPing(conn *icmp.PacketConn, addr *net.IPAddr) bool {
|
||||
msg := icmp.Message{
|
||||
Type: ipv4.ICMPTypeEcho,
|
||||
Code: 0,
|
||||
Body: &icmp.Echo{
|
||||
ID: 1234,
|
||||
Seq: 1,
|
||||
Data: []byte("go-alived-ping"),
|
||||
},
|
||||
}
|
||||
|
||||
msgBytes, err := msg.Marshal(nil)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
if _, err := conn.WriteTo(msgBytes, addr); err != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
conn.SetReadDeadline(time.Now().Add(c.timeout))
|
||||
|
||||
reply := make([]byte, 1500)
|
||||
n, _, err := conn.ReadFrom(reply)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
parsedMsg, err := icmp.ParseMessage(ipv4.ICMPTypeEchoReply.Protocol(), reply[:n])
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
if parsedMsg.Type == ipv4.ICMPTypeEchoReply {
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
73
internal/health/script.go
Normal file
73
internal/health/script.go
Normal file
@@ -0,0 +1,73 @@
|
||||
package health
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os/exec"
|
||||
"time"
|
||||
)
|
||||
|
||||
type ScriptChecker struct {
|
||||
name string
|
||||
script string
|
||||
args []string
|
||||
timeout time.Duration
|
||||
}
|
||||
|
||||
func NewScriptChecker(name string, config map[string]interface{}) (*ScriptChecker, error) {
|
||||
script, ok := config["script"].(string)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("script checker: missing or invalid 'script' field")
|
||||
}
|
||||
|
||||
var args []string
|
||||
if argsInterface, ok := config["args"].([]interface{}); ok {
|
||||
args = make([]string, len(argsInterface))
|
||||
for i, arg := range argsInterface {
|
||||
if argStr, ok := arg.(string); ok {
|
||||
args[i] = argStr
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
timeout := 10 * time.Second
|
||||
if t, ok := config["timeout"].(string); ok {
|
||||
if d, err := time.ParseDuration(t); err == nil {
|
||||
timeout = d
|
||||
}
|
||||
}
|
||||
|
||||
return &ScriptChecker{
|
||||
name: name,
|
||||
script: script,
|
||||
args: args,
|
||||
timeout: timeout,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (c *ScriptChecker) Name() string {
|
||||
return c.name
|
||||
}
|
||||
|
||||
func (c *ScriptChecker) Type() string {
|
||||
return "script"
|
||||
}
|
||||
|
||||
func (c *ScriptChecker) Check(ctx context.Context) CheckResult {
|
||||
cmdCtx, cancel := context.WithTimeout(ctx, c.timeout)
|
||||
defer cancel()
|
||||
|
||||
cmd := exec.CommandContext(cmdCtx, c.script, c.args...)
|
||||
|
||||
err := cmd.Run()
|
||||
if err != nil {
|
||||
if exitErr, ok := err.(*exec.ExitError); ok {
|
||||
if exitErr.ExitCode() != 0 {
|
||||
return CheckResultFailure
|
||||
}
|
||||
}
|
||||
return CheckResultFailure
|
||||
}
|
||||
|
||||
return CheckResultSuccess
|
||||
}
|
||||
61
internal/health/tcp.go
Normal file
61
internal/health/tcp.go
Normal file
@@ -0,0 +1,61 @@
|
||||
package health
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
)
|
||||
|
||||
type TCPChecker struct {
|
||||
name string
|
||||
host string
|
||||
port int
|
||||
}
|
||||
|
||||
func NewTCPChecker(name string, config map[string]interface{}) (*TCPChecker, error) {
|
||||
host, ok := config["host"].(string)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("tcp checker: missing or invalid 'host' field")
|
||||
}
|
||||
|
||||
var port int
|
||||
switch v := config["port"].(type) {
|
||||
case int:
|
||||
port = v
|
||||
case float64:
|
||||
port = int(v)
|
||||
default:
|
||||
return nil, fmt.Errorf("tcp checker: missing or invalid 'port' field")
|
||||
}
|
||||
|
||||
if port < 1 || port > 65535 {
|
||||
return nil, fmt.Errorf("tcp checker: invalid port number: %d", port)
|
||||
}
|
||||
|
||||
return &TCPChecker{
|
||||
name: name,
|
||||
host: host,
|
||||
port: port,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (c *TCPChecker) Name() string {
|
||||
return c.name
|
||||
}
|
||||
|
||||
func (c *TCPChecker) Type() string {
|
||||
return "tcp"
|
||||
}
|
||||
|
||||
func (c *TCPChecker) Check(ctx context.Context) CheckResult {
|
||||
addr := fmt.Sprintf("%s:%d", c.host, c.port)
|
||||
|
||||
var dialer net.Dialer
|
||||
conn, err := dialer.DialContext(ctx, "tcp", addr)
|
||||
if err != nil {
|
||||
return CheckResultFailure
|
||||
}
|
||||
|
||||
conn.Close()
|
||||
return CheckResultSuccess
|
||||
}
|
||||
Reference in New Issue
Block a user