Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 35 additions & 48 deletions R/languageserver.R
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,18 @@ LanguageServer <- R6::R6Class("LanguageServer",
inputcon = NULL,
outputcon = NULL,
exit_flag = NULL,

documents = NULL,
workspace = NULL,

processId = NULL,
rootUri = NULL,
rootPath = NULL,
initializationOptions = NULL,
ClientCapabilities = NULL,
ServerCapabilities = NULL,

diagnostics_task_manager = NULL,
parse_task_manager = NULL,
resolve_task_manager = NULL,

pending_replies = NULL,

initialize = function(host, port) {
if (is.null(port)) {
logger$info("connection type: stdio")
Expand All @@ -58,34 +53,23 @@ LanguageServer <- R6::R6Class("LanguageServer",
self$inputcon <- inputcon
self$outputcon <- outputcon

cpus <- parallel::detectCores()
# Performance optimization: Allow more workers and scale with CPU count
# Old default: min(max(floor(cpus / 2), 1), 3) - capped at 3
# New default: min(max(cpus - 1, 2), 8) - scale up to 8 workers
default_pool_size <- min(max(cpus - 1, 2), 8)
pool_size <- as.integer(
Sys.getenv("R_LANGSVR_POOL_SIZE", default_pool_size))

# parse pool - increase size for better throughput
# Parse operations are CPU-bound and can benefit from parallelism
parse_pool <- if (pool_size > 0) SessionPool$new(pool_size, "parse") else NULL
# diagnostics is slower, so use a separate pool
# Diagnostics can use slightly fewer workers since they're I/O heavy
diagnostics_pool_size <- min(max(floor(pool_size * 0.75), 1), pool_size)
diagnostics_pool <- if (pool_size > 0) SessionPool$new(diagnostics_pool_size, "diagnostics") else NULL

self$parse_task_manager <- TaskManager$new("parse", parse_pool)
self$diagnostics_task_manager <- TaskManager$new("diagnostics", diagnostics_pool)
self$parse_task_manager <- TaskManager$new(
"parse",
use_session = TRUE, process_recent_first = TRUE
)
self$diagnostics_task_manager <- TaskManager$new(
"diagnostics",
use_session = TRUE, process_recent_first = TRUE
)

# no pool for resolve task
# resolve task require a new session for every task
self$resolve_task_manager <- TaskManager$new("resolve", NULL)
self$resolve_task_manager <- TaskManager$new("resolve")

self$pending_replies <- collections::dict()

super$initialize()
},

process_events = function() {
self$diagnostics_task_manager$run_tasks()
self$diagnostics_task_manager$check_tasks()
Expand All @@ -97,11 +81,8 @@ LanguageServer <- R6::R6Class("LanguageServer",
self$workspace$poll_namespace_file()
}
},

text_sync = function(
# TODO: move it to Workspace!?
uri, document, run_lintr = FALSE, parse = FALSE, delay = 0) {

text_sync = function( # TODO: move it to Workspace!?
uri, document, run_lintr = FALSE, parse = FALSE, delay = 0) {
if (!self$pending_replies$has(uri)) {
self$pending_replies$set(uri, list(
`textDocument/documentSymbol` = collections::queue(),
Expand Down Expand Up @@ -131,7 +112,6 @@ LanguageServer <- R6::R6Class("LanguageServer",
)
}
},

check_connection = function() {
if (!isOpen(self$inputcon)) {
self$exit_flag <- TRUE
Expand All @@ -142,13 +122,11 @@ LanguageServer <- R6::R6Class("LanguageServer",
self$exit_flag <- TRUE
}
},

write_text = function(text) {
# we have made effort to ensure that text is utf-8
# so text is printed as is
writeLines(text, self$outputcon, sep = "", useBytes = TRUE)
},

read_line = function() {
if (self$tcp) {
if (socketSelect(list(self$inputcon), timeout = 0)) {
Expand All @@ -160,7 +138,6 @@ LanguageServer <- R6::R6Class("LanguageServer",
stdin_read_line()
}
},

read_char = function(n) {
if (self$tcp) {
out <- readChar(self$inputcon, n, useBytes = TRUE)
Expand All @@ -170,24 +147,34 @@ LanguageServer <- R6::R6Class("LanguageServer",
stdin_read_char(n)
}
},

run = function() {
on.exit(
{
if (!is.null(self$parse_task_manager)) self$parse_task_manager$stop()
if (!is.null(self$diagnostics_task_manager)) self$diagnostics_task_manager$stop()
if (!is.null(self$resolve_task_manager)) self$resolve_task_manager$stop()
},
add = TRUE
)
while (TRUE) {
ret <- tryCatchStack({
if (isTRUE(self$exit_flag)) {
logger$info("exiting")
break
}
ret <- tryCatchStack(
{
if (isTRUE(self$exit_flag)) {
logger$info("exiting")
break
}

self$process_events()
self$process_events()

data <- self$fetch(blocking = FALSE)
if (is.null(data)) {
Sys.sleep(0.1)
next
}
self$handle_raw(data)
}, error = function(e) e)
data <- self$fetch(blocking = FALSE)
if (is.null(data)) {
Sys.sleep(0.1)
next
}
self$handle_raw(data)
},
error = function(e) e
)
if (inherits(ret, "error")) {
logger$error(ret)
logger$error("exiting")
Expand Down
Loading
Loading