diff --git a/example.config.toml b/example.config.toml index 28158736d..c0ca32c6f 100644 --- a/example.config.toml +++ b/example.config.toml @@ -213,42 +213,20 @@ idle = "1m" # does not really matter), while websites pump heavy content in HTTP2 streams # # It means that statistically there is a different between traffic shape: -# TLS packet sizes are different, delays between packets are also different. +# delays between packets are also different. # In order to avoid censorship detection based on these patterns, there is a # mtg subsystem called "Doppelganger" that aims to mimic website statistics # as close as it could. # -# It does that by 2 ideas: -# 1. Delays between TLS packets are not constant. There are many factors -# that come in play. Application should generate some response, it could -# send some headers first and stream content with chunked encoding. So -# some first packets could come as soon as possible, with some delays -# after first ones. Such phenomenon is described by different statistic -# distribution. There are 2 distribution that describe it: lognormal -# distribution and Weibul distribution. Lognormal is all about steady streams -# of heavy content like a video. Weibul is great about short bursts like -# user who requested a static page an a couple of images. -# -# mtg tries to adapt Weibul distribution. It comes with some sensible -# defaults that were taken from ok.ru. But when you use domain fronting, -# it always make sense to take statistics from that website. You can specify -# some urls here. mtg will crawl them from time to time, accumulate time -# series and approximates parameters for Weibul. -# 2. TLS record sizes are not random. -# https://blog.cloudflare.com/optimizing-tls-over-tcp-to-reduce-latency/ -# https://aws.github.io/s2n-tls/usage-guide/ch08-record-sizes.html -# -# The idea is that huge TLS records could negatively affect performance. -# You cannot simply decrypt a part of the packet, you need to wait it -# whole, and huge packets could involve several RTTs if you do not use -# any specific software that treat TLS in a very special way. So -# servers start with small packets, usually around MTU, and ramp up -# later. This optimizes a time-to-first byte so web browsers start to -# render early. -# -# mtg uses the same technique as was introduced by Cloudflare in their -# patches to nginx 10 years ago: -# https://github.com/cloudflare/sslconfig/blob/master/patches/nginx__dynamic_tls_records.patch +# Delays between TLS packets are not constant. There are many factors +# that come in play. Application should generate some response, it could +# send some headers first and stream content with chunked encoding. So +# some first packets could come as soon as possible, with some delays +# after first ones. Such phenomenon is described by different statistic +# distribution. There are 2 distribution that describe it: lognormal +# distribution and Weibul distribution. Lognormal is all about steady streams +# of heavy content like a video. Weibul is great about short bursts like +# user who requested a static page an a couple of images. [defense.doppelganger] # This is a list of URLs that would be crawled by mtg to approximate delay # statistics. They MUST be HTTPS urls. @@ -266,6 +244,27 @@ repeats-per-raid = 10 # do not change a lot, so do not expect different results if you request # each 10 minutes. raid-each = "6h" +# This enables dynamic tls record sizing. +# +# Some modern stacks and platforms start to use the technique that is called +# DRS. They start with small TLS packets and ramp up eventually. First packets +# are usually about MTU size, after that we get 4k and eventually max size. +# This is done with a good intention: to minimize a time to the first byte, +# so application could start doing something with the data right after first +# RTT. +# +# Apparently, about 90% of application do not employ this technique, they use +# max size always: nginx, apache, java stuff. But Golang tools, angie and +# some specific patches activate this technique. +# +# In order to mimic a real website we need to know something about software +# it uses. Usually nobody cares: openssl does 16384, Python does it, nginx +# does it. So this setting is disabled by default. +# +# https://blog.cloudflare.com/optimizing-tls-over-tcp-to-reduce-latency/ +# https://aws.github.io/s2n-tls/usage-guide/ch08-record-sizes.html +# https://github.com/cloudflare/sslconfig/blob/master/patches/nginx__dynamic_tls_records.patch +drs = false # Some countries do active probing on Telegram connections. This technique # allows to protect from such effort. diff --git a/internal/cli/run_proxy.go b/internal/cli/run_proxy.go index 1ecb2703d..9453edaa3 100644 --- a/internal/cli/run_proxy.go +++ b/internal/cli/run_proxy.go @@ -265,6 +265,7 @@ func runProxy(conf *config.Config, version string) error { //nolint: funlen DoppelGangerURLs: doppelGangerURLs, DoppelGangerPerRaid: conf.Defense.Doppelganger.Repeats.Get(mtglib.DoppelGangerPerRaid), DoppelGangerEach: conf.Defense.Doppelganger.UpdateEach.Get(mtglib.DoppelGangerEach), + DoppelGangerDRS: conf.Defense.Doppelganger.DRS.Get(false), } proxy, err := mtglib.NewProxy(opts) diff --git a/internal/config/config.go b/internal/config/config.go index 91c93b0c7..9ced83223 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -53,6 +53,7 @@ type Config struct { URLs []TypeHttpsURL `json:"urls"` Repeats TypeConcurrency `json:"repeats_per_raid"` UpdateEach TypeDuration `json:"raid_each"` + DRS TypeBool `json:"drs"` } `json:"doppelganger"` } `json:"defense"` Network struct { diff --git a/internal/config/parse.go b/internal/config/parse.go index 06183c750..81a50f312 100644 --- a/internal/config/parse.go +++ b/internal/config/parse.go @@ -48,6 +48,7 @@ type tomlConfig struct { URLs []string `toml:"urls" json:"urls,omitempty"` Repeats uint `toml:"repeats-per-raid" json:"repeats_per_raid,omitempty"` UpdateEach string `toml:"raid-each" json:"raid_each,omitempty"` + DRS bool `toml:"drs" json:"drs,omitempty"` } `toml:"doppelganger" json:"doppelganger,omitempty"` } `toml:"defense" json:"defense,omitempty"` Network struct { diff --git a/mtglib/internal/doppel/conn.go b/mtglib/internal/doppel/conn.go index a2543cd33..7e8ed30e3 100644 --- a/mtglib/internal/doppel/conn.go +++ b/mtglib/internal/doppel/conn.go @@ -16,18 +16,48 @@ type Conn struct { } type connPayload struct { - ctx context.Context - ctxCancel context.CancelCauseFunc - clock Clock - wg sync.WaitGroup - writeLock sync.Mutex - writeStream bytes.Buffer + ctx context.Context + ctxCancel context.CancelCauseFunc + clock Clock + wg sync.WaitGroup + syncWriteLock sync.RWMutex + writeStream bytes.Buffer + writeCond *sync.Cond } func (c Conn) Write(p []byte) (int, error) { - c.p.writeLock.Lock() + c.p.syncWriteLock.RLock() + defer c.p.syncWriteLock.RUnlock() + + c.p.writeCond.L.Lock() + c.p.writeStream.Write(p) + c.p.writeCond.L.Unlock() + + return len(p), context.Cause(c.p.ctx) +} + +func (c Conn) SyncWrite(p []byte) (int, error) { + c.p.syncWriteLock.Lock() + defer c.p.syncWriteLock.Unlock() + + c.p.writeCond.L.Lock() + // wait until buffer is exhausted + for c.p.writeStream.Len() != 0 && context.Cause(c.p.ctx) == nil { + c.p.writeCond.Wait() + } c.p.writeStream.Write(p) - c.p.writeLock.Unlock() + c.p.writeCond.L.Unlock() + + if err := context.Cause(c.p.ctx); err != nil { + return len(p), err + } + + c.p.writeCond.L.Lock() + // wait until data will be sent + for c.p.writeStream.Len() != 0 && context.Cause(c.p.ctx) == nil { + c.p.writeCond.Wait() + } + c.p.writeCond.L.Unlock() return len(p), context.Cause(c.p.ctx) } @@ -39,6 +69,8 @@ func (c Conn) Start() { } func (c Conn) start() { + defer c.p.writeCond.Broadcast() + buf := [tls.MaxRecordSize]byte{} for { @@ -48,9 +80,9 @@ func (c Conn) start() { case <-c.p.clock.tick: } - c.p.writeLock.Lock() + c.p.writeCond.L.Lock() n, err := c.p.writeStream.Read(buf[:c.p.clock.stats.Size()]) - c.p.writeLock.Unlock() + c.p.writeCond.L.Unlock() if n == 0 || err != nil { continue @@ -60,6 +92,8 @@ func (c Conn) start() { c.p.ctxCancel(err) return } + + c.p.writeCond.Signal() } } @@ -75,6 +109,7 @@ func NewConn(ctx context.Context, conn essentials.Conn, stats *Stats) Conn { p: &connPayload{ ctx: ctx, ctxCancel: cancel, + writeCond: sync.NewCond(&sync.Mutex{}), clock: Clock{ stats: stats, tick: make(chan struct{}), diff --git a/mtglib/internal/doppel/conn_test.go b/mtglib/internal/doppel/conn_test.go index e774bcedd..eec1f6fb9 100644 --- a/mtglib/internal/doppel/conn_test.go +++ b/mtglib/internal/doppel/conn_test.go @@ -157,6 +157,136 @@ func (suite *ConnTestSuite) TestStopOnUnderlyingWriteError() { }, 2*time.Second, time.Millisecond) } +func (suite *ConnTestSuite) TestSyncWriteDataSent() { + suite.connMock. + On("Write", mock.AnythingOfType("[]uint8")). + Return(0, nil). + Maybe() + + c := suite.makeConn() + defer c.Stop() + + payload := []byte("sync hello") + n, err := c.SyncWrite(payload) + suite.NoError(err) + suite.Equal(len(payload), n) + + // SyncWrite returns only after data is flushed to the wire. + assembled := &bytes.Buffer{} + reader := bytes.NewReader(suite.connMock.Written()) + + for { + header := make([]byte, tls.SizeHeader) + if _, err := io.ReadFull(reader, header); err != nil { + break + } + + suite.Equal(byte(tls.TypeApplicationData), header[0]) + + length := binary.BigEndian.Uint16(header[tls.SizeRecordType+tls.SizeVersion:]) + rec := make([]byte, length) + _, err := io.ReadFull(reader, rec) + suite.NoError(err) + + assembled.Write(rec) + } + + suite.Equal(payload, assembled.Bytes()) +} + +func (suite *ConnTestSuite) TestSyncWriteDrainsBufferFirst() { + suite.connMock. + On("Write", mock.AnythingOfType("[]uint8")). + Return(0, nil). + Maybe() + + c := suite.makeConn() + defer c.Stop() + + // Buffer some data via async Write. + _, err := c.Write([]byte("first")) + suite.NoError(err) + + // SyncWrite must drain "first" before sending "second". + n, err := c.SyncWrite([]byte("second")) + suite.NoError(err) + suite.Equal(6, n) + + // All data should be on the wire now. + assembled := &bytes.Buffer{} + reader := bytes.NewReader(suite.connMock.Written()) + + for { + header := make([]byte, tls.SizeHeader) + if _, err := io.ReadFull(reader, header); err != nil { + break + } + + length := binary.BigEndian.Uint16(header[tls.SizeRecordType+tls.SizeVersion:]) + rec := make([]byte, length) + _, err := io.ReadFull(reader, rec) + suite.NoError(err) + + assembled.Write(rec) + } + + suite.Equal([]byte("firstsecond"), assembled.Bytes()) +} + +func (suite *ConnTestSuite) TestSyncWriteBlocksAsyncWrite() { + suite.connMock. + On("Write", mock.AnythingOfType("[]uint8")). + Return(0, nil). + Maybe() + + c := suite.makeConn() + defer c.Stop() + + // Start SyncWrite — it holds exclusive lock. + syncDone := make(chan struct{}) + + go func() { + defer close(syncDone) + c.SyncWrite([]byte("exclusive")) //nolint: errcheck + }() + + // Give SyncWrite time to acquire the lock. + time.Sleep(10 * time.Millisecond) + + // Async Write should block until SyncWrite completes. + writeDone := make(chan struct{}) + + go func() { + defer close(writeDone) + c.Write([]byte("blocked")) //nolint: errcheck + }() + + // SyncWrite should finish first. + <-syncDone + + select { + case <-writeDone: + // Write completed after SyncWrite — correct. + case <-time.After(2 * time.Second): + suite.Fail("async Write did not unblock after SyncWrite completed") + } +} + +func (suite *ConnTestSuite) TestSyncWriteReturnsErrorAfterStop() { + suite.connMock. + On("Write", mock.AnythingOfType("[]uint8")). + Return(0, nil). + Maybe() + + c := suite.makeConn() + c.Stop() + + time.Sleep(10 * time.Millisecond) + + _, err := c.SyncWrite([]byte("too late")) + suite.Error(err) +} + func TestConn(t *testing.T) { t.Parallel() suite.Run(t, &ConnTestSuite{}) diff --git a/mtglib/internal/doppel/ganger.go b/mtglib/internal/doppel/ganger.go index ee4a3ebb0..6621761f1 100644 --- a/mtglib/internal/doppel/ganger.go +++ b/mtglib/internal/doppel/ganger.go @@ -29,6 +29,8 @@ type Ganger struct { scoutRaidEach time.Duration scoutRaidRepeats int + drs bool + stats *Stats durations []time.Duration @@ -107,7 +109,7 @@ func (g *Ganger) run() { g.wg.Go(func() { select { case <-g.ctx.Done(): - case updatedStatsChan <- NewStats(durations): + case updatedStatsChan <- NewStats(durations, g.drs): } }) case stats := <-updatedStatsChan: @@ -152,6 +154,7 @@ func NewGanger( scoutEach time.Duration, scoutRepeats int, urls []string, + drs bool, ) *Ganger { ctx, cancel := context.WithCancel(ctx) @@ -169,9 +172,11 @@ func NewGanger( logger: logger, scoutRaidEach: scoutEach, scoutRaidRepeats: scoutRepeats, + drs: drs, stats: &Stats{ k: StatsDefaultK, lambda: StatsDefaultLambda, + drs: drs, }, scout: NewScout(network, urls), connRequests: make(chan gangerConnRequest), diff --git a/mtglib/internal/doppel/ganger_test.go b/mtglib/internal/doppel/ganger_test.go index 0343eb4a9..412eb8aee 100644 --- a/mtglib/internal/doppel/ganger_test.go +++ b/mtglib/internal/doppel/ganger_test.go @@ -29,7 +29,7 @@ func (suite *GangerTestSuite) SetupTest() { On("WarningError", mock.AnythingOfType("string"), mock.Anything). Maybe() - suite.g = NewGanger(suite.ctx, suite.network, suite.log, time.Hour, 1, suite.urls) + suite.g = NewGanger(suite.ctx, suite.network, suite.log, time.Hour, 1, suite.urls, true) suite.g.Run() } diff --git a/mtglib/internal/doppel/init.go b/mtglib/internal/doppel/init.go index 15bcfd8db..eb74a882a 100644 --- a/mtglib/internal/doppel/init.go +++ b/mtglib/internal/doppel/init.go @@ -14,8 +14,8 @@ const ( // Please see Stats description // https://blog.cloudflare.com/optimizing-tls-over-tcp-to-reduce-latency/ // https://github.com/cloudflare/sslconfig/blob/master/patches/nginx__dynamic_tls_records.patch - TLSRecordSizeStart = 1369 - TLSRecordSizeAccel = 4229 + TLSRecordSizeStart = 1450 + TLSRecordSizeAccel = 4096 TLSRecordSizeMax = 16384 - tls.SizeHeader TLSCounterAccelAfter = 40 diff --git a/mtglib/internal/doppel/stats.go b/mtglib/internal/doppel/stats.go index 8fb0c8d02..eb73b6177 100644 --- a/mtglib/internal/doppel/stats.go +++ b/mtglib/internal/doppel/stats.go @@ -17,6 +17,9 @@ const ( // these values are taken from ok.ru. measured from moscow site. StatsDefaultK = 0.37846373895785335 StatsDefaultLambda = 1.73177086015485 + + // how many bytes should we drift + DRSNoise = 100 ) // Stats is responsible for generating values that are distributed according @@ -66,6 +69,9 @@ type Stats struct { k float64 // https://en.wikipedia.org/wiki/Scale_parameter lambda float64 + + // Dynamic Record Sizing + drs bool } func (d *Stats) Delay() time.Duration { @@ -84,20 +90,24 @@ func (d *Stats) Size() int { d.sizeCounter = 0 } + if !d.drs { + return TLSRecordSizeMax + } + d.sizeLastRequested = time.Now() d.sizeCounter++ switch { case d.sizeCounter <= TLSCounterAccelAfter: - return TLSRecordSizeStart + return TLSRecordSizeStart - rand.IntN(DRSNoise) case d.sizeCounter <= TLSCounterMaxAfter: - return TLSRecordSizeAccel + return TLSRecordSizeAccel - rand.IntN(DRSNoise) } return TLSRecordSizeMax } -func NewStats(durations []time.Duration) *Stats { +func NewStats(durations []time.Duration, drs bool) *Stats { n := float64(len(durations)) // in milliseconds @@ -150,5 +160,6 @@ func NewStats(durations []time.Duration) *Stats { return &Stats{ k: k, lambda: lambda, + drs: drs, } } diff --git a/mtglib/internal/doppel/stats_test.go b/mtglib/internal/doppel/stats_test.go index 1e45a1f75..94240d166 100644 --- a/mtglib/internal/doppel/stats_test.go +++ b/mtglib/internal/doppel/stats_test.go @@ -38,7 +38,7 @@ func (suite *StatsTestSuite) TestNewStatsRecoverParameters() { knownLambda := 100.0 samples := suite.GenWeibull(knownK, knownLambda, 5000, 42) - stats := NewStats(samples) + stats := NewStats(samples, true) suite.InDelta(knownK, stats.k, 0.1) suite.InDelta(knownLambda, stats.lambda, 5.0) @@ -50,7 +50,7 @@ func (suite *StatsTestSuite) TestNewStatsExponentialCase() { knownLambda := 50.0 samples := suite.GenWeibull(knownK, knownLambda, 5000, 123) - stats := NewStats(samples) + stats := NewStats(samples, true) suite.InDelta(knownK, stats.k, 0.1) suite.InDelta(knownLambda, stats.lambda, 5.0) @@ -64,7 +64,7 @@ func (suite *StatsTestSuite) TestNewStatsSmallK() { knownLambda := 100.0 samples := suite.GenWeibull(knownK, knownLambda, 10000, 99) - stats := NewStats(samples) + stats := NewStats(samples, true) suite.InDelta(knownK, stats.k, 0.05) suite.InDelta(knownLambda, stats.lambda, 5.0) @@ -76,7 +76,7 @@ func (suite *StatsTestSuite) TestNewStatsLargeK() { knownLambda := 200.0 samples := suite.GenWeibull(knownK, knownLambda, 5000, 77) - stats := NewStats(samples) + stats := NewStats(samples, true) suite.InDelta(knownK, stats.k, 0.3) suite.InDelta(knownLambda, stats.lambda, 5.0) @@ -121,7 +121,7 @@ func (suite *StatsTestSuite) TestNewStatsRoundTrip() { knownLambda := 80.0 samples := suite.GenWeibull(knownK, knownLambda, 5000, 555) - stats := NewStats(samples) + stats := NewStats(samples, true) n := 50000 sum := 0.0 @@ -138,16 +138,17 @@ func (suite *StatsTestSuite) TestNewStatsRoundTrip() { } func (suite *StatsTestSuite) TestSizeStartPhase() { - stats := &Stats{k: 1.0, lambda: 1.0} + stats := &Stats{k: 1.0, lambda: 1.0, drs: true} for range TLSCounterAccelAfter { size := stats.Size() - suite.Equal(TLSRecordSizeStart, size) + suite.GreaterOrEqual(size, TLSRecordSizeStart-DRSNoise) + suite.LessOrEqual(size, TLSRecordSizeStart) } } func (suite *StatsTestSuite) TestSizeAccelPhase() { - stats := &Stats{k: 1.0, lambda: 1.0} + stats := &Stats{k: 1.0, lambda: 1.0, drs: true} for range TLSCounterAccelAfter { stats.Size() @@ -155,12 +156,13 @@ func (suite *StatsTestSuite) TestSizeAccelPhase() { for range TLSCounterMaxAfter - TLSCounterAccelAfter { size := stats.Size() - suite.Equal(TLSRecordSizeAccel, size) + suite.GreaterOrEqual(size, TLSRecordSizeAccel-DRSNoise) + suite.LessOrEqual(size, TLSRecordSizeAccel) } } func (suite *StatsTestSuite) TestSizeMaxPhase() { - stats := &Stats{k: 1.0, lambda: 1.0} + stats := &Stats{k: 1.0, lambda: 1.0, drs: true} for range TLSCounterMaxAfter { stats.Size() @@ -173,7 +175,7 @@ func (suite *StatsTestSuite) TestSizeMaxPhase() { } func (suite *StatsTestSuite) TestSizeResetsAfterInactivity() { - stats := &Stats{k: 1.0, lambda: 1.0} + stats := &Stats{k: 1.0, lambda: 1.0, drs: true} // Advance past start phase. for range TLSCounterMaxAfter { @@ -185,7 +187,30 @@ func (suite *StatsTestSuite) TestSizeResetsAfterInactivity() { // Simulate inactivity by backdating sizeLastRequested. stats.sizeLastRequested = time.Now().Add(-TLSRecordSizeResetAfter - time.Millisecond) - suite.Equal(TLSRecordSizeStart, stats.Size()) + size := stats.Size() + suite.GreaterOrEqual(size, TLSRecordSizeStart-DRSNoise) + suite.LessOrEqual(size, TLSRecordSizeStart) +} + +func (suite *StatsTestSuite) TestSizeNoDRSAlwaysMax() { + stats := &Stats{k: 1.0, lambda: 1.0, drs: false} + + for range TLSCounterMaxAfter + 20 { + suite.Equal(TLSRecordSizeMax, stats.Size()) + } +} + +func (suite *StatsTestSuite) TestSizeNoDRSIgnoresCounter() { + stats := &Stats{k: 1.0, lambda: 1.0, drs: false} + + // Even after many calls, always returns max. + for range 200 { + suite.Equal(TLSRecordSizeMax, stats.Size()) + } + + // Inactivity has no effect either. + stats.sizeLastRequested = time.Now().Add(-TLSRecordSizeResetAfter - time.Millisecond) + suite.Equal(TLSRecordSizeMax, stats.Size()) } func TestStats(t *testing.T) { diff --git a/mtglib/proxy.go b/mtglib/proxy.go index aac79de1b..4afcdd89d 100644 --- a/mtglib/proxy.go +++ b/mtglib/proxy.go @@ -90,7 +90,7 @@ func (p *Proxy) ServeConn(conn essentials.Conn) { } defer clientConn.Stop() - if _, err := clientConn.Write(noise); err != nil { + if _, err := clientConn.SyncWrite(noise); err != nil { ctx.logger.InfoError("cannot send the first packet", err) return } @@ -345,6 +345,7 @@ func NewProxy(opts ProxyOpts) (*Proxy, error) { opts.DoppelGangerEach, int(opts.DoppelGangerPerRaid), opts.DoppelGangerURLs, + opts.DoppelGangerDRS, ), configUpdater: dc.NewPublicConfigUpdater( tg, diff --git a/mtglib/proxy_opts.go b/mtglib/proxy_opts.go index 0f8600e80..cea9cad8c 100644 --- a/mtglib/proxy_opts.go +++ b/mtglib/proxy_opts.go @@ -157,6 +157,9 @@ type ProxyOpts struct { // DoppelGangerEach defines a time period between each raid. We recommend // to use hours here. DoppelGangerEach time.Duration + + // DoppelGangerDRS defines if TLS Dynamic Record Sizing is active. + DoppelGangerDRS bool } func (p ProxyOpts) valid() error {