Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 31 additions & 32 deletions example.config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -213,42 +213,20 @@ idle = "1m"
# does not really matter), while websites pump heavy content in HTTP2 streams
#
# It means that statistically there is a different between traffic shape:
# TLS packet sizes are different, delays between packets are also different.
# delays between packets are also different.
# In order to avoid censorship detection based on these patterns, there is a
# mtg subsystem called "Doppelganger" that aims to mimic website statistics
# as close as it could.
#
# It does that by 2 ideas:
# 1. Delays between TLS packets are not constant. There are many factors
# that come in play. Application should generate some response, it could
# send some headers first and stream content with chunked encoding. So
# some first packets could come as soon as possible, with some delays
# after first ones. Such phenomenon is described by different statistic
# distribution. There are 2 distribution that describe it: lognormal
# distribution and Weibul distribution. Lognormal is all about steady streams
# of heavy content like a video. Weibul is great about short bursts like
# user who requested a static page an a couple of images.
#
# mtg tries to adapt Weibul distribution. It comes with some sensible
# defaults that were taken from ok.ru. But when you use domain fronting,
# it always make sense to take statistics from that website. You can specify
# some urls here. mtg will crawl them from time to time, accumulate time
# series and approximates parameters for Weibul.
# 2. TLS record sizes are not random.
# https://blog.cloudflare.com/optimizing-tls-over-tcp-to-reduce-latency/
# https://aws.github.io/s2n-tls/usage-guide/ch08-record-sizes.html
#
# The idea is that huge TLS records could negatively affect performance.
# You cannot simply decrypt a part of the packet, you need to wait it
# whole, and huge packets could involve several RTTs if you do not use
# any specific software that treat TLS in a very special way. So
# servers start with small packets, usually around MTU, and ramp up
# later. This optimizes a time-to-first byte so web browsers start to
# render early.
#
# mtg uses the same technique as was introduced by Cloudflare in their
# patches to nginx 10 years ago:
# https://github.com/cloudflare/sslconfig/blob/master/patches/nginx__dynamic_tls_records.patch
# Delays between TLS packets are not constant. There are many factors
# that come in play. Application should generate some response, it could
# send some headers first and stream content with chunked encoding. So
# some first packets could come as soon as possible, with some delays
# after first ones. Such phenomenon is described by different statistic
# distribution. There are 2 distribution that describe it: lognormal
# distribution and Weibul distribution. Lognormal is all about steady streams
# of heavy content like a video. Weibul is great about short bursts like
# user who requested a static page an a couple of images.
[defense.doppelganger]
# This is a list of URLs that would be crawled by mtg to approximate delay
# statistics. They MUST be HTTPS urls.
Expand All @@ -266,6 +244,27 @@ repeats-per-raid = 10
# do not change a lot, so do not expect different results if you request
# each 10 minutes.
raid-each = "6h"
# This enables dynamic tls record sizing.
#
# Some modern stacks and platforms start to use the technique that is called
# DRS. They start with small TLS packets and ramp up eventually. First packets
# are usually about MTU size, after that we get 4k and eventually max size.
# This is done with a good intention: to minimize a time to the first byte,
# so application could start doing something with the data right after first
# RTT.
#
# Apparently, about 90% of application do not employ this technique, they use
# max size always: nginx, apache, java stuff. But Golang tools, angie and
# some specific patches activate this technique.
#
# In order to mimic a real website we need to know something about software
# it uses. Usually nobody cares: openssl does 16384, Python does it, nginx
# does it. So this setting is disabled by default.
#
# https://blog.cloudflare.com/optimizing-tls-over-tcp-to-reduce-latency/
# https://aws.github.io/s2n-tls/usage-guide/ch08-record-sizes.html
# https://github.com/cloudflare/sslconfig/blob/master/patches/nginx__dynamic_tls_records.patch
drs = false

# Some countries do active probing on Telegram connections. This technique
# allows to protect from such effort.
Expand Down
1 change: 1 addition & 0 deletions internal/cli/run_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ func runProxy(conf *config.Config, version string) error { //nolint: funlen
DoppelGangerURLs: doppelGangerURLs,
DoppelGangerPerRaid: conf.Defense.Doppelganger.Repeats.Get(mtglib.DoppelGangerPerRaid),
DoppelGangerEach: conf.Defense.Doppelganger.UpdateEach.Get(mtglib.DoppelGangerEach),
DoppelGangerDRS: conf.Defense.Doppelganger.DRS.Get(false),
}

proxy, err := mtglib.NewProxy(opts)
Expand Down
1 change: 1 addition & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type Config struct {
URLs []TypeHttpsURL `json:"urls"`
Repeats TypeConcurrency `json:"repeats_per_raid"`
UpdateEach TypeDuration `json:"raid_each"`
DRS TypeBool `json:"drs"`
} `json:"doppelganger"`
} `json:"defense"`
Network struct {
Expand Down
1 change: 1 addition & 0 deletions internal/config/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type tomlConfig struct {
URLs []string `toml:"urls" json:"urls,omitempty"`
Repeats uint `toml:"repeats-per-raid" json:"repeats_per_raid,omitempty"`
UpdateEach string `toml:"raid-each" json:"raid_each,omitempty"`
DRS bool `toml:"drs" json:"drs,omitempty"`
} `toml:"doppelganger" json:"doppelganger,omitempty"`
} `toml:"defense" json:"defense,omitempty"`
Network struct {
Expand Down
55 changes: 45 additions & 10 deletions mtglib/internal/doppel/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,48 @@ type Conn struct {
}

type connPayload struct {
ctx context.Context
ctxCancel context.CancelCauseFunc
clock Clock
wg sync.WaitGroup
writeLock sync.Mutex
writeStream bytes.Buffer
ctx context.Context
ctxCancel context.CancelCauseFunc
clock Clock
wg sync.WaitGroup
syncWriteLock sync.RWMutex
writeStream bytes.Buffer
writeCond *sync.Cond
}

func (c Conn) Write(p []byte) (int, error) {
c.p.writeLock.Lock()
c.p.syncWriteLock.RLock()
defer c.p.syncWriteLock.RUnlock()

c.p.writeCond.L.Lock()
c.p.writeStream.Write(p)
c.p.writeCond.L.Unlock()

return len(p), context.Cause(c.p.ctx)
}

func (c Conn) SyncWrite(p []byte) (int, error) {
c.p.syncWriteLock.Lock()
defer c.p.syncWriteLock.Unlock()

c.p.writeCond.L.Lock()
// wait until buffer is exhausted
for c.p.writeStream.Len() != 0 && context.Cause(c.p.ctx) == nil {
c.p.writeCond.Wait()
}
c.p.writeStream.Write(p)
c.p.writeLock.Unlock()
c.p.writeCond.L.Unlock()

if err := context.Cause(c.p.ctx); err != nil {
return len(p), err
}

c.p.writeCond.L.Lock()
// wait until data will be sent
for c.p.writeStream.Len() != 0 && context.Cause(c.p.ctx) == nil {
c.p.writeCond.Wait()
}
c.p.writeCond.L.Unlock()

return len(p), context.Cause(c.p.ctx)
}
Expand All @@ -39,6 +69,8 @@ func (c Conn) Start() {
}

func (c Conn) start() {
defer c.p.writeCond.Broadcast()

buf := [tls.MaxRecordSize]byte{}

for {
Expand All @@ -48,9 +80,9 @@ func (c Conn) start() {
case <-c.p.clock.tick:
}

c.p.writeLock.Lock()
c.p.writeCond.L.Lock()
n, err := c.p.writeStream.Read(buf[:c.p.clock.stats.Size()])
c.p.writeLock.Unlock()
c.p.writeCond.L.Unlock()

if n == 0 || err != nil {
continue
Expand All @@ -60,6 +92,8 @@ func (c Conn) start() {
c.p.ctxCancel(err)
return
}

c.p.writeCond.Signal()
}
}

Expand All @@ -75,6 +109,7 @@ func NewConn(ctx context.Context, conn essentials.Conn, stats *Stats) Conn {
p: &connPayload{
ctx: ctx,
ctxCancel: cancel,
writeCond: sync.NewCond(&sync.Mutex{}),
clock: Clock{
stats: stats,
tick: make(chan struct{}),
Expand Down
130 changes: 130 additions & 0 deletions mtglib/internal/doppel/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,136 @@ func (suite *ConnTestSuite) TestStopOnUnderlyingWriteError() {
}, 2*time.Second, time.Millisecond)
}

func (suite *ConnTestSuite) TestSyncWriteDataSent() {
suite.connMock.
On("Write", mock.AnythingOfType("[]uint8")).
Return(0, nil).
Maybe()

c := suite.makeConn()
defer c.Stop()

payload := []byte("sync hello")
n, err := c.SyncWrite(payload)
suite.NoError(err)
suite.Equal(len(payload), n)

// SyncWrite returns only after data is flushed to the wire.
assembled := &bytes.Buffer{}
reader := bytes.NewReader(suite.connMock.Written())

for {
header := make([]byte, tls.SizeHeader)
if _, err := io.ReadFull(reader, header); err != nil {
break
}

suite.Equal(byte(tls.TypeApplicationData), header[0])

length := binary.BigEndian.Uint16(header[tls.SizeRecordType+tls.SizeVersion:])
rec := make([]byte, length)
_, err := io.ReadFull(reader, rec)
suite.NoError(err)

assembled.Write(rec)
}

suite.Equal(payload, assembled.Bytes())
}

func (suite *ConnTestSuite) TestSyncWriteDrainsBufferFirst() {
suite.connMock.
On("Write", mock.AnythingOfType("[]uint8")).
Return(0, nil).
Maybe()

c := suite.makeConn()
defer c.Stop()

// Buffer some data via async Write.
_, err := c.Write([]byte("first"))
suite.NoError(err)

// SyncWrite must drain "first" before sending "second".
n, err := c.SyncWrite([]byte("second"))
suite.NoError(err)
suite.Equal(6, n)

// All data should be on the wire now.
assembled := &bytes.Buffer{}
reader := bytes.NewReader(suite.connMock.Written())

for {
header := make([]byte, tls.SizeHeader)
if _, err := io.ReadFull(reader, header); err != nil {
break
}

length := binary.BigEndian.Uint16(header[tls.SizeRecordType+tls.SizeVersion:])
rec := make([]byte, length)
_, err := io.ReadFull(reader, rec)
suite.NoError(err)

assembled.Write(rec)
}

suite.Equal([]byte("firstsecond"), assembled.Bytes())
}

func (suite *ConnTestSuite) TestSyncWriteBlocksAsyncWrite() {
suite.connMock.
On("Write", mock.AnythingOfType("[]uint8")).
Return(0, nil).
Maybe()

c := suite.makeConn()
defer c.Stop()

// Start SyncWrite — it holds exclusive lock.
syncDone := make(chan struct{})

go func() {
defer close(syncDone)
c.SyncWrite([]byte("exclusive")) //nolint: errcheck
}()

// Give SyncWrite time to acquire the lock.
time.Sleep(10 * time.Millisecond)

// Async Write should block until SyncWrite completes.
writeDone := make(chan struct{})

go func() {
defer close(writeDone)
c.Write([]byte("blocked")) //nolint: errcheck
}()

// SyncWrite should finish first.
<-syncDone

select {
case <-writeDone:
// Write completed after SyncWrite — correct.
case <-time.After(2 * time.Second):
suite.Fail("async Write did not unblock after SyncWrite completed")
}
}

func (suite *ConnTestSuite) TestSyncWriteReturnsErrorAfterStop() {
suite.connMock.
On("Write", mock.AnythingOfType("[]uint8")).
Return(0, nil).
Maybe()

c := suite.makeConn()
c.Stop()

time.Sleep(10 * time.Millisecond)

_, err := c.SyncWrite([]byte("too late"))
suite.Error(err)
}

func TestConn(t *testing.T) {
t.Parallel()
suite.Run(t, &ConnTestSuite{})
Expand Down
7 changes: 6 additions & 1 deletion mtglib/internal/doppel/ganger.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ type Ganger struct {
scoutRaidEach time.Duration
scoutRaidRepeats int

drs bool

stats *Stats
durations []time.Duration

Expand Down Expand Up @@ -107,7 +109,7 @@ func (g *Ganger) run() {
g.wg.Go(func() {
select {
case <-g.ctx.Done():
case updatedStatsChan <- NewStats(durations):
case updatedStatsChan <- NewStats(durations, g.drs):
}
})
case stats := <-updatedStatsChan:
Expand Down Expand Up @@ -152,6 +154,7 @@ func NewGanger(
scoutEach time.Duration,
scoutRepeats int,
urls []string,
drs bool,
) *Ganger {
ctx, cancel := context.WithCancel(ctx)

Expand All @@ -169,9 +172,11 @@ func NewGanger(
logger: logger,
scoutRaidEach: scoutEach,
scoutRaidRepeats: scoutRepeats,
drs: drs,
stats: &Stats{
k: StatsDefaultK,
lambda: StatsDefaultLambda,
drs: drs,
},
scout: NewScout(network, urls),
connRequests: make(chan gangerConnRequest),
Expand Down
2 changes: 1 addition & 1 deletion mtglib/internal/doppel/ganger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (suite *GangerTestSuite) SetupTest() {
On("WarningError", mock.AnythingOfType("string"), mock.Anything).
Maybe()

suite.g = NewGanger(suite.ctx, suite.network, suite.log, time.Hour, 1, suite.urls)
suite.g = NewGanger(suite.ctx, suite.network, suite.log, time.Hour, 1, suite.urls, true)
suite.g.Run()
}

Expand Down
4 changes: 2 additions & 2 deletions mtglib/internal/doppel/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ const (
// Please see Stats description
// https://blog.cloudflare.com/optimizing-tls-over-tcp-to-reduce-latency/
// https://github.com/cloudflare/sslconfig/blob/master/patches/nginx__dynamic_tls_records.patch
TLSRecordSizeStart = 1369
TLSRecordSizeAccel = 4229
TLSRecordSizeStart = 1450
TLSRecordSizeAccel = 4096
TLSRecordSizeMax = 16384 - tls.SizeHeader

TLSCounterAccelAfter = 40
Expand Down
Loading
Loading