-
-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Dispatcher implementations #1017
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
11 commits
Select commit
Hold shift + click to select a range
b72b7d9
Add StrictRateLimitedDispatcher
GarthSnyder c0915d7
Add ConcurrencyLimitedDispatcher
GarthSnyder 3f118b9
StrictRateLimitedDispatcher now schedules only on execution
GarthSnyder b68b53a
Refactor rate limited dispatchers onto common base
GarthSnyder d1cf69b
Limiter dispatchers complete and tested
GarthSnyder fbcfd08
Add CoreDataDispatcher
GarthSnyder 7f0dcdc
Grooming, doc comments, NSLock -> DispatchQueue
GarthSnyder 1e9e9c7
Break Dispatcher implementation tests into separate classes to allow …
GarthSnyder fdab7f3
Move time measurement outside of serialization for rate limit test
GarthSnyder abf0fa9
Update testing for Linux and slow Travis environments
GarthSnyder d9690a7
Build all v7 branches
GarthSnyder File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,7 +2,7 @@ branches: | |
| only: | ||
| - master | ||
| - v4 | ||
| - v7 | ||
| - /^v7/ | ||
| - legacy-1.x | ||
| - /^\d+\.\d+\.\d+$/ | ||
|
|
||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,37 @@ | ||
| import Foundation | ||
|
|
||
| /// A PromiseKit Dispatcher that allows no more than X simultaneous | ||
| /// executions at once. | ||
|
|
||
| class ConcurrencyLimitedDispatcher: Dispatcher { | ||
|
|
||
| let queue: Dispatcher | ||
| let serializer: DispatchQueue = DispatchQueue(label: "CLD serializer") | ||
|
|
||
| private let semaphore: DispatchSemaphore | ||
|
|
||
| /// A `PromiseKit` `Dispatcher` that allows no more than X simultaneous | ||
| /// executions at once. | ||
| /// | ||
| /// - Parameters: | ||
| /// - limit: The number of executions that may run at once. | ||
| /// - queue: The DispatchQueue or Dispatcher on which to perform executions. | ||
| /// Should be some form of concurrent queue. | ||
|
|
||
| public init(limit: Int, queue: Dispatcher = DispatchQueue.global(qos: .background)) { | ||
| self.queue = queue | ||
| semaphore = DispatchSemaphore(value: limit) | ||
| } | ||
|
|
||
| public func dispatch(_ body: @escaping () -> Void) { | ||
| serializer.async { | ||
| self.semaphore.wait() | ||
| self.queue.dispatch { | ||
| body() | ||
| self.semaphore.signal() | ||
| } | ||
| } | ||
| } | ||
|
|
||
| } | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,31 @@ | ||
| #if !os(Linux) | ||
|
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can instead add this to the exclude list in Package.swift for this target. |
||
|
|
||
| import Foundation | ||
| import CoreData | ||
|
|
||
| public extension NSManagedObjectContext { | ||
| var dispatcher: CoreDataDispatcher { | ||
| return CoreDataDispatcher(self) | ||
| } | ||
| } | ||
|
|
||
| /// A `Dispatcher` that dispatches onto the threads associated with | ||
| /// `NSManagedObjectContext`s, allowing Core Data operations to be | ||
| /// handled using promises. | ||
|
|
||
| public struct CoreDataDispatcher: Dispatcher { | ||
|
|
||
| let context: NSManagedObjectContext | ||
|
|
||
| public init(_ context: NSManagedObjectContext) { | ||
| self.context = context | ||
| } | ||
|
|
||
| public func dispatch(_ body: @escaping () -> Void) { | ||
| context.perform(body) | ||
| } | ||
|
|
||
| } | ||
|
|
||
| #endif | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,64 @@ | ||
| import Foundation | ||
|
|
||
| // Simple queue implementation with storage recovery | ||
|
|
||
| fileprivate let arraySizeWorthCompacting = 100 | ||
| fileprivate let minUtilization = 0.6 | ||
|
|
||
| struct Queue<T> { | ||
|
|
||
| var elements: [T?] = [] | ||
| var head = 0 | ||
| let maxDepth: Int? | ||
|
|
||
| init(maxDepth: Int? = nil) { | ||
| self.maxDepth = maxDepth | ||
| } | ||
|
|
||
| var isEmpty: Bool { | ||
| return head >= elements.count | ||
| } | ||
|
|
||
| var count: Int { | ||
| return elements.count - head | ||
| } | ||
|
|
||
| mutating func enqueue(_ item: T) { | ||
| elements.append(item) | ||
| if let maxDepth = maxDepth, count > maxDepth { | ||
| _ = dequeue() | ||
| } | ||
| } | ||
|
|
||
| mutating func dequeue() -> T { | ||
| assert(!isEmpty, "Dequeue attempt on an empty Queue") | ||
| defer { | ||
| elements[head] = nil | ||
| head += 1 | ||
| maybeCompactStorage() | ||
| } | ||
| return elements[head]! | ||
| } | ||
|
|
||
| private mutating func maybeCompactStorage() { | ||
| let n = elements.count | ||
| if n > arraySizeWorthCompacting && head > Int(Double(n) * (1 - minUtilization)) { | ||
| compactStorage() | ||
| } | ||
| } | ||
|
|
||
| mutating func compactStorage() { | ||
| if isEmpty { | ||
| elements.removeAll(keepingCapacity: false) | ||
| } else { | ||
| elements.removeFirst(head) | ||
| } | ||
| head = 0 | ||
| } | ||
|
|
||
| mutating func purge() { | ||
| elements.removeAll(keepingCapacity: false) | ||
| head = 0 | ||
| } | ||
|
|
||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,106 @@ | ||
| import Foundation | ||
|
|
||
| /// A `PromiseKit` `Dispatcher` that dispatches X closures every Y seconds, | ||
| /// on average. | ||
| /// | ||
| /// This implementation is O(1) in both space and time, but it uses approximate | ||
| /// time accounting. Over the long term, the rate converges to a rate of X/Y, | ||
| /// but the transient burst rate will be up to 2X/Y in some situations. | ||
| /// | ||
| /// For a completely accurate rate limiter that dispatches as rapidly as | ||
| /// possible, see `StrictRateLimitedDispatcher`. That implementation requires | ||
| /// additional storage. | ||
| /// | ||
| /// Executions are paced by start time, not by completion, so it's possible to | ||
| /// end up with more than X closures running concurrently in some circumstances. | ||
| /// | ||
| /// There is no guarantee that you will reach a given dispatch rate. There are not | ||
| /// an infinite number of threads available, and GCD scheduling has limited accuracy. | ||
| /// | ||
| /// 100% thread safe. | ||
|
|
||
| public class RateLimitedDispatcher: RateLimitedDispatcherBase { | ||
|
|
||
| private var tokensInBucket: Double = 0 | ||
| private var latestAccrual: DispatchTime = DispatchTime.now() | ||
| private var retryWorkItem: DispatchWorkItem? { willSet { retryWorkItem?.cancel() }} | ||
|
|
||
| private var tokensPerSecond: Double { return Double(maxDispatches) / interval } | ||
|
|
||
| /// A `PromiseKit` `Dispatcher` that dispatches X executions every Y | ||
| /// seconds, on average. | ||
| /// | ||
| /// This version is O(1) in space and time but uses an approximate algorithm with | ||
| /// burst rates up to 2X per Y seconds. For a more accurate implementation, use | ||
| /// `StrictRateLimitedDispatcher`. | ||
| /// | ||
| /// - Parameters: | ||
| /// - maxDispatches: The number of executions that may be dispatched within a given interval. | ||
| /// - perInterval: The length of the reference interval, in seconds. | ||
| /// - queue: The DispatchQueue or Dispatcher on which to perform executions. May be serial or concurrent. | ||
|
|
||
| override init(maxDispatches: Int, perInterval interval: TimeInterval, queue: Dispatcher = DispatchQueue.global()) { | ||
| latestAccrual = DispatchTime.now() | ||
| super.init(maxDispatches: maxDispatches, perInterval: interval, queue: queue) | ||
| tokensInBucket = Double(maxDispatches) | ||
| } | ||
|
|
||
| override func dispatchFromQueue() { | ||
|
|
||
| guard undispatched.count > 0 else { return } | ||
| cleanupNonce += 1 | ||
|
|
||
| let now = DispatchTime.now() | ||
| let tokensToAdd = (now - latestAccrual) * tokensPerSecond | ||
| tokensInBucket = min(Double(maxDispatches - nDispatched), tokensInBucket + tokensToAdd) | ||
| latestAccrual = now | ||
|
|
||
| // print("runqueue \(now.rawValue), nDispatched = \(nDispatched), tokens = \(tokensInBucket), undispatched = \(undispatched.count)") | ||
|
|
||
| var didDispatch = false | ||
| while tokensInBucket >= 1.0 && !undispatched.isEmpty && nDispatched < maxDispatches { | ||
| didDispatch = true | ||
| tokensInBucket -= 1.0 | ||
| nDispatched += 1 | ||
| let body = undispatched.dequeue() | ||
| queue.dispatch { | ||
| self.serializer.async { | ||
| self.recordActualStart() | ||
| } | ||
| body() | ||
| } | ||
| } | ||
|
|
||
| if !didDispatch { | ||
| scheduleRetry() | ||
| } | ||
|
|
||
| } | ||
|
|
||
| private func scheduleRetry() { | ||
| guard retryWorkItem == nil && !undispatched.isEmpty && nDispatched < maxDispatches else { return } | ||
| let tokenDeficit = 1 - tokensInBucket | ||
| let secondsToGo = tokenDeficit / tokensPerSecond | ||
| let deadline = latestAccrual + secondsToGo + 1.0E-6 | ||
| retryWorkItem = DispatchWorkItem { [weak self] in | ||
| self?.retryWorkItem = nil | ||
| self?.dispatchFromQueue() | ||
| } | ||
| serializer.asyncAfter(deadline: deadline, execute: retryWorkItem!) | ||
| } | ||
|
|
||
| override func cleanup(_ nonce: Int64) { | ||
| super.cleanup(nonce) | ||
| guard nonce == cleanupNonce else { return } | ||
| tokensInBucket = Double(maxDispatches) // Avoid accumulating roundoff errors | ||
| } | ||
|
|
||
| } | ||
|
|
||
| internal extension DispatchTime { | ||
| static func -(a: DispatchTime, b: DispatchTime) -> TimeInterval { | ||
| let delta = a.uptimeNanoseconds - b.uptimeNanoseconds | ||
| return TimeInterval(delta) / 1_000_000_000 | ||
| } | ||
| } | ||
|
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,57 @@ | ||
| import Foundation | ||
|
|
||
| public class RateLimitedDispatcherBase: Dispatcher { | ||
|
|
||
| let maxDispatches: Int | ||
| let interval: TimeInterval | ||
| let queue: Dispatcher | ||
|
|
||
| internal let serializer = DispatchQueue(label: "RLD serializer") | ||
|
|
||
| internal var nDispatched = 0 | ||
| internal var undispatched = Queue<() -> Void>() | ||
|
|
||
| internal var cleanupNonce: Int64 = 0 | ||
| internal var cleanupWorkItem: DispatchWorkItem? { willSet { cleanupWorkItem?.cancel() }} | ||
|
|
||
| public init(maxDispatches: Int, perInterval interval: TimeInterval, queue: Dispatcher = DispatchQueue.global()) { | ||
| self.maxDispatches = maxDispatches | ||
| self.interval = interval | ||
| self.queue = queue | ||
| } | ||
|
|
||
| public func dispatch(_ body: @escaping () -> Void) { | ||
| serializer.async { | ||
| self.undispatched.enqueue(body) | ||
| self.dispatchFromQueue() | ||
| } | ||
| } | ||
|
|
||
| internal func dispatchFromQueue() { | ||
| fatalError("Subclass responsibility") | ||
| } | ||
|
|
||
| internal func recordActualStart() { | ||
| nDispatched -= 1 | ||
| dispatchFromQueue() | ||
| if nDispatched == 0 && undispatched.isEmpty { | ||
| scheduleCleanup() | ||
| } | ||
| } | ||
|
|
||
| internal func scheduleCleanup() { | ||
| cleanupWorkItem = DispatchWorkItem { [ weak self, nonce = self.cleanupNonce ] in | ||
| self?.cleanup(nonce) | ||
| } | ||
| serializer.asyncAfter(deadline: DispatchTime.now() + interval, execute: cleanupWorkItem!) | ||
| } | ||
|
|
||
| internal func cleanup(_ nonce: Int64) { | ||
| // Calls to cleanup() have to go through the serializer queue, so by by the time | ||
| // we get here, more activity may have occurred. Ergo, verify nonce. | ||
| guard nonce == cleanupNonce else { return } | ||
| undispatched.compactStorage() | ||
| cleanupWorkItem = nil | ||
| } | ||
|
|
||
| } |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, we can remove the
when()that does this manually. I figure. Haven't analyzed them both.