Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ branches:
only:
- master
- v4
- v7
- /^v7/
- legacy-1.x
- /^\d+\.\d+\.\d+$/

Expand Down
37 changes: 37 additions & 0 deletions Sources/ConcurrencyLimitedDispatcher.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import Foundation

/// A PromiseKit Dispatcher that allows no more than X simultaneous
/// executions at once.

class ConcurrencyLimitedDispatcher: Dispatcher {
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, we can remove the when() that does this manually. I figure. Haven't analyzed them both.


let queue: Dispatcher
let serializer: DispatchQueue = DispatchQueue(label: "CLD serializer")

private let semaphore: DispatchSemaphore

/// A `PromiseKit` `Dispatcher` that allows no more than X simultaneous
/// executions at once.
///
/// - Parameters:
/// - limit: The number of executions that may run at once.
/// - queue: The DispatchQueue or Dispatcher on which to perform executions.
/// Should be some form of concurrent queue.

public init(limit: Int, queue: Dispatcher = DispatchQueue.global(qos: .background)) {
self.queue = queue
semaphore = DispatchSemaphore(value: limit)
}

public func dispatch(_ body: @escaping () -> Void) {
serializer.async {
self.semaphore.wait()
self.queue.dispatch {
body()
self.semaphore.signal()
}
}
}

}

31 changes: 31 additions & 0 deletions Sources/CoreDataDispatcher.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#if !os(Linux)
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can instead add this to the exclude list in Package.swift for this target.


import Foundation
import CoreData

public extension NSManagedObjectContext {
var dispatcher: CoreDataDispatcher {
return CoreDataDispatcher(self)
}
}

/// A `Dispatcher` that dispatches onto the threads associated with
/// `NSManagedObjectContext`s, allowing Core Data operations to be
/// handled using promises.

public struct CoreDataDispatcher: Dispatcher {

let context: NSManagedObjectContext

public init(_ context: NSManagedObjectContext) {
self.context = context
}

public func dispatch(_ body: @escaping () -> Void) {
context.perform(body)
}

}

#endif

64 changes: 64 additions & 0 deletions Sources/Queue.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import Foundation

// Simple queue implementation with storage recovery

fileprivate let arraySizeWorthCompacting = 100
fileprivate let minUtilization = 0.6

struct Queue<T> {

var elements: [T?] = []
var head = 0
let maxDepth: Int?

init(maxDepth: Int? = nil) {
self.maxDepth = maxDepth
}

var isEmpty: Bool {
return head >= elements.count
}

var count: Int {
return elements.count - head
}

mutating func enqueue(_ item: T) {
elements.append(item)
if let maxDepth = maxDepth, count > maxDepth {
_ = dequeue()
}
}

mutating func dequeue() -> T {
assert(!isEmpty, "Dequeue attempt on an empty Queue")
defer {
elements[head] = nil
head += 1
maybeCompactStorage()
}
return elements[head]!
}

private mutating func maybeCompactStorage() {
let n = elements.count
if n > arraySizeWorthCompacting && head > Int(Double(n) * (1 - minUtilization)) {
compactStorage()
}
}

mutating func compactStorage() {
if isEmpty {
elements.removeAll(keepingCapacity: false)
} else {
elements.removeFirst(head)
}
head = 0
}

mutating func purge() {
elements.removeAll(keepingCapacity: false)
head = 0
}

}
106 changes: 106 additions & 0 deletions Sources/RateLimitedDispatcher.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
import Foundation

/// A `PromiseKit` `Dispatcher` that dispatches X closures every Y seconds,
/// on average.
///
/// This implementation is O(1) in both space and time, but it uses approximate
/// time accounting. Over the long term, the rate converges to a rate of X/Y,
/// but the transient burst rate will be up to 2X/Y in some situations.
///
/// For a completely accurate rate limiter that dispatches as rapidly as
/// possible, see `StrictRateLimitedDispatcher`. That implementation requires
/// additional storage.
///
/// Executions are paced by start time, not by completion, so it's possible to
/// end up with more than X closures running concurrently in some circumstances.
///
/// There is no guarantee that you will reach a given dispatch rate. There are not
/// an infinite number of threads available, and GCD scheduling has limited accuracy.
///
/// 100% thread safe.

public class RateLimitedDispatcher: RateLimitedDispatcherBase {

private var tokensInBucket: Double = 0
private var latestAccrual: DispatchTime = DispatchTime.now()
private var retryWorkItem: DispatchWorkItem? { willSet { retryWorkItem?.cancel() }}

private var tokensPerSecond: Double { return Double(maxDispatches) / interval }

/// A `PromiseKit` `Dispatcher` that dispatches X executions every Y
/// seconds, on average.
///
/// This version is O(1) in space and time but uses an approximate algorithm with
/// burst rates up to 2X per Y seconds. For a more accurate implementation, use
/// `StrictRateLimitedDispatcher`.
///
/// - Parameters:
/// - maxDispatches: The number of executions that may be dispatched within a given interval.
/// - perInterval: The length of the reference interval, in seconds.
/// - queue: The DispatchQueue or Dispatcher on which to perform executions. May be serial or concurrent.

override init(maxDispatches: Int, perInterval interval: TimeInterval, queue: Dispatcher = DispatchQueue.global()) {
latestAccrual = DispatchTime.now()
super.init(maxDispatches: maxDispatches, perInterval: interval, queue: queue)
tokensInBucket = Double(maxDispatches)
}

override func dispatchFromQueue() {

guard undispatched.count > 0 else { return }
cleanupNonce += 1

let now = DispatchTime.now()
let tokensToAdd = (now - latestAccrual) * tokensPerSecond
tokensInBucket = min(Double(maxDispatches - nDispatched), tokensInBucket + tokensToAdd)
latestAccrual = now

// print("runqueue \(now.rawValue), nDispatched = \(nDispatched), tokens = \(tokensInBucket), undispatched = \(undispatched.count)")

var didDispatch = false
while tokensInBucket >= 1.0 && !undispatched.isEmpty && nDispatched < maxDispatches {
didDispatch = true
tokensInBucket -= 1.0
nDispatched += 1
let body = undispatched.dequeue()
queue.dispatch {
self.serializer.async {
self.recordActualStart()
}
body()
}
}

if !didDispatch {
scheduleRetry()
}

}

private func scheduleRetry() {
guard retryWorkItem == nil && !undispatched.isEmpty && nDispatched < maxDispatches else { return }
let tokenDeficit = 1 - tokensInBucket
let secondsToGo = tokenDeficit / tokensPerSecond
let deadline = latestAccrual + secondsToGo + 1.0E-6
retryWorkItem = DispatchWorkItem { [weak self] in
self?.retryWorkItem = nil
self?.dispatchFromQueue()
}
serializer.asyncAfter(deadline: deadline, execute: retryWorkItem!)
}

override func cleanup(_ nonce: Int64) {
super.cleanup(nonce)
guard nonce == cleanupNonce else { return }
tokensInBucket = Double(maxDispatches) // Avoid accumulating roundoff errors
}

}

internal extension DispatchTime {
static func -(a: DispatchTime, b: DispatchTime) -> TimeInterval {
let delta = a.uptimeNanoseconds - b.uptimeNanoseconds
return TimeInterval(delta) / 1_000_000_000
}
}

57 changes: 57 additions & 0 deletions Sources/RateLimitedDispatcherBase.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import Foundation

public class RateLimitedDispatcherBase: Dispatcher {

let maxDispatches: Int
let interval: TimeInterval
let queue: Dispatcher

internal let serializer = DispatchQueue(label: "RLD serializer")

internal var nDispatched = 0
internal var undispatched = Queue<() -> Void>()

internal var cleanupNonce: Int64 = 0
internal var cleanupWorkItem: DispatchWorkItem? { willSet { cleanupWorkItem?.cancel() }}

public init(maxDispatches: Int, perInterval interval: TimeInterval, queue: Dispatcher = DispatchQueue.global()) {
self.maxDispatches = maxDispatches
self.interval = interval
self.queue = queue
}

public func dispatch(_ body: @escaping () -> Void) {
serializer.async {
self.undispatched.enqueue(body)
self.dispatchFromQueue()
}
}

internal func dispatchFromQueue() {
fatalError("Subclass responsibility")
}

internal func recordActualStart() {
nDispatched -= 1
dispatchFromQueue()
if nDispatched == 0 && undispatched.isEmpty {
scheduleCleanup()
}
}

internal func scheduleCleanup() {
cleanupWorkItem = DispatchWorkItem { [ weak self, nonce = self.cleanupNonce ] in
self?.cleanup(nonce)
}
serializer.asyncAfter(deadline: DispatchTime.now() + interval, execute: cleanupWorkItem!)
}

internal func cleanup(_ nonce: Int64) {
// Calls to cleanup() have to go through the serializer queue, so by by the time
// we get here, more activity may have occurred. Ergo, verify nonce.
guard nonce == cleanupNonce else { return }
undispatched.compactStorage()
cleanupWorkItem = nil
}

}
Loading