From 580d189bad11fc1f9f5a1de3f1635bc40cd50c2a Mon Sep 17 00:00:00 2001 From: cfc4n Date: Sun, 11 Jun 2023 15:02:08 +0800 Subject: [PATCH 1/3] parse message using bytes.buffer. --- pkg/event_processor/http_request.go | 58 +++++++------- pkg/event_processor/http_response.go | 81 ++++++++++---------- pkg/event_processor/iworker.go | 108 +++++++++++++-------------- 3 files changed, 122 insertions(+), 125 deletions(-) diff --git a/pkg/event_processor/http_request.go b/pkg/event_processor/http_request.go index 7e38f1dff..55f5ca465 100644 --- a/pkg/event_processor/http_request.go +++ b/pkg/event_processor/http_request.go @@ -31,81 +31,81 @@ type HTTPRequest struct { bufReader *bufio.Reader } -func (this *HTTPRequest) Init() { - this.reader = bytes.NewBuffer(nil) - this.bufReader = bufio.NewReader(this.reader) +func (hr *HTTPRequest) Init() { + hr.reader = bytes.NewBuffer(nil) + hr.bufReader = bufio.NewReader(hr.reader) } -func (this *HTTPRequest) Name() string { +func (hr *HTTPRequest) Name() string { return "HTTPRequest" } -func (this *HTTPRequest) PacketType() PacketType { - return this.packerType +func (hr *HTTPRequest) PacketType() PacketType { + return hr.packerType } -func (this *HTTPRequest) ParserType() ParserType { +func (hr *HTTPRequest) ParserType() ParserType { return ParserTypeHttpRequest } -func (this *HTTPRequest) Write(b []byte) (int, error) { +func (hr *HTTPRequest) Write(b []byte) (int, error) { // 如果未初始化 - if !this.isInit { - n, e := this.reader.Write(b) + if !hr.isInit { + n, e := hr.reader.Write(b) if e != nil { return n, e } - req, err := http.ReadRequest(this.bufReader) + req, err := http.ReadRequest(hr.bufReader) if err != nil { return 0, err } - this.request = req - this.isInit = true + hr.request = req + hr.isInit = true return n, nil } // 如果已初始化 - l, e := this.reader.Write(b) + l, e := hr.reader.Write(b) if e != nil { return 0, e } // TODO 检测是否接收完整个包 if false { - this.isDone = true + hr.isDone = true } return l, nil } -func (this *HTTPRequest) detect(payload []byte) error { - //this.Init() +func (hr *HTTPRequest) detect(payload []byte) error { + //hr.Init() rd := bytes.NewReader(payload) buf := bufio.NewReader(rd) req, err := http.ReadRequest(buf) if err != nil { return err } - this.request = req + hr.request = req return nil } -func (this *HTTPRequest) IsDone() bool { - return this.isDone +func (hr *HTTPRequest) IsDone() bool { + return hr.isDone } -func (this *HTTPRequest) Reset() { - this.isDone = false - this.isInit = false - this.reader.Reset() - this.bufReader.Reset(this.reader) +func (hr *HTTPRequest) Reset() { + hr.isDone = false + hr.isInit = false + hr.reader.Reset() + hr.bufReader.Reset(hr.reader) } -func (this *HTTPRequest) Display() []byte { - if this.request.Proto == "HTTP/2.0" { - return this.reader.Bytes() +func (hr *HTTPRequest) Display() []byte { + if hr.request.Proto == "HTTP/2.0" { + return hr.reader.Bytes() } - b, e := httputil.DumpRequest(this.request, true) + b, e := httputil.DumpRequest(hr.request, true) if e != nil { log.Println("DumpRequest error:", e) return nil diff --git a/pkg/event_processor/http_response.go b/pkg/event_processor/http_response.go index 50e5d8a57..ffd8acf2d 100644 --- a/pkg/event_processor/http_response.go +++ b/pkg/event_processor/http_response.go @@ -39,118 +39,117 @@ type HTTPResponse struct { bufReader *bufio.Reader } -func (this *HTTPResponse) Init() { - this.reader = bytes.NewBuffer(nil) - this.bufReader = bufio.NewReader(this.reader) - this.receivedLen = 0 - this.headerLength = 0 +func (hr *HTTPResponse) Init() { + hr.reader = bytes.NewBuffer(nil) + hr.bufReader = bufio.NewReader(hr.reader) + hr.receivedLen = 0 + hr.headerLength = 0 } -func (this *HTTPResponse) Name() string { +func (hr *HTTPResponse) Name() string { return "HTTPResponse" } -func (this *HTTPResponse) PacketType() PacketType { - return this.packerType +func (hr *HTTPResponse) PacketType() PacketType { + return hr.packerType } -func (this *HTTPResponse) ParserType() ParserType { +func (hr *HTTPResponse) ParserType() ParserType { return ParserTypeHttpResponse } -func (this *HTTPResponse) Write(b []byte) (int, error) { +func (hr *HTTPResponse) Write(b []byte) (int, error) { var l int var e error var req *http.Response // 如果未初始化 - if !this.isInit { - l, e = this.reader.Write(b) + if !hr.isInit { + l, e = hr.reader.Write(b) if e != nil { return l, e } - req, e = http.ReadResponse(this.bufReader, nil) + req, e = http.ReadResponse(hr.bufReader, nil) if e != nil { return 0, e } - this.response = req - this.isInit = true + hr.response = req + hr.isInit = true } else { // 如果已初始化 - l, e = this.reader.Write(b) + l, e = hr.reader.Write(b) if e != nil { return 0, e } } - this.receivedLen += int64(l) + hr.receivedLen += int64(l) // 检测是否接收完整个包 - //if this.response.ContentLength >= this.receivedLen { + //if hr.response.ContentLength >= hr.receivedLen { if false { - this.isDone = true + hr.isDone = true } return l, nil } -func (this *HTTPResponse) detect(payload []byte) error { +func (hr *HTTPResponse) detect(payload []byte) error { rd := bytes.NewReader(payload) buf := bufio.NewReader(rd) res, err := http.ReadResponse(buf, nil) if err != nil { return err } - this.response = res + hr.response = res return nil } -func (this *HTTPResponse) IsDone() bool { - return this.isDone +func (hr *HTTPResponse) IsDone() bool { + return hr.isDone } -func (this *HTTPResponse) Reset() { - this.isDone = false - this.isInit = false - this.reader.Reset() - this.bufReader.Reset(this.reader) +func (hr *HTTPResponse) Reset() { + hr.isDone = false + hr.isInit = false + hr.reader.Reset() + hr.bufReader.Reset(hr.reader) } -func (this *HTTPResponse) Display() []byte { +func (hr *HTTPResponse) Display() []byte { var reader io.ReadCloser var err error - switch this.response.Header.Get("Content-Encoding") { + switch hr.response.Header.Get("Content-Encoding") { case "gzip": - reader, err = gzip.NewReader(this.response.Body) + reader, err = gzip.NewReader(hr.response.Body) if err != nil { log.Println(err) break } // gzip uncompressed success - this.response.Body = reader - this.packerType = PacketTypeGzip + hr.response.Body = reader + hr.packerType = PacketTypeGzip defer reader.Close() default: - //reader = this.response.Body - this.packerType = PacketTypeNull - //log.Println("not gzip content") + //reader = hr.response.Body + hr.packerType = PacketTypeNull //TODO for debug //return []byte("") } headerMap := bytes.NewBufferString("") - for k, v := range this.response.Header { + for k, v := range hr.response.Header { headerMap.WriteString(fmt.Sprintf("\t%s\t=>\t%s\n", k, v)) } - log.Printf("HTTPS Headers \n\t%s", headerMap.String()) + //log.Printf("HTTPS Headers \n\t%s", headerMap.String()) var b []byte var e error - if this.response.ContentLength == 0 { - b, e = httputil.DumpResponse(this.response, false) + if hr.response.ContentLength == 0 { + b, e = httputil.DumpResponse(hr.response, false) } else { - b, e = httputil.DumpResponse(this.response, true) + b, e = httputil.DumpResponse(hr.response, true) } if e != nil { log.Println("DumpResponse error:", e) diff --git a/pkg/event_processor/iworker.go b/pkg/event_processor/iworker.go index 093b35acf..a5e6252a1 100644 --- a/pkg/event_processor/iworker.go +++ b/pkg/event_processor/iworker.go @@ -15,6 +15,7 @@ package event_processor import ( + "bytes" "ecapture/user/event" "encoding/hex" "time" @@ -47,6 +48,7 @@ type eventWorker struct { UUID string processor *EventProcessor parser IParser + payload *bytes.Buffer } func NewEventWorker(uuid string, processor *EventProcessor) IWorker { @@ -58,99 +60,95 @@ func NewEventWorker(uuid string, processor *EventProcessor) IWorker { return eWorker } -func (this *eventWorker) init(uuid string, processor *EventProcessor) { - this.ticker = time.NewTicker(time.Millisecond * 100) - this.incoming = make(chan event.IEventStruct, MaxChanLen) - this.status = ProcessStateInit - this.UUID = uuid - this.processor = processor +func (ew *eventWorker) init(uuid string, processor *EventProcessor) { + ew.ticker = time.NewTicker(time.Millisecond * 100) + ew.incoming = make(chan event.IEventStruct, MaxChanLen) + ew.status = ProcessStateInit + ew.UUID = uuid + ew.processor = processor + ew.payload = bytes.NewBuffer(nil) + ew.payload.Reset() } -func (this *eventWorker) GetUUID() string { - return this.UUID +func (ew *eventWorker) GetUUID() string { + return ew.UUID } -func (this *eventWorker) Write(e event.IEventStruct) error { - this.incoming <- e +func (ew *eventWorker) Write(e event.IEventStruct) error { + ew.incoming <- e return nil } // 输出包内容 -func (this *eventWorker) Display() { - // 解析器类型检测 - if this.parser.ParserType() != ParserTypeHttpResponse { - //临时调试开关 - //return - } +func (ew *eventWorker) Display() { // 输出包内容 - b := this.parser.Display() + b := ew.parserEvents() if len(b) <= 0 { return } - if this.processor.isHex { + if ew.processor.isHex { b = []byte(hex.Dump(b)) } // TODO 格式化的终端输出 // 重置状态 - this.processor.GetLogger().Printf("UUID:%s, Name:%s, Type:%d, Length:%d", this.UUID, this.parser.Name(), this.parser.ParserType(), len(b)) - this.processor.GetLogger().Println("\n" + string(b)) - this.parser.Reset() + ew.processor.GetLogger().Printf("UUID:%s, Name:%s, Type:%d, Length:%d", ew.UUID, ew.parser.Name(), ew.parser.ParserType(), len(b)) + ew.processor.GetLogger().Println("\n" + string(b)) + ew.parser.Reset() // 设定状态、重置包类型 - this.status = ProcessStateDone - this.packetType = PacketTypeNull -} - -// 解析类型,输出 -func (this *eventWorker) parserEvent(e event.IEventStruct) { - if this.status == ProcessStateInit { - // 识别包类型,只检测,不把payload设置到parser的属性中,需要重新调用parser.Write()写入 - parser := NewParser(e.Payload()) - this.parser = parser - } - // 设定当前worker的状态为正在解析 - this.status = ProcessStateProcessing + ew.packetType = PacketTypeNull +} - // 写入payload到parser - _, err := this.parser.Write(e.Payload()[:e.PayloadLen()]) - if err != nil { - this.processor.GetLogger().Fatalf("eventWorker: detect packet type error, UUID:%s, error:%v", this.UUID, err) +func (ew *eventWorker) writeEvent(e event.IEventStruct) { + if ew.status != ProcessStateInit { + ew.processor.GetLogger().Printf("write events failed, unknow eventWorker status") + return } + ew.payload.Write(e.Payload()) +} - // 是否接收完成,能否输出 - if this.parser.IsDone() { - this.Display() +// 解析类型,输出 +func (ew *eventWorker) parserEvents() []byte { + ew.status = ProcessStateProcessing + parser := NewParser(ew.payload.Bytes()) + ew.parser = parser + n, e := ew.parser.Write(ew.payload.Bytes()) + if e != nil { + ew.processor.GetLogger().Printf("ew.parser write payload %n bytes, error:%v", n, e) } + ew.status = ProcessStateDone + ew.parser.Reset() + return ew.parser.Display() } -func (this *eventWorker) Run() { +func (ew *eventWorker) Run() { for { select { - case _ = <-this.ticker.C: + case _ = <-ew.ticker.C: // 输出包 - if this.tickerCount > MaxTickerCount { - this.processor.GetLogger().Printf("eventWorker TickerCount > %d, event closed.", MaxTickerCount) - this.Close() + if ew.tickerCount > MaxTickerCount { + //ew.processor.GetLogger().Printf("eventWorker TickerCount > %d, event closed.", MaxTickerCount) + ew.Close() return } - this.tickerCount++ - case e := <-this.incoming: + ew.tickerCount++ + case e := <-ew.incoming: // reset tickerCount - this.tickerCount = 0 - this.parserEvent(e) + ew.tickerCount = 0 + ew.writeEvent(e) } } } -func (this *eventWorker) Close() { +func (ew *eventWorker) Close() { // 即将关闭, 必须输出结果 - this.ticker.Stop() - this.Display() - this.tickerCount = 0 - this.processor.delWorkerByUUID(this) + ew.ticker.Stop() + ew.Display() + ew.tickerCount = 0 + ew.processor.delWorkerByUUID(ew) } From 63c2d44e07374e799fe0cf80d357ff4b19bfe801 Mon Sep 17 00:00:00 2001 From: cfc4n Date: Sun, 11 Jun 2023 15:20:00 +0800 Subject: [PATCH 2/3] tidy up parser code. --- pkg/event_processor/http_request.go | 3 +-- pkg/event_processor/http_response.go | 4 ++-- pkg/event_processor/iworker.go | 7 +++---- 3 files changed, 6 insertions(+), 8 deletions(-) diff --git a/pkg/event_processor/http_request.go b/pkg/event_processor/http_request.go index 55f5ca465..bdc8a83a6 100644 --- a/pkg/event_processor/http_request.go +++ b/pkg/event_processor/http_request.go @@ -69,7 +69,6 @@ func (hr *HTTPRequest) Write(b []byte) (int, error) { if e != nil { return 0, e } - // TODO 检测是否接收完整个包 if false { hr.isDone = true @@ -108,7 +107,7 @@ func (hr *HTTPRequest) Display() []byte { b, e := httputil.DumpRequest(hr.request, true) if e != nil { log.Println("DumpRequest error:", e) - return nil + return hr.reader.Bytes() } return b } diff --git a/pkg/event_processor/http_response.go b/pkg/event_processor/http_response.go index ffd8acf2d..dc7f321cf 100644 --- a/pkg/event_processor/http_response.go +++ b/pkg/event_processor/http_response.go @@ -152,8 +152,8 @@ func (hr *HTTPResponse) Display() []byte { b, e = httputil.DumpResponse(hr.response, true) } if e != nil { - log.Println("DumpResponse error:", e) - return []byte("") + log.Println("[http response] DumpResponse error:", e) + return hr.reader.Bytes() } return b } diff --git a/pkg/event_processor/iworker.go b/pkg/event_processor/iworker.go index a5e6252a1..15977f6e8 100644 --- a/pkg/event_processor/iworker.go +++ b/pkg/event_processor/iworker.go @@ -84,7 +84,7 @@ func (ew *eventWorker) Display() { // 输出包内容 b := ew.parserEvents() - + defer ew.parser.Reset() if len(b) <= 0 { return } @@ -97,9 +97,9 @@ func (ew *eventWorker) Display() { // 重置状态 ew.processor.GetLogger().Printf("UUID:%s, Name:%s, Type:%d, Length:%d", ew.UUID, ew.parser.Name(), ew.parser.ParserType(), len(b)) ew.processor.GetLogger().Println("\n" + string(b)) - ew.parser.Reset() + //ew.parser.Reset() // 设定状态、重置包类型 - + ew.status = ProcessStateInit ew.packetType = PacketTypeNull } @@ -121,7 +121,6 @@ func (ew *eventWorker) parserEvents() []byte { ew.processor.GetLogger().Printf("ew.parser write payload %n bytes, error:%v", n, e) } ew.status = ProcessStateDone - ew.parser.Reset() return ew.parser.Display() } From 1860340552e4b5cfd052b6fe654ce182f1be0d6d Mon Sep 17 00:00:00 2001 From: cfc4n Date: Sun, 11 Jun 2023 18:22:53 +0800 Subject: [PATCH 3/3] fix format type. --- pkg/event_processor/iworker.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/event_processor/iworker.go b/pkg/event_processor/iworker.go index 15977f6e8..61f2d919c 100644 --- a/pkg/event_processor/iworker.go +++ b/pkg/event_processor/iworker.go @@ -118,7 +118,7 @@ func (ew *eventWorker) parserEvents() []byte { ew.parser = parser n, e := ew.parser.Write(ew.payload.Bytes()) if e != nil { - ew.processor.GetLogger().Printf("ew.parser write payload %n bytes, error:%v", n, e) + ew.processor.GetLogger().Printf("ew.parser write payload %d bytes, error:%v", n, e) } ew.status = ProcessStateDone return ew.parser.Display()