diff --git a/pkg/event_processor/http_request.go b/pkg/event_processor/http_request.go index 7e38f1dff..bdc8a83a6 100644 --- a/pkg/event_processor/http_request.go +++ b/pkg/event_processor/http_request.go @@ -31,84 +31,83 @@ type HTTPRequest struct { bufReader *bufio.Reader } -func (this *HTTPRequest) Init() { - this.reader = bytes.NewBuffer(nil) - this.bufReader = bufio.NewReader(this.reader) +func (hr *HTTPRequest) Init() { + hr.reader = bytes.NewBuffer(nil) + hr.bufReader = bufio.NewReader(hr.reader) } -func (this *HTTPRequest) Name() string { +func (hr *HTTPRequest) Name() string { return "HTTPRequest" } -func (this *HTTPRequest) PacketType() PacketType { - return this.packerType +func (hr *HTTPRequest) PacketType() PacketType { + return hr.packerType } -func (this *HTTPRequest) ParserType() ParserType { +func (hr *HTTPRequest) ParserType() ParserType { return ParserTypeHttpRequest } -func (this *HTTPRequest) Write(b []byte) (int, error) { +func (hr *HTTPRequest) Write(b []byte) (int, error) { // 如果未初始化 - if !this.isInit { - n, e := this.reader.Write(b) + if !hr.isInit { + n, e := hr.reader.Write(b) if e != nil { return n, e } - req, err := http.ReadRequest(this.bufReader) + req, err := http.ReadRequest(hr.bufReader) if err != nil { return 0, err } - this.request = req - this.isInit = true + hr.request = req + hr.isInit = true return n, nil } // 如果已初始化 - l, e := this.reader.Write(b) + l, e := hr.reader.Write(b) if e != nil { return 0, e } - // TODO 检测是否接收完整个包 if false { - this.isDone = true + hr.isDone = true } return l, nil } -func (this *HTTPRequest) detect(payload []byte) error { - //this.Init() +func (hr *HTTPRequest) detect(payload []byte) error { + //hr.Init() rd := bytes.NewReader(payload) buf := bufio.NewReader(rd) req, err := http.ReadRequest(buf) if err != nil { return err } - this.request = req + hr.request = req return nil } -func (this *HTTPRequest) IsDone() bool { - return this.isDone +func (hr *HTTPRequest) IsDone() bool { + return hr.isDone } -func (this *HTTPRequest) Reset() { - this.isDone = false - this.isInit = false - this.reader.Reset() - this.bufReader.Reset(this.reader) +func (hr *HTTPRequest) Reset() { + hr.isDone = false + hr.isInit = false + hr.reader.Reset() + hr.bufReader.Reset(hr.reader) } -func (this *HTTPRequest) Display() []byte { - if this.request.Proto == "HTTP/2.0" { - return this.reader.Bytes() +func (hr *HTTPRequest) Display() []byte { + if hr.request.Proto == "HTTP/2.0" { + return hr.reader.Bytes() } - b, e := httputil.DumpRequest(this.request, true) + b, e := httputil.DumpRequest(hr.request, true) if e != nil { log.Println("DumpRequest error:", e) - return nil + return hr.reader.Bytes() } return b } diff --git a/pkg/event_processor/http_response.go b/pkg/event_processor/http_response.go index 50e5d8a57..dc7f321cf 100644 --- a/pkg/event_processor/http_response.go +++ b/pkg/event_processor/http_response.go @@ -39,122 +39,121 @@ type HTTPResponse struct { bufReader *bufio.Reader } -func (this *HTTPResponse) Init() { - this.reader = bytes.NewBuffer(nil) - this.bufReader = bufio.NewReader(this.reader) - this.receivedLen = 0 - this.headerLength = 0 +func (hr *HTTPResponse) Init() { + hr.reader = bytes.NewBuffer(nil) + hr.bufReader = bufio.NewReader(hr.reader) + hr.receivedLen = 0 + hr.headerLength = 0 } -func (this *HTTPResponse) Name() string { +func (hr *HTTPResponse) Name() string { return "HTTPResponse" } -func (this *HTTPResponse) PacketType() PacketType { - return this.packerType +func (hr *HTTPResponse) PacketType() PacketType { + return hr.packerType } -func (this *HTTPResponse) ParserType() ParserType { +func (hr *HTTPResponse) ParserType() ParserType { return ParserTypeHttpResponse } -func (this *HTTPResponse) Write(b []byte) (int, error) { +func (hr *HTTPResponse) Write(b []byte) (int, error) { var l int var e error var req *http.Response // 如果未初始化 - if !this.isInit { - l, e = this.reader.Write(b) + if !hr.isInit { + l, e = hr.reader.Write(b) if e != nil { return l, e } - req, e = http.ReadResponse(this.bufReader, nil) + req, e = http.ReadResponse(hr.bufReader, nil) if e != nil { return 0, e } - this.response = req - this.isInit = true + hr.response = req + hr.isInit = true } else { // 如果已初始化 - l, e = this.reader.Write(b) + l, e = hr.reader.Write(b) if e != nil { return 0, e } } - this.receivedLen += int64(l) + hr.receivedLen += int64(l) // 检测是否接收完整个包 - //if this.response.ContentLength >= this.receivedLen { + //if hr.response.ContentLength >= hr.receivedLen { if false { - this.isDone = true + hr.isDone = true } return l, nil } -func (this *HTTPResponse) detect(payload []byte) error { +func (hr *HTTPResponse) detect(payload []byte) error { rd := bytes.NewReader(payload) buf := bufio.NewReader(rd) res, err := http.ReadResponse(buf, nil) if err != nil { return err } - this.response = res + hr.response = res return nil } -func (this *HTTPResponse) IsDone() bool { - return this.isDone +func (hr *HTTPResponse) IsDone() bool { + return hr.isDone } -func (this *HTTPResponse) Reset() { - this.isDone = false - this.isInit = false - this.reader.Reset() - this.bufReader.Reset(this.reader) +func (hr *HTTPResponse) Reset() { + hr.isDone = false + hr.isInit = false + hr.reader.Reset() + hr.bufReader.Reset(hr.reader) } -func (this *HTTPResponse) Display() []byte { +func (hr *HTTPResponse) Display() []byte { var reader io.ReadCloser var err error - switch this.response.Header.Get("Content-Encoding") { + switch hr.response.Header.Get("Content-Encoding") { case "gzip": - reader, err = gzip.NewReader(this.response.Body) + reader, err = gzip.NewReader(hr.response.Body) if err != nil { log.Println(err) break } // gzip uncompressed success - this.response.Body = reader - this.packerType = PacketTypeGzip + hr.response.Body = reader + hr.packerType = PacketTypeGzip defer reader.Close() default: - //reader = this.response.Body - this.packerType = PacketTypeNull - //log.Println("not gzip content") + //reader = hr.response.Body + hr.packerType = PacketTypeNull //TODO for debug //return []byte("") } headerMap := bytes.NewBufferString("") - for k, v := range this.response.Header { + for k, v := range hr.response.Header { headerMap.WriteString(fmt.Sprintf("\t%s\t=>\t%s\n", k, v)) } - log.Printf("HTTPS Headers \n\t%s", headerMap.String()) + //log.Printf("HTTPS Headers \n\t%s", headerMap.String()) var b []byte var e error - if this.response.ContentLength == 0 { - b, e = httputil.DumpResponse(this.response, false) + if hr.response.ContentLength == 0 { + b, e = httputil.DumpResponse(hr.response, false) } else { - b, e = httputil.DumpResponse(this.response, true) + b, e = httputil.DumpResponse(hr.response, true) } if e != nil { - log.Println("DumpResponse error:", e) - return []byte("") + log.Println("[http response] DumpResponse error:", e) + return hr.reader.Bytes() } return b } diff --git a/pkg/event_processor/iworker.go b/pkg/event_processor/iworker.go index 093b35acf..61f2d919c 100644 --- a/pkg/event_processor/iworker.go +++ b/pkg/event_processor/iworker.go @@ -15,6 +15,7 @@ package event_processor import ( + "bytes" "ecapture/user/event" "encoding/hex" "time" @@ -47,6 +48,7 @@ type eventWorker struct { UUID string processor *EventProcessor parser IParser + payload *bytes.Buffer } func NewEventWorker(uuid string, processor *EventProcessor) IWorker { @@ -58,99 +60,94 @@ func NewEventWorker(uuid string, processor *EventProcessor) IWorker { return eWorker } -func (this *eventWorker) init(uuid string, processor *EventProcessor) { - this.ticker = time.NewTicker(time.Millisecond * 100) - this.incoming = make(chan event.IEventStruct, MaxChanLen) - this.status = ProcessStateInit - this.UUID = uuid - this.processor = processor +func (ew *eventWorker) init(uuid string, processor *EventProcessor) { + ew.ticker = time.NewTicker(time.Millisecond * 100) + ew.incoming = make(chan event.IEventStruct, MaxChanLen) + ew.status = ProcessStateInit + ew.UUID = uuid + ew.processor = processor + ew.payload = bytes.NewBuffer(nil) + ew.payload.Reset() } -func (this *eventWorker) GetUUID() string { - return this.UUID +func (ew *eventWorker) GetUUID() string { + return ew.UUID } -func (this *eventWorker) Write(e event.IEventStruct) error { - this.incoming <- e +func (ew *eventWorker) Write(e event.IEventStruct) error { + ew.incoming <- e return nil } // 输出包内容 -func (this *eventWorker) Display() { - // 解析器类型检测 - if this.parser.ParserType() != ParserTypeHttpResponse { - //临时调试开关 - //return - } +func (ew *eventWorker) Display() { // 输出包内容 - b := this.parser.Display() - + b := ew.parserEvents() + defer ew.parser.Reset() if len(b) <= 0 { return } - if this.processor.isHex { + if ew.processor.isHex { b = []byte(hex.Dump(b)) } // TODO 格式化的终端输出 // 重置状态 - this.processor.GetLogger().Printf("UUID:%s, Name:%s, Type:%d, Length:%d", this.UUID, this.parser.Name(), this.parser.ParserType(), len(b)) - this.processor.GetLogger().Println("\n" + string(b)) - this.parser.Reset() + ew.processor.GetLogger().Printf("UUID:%s, Name:%s, Type:%d, Length:%d", ew.UUID, ew.parser.Name(), ew.parser.ParserType(), len(b)) + ew.processor.GetLogger().Println("\n" + string(b)) + //ew.parser.Reset() // 设定状态、重置包类型 - this.status = ProcessStateDone - this.packetType = PacketTypeNull + ew.status = ProcessStateInit + ew.packetType = PacketTypeNull } -// 解析类型,输出 -func (this *eventWorker) parserEvent(e event.IEventStruct) { - if this.status == ProcessStateInit { - // 识别包类型,只检测,不把payload设置到parser的属性中,需要重新调用parser.Write()写入 - parser := NewParser(e.Payload()) - this.parser = parser - } - - // 设定当前worker的状态为正在解析 - this.status = ProcessStateProcessing - - // 写入payload到parser - _, err := this.parser.Write(e.Payload()[:e.PayloadLen()]) - if err != nil { - this.processor.GetLogger().Fatalf("eventWorker: detect packet type error, UUID:%s, error:%v", this.UUID, err) +func (ew *eventWorker) writeEvent(e event.IEventStruct) { + if ew.status != ProcessStateInit { + ew.processor.GetLogger().Printf("write events failed, unknow eventWorker status") + return } + ew.payload.Write(e.Payload()) +} - // 是否接收完成,能否输出 - if this.parser.IsDone() { - this.Display() +// 解析类型,输出 +func (ew *eventWorker) parserEvents() []byte { + ew.status = ProcessStateProcessing + parser := NewParser(ew.payload.Bytes()) + ew.parser = parser + n, e := ew.parser.Write(ew.payload.Bytes()) + if e != nil { + ew.processor.GetLogger().Printf("ew.parser write payload %d bytes, error:%v", n, e) } + ew.status = ProcessStateDone + return ew.parser.Display() } -func (this *eventWorker) Run() { +func (ew *eventWorker) Run() { for { select { - case _ = <-this.ticker.C: + case _ = <-ew.ticker.C: // 输出包 - if this.tickerCount > MaxTickerCount { - this.processor.GetLogger().Printf("eventWorker TickerCount > %d, event closed.", MaxTickerCount) - this.Close() + if ew.tickerCount > MaxTickerCount { + //ew.processor.GetLogger().Printf("eventWorker TickerCount > %d, event closed.", MaxTickerCount) + ew.Close() return } - this.tickerCount++ - case e := <-this.incoming: + ew.tickerCount++ + case e := <-ew.incoming: // reset tickerCount - this.tickerCount = 0 - this.parserEvent(e) + ew.tickerCount = 0 + ew.writeEvent(e) } } } -func (this *eventWorker) Close() { +func (ew *eventWorker) Close() { // 即将关闭, 必须输出结果 - this.ticker.Stop() - this.Display() - this.tickerCount = 0 - this.processor.delWorkerByUUID(this) + ew.ticker.Stop() + ew.Display() + ew.tickerCount = 0 + ew.processor.delWorkerByUUID(ew) }