26 Star 67 Fork 27

狂奔的蜗牛. / wmq-go

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
api.go 14.92 KB
一键复制 编辑 原始数据 按行查看 历史
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558
package main
import (
"bytes"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"os/exec"
"path/filepath"
"regexp"
"strconv"
"strings"
"time"
"os"
"github.com/Jeffail/gabs"
"github.com/buaazp/fasthttprouter"
"github.com/nu7hatch/gouuid"
logger "github.com/snail007/mini-logger"
"github.com/valyala/fasthttp"
)
var (
apiToken string
apiTimeout = time.Second * 30
)
func apiMessageAdd(ctx *fasthttp.RequestCtx) {
if !checkRequest(ctx) {
tokenError(ctx)
return
}
Name := string(ctx.QueryArgs().Peek("Name"))
Comment := string(ctx.QueryArgs().Peek("Comment"))
DurableS := string(ctx.QueryArgs().Peek("Durable"))
IsNeedTokenS := string(ctx.QueryArgs().Peek("IsNeedToken"))
Mode := string(ctx.QueryArgs().Peek("Mode"))
Token := string(ctx.QueryArgs().Peek("Token"))
if Name == "" || DurableS == "" || IsNeedTokenS == "" {
response(ctx, "", errors.New("args required.10001"))
return
}
if _, _, err := getMessage(Name); err == nil {
response(ctx, "", errors.New("message exists"))
return
}
Durable := false
if DurableS == "1" {
Durable = true
}
IsNeedToken := false
if IsNeedTokenS == "1" {
IsNeedToken = true
}
if IsNeedToken && Token == "" {
response(ctx, "", errors.New("message exists"))
return
}
if Mode != "fanout" && Mode != "topic" && Mode != "direct" {
response(ctx, "", errors.New("args required.10002"))
return
}
m := message{
Name: Name,
Durable: Durable,
Mode: Mode,
IsNeedToken: IsNeedToken,
Token: Token,
Comment: Comment,
Consumers: []consumer{},
}
err := addMessage(m)
if err == nil {
err = writeMessagesToFile(messages, cfg.GetString("consume.DataFile"))
}
response(ctx, err, err)
}
func apiMessageUpdate(ctx *fasthttp.RequestCtx) {
if !checkRequest(ctx) {
tokenError(ctx)
return
}
Name := string(ctx.QueryArgs().Peek("Name"))
Comment := string(ctx.QueryArgs().Peek("Comment"))
DurableS := string(ctx.QueryArgs().Peek("Durable"))
IsNeedTokenS := string(ctx.QueryArgs().Peek("IsNeedToken"))
Mode := string(ctx.QueryArgs().Peek("Mode"))
Token := string(ctx.QueryArgs().Peek("Token"))
if Name == "" || DurableS == "" || IsNeedTokenS == "" {
response(ctx, "", errors.New("args required.10003"))
return
}
if _, _, err := getMessage(Name); err != nil {
response(ctx, "", errors.New("message not found"))
return
}
Durable := false
if DurableS == "1" {
Durable = true
}
IsNeedToken := false
if IsNeedTokenS == "1" {
IsNeedToken = true
if Token == "" {
response(ctx, "", errors.New("args required.10013"))
return
}
}
if Mode != "fanout" && Mode != "topic" && Mode != "direct" {
response(ctx, "", errors.New("args required.10004"))
return
}
m := message{
Name: Name,
Durable: Durable,
Mode: Mode,
IsNeedToken: IsNeedToken,
Token: Token,
Comment: Comment,
}
err := updateMessage(m)
if err == nil {
err = writeMessagesToFile(messages, cfg.GetString("consume.DataFile"))
}
response(ctx, err, err)
}
func apiMessageDelete(ctx *fasthttp.RequestCtx) {
if !checkRequest(ctx) {
tokenError(ctx)
return
}
exchangeName := string(ctx.QueryArgs().Peek("Name"))
msg, _, err := getMessage(exchangeName)
if err != nil {
response(ctx, "", errors.New("message not found"))
return
}
err = deleteMessage(*msg)
if err != nil {
response(ctx, "", err)
return
}
err = writeMessagesToFile(messages, cfg.GetString("consume.DataFile"))
if err != nil {
response(ctx, "", err)
return
}
response(ctx, "", nil)
}
func apiMessageStatus(ctx *fasthttp.RequestCtx) {
if !checkRequest(ctx) {
tokenError(ctx)
return
}
exchangeName := string(ctx.QueryArgs().Peek("Name"))
j, err := statusMessage(exchangeName)
if err != nil {
response(ctx, "", err)
return
}
ctx.WriteString("{\"code\":1,\"data\":" + j + "}")
}
func apiConsumerAdd(ctx *fasthttp.RequestCtx) {
if !checkRequest(ctx) {
tokenError(ctx)
return
}
exchangeName := string(ctx.QueryArgs().Peek("Name"))
IDUUID, _ := uuid.NewV4()
Comment := string(ctx.QueryArgs().Peek("Comment"))
CodeS := string(ctx.QueryArgs().Peek("Code"))
CheckCodeS := string(ctx.QueryArgs().Peek("CheckCode"))
RouteKey := string(ctx.QueryArgs().Peek("RouteKey"))
TimeoutS := string(ctx.QueryArgs().Peek("Timeout"))
URL := string(ctx.QueryArgs().Peek("URL"))
if exchangeName == "" || CodeS == "" || CheckCodeS == "" || TimeoutS == "" || URL == "" {
response(ctx, "", errors.New("args required.10005"))
return
}
msg, _, err := getMessage(exchangeName)
if err != nil {
response(ctx, "", err)
return
}
ID := IDUUID.String()
CheckCode := false
if CheckCodeS == "1" {
CheckCode = true
}
if ok, err := regexp.Match(`[1-9]\d{1,2}`, []byte(CodeS)); !ok || err != nil {
response(ctx, "", errors.New("args required.10006"))
return
}
if ok, err := regexp.Match(`[1-9]\d*`, []byte(TimeoutS)); !ok || err != nil {
response(ctx, "", errors.New("args required.10007"))
return
}
CheckCode = true
codeI, _ := strconv.Atoi(CodeS)
TimeoutI, _ := strconv.Atoi(TimeoutS)
c := consumer{
ID: ID,
Comment: Comment,
CheckCode: CheckCode,
Code: float64(codeI),
Timeout: float64(TimeoutI),
URL: URL,
RouteKey: RouteKey,
}
err = addConsumer(*msg, c)
if err == nil {
err = writeMessagesToFile(messages, cfg.GetString("consume.DataFile"))
}
response(ctx, err, err)
}
func apiConsumerUpdate(ctx *fasthttp.RequestCtx) {
if !checkRequest(ctx) {
tokenError(ctx)
return
}
exchangeName := string(ctx.QueryArgs().Peek("Name"))
ID := string(ctx.QueryArgs().Peek("ID"))
Comment := string(ctx.QueryArgs().Peek("Comment"))
CodeS := string(ctx.QueryArgs().Peek("Code"))
CheckCodeS := string(ctx.QueryArgs().Peek("CheckCode"))
RouteKey := string(ctx.QueryArgs().Peek("RouteKey"))
TimeoutS := string(ctx.QueryArgs().Peek("Timeout"))
URL := string(ctx.QueryArgs().Peek("URL"))
if exchangeName == "" || CodeS == "" || CheckCodeS == "" || TimeoutS == "" || URL == "" {
response(ctx, "", errors.New("args required.10008"))
return
}
msg, _, err := getMessage(exchangeName)
if err != nil {
response(ctx, "", errors.New("message not found"))
return
}
c, _, _, err := getConsumer(exchangeName, ID)
if err != nil {
response(ctx, "", errors.New("consumer not found"))
return
}
CheckCode := false
if CheckCodeS == "1" {
CheckCode = true
}
if ok, err := regexp.Match(`[1-9]\d{1,2}`, []byte(CodeS)); !ok || err != nil {
response(ctx, "", errors.New("args required.10009"))
return
}
if ok, err := regexp.Match(`[1-9]\d*`, []byte(TimeoutS)); !ok || err != nil {
response(ctx, "", errors.New("args required.10010"))
return
}
CheckCode = true
codeI, _ := strconv.Atoi(CodeS)
TimeoutI, _ := strconv.Atoi(TimeoutS)
c0 := consumer{
ID: c.ID,
Comment: Comment,
CheckCode: CheckCode,
Code: float64(codeI),
Timeout: float64(TimeoutI),
URL: URL,
RouteKey: RouteKey,
}
err = updateConsumer(*msg, c0)
if err == nil {
err = writeMessagesToFile(messages, cfg.GetString("consume.DataFile"))
}
response(ctx, err, err)
}
func apiConsumerDelete(ctx *fasthttp.RequestCtx) {
if !checkRequest(ctx) {
tokenError(ctx)
return
}
exchangeName := string(ctx.QueryArgs().Peek("Name"))
ID := string(ctx.QueryArgs().Peek("ID"))
msg, _, err := getMessage(exchangeName)
if err != nil {
response(ctx, "", errors.New("message not found"))
return
}
c, _, _, err := getConsumer(exchangeName, ID)
if err != nil {
response(ctx, "", errors.New("consumer not found"))
return
}
err = deleteConsumer(*msg, *c)
if err != nil {
response(ctx, "", err)
return
}
err = writeMessagesToFile(messages, cfg.GetString("consume.DataFile"))
if err != nil {
response(ctx, "", err)
return
}
response(ctx, "", nil)
}
func apiConsumerStatus(ctx *fasthttp.RequestCtx) {
if !checkRequest(ctx) {
tokenError(ctx)
return
}
exchangeName := string(ctx.QueryArgs().Peek("Name"))
consumerID := string(ctx.QueryArgs().Peek("ID"))
j, e := statusConsumer(exchangeName, consumerID)
d := ""
if e == nil {
d = j.String()
ctx.WriteString("{\"code\":1,\"data\":" + d + "}")
} else {
response(ctx, e, e)
}
}
func apiPublish(ctx *fasthttp.RequestCtx) {
queryString := string(ctx.QueryArgs().QueryString())
exchangeName := ctx.UserValue("name").(string)
msg, _, err := getMessage(exchangeName)
if err != nil {
ctx.Response.SetStatusCode(fasthttp.StatusInternalServerError)
ctx.WriteString(err.Error())
return
}
tokenB := ctx.Request.Header.Peek("Token")
token := string(tokenB)
if msg.IsNeedToken && token != msg.Token {
ctx.Response.SetStatusCode(fasthttp.StatusInternalServerError)
ctx.WriteString("token error")
return
}
routeKeyB := ctx.Request.Header.Peek("RouteKey")
routeKey := string(routeKeyB)
method := strings.ToLower(string(ctx.Request.Header.Method()))
headerMap := make(map[string]string)
ignores := cfg.GetStringSlice("publish.IgnoreHeaders")
ctx.Request.Header.VisitAll(func(k, v []byte) {
var found = false
for _, ignore := range ignores {
k1 := strings.ToLower(strings.TrimSpace(string(k)))
k2 := strings.ToLower(strings.TrimSpace(ignore))
if k1 == k2 {
found = true
break
}
}
if !found {
headerMap[strings.TrimSpace(string(k))] = string(v)
}
})
encodeString := base64.StdEncoding.EncodeToString(ctx.Request.Body())
mqMessage := gabs.New()
a, _ := json.Marshal(headerMap)
mqMessage.Set(string(a), "header")
mqMessage.Set(ctx.RemoteIP(), "ip")
mqMessage.Set(encodeString, "body")
mqMessage.Set(method, "method")
mqMessage.Set(queryString, "args")
err = publish(mqMessage.String(), exchangeName, routeKey, token)
if err == nil {
ctx.Response.SetStatusCode(fasthttp.StatusNoContent)
return
}
ctx.Response.SetStatusCode(fasthttp.StatusInternalServerError)
ctx.WriteString(err.Error())
return
}
func apiReload(ctx *fasthttp.RequestCtx) {
if !checkRequest(ctx) {
tokenError(ctx)
return
}
reload()
response(ctx, "", nil)
}
func apiRestart(ctx *fasthttp.RequestCtx) {
if !checkRequest(ctx) {
tokenError(ctx)
return
}
err := restart()
response(ctx, "", err)
}
func apiConfig(ctx *fasthttp.RequestCtx) {
if !checkRequest(ctx) {
tokenError(ctx)
return
}
//j, e := config()
response(ctx, messages, nil)
}
func apiLogList(ctx *fasthttp.RequestCtx) {
if !checkRequest(ctx) {
tokenError(ctx)
return
}
f, _ := filepath.Abs(cfg.GetString("log.dir"))
fs, _ := filepath.Glob(f + "/*")
var list []string
for _, v := range fs {
list = append(list, filepath.Base(v))
}
response(ctx, list, nil)
}
func apiLogFile(ctx *fasthttp.RequestCtx) {
if !checkRequest(ctx) {
tokenError(ctx)
return
}
filename := string(ctx.QueryArgs().Peek("file"))
if filename == "" || strings.ContainsAny(filename, "/\\") {
response(ctx, "", errors.New("args required.10011"))
return
}
f, _ := filepath.Abs(cfg.GetString("log.dir"))
file := filepath.Join(f, filename)
if _, e := os.Stat(file); os.IsNotExist(e) {
response(ctx, "", errors.New("file not found"))
return
}
ctx.Response.Header.Set("Content-Type", "application/force-download")
ctx.Response.Header.Set("Content-Disposition", "attachment; filename=\""+filename+"\"")
ctx.Response.Header.Set("Content-Transfer-Encoding", "binary")
ctx.SendFile(file)
}
func apiLog(ctx *fasthttp.RequestCtx) {
if !checkRequest(ctx) {
tokenError(ctx)
return
}
keyword := string(ctx.QueryArgs().Peek("keyword"))
logType := string(ctx.QueryArgs().Peek("type"))
if logType == "" {
response(ctx, "", errors.New("args required.10012"))
return
}
file, _ := filepath.Abs(filepath.Join(cfg.GetString("log.dir"), logType) + ".log")
commandStr := ""
if keyword == "" {
commandStr = fmt.Sprintf("tail -n 100 %s", file)
} else {
commandStr = fmt.Sprintf("grep \"%s\" %s |tail -n 100", keyword, file)
}
cmd := exec.Command("bash", "-c", commandStr)
stdout := &bytes.Buffer{}
stderr := &bytes.Buffer{}
cmd.Stdout = stdout
cmd.Stderr = stderr
err := cmd.Run()
response(ctx, string(stdout.Bytes()), err)
}
func tokenError(ctx *fasthttp.RequestCtx) {
ctx.Response.SetBodyString("{code:0,data:\"token error\"}")
}
func checkRequest(ctx *fasthttp.RequestCtx) (ok bool) {
token := ctx.QueryArgs().Peek("api-token")
if token == nil || string(token) != apiToken {
ok = false
ctx.SetStatusCode(fasthttp.StatusInternalServerError)
return
}
ok = true
return
}
func timeoutFactory(h fasthttp.RequestHandler) fasthttp.RequestHandler {
return fasthttp.TimeoutHandler(h, apiTimeout, "timeout")
}
func response(ctx *fasthttp.RequestCtx, data interface{}, err error) {
callback := ctx.QueryArgs().Peek("callback")
callbackFunc := ""
if callback != nil && len(callback) > 0 {
callbackFunc = string(callback)
}
ja := gabs.New()
if err == nil {
ja.Set(1, "code")
ja.Set(data, "data")
} else {
ja.Set(0, "code")
ja.Set(err.Error(), "data")
}
if callbackFunc == "" {
fmt.Fprintf(ctx, ja.String())
} else {
fmt.Fprintf(ctx, callbackFunc+"("+ja.String()+")")
}
}
func serveAPI(listen, token string) (err error) {
ctx := log.With(logger.Fields{"func": "serveAPI"})
apiToken = token
router := fasthttprouter.New()
router.GET("/message/add", timeoutFactory(apiMessageAdd))
router.GET("/message/update", timeoutFactory(apiMessageUpdate))
router.GET("/message/delete", timeoutFactory(apiMessageDelete))
router.GET("/message/status", timeoutFactory(apiMessageStatus))
router.GET("/consumer/add", timeoutFactory(apiConsumerAdd))
router.GET("/consumer/update", timeoutFactory(apiConsumerUpdate))
router.GET("/consumer/delete", timeoutFactory(apiConsumerDelete))
router.GET("/consumer/status", timeoutFactory(apiConsumerStatus))
router.GET("/reload", timeoutFactory(apiReload))
router.GET("/restart", timeoutFactory(apiRestart))
router.GET("/config", timeoutFactory(apiConfig))
router.GET("/log", timeoutFactory(apiLog))
router.GET("/log/file", apiLogFile)
router.GET("/log/list", timeoutFactory(apiLogList))
ctx.Infof("Api service started")
var h = func(ctx *fasthttp.RequestCtx) {
defer access(ctx)
router.Handler(ctx)
}
if fasthttp.ListenAndServe(listen, h) == nil {
ctx.Safe().Fatalf("start api fail:%s", err)
}
return
}
func servePublish(listen string) (err error) {
ctx := log.With(logger.Fields{"func": "servePublish"})
router := fasthttprouter.New()
router.POST("/:name", timeoutFactory(apiPublish))
router.GET("/:name", timeoutFactory(apiPublish))
ctx.Infof("Publish service started")
var h = func(ctx *fasthttp.RequestCtx) {
defer access(ctx)
router.Handler(ctx)
}
if fasthttp.ListenAndServe(listen, h) == nil {
ctx.Safe().Fatalf("start publish fail:%s", err)
}
return
}
func access(ctx *fasthttp.RequestCtx) {
post := ""
if cfg.GetBool("log.post") {
post = string(ctx.Request.Body())
}
fields := logger.Fields{
"code": strconv.Itoa(ctx.Response.StatusCode()),
"uri": string(ctx.RequestURI()),
"remoteAddr": strings.Split(ctx.RemoteAddr().String(), ":")[0],
"method": string(ctx.Method()),
"host": string(ctx.Request.Host()),
"referer": string(ctx.Request.Header.Referer()),
"userAgent": string(ctx.Request.Header.UserAgent()),
"response": string(ctx.Response.Body()),
"post": post,
}
accessLog.With(fields).Info("")
}
Go
1
https://gitee.com/snail/wmq-go.git
git@gitee.com:snail/wmq-go.git
snail
wmq-go
wmq-go
master

搜索帮助