Skip to content

Commit 97c6178

Browse files
authored
Merge b21f1eb into a8fb87e
2 parents a8fb87e + b21f1eb commit 97c6178

5 files changed

Lines changed: 247 additions & 0 deletions

File tree

ydb/tests/stability/ydb/test_stability.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ class TestSetupForStability(object):
4646
yatest_common.binary_path('ydb/tests/tools/nemesis/driver/nemesis'),
4747
yatest_common.binary_path('ydb/tools/simple_queue/simple_queue'),
4848
yatest_common.binary_path('ydb/tools/olap_workload/olap_workload'),
49+
yatest_common.binary_path('ydb/tools/statistics_workload/statistics_workload'),
4950
)
5051

5152
@classmethod
@@ -138,6 +139,23 @@ def test_olap_workload(self):
138139

139140
self._stop_nemesis()
140141

142+
def test_statistics_workload(self):
143+
self._start_nemesis()
144+
145+
log_file = "/Berkanavt/nemesis/log/statistics_workload.log"
146+
test_path = "/Berkanavt/nemesis/bin/statistics_workload"
147+
node = list(self.kikimr_cluster.nodes.values())[0]
148+
node.ssh_command(
149+
f'screen -d -m bash -c "while true; do sudo {test_path} --database /Root/db1 --log_file {log_file} ; done"',
150+
raise_on_error=True
151+
)
152+
sleep_time_min = 90
153+
154+
logger.info('Sleeping for {} minute(s)'.format(sleep_time_min))
155+
time.sleep(sleep_time_min*60)
156+
157+
self._stop_nemesis()
158+
141159
@classmethod
142160
def _start_nemesis(cls):
143161
for node in cls.kikimr_cluster.nodes.values():

ydb/tests/stability/ydb/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ DATA(
1515
DEPENDS(
1616
ydb/tools/simple_queue
1717
ydb/tools/olap_workload
18+
ydb/tools/statistics_workload
1819
ydb/tools/cfg/bin
1920
ydb/tests/tools/nemesis/driver
2021
)
Lines changed: 215 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,215 @@
1+
# -*- coding: utf-8 -*-
2+
import argparse
3+
import ydb
4+
import logging
5+
import time
6+
import os
7+
import random
8+
import string
9+
10+
11+
ydb.interceptor.monkey_patch_event_handler()
12+
13+
14+
logger = logging.getLogger("StatisticsWorkload")
15+
16+
17+
def table_name_with_prefix(table_prefix):
18+
table_suffix = ''.join(random.choices(string.ascii_uppercase + string.digits, k=5))
19+
return os.path.join(table_prefix + "_" + table_suffix)
20+
21+
22+
def random_string(length):
23+
letters = string.ascii_lowercase
24+
return bytes(''.join(random.choice(letters) for i in range(length)), encoding='utf8')
25+
26+
27+
def random_type():
28+
return random.choice([ydb.PrimitiveType.Int64, ydb.PrimitiveType.String])
29+
30+
31+
def random_value(type):
32+
if isinstance(type, ydb.OptionalType):
33+
return random_value(type.item)
34+
if type == ydb.PrimitiveType.Int64:
35+
return random.randint(0, 1 << 31)
36+
if type == ydb.PrimitiveType.String:
37+
return random_string(random.randint(1, 32))
38+
39+
40+
class Workload(object):
41+
def __init__(self, endpoint, database, duration, batch_size, batch_count):
42+
self.database = database
43+
self.driver = ydb.Driver(ydb.DriverConfig(endpoint, database))
44+
self.pool = ydb.SessionPool(self.driver, size=200)
45+
self.duration = duration
46+
self.batch_size = batch_size
47+
self.batch_count = batch_count
48+
49+
def __enter__(self):
50+
return self
51+
52+
def __exit__(self, exc_type, exc_val, exc_tb):
53+
self.pool.stop()
54+
self.driver.stop()
55+
56+
def run_query_ignore_errors(self, callee):
57+
try:
58+
self.pool.retry_operation_sync(callee)
59+
except Exception as e:
60+
logger.error(f'{type(e)}, {e}')
61+
62+
def generate_batch(self, schema):
63+
data = []
64+
for i in range(self.batch_size):
65+
data.append({c.name: random_value(c.type) for c in schema})
66+
return data
67+
68+
def get_tables(self):
69+
db = self.driver.scheme_client.list_directory(self.database)
70+
return [t.name for t in db.children]
71+
72+
def create_table(self, table_name):
73+
logger.info(f"create table '{table_name}'")
74+
75+
def callee(session):
76+
session.execute_scheme(f"""
77+
CREATE TABLE `{table_name}` (
78+
id Int64 NOT NULL,
79+
value Int64,
80+
PRIMARY KEY(id)
81+
)
82+
PARTITION BY HASH(id)
83+
WITH (
84+
STORE = COLUMN
85+
)
86+
""")
87+
self.run_query_ignore_errors(callee)
88+
89+
def enable_statistics(self, table_name):
90+
def callee(session):
91+
session.execute_scheme(f"""
92+
ALTER OBJECT `{table_name}` (TYPE TABLE) SET (ACTION=UPSERT_INDEX, NAME=cms_key, TYPE=COUNT_MIN_SKETCH, FEATURES=`{'column_names': ['id']}`);
93+
""")
94+
session.execute_scheme(f"""
95+
ALTER OBJECT `{table_name}` (TYPE TABLE) SET (ACTION=UPSERT_INDEX, NAME=cms_value, TYPE=COUNT_MIN_SKETCH, FEATURES=`{'column_names': ['value']}`);
96+
""")
97+
self.run_query_ignore_errors(callee)
98+
99+
def drop_table(self, table_name):
100+
logger.info(f'drop table {table_name}')
101+
102+
def callee(session):
103+
session.drop_table(table_name)
104+
self.run_query_ignore_errors(callee)
105+
106+
def drop_all_tables_with_prefix(self, prefix):
107+
for t in self.get_tables():
108+
if t.startswith(prefix):
109+
self.drop_table(self.database + "/" + t)
110+
111+
def list_columns(self, table_name):
112+
def callee(session):
113+
return session.describe_table(self.database + "/" + table_name).columns
114+
return self.pool.retry_operation_sync(callee)
115+
116+
def add_batch(self, table_name, schema):
117+
column_types = ydb.BulkUpsertColumns()
118+
for c in schema:
119+
column_types.add_column(c.name, c.type)
120+
batch = self.generate_batch(schema)
121+
logger.info(f"batch size: {len(batch)}")
122+
self.driver.table_client.bulk_upsert(self.database + "/" + table_name, batch, column_types)
123+
124+
def add_data(self, table_name):
125+
schema = self.list_columns(table_name)
126+
for i in range(self.batch_count):
127+
logger.info(f"add batch #{i}")
128+
self.add_batch(table_name, schema)
129+
130+
def delete_from_table(self, table_name):
131+
logger.info(f"delete from table '{table_name}'")
132+
133+
def callee(session):
134+
session.transaction().execute(f"DELETE FROM `{table_name}`", commit_tx=True)
135+
self.run_query_ignore_errors(callee)
136+
137+
def rows_count(self, table_name):
138+
return self.driver.table_client.scan_query(f"SELECT count(*) FROM `{table_name}`").next().result_set.rows[0][0]
139+
140+
def analyze(self, table_name):
141+
table_path = self.database + "/" + table_name
142+
logger.info(f"analyze '{table_name}'")
143+
144+
def callee(session):
145+
session.execute_scheme(f"ANALYZE `{table_path}`")
146+
self.run_query_ignore_errors(callee)
147+
148+
def execute(self):
149+
table_prefix = "test_table"
150+
table_name = table_name_with_prefix(table_prefix)
151+
table_statistics = ".metadata/_statistics"
152+
153+
try:
154+
logger.info("start new round")
155+
156+
self.pool.acquire()
157+
158+
self.delete_from_table(table_statistics)
159+
if self.rows_count(table_statistics) > 0:
160+
logger.error(f"table '{table_statistics}' is not empty")
161+
return
162+
163+
self.drop_all_tables_with_prefix(table_prefix)
164+
self.create_table(table_name)
165+
166+
self.add_data(table_name)
167+
count = self.rows_count(table_name)
168+
logger.info(f"number of rows in table '{table_name}' {count}")
169+
if count == 0:
170+
logger.error(f"table {table_name} is empty")
171+
return
172+
173+
logger.info("waiting to receive information about the table from scheme shard")
174+
time.sleep(300)
175+
176+
self.analyze(table_name)
177+
178+
count = self.rows_count(table_statistics)
179+
logger.info(f"number of rows in table '{table_statistics}' {count}")
180+
if count == 0:
181+
logger.error(f"table '{table_statistics}' is empty")
182+
except Exception as e:
183+
logger.error(f"{type(e)}, {e}")
184+
185+
def run(self):
186+
started_at = time.time()
187+
188+
while time.time() - started_at < self.duration:
189+
self.execute()
190+
191+
192+
if __name__ == '__main__':
193+
parser = argparse.ArgumentParser(
194+
description="statistics stability workload", formatter_class=argparse.RawDescriptionHelpFormatter
195+
)
196+
parser.add_argument('--endpoint', default='localhost:2135', help="An endpoint to be used")
197+
parser.add_argument('--database', default=None, required=True, help='A database to connect')
198+
parser.add_argument('--duration', default=120, type=lambda x: int(x), help='A duration of workload in seconds')
199+
parser.add_argument('--batch_size', default=1000, help='Batch size for bulk insert')
200+
parser.add_argument('--batch_count', default=3, help='The number of butches to be inserted')
201+
parser.add_argument('--log_file', default=None, help='Append log into specified file')
202+
203+
args = parser.parse_args()
204+
205+
if args.log_file:
206+
logging.basicConfig(
207+
filename=args.log_file,
208+
filemode='a',
209+
format='%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s',
210+
datefmt='%H:%M:%S',
211+
level=logging.INFO
212+
)
213+
214+
with Workload(args.endpoint, args.database, args.duration, args.batch_size, args.batch_count) as workload:
215+
workload.run()
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
PY3_PROGRAM(statistics_workload)
2+
3+
PY_SRCS(
4+
__main__.py
5+
)
6+
7+
PEERDIR(
8+
ydb/public/sdk/python
9+
library/python/monlib
10+
)
11+
12+
END()

ydb/tools/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ RECURSE(
55
query_replay_yt
66
simple_queue
77
olap_workload
8+
statistics_workload
89
tsserver
910
tstool
1011
ydbd_slice

0 commit comments

Comments
 (0)