From e5ae2efcef4fb9652f036557a3756db16088c651 Mon Sep 17 00:00:00 2001 From: loveuer Date: Wed, 4 Dec 2024 15:49:37 +0800 Subject: [PATCH] fix: update user nickname; feat: add database/kafka. --- .gitignore | 3 +- go.mod | 5 +- go.sum | 42 +++++- internal/controller/user.go | 26 ++-- internal/database/db/client.go | 19 ++- internal/database/db/db_test.go | 4 +- internal/database/db/{init.go => new.go} | 17 ++- internal/database/kafka/client.go | 74 +++++++++++ internal/database/kafka/client_test.go | 87 +++++++++++++ internal/database/kafka/consumer.go | 158 +++++++++++++++++++++++ internal/database/kafka/option.go | 42 ++++++ internal/database/kafka/writer.go | 82 ++++++++++++ internal/handler/user.go | 81 +++++++----- internal/interfaces/logger.go | 7 + internal/opt/var.go | 2 + 15 files changed, 585 insertions(+), 64 deletions(-) rename internal/database/db/{init.go => new.go} (73%) create mode 100644 internal/database/kafka/client.go create mode 100644 internal/database/kafka/client_test.go create mode 100644 internal/database/kafka/consumer.go create mode 100644 internal/database/kafka/option.go create mode 100644 internal/database/kafka/writer.go diff --git a/.gitignore b/.gitignore index b00fa04..30a4ace 100644 --- a/.gitignore +++ b/.gitignore @@ -5,4 +5,5 @@ *.sqlite *.sqlite3 xtest -*.sock \ No newline at end of file +*.sock +__debug* \ No newline at end of file diff --git a/go.mod b/go.mod index 670c874..c5372c0 100644 --- a/go.mod +++ b/go.mod @@ -15,13 +15,14 @@ require ( github.com/jedib0t/go-pretty/v6 v6.5.9 github.com/loveuer/esgo2dump v0.3.3 github.com/loveuer/go-sqlite3 v1.0.2 - github.com/loveuer/nf v0.2.8 + github.com/loveuer/nf v0.2.12 github.com/loveuer/ngorm/v2 v2.1.1 github.com/ncruces/go-sqlite3/gormlite v0.18.0 github.com/olivere/elastic/v7 v7.0.32 github.com/psanford/httpreadat v0.1.0 github.com/rabbitmq/amqp091-go v1.10.0 github.com/samber/lo v1.39.0 + github.com/segmentio/kafka-go v0.4.47 github.com/sirupsen/logrus v1.9.2 github.com/spf13/cast v1.6.0 github.com/spf13/cobra v1.8.1 @@ -52,12 +53,14 @@ require ( github.com/jinzhu/inflection v1.0.0 // indirect github.com/jinzhu/now v1.1.5 // indirect github.com/josharian/intern v1.0.0 // indirect + github.com/klauspost/compress v1.15.9 // indirect github.com/mailru/easyjson v0.7.7 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/mattn/go-runewidth v0.0.15 // indirect github.com/ncruces/go-sqlite3 v0.18.3 // indirect github.com/ncruces/julianday v1.0.0 // indirect + github.com/pierrec/lz4/v4 v4.1.15 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/rivo/uniseg v0.2.0 // indirect github.com/spf13/pflag v1.0.5 // indirect diff --git a/go.sum b/go.sum index 84ef304..d05c12a 100644 --- a/go.sum +++ b/go.sum @@ -134,6 +134,8 @@ github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/ github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/klauspost/compress v1.15.9 h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY= +github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= @@ -153,8 +155,8 @@ github.com/loveuer/esgo2dump v0.3.3 h1:/AidoaFV7bDRyT1ycyBKs4XGmyVs2ShaUKrpEBiUW github.com/loveuer/esgo2dump v0.3.3/go.mod h1:thZvfsO0kd7Ck3TA0jc9rRc4CuIa4Iuiq6tF3tCqXEY= github.com/loveuer/go-sqlite3 v1.0.2 h1:kcENqm6mt0wPH/N9Sw+6UC74qtU8o+aMEO04I62pjDE= github.com/loveuer/go-sqlite3 v1.0.2/go.mod h1:8+45etSlBYCtYP/ThX/e1wLgG+x6G6oXck2FhjC57tA= -github.com/loveuer/nf v0.2.8 h1:Qo0M748TglS6E5geh1LG0IBkrjLm+5yUs3II9l50tEQ= -github.com/loveuer/nf v0.2.8/go.mod h1:M6reF17/kJBis30H4DxR5hrtgo/oJL4AV4cBe4HzJLw= +github.com/loveuer/nf v0.2.12 h1:1Og+ORHsOWKFmy9kKJhjvXDkdbaurH82HjIxuGA3nNM= +github.com/loveuer/nf v0.2.12/go.mod h1:M6reF17/kJBis30H4DxR5hrtgo/oJL4AV4cBe4HzJLw= github.com/loveuer/ngorm/v2 v2.1.1 h1:v+ut5BjeSBFU87o800pI8Q3fXEOUAkvk5+btMw2oOEc= github.com/loveuer/ngorm/v2 v2.1.1/go.mod h1:BVhFGQsRMdcf08MtmwwRihwCR/x7wDd0Fzy8Xj+edM0= github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= @@ -185,6 +187,8 @@ github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU= github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE= github.com/onsi/gomega v1.18.1/go.mod h1:0q+aL8jAiMXy9hbwj2mr5GziHiwhAIQpFmmtT5hitRs= +github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0= +github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -207,6 +211,8 @@ github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQD github.com/samber/lo v1.39.0 h1:4gTz1wUhNYLhFSKl6O+8peW0v2F4BCY034GRpU9WnuA= github.com/samber/lo v1.39.0/go.mod h1:+m/ZKRl6ClXCE2Lgf3MsQlWfh4bn1bz6CXEOxnEXnEA= github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= +github.com/segmentio/kafka-go v0.4.47 h1:IqziR4pA3vrZq7YdRxaT3w1/5fvIH5qpCwstUanQQB0= +github.com/segmentio/kafka-go v0.4.47/go.mod h1:HjF6XbOKh0Pjlkr5GVZxt6CsjjwnmhVOfURM5KMd8qg= github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24/go.mod h1:M+9NzErvs504Cn4c5DxATwIqPbtswREoFCre64PpcG4= github.com/shopspring/decimal v1.2.0 h1:abSATXmQEYyShuxI4/vyW3tV1MrKAJzCZ/0zLUXYbsQ= github.com/shopspring/decimal v1.2.0/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o= @@ -244,7 +250,14 @@ github.com/tetratelabs/wazero v1.8.0 h1:iEKu0d4c2Pd+QSRieYbnQC9yiFlMS9D+Jr0LsRmc github.com/tetratelabs/wazero v1.8.0/go.mod h1:yAI0XTsMBhREkM/YDAK/zNou3GoiAce1P6+rp/wQhjs= github.com/vesoft-inc/nebula-go/v3 v3.5.0 h1:2ZSkoBxtIfs15AXJXqrAPDPd0Z9HrzKR7YKXPqlJcR0= github.com/vesoft-inc/nebula-go/v3 v3.5.0/go.mod h1:+sXv05jYQBARdTbTcIEsWVXCnF/6ttOlDK35xQ6m54s= +github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= +github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= +github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= +github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4= +github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8= +github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= github.com/zenazn/goji v0.9.0/go.mod h1:7S9M489iMyHBNxwZnk9/EHS098H4/F6TATF2mIxtB1Q= go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= @@ -268,7 +281,9 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20201203163018-be400aefbc4c/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A= golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -282,6 +297,8 @@ golang.org/x/lint v0.0.0-20210508222113-6edffad5e616/go.mod h1:3xt1FjdF8hUf6vQPI golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc= golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -292,6 +309,10 @@ golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= +golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys= golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -299,6 +320,8 @@ golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -315,13 +338,22 @@ golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= +golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= +golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= @@ -329,6 +361,10 @@ golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= +golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224= golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= @@ -345,6 +381,8 @@ golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtn golang.org/x/tools v0.0.0-20200103221440-774c71fcf114/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= golang.org/x/tools v0.1.2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/internal/controller/user.go b/internal/controller/user.go index c44fffe..60eaf0c 100644 --- a/internal/controller/user.go +++ b/internal/controller/user.go @@ -5,17 +5,19 @@ import ( "encoding/json" "errors" "fmt" - "github.com/loveuer/nf/nft/resp" - "github.com/spf13/cast" - "gorm.io/gorm" "strings" "time" + "ultone/internal/database/cache" "ultone/internal/database/db" "ultone/internal/log" "ultone/internal/model" "ultone/internal/opt" "ultone/internal/tool" + + "github.com/loveuer/nf/nft/resp" + "github.com/spf13/cast" + "gorm.io/gorm" ) type userController interface { @@ -25,7 +27,7 @@ type userController interface { CacheToken(ctx context.Context, token string, user *model.User) error RmToken(ctx context.Context, token string) error RmUserCache(ctx context.Context, id uint64) error - DeleteUser(ctx context.Context, id uint64) error + DeleteUser(ctx context.Context, target *model.User) error } type uc struct{} @@ -135,30 +137,26 @@ func (u uc) RmUserCache(ctx context.Context, id uint64) error { return cache.Client.Del(tool.Timeout(3), key) } -func (u uc) DeleteUser(ctx context.Context, id uint64) error { +func (u uc) DeleteUser(ctx context.Context, target *model.User) error { var ( err error now = time.Now() - username = "CONCAT(username, '@del')" + username = fmt.Sprintf("%s@%d", target.Username, now.UnixMilli()) ) - if opt.Cfg.DB.Type == "sqlite" { - username = "username || '@del'" - } - if err = db.Default.Session(tool.Timeout(5)). Model(&model.User{}). - Where("id = ?", id). + Where("id = ?", target.Id). Updates(map[string]any{ "deleted_at": now.UnixMilli(), - "username": gorm.Expr(username), + "username": username, }).Error; err != nil { return resp.NewError(500, "", err, nil) } if opt.EnableUserCache { - if err = u.RmUserCache(ctx, id); err != nil { - log.Warn(ctx, "controller.DeleteUser: rm user=%d cache err=%v", id, err) + if err = u.RmUserCache(ctx, target.Id); err != nil { + log.Warn(ctx, "controller.DeleteUser: rm user=%d cache err=%v", target.Id, err) } } diff --git a/internal/database/db/client.go b/internal/database/db/client.go index b00fd2c..780771f 100644 --- a/internal/database/db/client.go +++ b/internal/database/db/client.go @@ -3,25 +3,32 @@ package db import ( "context" "io" + "ultone/internal/opt" "ultone/internal/tool" "gorm.io/gorm" ) -var ( - Default *Client +var Default *Client + +type DBType string + +const ( + DBTypeSqlite = "sqlite" + DBTypeMysql = "mysql" + DBTypePostgres = "postgres" ) type Client struct { ctx context.Context cli *gorm.DB - ttype string + dbType DBType cfgSqlite *cfgSqlite } -func (c *Client) Type() string { - return c.ttype +func (c *Client) Type() DBType { + return c.dbType } func (c *Client) Session(ctxs ...context.Context) *gorm.DB { @@ -49,7 +56,7 @@ func (c *Client) Close() { // Dump // Only for sqlite with mem mode to dump data to bytes(io.Reader) func (c *Client) Dump() (reader io.ReadSeekCloser, ok bool) { - if c.ttype != "sqlite" { + if c.dbType != DBTypeSqlite { return nil, false } diff --git a/internal/database/db/db_test.go b/internal/database/db/db_test.go index 2fc6a57..0d28b38 100644 --- a/internal/database/db/db_test.go +++ b/internal/database/db/db_test.go @@ -8,7 +8,7 @@ import ( ) func TestOpen(t *testing.T) { - myClient, err := New(context.TODO(), "sqlite::", OptSqliteByMem()) + myClient, err := New(context.TODO(), "sqlite::", OptSqliteByMem(nil)) if err != nil { t.Fatalf("TestOpen: New err = %v", err) } @@ -37,7 +37,7 @@ func TestOpen(t *testing.T) { t.Fatalf("TestOpen: ReadAll err = %v", err) } - os.WriteFile("dump.db", bs, 0644) + os.WriteFile("dump.db", bs, 0o644) } myClient.Close() diff --git a/internal/database/db/init.go b/internal/database/db/new.go similarity index 73% rename from internal/database/db/init.go rename to internal/database/db/new.go index 01f3fb1..0be1ace 100644 --- a/internal/database/db/init.go +++ b/internal/database/db/new.go @@ -11,35 +11,38 @@ import ( ) func New(ctx context.Context, uri string, opts ...Option) (*Client, error) { - strs := strings.Split(uri, "::") + parts := strings.SplitN(uri, "::", 2) - if len(strs) != 2 { + if len(parts) != 2 { return nil, fmt.Errorf("db.Init: opt db uri invalid: %s", uri) } - c := &Client{ttype: strs[0], cfgSqlite: &cfgSqlite{fsType: "file"}} + c := &Client{cfgSqlite: &cfgSqlite{fsType: "file"}} for _, f := range opts { f(c) } var ( err error - dsn = strs[1] + dsn = parts[1] ) - switch strs[0] { + switch parts[0] { case "sqlite": + c.dbType = DBTypeSqlite err = openSqlite(c, dsn) case "mysql": + c.dbType = DBTypeMysql c.cli, err = gorm.Open(mysql.Open(dsn)) case "postgres": + c.dbType = DBTypePostgres c.cli, err = gorm.Open(postgres.Open(dsn)) default: - return nil, fmt.Errorf("db type only support: [sqlite, mysql, postgres], unsupported db type: %s", strs[0]) + return nil, fmt.Errorf("db type only support: [sqlite, mysql, postgres], unsupported db type: %s", parts[0]) } if err != nil { - return nil, fmt.Errorf("db.Init: open %s with dsn:%s, err: %w", strs[0], dsn, err) + return nil, fmt.Errorf("db.Init: open %s with dsn:%s, err: %w", parts[0], dsn, err) } return c, nil diff --git a/internal/database/kafka/client.go b/internal/database/kafka/client.go new file mode 100644 index 0000000..ca99066 --- /dev/null +++ b/internal/database/kafka/client.go @@ -0,0 +1,74 @@ +package kafka + +import ( + "context" + "fmt" + "sync" + "time" + + "ultone/internal/interfaces" + + "github.com/loveuer/nf/nft/log" + kfkgo "github.com/segmentio/kafka-go" + "github.com/segmentio/kafka-go/sasl" +) + +var Client *client + +type client struct { + sync.Mutex + ctx context.Context + d *kfkgo.Dialer + topic string + partition int + reconnection bool + mechanism sasl.Mechanism + address string + logger interfaces.Logger + writer *kfkgo.Writer +} + +func Init(address string, opts ...OptionFn) error { + c, err := New(address, opts...) + if err != nil { + return err + } + + Client = c + + return nil +} + +func New(address string, opts ...OptionFn) (*client, error) { + c := &client{} + + if address == "" { + return nil, fmt.Errorf("address required") + } + + for _, fn := range opts { + fn(c) + } + + c.address = address + + if c.ctx == nil { + c.ctx = context.Background() + } + + if c.logger == nil { + c.logger = log.New() + } + + dia := &kfkgo.Dialer{ + Timeout: 30 * time.Second, + } + + if c.mechanism != nil { + dia.SASLMechanism = c.mechanism + } + + c.d = dia + + return c, nil +} diff --git a/internal/database/kafka/client_test.go b/internal/database/kafka/client_test.go new file mode 100644 index 0000000..594b326 --- /dev/null +++ b/internal/database/kafka/client_test.go @@ -0,0 +1,87 @@ +package kafka + +import ( + "context" + "os/signal" + "syscall" + "testing" + "time" + + "ultone/internal/tool" + + "github.com/loveuer/nf/nft/log" +) + +func TestKafka(t *testing.T) { + ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) + defer cancel() + + client, err := New("10.220.10.15:9092", + WithTopic("test_zyp"), + WithPlainAuth("admin", "Yhblsqt@!."), + WithReconnection(), + ) + if err != nil { + t.Fatal(1, err) + } + + ch, err := client.ReadMessage(ctx, ReadConfig{}) + if err != nil { + t.Fatal(2, err) + } + + for msg := range ch { + if msg.err != nil { + t.Logf("[Error] [TestKafka] msg.err = %v", msg.err) + continue + } + + t.Logf("[Info ] [TestKafka] [time = %s] [msg.topic = %s] [msg.key = %s] [msg.value = %s]", time.Now().Format("060102T150405"), msg.Topic, string(msg.Key), string(msg.Value)) + } +} + +func TestKafkaWrite(t *testing.T) { + log.SetLogLevel(log.LogLevelDebug) + client, err := New("10.220.10.15:9092", + WithTopic("test_zyp"), + WithPlainAuth("admin", "Yhblsqt@!."), + WithReconnection(), + ) + if err != nil { + t.Fatal(1, err) + } + + if err = client.WriteMessages(tool.Timeout(5), + &Payload{ + Key: []byte(time.Now().Format("2006/01/02 15:04:05")), + Value: []byte(tool.RandomString(16)), + }, + &Payload{ + Key: []byte(time.Now().Format("2006/01/02 15:04:05")), + Value: []byte(tool.RandomString(16)), + }, + &Payload{ + Key: []byte(time.Now().Format("2006/01/02 15:04:05")), + Value: []byte(tool.RandomString(16)), + }, + ); err != nil { + t.Log(2, err) + } + + if err = client.WriteMessages(context.Background(), + &Payload{ + Key: []byte(time.Now().Format("2006/01/02 15:04:05")), + Value: []byte(tool.RandomString(16)), + }, + &Payload{ + Key: []byte(time.Now().Format("2006/01/02 15:04:05")), + Value: []byte(tool.RandomString(16)), + }, + &Payload{ + Key: []byte(time.Now().Format("2006/01/02 15:04:05")), + Value: []byte(tool.RandomString(16)), + }, + ); err != nil { + t.Log(3, err) + } +} diff --git a/internal/database/kafka/consumer.go b/internal/database/kafka/consumer.go new file mode 100644 index 0000000..3861f9e --- /dev/null +++ b/internal/database/kafka/consumer.go @@ -0,0 +1,158 @@ +package kafka + +import ( + "context" + "errors" + "io" + "time" + + "ultone/internal/tool" + + kfkgo "github.com/segmentio/kafka-go" +) + +type Message struct { + kfkgo.Message + err error +} + +type ReadConfig struct { + // MaxBytes: read buffer max bytes + /* + - default 1MB + */ + MaxBytes int + + // FirstOffset + /* + - false: use last offset(-1) + - true: use first offset(-2) + - default: false + - more: [about group offset](https://github.com/segmentio/kafka-go/blob/main/reader.go#L16) + */ + FirstOffset bool + + Topic string + Group string + + // Timeout: every read max duration + /* + - default: 30 seconds (same with kafka-go default) + */ + Timeout int +} + +var defaultReadConfig = ReadConfig{ + // 1 MB + MaxBytes: 1e6, + Group: "default", + Timeout: 30, +} + +func (c *client) ReadMessage(ctx context.Context, configs ...ReadConfig) (<-chan *Message, error) { + var ( + err error + cfg = ReadConfig{} + ch = make(chan *Message, 1) + retry = 0 + ) + + if len(configs) > 0 { + cfg = configs[0] + } + + if cfg.Group == "" { + cfg.Group = defaultReadConfig.Group + } + + if cfg.MaxBytes <= 0 { + cfg.MaxBytes = defaultReadConfig.MaxBytes + } + + if cfg.Timeout <= 0 { + cfg.Timeout = defaultReadConfig.Timeout + } + + offset := kfkgo.LastOffset + if cfg.FirstOffset { + offset = kfkgo.FirstOffset + } + + rc := kfkgo.ReaderConfig{ + Brokers: []string{c.address}, + GroupID: cfg.Group, + Topic: c.topic, + Partition: c.partition, + Dialer: c.d, + MaxBytes: cfg.MaxBytes, + StartOffset: offset, + } + + if err = rc.Validate(); err != nil { + return nil, err + } + + r := kfkgo.NewReader(rc) + + go func() { + defer func() { + close(ch) + _ = r.Close() + }() + Loop: + for { + select { + case <-ctx.Done(): + close(ch) + _ = r.Close() + return + default: + msg, err := r.ReadMessage(tool.TimeoutCtx(ctx, cfg.Timeout)) + if err != nil { + if errors.Is(err, context.DeadlineExceeded) { + continue Loop + } + + if errors.Is(err, context.Canceled) { + return + } + + c.logger.Debug("kafka.ReadMessage: err = %s", err.Error()) + + if errors.Is(err, io.EOF) { + return + } + + if errors.Is(err, io.ErrShortBuffer) { + ch <- &Message{ + Message: msg, + err: err, + } + continue Loop + } + + if c.reconnection { + retry++ + c.logger.Warn("kafka.ReadMessage: reconnection after 30 seconds, times = %d, err = %s", retry, err.Error()) + time.Sleep(30 * time.Second) + continue Loop + } + + ch <- &Message{ + Message: msg, + err: err, + } + + return + } + + ch <- &Message{ + Message: msg, + err: nil, + } + } + } + }() + + return ch, nil +} diff --git a/internal/database/kafka/option.go b/internal/database/kafka/option.go new file mode 100644 index 0000000..3e85430 --- /dev/null +++ b/internal/database/kafka/option.go @@ -0,0 +1,42 @@ +package kafka + +import ( + "ultone/internal/interfaces" + + "github.com/segmentio/kafka-go/sasl/plain" +) + +type OptionFn func(*client) + +func WithPlainAuth(username, password string) OptionFn { + return func(c *client) { + c.mechanism = plain.Mechanism{ + Username: username, + Password: password, + } + } +} + +func WithTopic(topic string) OptionFn { + return func(c *client) { + c.topic = topic + } +} + +func WithPartition(partition int) OptionFn { + return func(c *client) { + c.partition = partition + } +} + +func WithReconnection() OptionFn { + return func(c *client) { + c.reconnection = true + } +} + +func WithLogger(logger interfaces.Logger) OptionFn { + return func(c *client) { + c.logger = logger + } +} diff --git a/internal/database/kafka/writer.go b/internal/database/kafka/writer.go new file mode 100644 index 0000000..2e66f53 --- /dev/null +++ b/internal/database/kafka/writer.go @@ -0,0 +1,82 @@ +package kafka + +import ( + "context" + "crypto/tls" + "errors" + "time" + + kfkgo "github.com/segmentio/kafka-go" +) + +type Payload struct { + Key []byte + Value []byte + Headers []kfkgo.Header + WriterData any +} + +func (c *client) WriteMessages(ctx context.Context, payloads ...*Payload) error { + if len(payloads) == 0 { + return nil + } + + times := 0 +Retry: + if c.writer == nil { + c.Lock() + c.writer = &kfkgo.Writer{ + Addr: kfkgo.TCP(c.address), + Topic: c.topic, + Balancer: &kfkgo.Hash{}, + WriteTimeout: 0, + RequiredAcks: 0, + Async: false, + Transport: &kfkgo.Transport{ + DialTimeout: 30 * time.Second, + TLS: &tls.Config{InsecureSkipVerify: true}, // todo + SASL: c.mechanism, + Context: c.ctx, + }, + AllowAutoTopicCreation: true, + } + c.Unlock() + } + + now := time.Now() + + msgs := make([]kfkgo.Message, 0, len(payloads)) + + for _, item := range payloads { + msgs = append(msgs, kfkgo.Message{ + Key: item.Key, + Value: item.Value, + Headers: item.Headers, + WriterData: item.WriterData, + Time: now, + }) + } + + context.WithoutCancel(ctx) + if err := c.writer.WriteMessages(ctx, msgs...); err != nil { + if errors.Is(err, context.DeadlineExceeded) { + goto HandleError + } + + if c.reconnection { + times++ + c.logger.Warn("kafka.WriteMessage: reconnection after 30 seconds, times = %d, err = %s", times, err.Error()) + time.Sleep(30 * time.Second) + c.Lock() + c.writer = nil + c.Unlock() + goto Retry + } + + HandleError: + c.logger.Warn("kafka.WriteMessage: err = %s", err.Error()) + return err + } + + return nil +} diff --git a/internal/handler/user.go b/internal/handler/user.go index 4903d89..c52d106 100644 --- a/internal/handler/user.go +++ b/internal/handler/user.go @@ -3,12 +3,9 @@ package handler import ( "errors" "fmt" - "github.com/loveuer/nf" - "github.com/loveuer/nf/nft/resp" - "github.com/samber/lo" - "gorm.io/gorm" - "gorm.io/gorm/clause" + "net/http" "time" + "ultone/internal/controller" "ultone/internal/database/cache" "ultone/internal/database/db" @@ -17,6 +14,12 @@ import ( "ultone/internal/opt" "ultone/internal/sqlType" "ultone/internal/tool" + + "github.com/loveuer/nf" + "github.com/loveuer/nf/nft/resp" + "github.com/samber/lo" + "gorm.io/gorm" + "gorm.io/gorm/clause" ) func AuthLogin(c *nf.Ctx) error { @@ -140,6 +143,7 @@ func UserUpdate(c *nf.Ctx) error { type Req struct { OldPassword string `json:"old_password"` NewPassword string `json:"new_password"` + Nickname string `json:"nickname"` } type Model struct { @@ -147,11 +151,13 @@ func UserUpdate(c *nf.Ctx) error { } var ( - ok bool - err error - req = new(Req) - user *model.User - m = new(Model) + ok bool + err error + req = new(Req) + user *model.User + m = new(Model) + updates = make(map[string]any) + changes = make(map[string]any) ) if user, ok = c.Locals("user").(*model.User); !ok { @@ -162,42 +168,55 @@ func UserUpdate(c *nf.Ctx) error { return resp.Resp400(c, err) } - if req.OldPassword == "" || req.NewPassword == "" { - return resp.Resp400(c, req) + if err = c.BodyParser(&changes); err != nil { + return resp.Resp400(c, err) } - if err = tool.CheckPassword(req.NewPassword); err != nil { - return resp.Resp400(c, req, err.Error()) + if _, ok = changes["nickname"]; ok { + updates["nickname"] = req.Nickname } - if err = db.Default.Session(tool.Timeout(3)). - Select("password"). - Model(&model.User{}). - Where("username = ?", user.Username). - Where("deleted_at = 0"). - Take(m). - Error; err != nil { - return resp.Resp500(c, err.Error()) + if req.OldPassword != "" && req.NewPassword != "" { + if err = tool.CheckPassword(req.NewPassword); err != nil { + return resp.Resp400(c, req, err.Error()) + } + + if err = db.Default.Session(tool.Timeout(3)). + Select("password"). + Model(&model.User{}). + Where("username = ?", user.Username). + Where("deleted_at = 0"). + Take(m). + Error; err != nil { + return resp.Resp500(c, err.Error()) + } + + if !tool.ComparePassword(req.OldPassword, m.Password) { + return resp.Resp400(c, nil, "原密码错误") + } + + updates["password"] = tool.NewPassword(req.NewPassword) } - if !tool.ComparePassword(req.OldPassword, m.Password) { - return resp.Resp400(c, nil, "原密码错误") + if len(updates) == 0 { + return resp.Resp400(c, nf.Map{"req": req, "reason": "nothing to update"}, "没有需要更新的内容") } if err = db.Default.Session(tool.Timeout(5)). Model(&model.User{}). Where("id = ?", user.Id). - Update("password", tool.NewPassword(req.NewPassword)). + Updates(updates). Error; err != nil { return resp.Resp500(c, err.Error()) } - _ = controller.UserController.RmUserCache(c.Context(), user.Id) - // todo delete token + if _, ok = updates["password"]; ok { + _ = controller.UserController.RmUserCache(c.Context(), user.Id) + c.SetHeader("Set-Cookie", fmt.Sprintf("%s=;Path=/", opt.CookieName)) + return c.Redirect(opt.LoginURL, http.StatusFound) + } - c.SetHeader("Set-Cookie", fmt.Sprintf("%s=;Path=/", opt.CookieName)) - - return resp.Resp200(c, nil, "修改成功, 请重新登录") + return resp.Resp200(c, nil, "修改成功") } func ManageUserList(c *nf.Ctx) error { @@ -553,7 +572,7 @@ func ManageUserDelete(c *nf.Ctx) error { return resp.Resp403(c, nil) } - if err = controller.UserController.DeleteUser(c.Context(), target.Id); err != nil { + if err = controller.UserController.DeleteUser(c.Context(), target); err != nil { return resp.RespError(c, err) } diff --git a/internal/interfaces/logger.go b/internal/interfaces/logger.go index 8e75d58..6fc633e 100644 --- a/internal/interfaces/logger.go +++ b/internal/interfaces/logger.go @@ -5,3 +5,10 @@ type OpLogger interface { Render(content map[string]any) (string, error) Template() string } + +type Logger interface { + Debug(msg string, data ...any) + Info(msg string, data ...any) + Warn(msg string, data ...any) + Error(msg string, data ...any) +} diff --git a/internal/opt/var.go b/internal/opt/var.go index 1935749..43748da 100644 --- a/internal/opt/var.go +++ b/internal/opt/var.go @@ -41,6 +41,8 @@ const ( OpLogWriteDurationSecond = 5 LocalTraceKey = "X-Trace-Id" + + LoginURL = "/login" ) var (