Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 29 additions & 3 deletions server/querier/app/distributed-tracing/router/tracemap.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,36 @@
package router

import (
"github.com/deepflowio/deepflow/server/querier/app/distributed-tracing/service/tracemap"
"github.com/gin-gonic/gin"
"github.com/gin-gonic/gin/binding"

"github.com/deepflowio/deepflow/server/querier/app/distributed-tracing/service/common"
"github.com/deepflowio/deepflow/server/querier/app/distributed-tracing/service/model"
"github.com/deepflowio/deepflow/server/querier/app/distributed-tracing/service/tracemap"
"github.com/deepflowio/deepflow/server/querier/config"
"github.com/deepflowio/deepflow/server/querier/router"
)

func TraceMapRouter(e *gin.Engine) {
e.GET("/trace_map", tracemap.TraceMap)
func TraceMapRouter(e *gin.Engine, cfg *config.QuerierConfig) {
e.GET("/trace_map", traceMap(cfg))
}

func traceMap(cfg *config.QuerierConfig) gin.HandlerFunc {
return gin.HandlerFunc(func(c *gin.Context) {
var traceMap model.TraceMap

// 参数校验
err := c.ShouldBindBodyWith(&traceMap, binding.JSON)
if err != nil {
router.BadRequestResponse(c, common.INVALID_POST_DATA, err.Error())
return
}
traceMap.Context = c.Request.Context()
traceMap.OrgID = c.Request.Header.Get(common.HEADER_KEY_X_ORG_ID)
result, debug, err := tracemap.TraceMap(traceMap, cfg)
if err == nil && !traceMap.Debug {
debug = nil
}
router.JsonResponse(c, result, debug, err)
})
}
19 changes: 19 additions & 0 deletions server/querier/app/distributed-tracing/service/common/const.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package common

const (
SUCCESS = "SUCCESS"
FAIL = "FAIL"
INVALID_PARAMETERS = "INVALID_PARAMETERS"
INVALID_POST_DATA = "INVALID_POST_DATA"
SERVER_ERROR = "SERVER_ERROR"
)

const (
DATABASE_FLOW_LOG = "flow_log"
TABLE_L7_FLOW_LOG = "l7_flow_log"
TAG_TRACE_ID = "trace_id"
)

const (
HEADER_KEY_X_ORG_ID = "X-Org-Id"
)
82 changes: 82 additions & 0 deletions server/querier/app/distributed-tracing/service/config/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package config

import (
"io/ioutil"
"os"
"reflect"
"regexp"

"strings"

"github.com/op/go-logging"
"gopkg.in/yaml.v2"
)

var log = logging.MustGetLogger("tracemap")
var Cfg *TraceMapConfig

type Config struct {
TraceMapConfig TraceMapConfig `yaml:"trace-map"`
}

type TraceMapConfig struct {
TotalTracesCountMax int `default:"100000" yaml:"total_traces_count_max"`
BatchTracesCountMax int `default:"1000" yaml:"batch_traces_count_max"`
Querier Querier `yaml:"querier"`
}

type Querier struct {
Host string `default:"127.0.0.1" yaml:"host"`
Port int `default:"20416" yaml:"port"`
}

func (c *Config) expendEnv() {
reConfig := reflect.ValueOf(&c.TraceMapConfig)
reConfig = reConfig.Elem()
for i := 0; i < reConfig.NumField(); i++ {
field := reConfig.Field(i)
switch field.Type().String() {
case "string":
fieldStr := field.String()
pattern := regexp.MustCompile(`\$\{(.+?)\}`)
p := pattern.FindAllSubmatch([]byte(fieldStr), -1)
for _, i := range p {
str := string(i[1])
fieldStr = strings.Replace(fieldStr, string(i[0]), os.Getenv(str), 1)
}
field.SetString(fieldStr)
}
}
}

func (c *Config) Validate() error {
return nil
}

func (c *Config) Load(path string) {
configBytes, err := ioutil.ReadFile(path)
if err != nil {
log.Error("Read config file error:", err, path)
os.Exit(1)
}

if err = yaml.Unmarshal(configBytes, c); err != nil {
log.Error("Unmarshal yaml error:", err)
os.Exit(1)
}

if err = c.Validate(); err != nil {
log.Error(err)
os.Exit(1)
}
c.expendEnv()
}

func DefaultConfig() *Config {
cfg := &Config{}
if err := SetDefault(cfg); err != nil {
log.Error(err)
os.Exit(1)
}
return cfg
}
196 changes: 196 additions & 0 deletions server/querier/app/distributed-tracing/service/config/defaults.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
package config

import (
"encoding/json"
"errors"
"reflect"
"strconv"
"time"
)

var (
errInvalidType = errors.New("not a struct pointer")
)

const (
fieldName = "default"
)

// Set initializes members in a struct referenced by a pointer.
// Maps and slices are initialized by `make` and other primitive types are set with default values.
// `ptr` should be a struct pointer
func SetDefault(ptr interface{}) error {
if reflect.TypeOf(ptr).Kind() != reflect.Ptr {
return errInvalidType
}

v := reflect.ValueOf(ptr).Elem()
t := v.Type()

if t.Kind() != reflect.Struct {
return errInvalidType
}

for i := 0; i < t.NumField(); i++ {
if defaultVal := t.Field(i).Tag.Get(fieldName); defaultVal != "-" {
if err := setField(v.Field(i), defaultVal); err != nil {
return err
}
}
}
callSetter(ptr)
return nil
}

func setField(field reflect.Value, defaultVal string) error {
if !field.CanSet() {
return nil
}

if !shouldInitializeField(field, defaultVal) {
return nil
}

isInitial := isInitialValue(field)
if isInitial {
switch field.Kind() {
case reflect.Bool:
if val, err := strconv.ParseBool(defaultVal); err == nil {
field.Set(reflect.ValueOf(val).Convert(field.Type()))
}
case reflect.Int:
if val, err := strconv.ParseInt(defaultVal, 0, strconv.IntSize); err == nil {
field.Set(reflect.ValueOf(int(val)).Convert(field.Type()))
}
case reflect.Int8:
if val, err := strconv.ParseInt(defaultVal, 0, 8); err == nil {
field.Set(reflect.ValueOf(int8(val)).Convert(field.Type()))
}
case reflect.Int16:
if val, err := strconv.ParseInt(defaultVal, 0, 16); err == nil {
field.Set(reflect.ValueOf(int16(val)).Convert(field.Type()))
}
case reflect.Int32:
if val, err := strconv.ParseInt(defaultVal, 0, 32); err == nil {
field.Set(reflect.ValueOf(int32(val)).Convert(field.Type()))
}
case reflect.Int64:
if val, err := time.ParseDuration(defaultVal); err == nil {
field.Set(reflect.ValueOf(val).Convert(field.Type()))
} else if val, err := strconv.ParseInt(defaultVal, 0, 64); err == nil {
field.Set(reflect.ValueOf(val).Convert(field.Type()))
}
case reflect.Uint:
if val, err := strconv.ParseUint(defaultVal, 0, strconv.IntSize); err == nil {
field.Set(reflect.ValueOf(uint(val)).Convert(field.Type()))
}
case reflect.Uint8:
if val, err := strconv.ParseUint(defaultVal, 0, 8); err == nil {
field.Set(reflect.ValueOf(uint8(val)).Convert(field.Type()))
}
case reflect.Uint16:
if val, err := strconv.ParseUint(defaultVal, 0, 16); err == nil {
field.Set(reflect.ValueOf(uint16(val)).Convert(field.Type()))
}
case reflect.Uint32:
if val, err := strconv.ParseUint(defaultVal, 0, 32); err == nil {
field.Set(reflect.ValueOf(uint32(val)).Convert(field.Type()))
}
case reflect.Uint64:
if val, err := strconv.ParseUint(defaultVal, 0, 64); err == nil {
field.Set(reflect.ValueOf(val).Convert(field.Type()))
}
case reflect.Uintptr:
if val, err := strconv.ParseUint(defaultVal, 0, strconv.IntSize); err == nil {
field.Set(reflect.ValueOf(uintptr(val)).Convert(field.Type()))
}
case reflect.Float32:
if val, err := strconv.ParseFloat(defaultVal, 32); err == nil {
field.Set(reflect.ValueOf(float32(val)).Convert(field.Type()))
}
case reflect.Float64:
if val, err := strconv.ParseFloat(defaultVal, 64); err == nil {
field.Set(reflect.ValueOf(val).Convert(field.Type()))
}
case reflect.String:
field.Set(reflect.ValueOf(defaultVal).Convert(field.Type()))

case reflect.Slice:
ref := reflect.New(field.Type())
ref.Elem().Set(reflect.MakeSlice(field.Type(), 0, 0))
if defaultVal != "" && defaultVal != "[]" {
if err := json.Unmarshal([]byte(defaultVal), ref.Interface()); err != nil {
return err
}
}
field.Set(ref.Elem().Convert(field.Type()))
case reflect.Map:
ref := reflect.New(field.Type())
ref.Elem().Set(reflect.MakeMap(field.Type()))
if defaultVal != "" && defaultVal != "{}" {
if err := json.Unmarshal([]byte(defaultVal), ref.Interface()); err != nil {
return err
}
}
field.Set(ref.Elem().Convert(field.Type()))
case reflect.Struct:
if defaultVal != "" && defaultVal != "{}" {
if err := json.Unmarshal([]byte(defaultVal), field.Addr().Interface()); err != nil {
return err
}
}
case reflect.Ptr:
field.Set(reflect.New(field.Type().Elem()))
}
}

switch field.Kind() {
case reflect.Ptr:
if isInitial || field.Elem().Kind() == reflect.Struct {
setField(field.Elem(), defaultVal)
callSetter(field.Interface())
}
case reflect.Struct:
if err := SetDefault(field.Addr().Interface()); err != nil {
return err
}
case reflect.Slice:
for j := 0; j < field.Len(); j++ {
if err := setField(field.Index(j), defaultVal); err != nil {
return err
}
}
}

return nil
}

func isInitialValue(field reflect.Value) bool {
return reflect.DeepEqual(reflect.Zero(field.Type()).Interface(), field.Interface())
}

func shouldInitializeField(field reflect.Value, tag string) bool {
switch field.Kind() {
case reflect.Struct:
return true
case reflect.Ptr:
if !field.IsNil() && field.Elem().Kind() == reflect.Struct {
return true
}
case reflect.Slice:
return field.Len() > 0 || tag != ""
}

return tag != ""
}

// Setter is an interface for setting default values
type Setter interface {
SetDefaults()
}

func callSetter(v interface{}) {
if ds, ok := v.(Setter); ok {
ds.SetDefaults()
}
}
22 changes: 22 additions & 0 deletions server/querier/app/distributed-tracing/service/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package service

import (
"encoding/json"
)

type ServiceError struct {
Status string
Message string
}

func (e *ServiceError) Error() string {
err, _ := json.Marshal(e)
return string(err)
}

func NewError(status string, message string) error {
return &ServiceError{
Status: status,
Message: message,
}
}
29 changes: 29 additions & 0 deletions server/querier/app/distributed-tracing/service/model/model.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package model

import "context"

type TraceMap struct {
QueryCondition string `json:"query_condition"`
TimeStart int `json:"time_start" binding:"required"`
TimeEnd int `json:"time_end" binding:"required"`
Debug bool `json:"debug"`
Context context.Context
OrgID string
}

type Debug struct {
IP string `json:"ip"`
Sql string `json:"sql"`
SqlCH string `json:"sql_CH"`
QueryTime string `json:"query_time"`
QueryUUID string `json:"query_uuid"`
Error string `json:"error"`
}

type TraceMapDebug struct {
QuerierDebug []Debug `json:"querier_debug"`
FormatTime string `json:"format_time"`
}

type TraceMapTree struct {
}
Loading