-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathkeycopy_daemon.py
More file actions
297 lines (240 loc) · 9.27 KB
/
keycopy_daemon.py
File metadata and controls
297 lines (240 loc) · 9.27 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
#!/usr/bin/env python3
"""Global key-history to clipboard daemon.
Behavior:
- Tracks a rolling buffer of typed characters from global key events.
- Clears the buffer after an idle timeout.
- Copies the current buffer when a hotkey is pressed.
Notes:
- This reconstructs text from key events and is approximate by design.
- On Linux Wayland, global key listeners can be restricted by compositor policy.
"""
from __future__ import annotations
import argparse
import os
import signal
import shutil
import subprocess
import sys
import threading
import time
from dataclasses import dataclass
from typing import Iterable, Optional
from pynput import keyboard
@dataclass
class Config:
hotkey: str
idle_seconds: float
max_buffer: int
poll_seconds: float
verbose: bool
class ClipboardError(RuntimeError):
pass
def _first_available_cmd(candidates: Iterable[list[str]]) -> Optional[list[str]]:
for cmd in candidates:
if _which(cmd[0]):
return cmd
return None
def _which(binary: str) -> Optional[str]:
resolved = shutil.which(binary)
if resolved:
return resolved
# Common locations for user-managed installs (e.g., Homebrew on Linux).
fallback_dirs = (
"/home/linuxbrew/.linuxbrew/bin",
"/usr/local/bin",
"/usr/bin",
"/bin",
)
for path in fallback_dirs:
candidate = os.path.join(path, binary)
if os.path.isfile(candidate) and os.access(candidate, os.X_OK):
return candidate
return None
def copy_to_clipboard(text: str) -> None:
# Use native clipboard tooling per platform when possible.
if sys.platform == "darwin":
cmd = _first_available_cmd([["pbcopy"]])
elif os.name == "nt":
cmd = _first_available_cmd([["clip"]])
else:
# Prefer wl-copy on Wayland, fallback to xclip/xsel.
cmd = _first_available_cmd(
[
["wl-copy", "--type", "text/plain"],
["xclip", "-selection", "clipboard"],
["xsel", "--clipboard", "--input"],
]
)
if not cmd:
if sys.platform == "darwin":
raise ClipboardError("No clipboard tool found. Expected: pbcopy.")
if os.name == "nt":
raise ClipboardError("No clipboard tool found. Expected: clip.exe.")
raise ClipboardError("No clipboard tool found. Install wl-clipboard, xclip, or xsel.")
proc = subprocess.run(cmd, input=text.encode("utf-8"), capture_output=True, check=False)
if proc.returncode != 0:
stderr = proc.stderr.decode("utf-8", errors="replace").strip()
raise ClipboardError(f"Clipboard command failed ({' '.join(cmd)}): {stderr}")
class KeyHistoryDaemon:
def __init__(self, config: Config) -> None:
self.config = config
self._buffer: list[str] = []
self._lock = threading.Lock()
self._last_input_ts: float = 0.0
self._stop_event = threading.Event()
self._ctrl_pressed = False
self._alt_pressed = False
self._meta_pressed = False
self._listener: Optional[keyboard.Listener] = None
self._hotkey: Optional[keyboard.HotKey] = None
def run(self) -> int:
self._log("Starting daemon")
self._log(
f"Hotkey={self.config.hotkey} idle_seconds={self.config.idle_seconds} max_buffer={self.config.max_buffer}"
)
self._hotkey = keyboard.HotKey(
keyboard.HotKey.parse(self.config.hotkey), self._on_hotkey_activated
)
# Keep the listener object around so canonical() is available in callbacks.
self._listener = keyboard.Listener(on_press=self._on_press, on_release=self._on_release)
idle_thread = threading.Thread(target=self._idle_loop, daemon=True)
idle_thread.start()
self._listener.start()
self._listener.join()
self._stop_event.set()
idle_thread.join(timeout=self.config.poll_seconds * 2)
self._log("Stopped daemon")
return 0
def stop(self) -> None:
self._stop_event.set()
if self._listener:
self._listener.stop()
def _idle_loop(self) -> None:
while not self._stop_event.is_set():
time.sleep(self.config.poll_seconds)
with self._lock:
if not self._buffer:
continue
if self._last_input_ts <= 0:
continue
if time.time() - self._last_input_ts >= self.config.idle_seconds:
self._buffer.clear()
self._last_input_ts = 0.0
self._log("Idle timeout reached; buffer cleared")
def _on_hotkey_activated(self) -> None:
with self._lock:
text = "".join(self._buffer)
if not text:
self._log("Hotkey pressed; buffer is empty")
return
try:
copy_to_clipboard(text)
self._log(f"Copied {len(text)} characters to clipboard")
except ClipboardError as exc:
self._log(str(exc), is_error=True)
def _on_press(self, key: keyboard.Key | keyboard.KeyCode) -> None:
if self._listener is None or self._hotkey is None:
return
canonical = self._listener.canonical(key)
self._hotkey.press(canonical)
self._update_modifier_state(key, pressed=True)
# Ignore shortcut chords (Ctrl/Alt/Meta held) for buffer reconstruction.
if self._ctrl_pressed or self._alt_pressed or self._meta_pressed:
return
char_to_add: Optional[str] = None
remove_last = False
if isinstance(key, keyboard.KeyCode) and key.char:
char_to_add = key.char
elif key == keyboard.Key.space:
char_to_add = " "
elif key == keyboard.Key.enter:
char_to_add = "\n"
elif key == keyboard.Key.tab:
char_to_add = "\t"
elif key == keyboard.Key.backspace:
remove_last = True
if char_to_add is None and not remove_last:
return
with self._lock:
if remove_last:
if self._buffer:
self._buffer.pop()
self._last_input_ts = time.time()
return
self._buffer.append(char_to_add)
if len(self._buffer) > self.config.max_buffer:
overflow = len(self._buffer) - self.config.max_buffer
del self._buffer[:overflow]
self._last_input_ts = time.time()
def _on_release(self, key: keyboard.Key | keyboard.KeyCode) -> None:
if self._listener is None or self._hotkey is None:
return
canonical = self._listener.canonical(key)
self._hotkey.release(canonical)
self._update_modifier_state(key, pressed=False)
def _update_modifier_state(self, key: keyboard.Key | keyboard.KeyCode, *, pressed: bool) -> None:
if key in (keyboard.Key.ctrl, keyboard.Key.ctrl_l, keyboard.Key.ctrl_r):
self._ctrl_pressed = pressed
elif key in (keyboard.Key.alt, keyboard.Key.alt_l, keyboard.Key.alt_r):
self._alt_pressed = pressed
elif key in (keyboard.Key.cmd, keyboard.Key.cmd_l, keyboard.Key.cmd_r):
self._meta_pressed = pressed
def _log(self, message: str, *, is_error: bool = False) -> None:
if not self.config.verbose and not is_error:
return
stream = sys.stderr if is_error else sys.stdout
print(f"[keycopy] {message}", file=stream, flush=True)
def parse_args() -> Config:
parser = argparse.ArgumentParser(description="Global typed-text buffer copier")
parser.add_argument(
"--hotkey",
default=os.environ.get("KEYCOPY_HOTKEY", "<ctrl>+<shift>+g"),
help='pynput hotkey format (default: "<ctrl>+<shift>+g")',
)
parser.add_argument(
"--idle-seconds",
type=float,
default=float(os.environ.get("KEYCOPY_IDLE_SECONDS", "3")),
help="Clear buffer after this many idle seconds (default: 3)",
)
parser.add_argument(
"--max-buffer",
type=int,
default=int(os.environ.get("KEYCOPY_MAX_BUFFER", "4096")),
help="Max tracked characters before trimming oldest (default: 4096)",
)
parser.add_argument(
"--poll-seconds",
type=float,
default=float(os.environ.get("KEYCOPY_POLL_SECONDS", "0.1")),
help="Idle timer polling interval (default: 0.1)",
)
parser.add_argument(
"--quiet",
action="store_true",
help="Disable informational logs",
)
args = parser.parse_args()
if args.idle_seconds <= 0:
parser.error("--idle-seconds must be > 0")
if args.max_buffer <= 0:
parser.error("--max-buffer must be > 0")
if args.poll_seconds <= 0:
parser.error("--poll-seconds must be > 0")
return Config(
hotkey=args.hotkey,
idle_seconds=args.idle_seconds,
max_buffer=args.max_buffer,
poll_seconds=args.poll_seconds,
verbose=not args.quiet,
)
def main() -> int:
config = parse_args()
daemon = KeyHistoryDaemon(config)
def _handle_signal(_signum: int, _frame: object) -> None:
daemon.stop()
signal.signal(signal.SIGINT, _handle_signal)
signal.signal(signal.SIGTERM, _handle_signal)
return daemon.run()
if __name__ == "__main__":
raise SystemExit(main())