Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions CombineExt.xcodeproj/project.pbxproj
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
1970A8B42524730500799AB6 /* FilterManyTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 1970A8B32524730400799AB6 /* FilterManyTests.swift */; };
712E36C82711B79000A2AAFE /* RetryWhen.swift in Sources */ = {isa = PBXBuildFile; fileRef = 712E36C72711B79000A2AAFE /* RetryWhen.swift */; };
7182326F26DAAF230026BAD3 /* RetryWhenTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 7182326E26DAAF230026BAD3 /* RetryWhenTests.swift */; };
63FEBC9327E9FCDB00E934AD /* FlatMapFirst.swift in Sources */ = {isa = PBXBuildFile; fileRef = 63FEBC9227E9FCDB00E934AD /* FlatMapFirst.swift */; };
63FEBC9527E9FE9000E934AD /* FlatMapFirstTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 63FEBC9427E9FE9000E934AD /* FlatMapFirstTests.swift */; };
BF330EF624F1FFFE001281FC /* CombineSchedulers in Frameworks */ = {isa = PBXBuildFile; productRef = BF330EF524F1FFFE001281FC /* CombineSchedulers */; };
BF330EF924F20032001281FC /* Timer.swift in Sources */ = {isa = PBXBuildFile; fileRef = BF330EF824F20032001281FC /* Timer.swift */; };
BF330EFB24F20080001281FC /* Lock.swift in Sources */ = {isa = PBXBuildFile; fileRef = BF330EFA24F20080001281FC /* Lock.swift */; };
BF3D3B5D253B83F300D830ED /* IgnoreFailure.swift in Sources */ = {isa = PBXBuildFile; fileRef = BF3D3B5C253B83F300D830ED /* IgnoreFailure.swift */; };
Expand Down Expand Up @@ -120,6 +123,8 @@
1970A8B32524730400799AB6 /* FilterManyTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = FilterManyTests.swift; sourceTree = "<group>"; };
712E36C72711B79000A2AAFE /* RetryWhen.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = RetryWhen.swift; sourceTree = "<group>"; };
7182326E26DAAF230026BAD3 /* RetryWhenTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = RetryWhenTests.swift; sourceTree = "<group>"; };
63FEBC9227E9FCDB00E934AD /* FlatMapFirst.swift */ = {isa = PBXFileReference; indentWidth = 4; lastKnownFileType = sourcecode.swift; path = FlatMapFirst.swift; sourceTree = "<group>"; tabWidth = 2; };
63FEBC9427E9FE9000E934AD /* FlatMapFirstTests.swift */ = {isa = PBXFileReference; indentWidth = 4; lastKnownFileType = sourcecode.swift; path = FlatMapFirstTests.swift; sourceTree = "<group>"; };
BF330EF824F20032001281FC /* Timer.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = Timer.swift; sourceTree = "<group>"; };
BF330EFA24F20080001281FC /* Lock.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = Lock.swift; sourceTree = "<group>"; };
BF3D3B5C253B83F300D830ED /* IgnoreFailure.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = IgnoreFailure.swift; sourceTree = "<group>"; };
Expand Down Expand Up @@ -284,6 +289,9 @@
OBJ_31 /* Toggle.swift */,
OBJ_32 /* WithLatestFrom.swift */,
OBJ_33 /* ZipMany.swift */,
1970A8A925246FBD00799AB6 /* FilterMany.swift */,
BFADDC8025BCE4C200465E9B /* FlatMapBatches.swift */,
63FEBC9227E9FCDB00E934AD /* FlatMapFirst.swift */,
);
path = Operators;
sourceTree = "<group>";
Expand Down Expand Up @@ -341,6 +349,8 @@
OBJ_59 /* ToggleTests.swift */,
OBJ_60 /* WithLatestFromTests.swift */,
OBJ_61 /* ZipManyTests.swift */,
BFADDC8A25BCE91E00465E9B /* FlatMapBatchesTests.swift */,
63FEBC9427E9FE9000E934AD /* FlatMapFirstTests.swift */,
);
path = Tests;
sourceTree = SOURCE_ROOT;
Expand Down Expand Up @@ -599,6 +609,7 @@
EA0D86D6287D19DC0085356E /* MapToValueTests.swift in Sources */,
OBJ_139 /* ShareReplayTests.swift in Sources */,
EA0D86D5287D19DC0085356E /* MapToResultTests.swift in Sources */,
63FEBC9527E9FE9000E934AD /* FlatMapFirstTests.swift in Sources */,
BFADDC8B25BCE91E00465E9B /* FlatMapBatchesTests.swift in Sources */,
OBJ_140 /* ToggleTests.swift in Sources */,
OBJ_141 /* WithLatestFromTests.swift in Sources */,
Expand Down Expand Up @@ -637,6 +648,7 @@
BF43CC1525008B4F005AFA28 /* IgnoreOutputSetOutputType.swift in Sources */,
OBJ_93 /* Partition.swift in Sources */,
OBJ_94 /* PrefixDuration.swift in Sources */,
63FEBC9327E9FCDB00E934AD /* FlatMapFirst.swift in Sources */,
OBJ_95 /* RemoveAllDuplicates.swift in Sources */,
OBJ_96 /* SetOutputType.swift in Sources */,
OBJ_97 /* ShareReplay.swift in Sources */,
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

52 changes: 52 additions & 0 deletions Sources/Operators/FlatMapFirst.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
//
// FlatMapFirst.swift
// CombineExt
//
// Created by Martin Troup on 22/03/2022.
// Copyright © 2020 Combine Community. All rights reserved.
//

#if canImport(Combine)
import Combine
import Foundation

@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
public extension Publisher {
/// The operator is a special case of `flatMap` operator.
///
/// Like `flatMapLatest`, it only allows one inner publisher at a time. Unlike `flatMapLatest`, it will not cancel an ongoing inner publisher.
/// Instead it ignores events from the source until the inner publisher is done. It creates another inner publisher only when the previous one is done.
///
/// - Returns: A publisher emitting the values of a single inner publisher at a time (until the inner publisher finishes).
func flatMapFirst<P: Publisher>(
_ transform: @escaping (Output) -> P
) -> Publishers.FlatMap<Publishers.HandleEvents<P>, Publishers.Filter<Self>>
where Self.Failure == P.Failure {
var isRunning = false
let lock = NSRecursiveLock()

func set(isRunning newValue: Bool) {
defer { lock.unlock() }
lock.lock()

isRunning = newValue
}

return filter { _ in !isRunning }
.flatMap { output in
transform(output)
.handleEvents(
receiveSubscription: { _ in
set(isRunning: true)
},
receiveCompletion: { _ in
set(isRunning: false)
},
receiveCancel: {
set(isRunning: false)
}
)
}
}
}
#endif
128 changes: 128 additions & 0 deletions Tests/FlatMapFirstTests.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
//
// FlatMapFirstTests.swift
// CombineExtTests
//
// Created by Martin Troup on 22/03/2022.
// Copyright © 2020 Combine Community. All rights reserved.
//

#if !os(watchOS)
import Combine
import CombineSchedulers
import Foundation
import XCTest

@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
class FlatMapFirstTests: XCTestCase {
var cancellables: Set<AnyCancellable>!

override func setUp() {
super.setUp()

cancellables = []
}

struct TestError: Error, Equatable {}

func testSingleUpstreamSingleFlatMap() {
let testScheduler = DispatchQueue.test

var innerPublisherSubscriptionCount = 0
var innerPublisherCompletionCount = 0
var isUpstreamCompleted = false

Just("").setFailureType(to: Never.self)
.delay(for: 1, scheduler: testScheduler)
.flatMapFirst { _ -> AnyPublisher<Date, Never> in
return Just(Date())
.delay(for: 1, scheduler: testScheduler)
.handleEvents(
receiveSubscription: { _ in innerPublisherSubscriptionCount += 1 },
receiveCompletion: { _ in innerPublisherCompletionCount += 1 }
)
.eraseToAnyPublisher()
}
.sink(
receiveCompletion: { completion in
if case .finished = completion {
isUpstreamCompleted = true
}
},
receiveValue: { _ in }
)
.store(in: &cancellables)

testScheduler.advance(by: 2)

XCTAssertEqual(innerPublisherSubscriptionCount, 1)
XCTAssertEqual(innerPublisherCompletionCount, 1)
XCTAssertTrue(isUpstreamCompleted)
}

func testErrorUpstreamSkippingFlatMap() {
let testScheduler = DispatchQueue.test

var innerPublisherSubscriptionCount = 0
var isUpstreamCompleted = false

Fail(error: TestError()).eraseToAnyPublisher()
.delay(for: 1, scheduler: testScheduler)
.flatMapFirst { (_: String) -> AnyPublisher<Date, TestError> in
return Just(Date()).setFailureType(to: TestError.self)
.handleEvents(receiveSubscription: { _ in innerPublisherSubscriptionCount += 1 })
.eraseToAnyPublisher()
}
.sink(
receiveCompletion: { completion in
if case let .failure(error) = completion {
XCTAssertEqual(error, TestError())
isUpstreamCompleted = true
}
},
receiveValue: { _ in }
)
.store(in: &cancellables)

testScheduler.advance(by: 1)

XCTAssertEqual(innerPublisherSubscriptionCount, 0)
XCTAssertTrue(isUpstreamCompleted)
}

func testStandardProcessingOfFlatMapFirst() {
let testScheduler = DispatchQueue.test

var innerPublisherSubscriptionCount = 0
var innerPublisherCompletionCount = 0
var isUpstreamCompleted = false

testScheduler.timerPublisher(every: 1)
.autoconnect()
.prefix(100)
.flatMapFirst { _ -> AnyPublisher<Date, Never> in
return Just(Date())
.handleEvents(
receiveSubscription: { _ in innerPublisherSubscriptionCount += 1 },
receiveCompletion: { _ in innerPublisherCompletionCount += 1 }
)
.delay(for: 10, scheduler: testScheduler)
.eraseToAnyPublisher()
}
.sink(
receiveCompletion: { completion in
if case .finished = completion {
isUpstreamCompleted = true
}
},
receiveValue: { _ in }
)
.store(in: &cancellables)

testScheduler.advance(by: 110)

XCTAssertEqual(innerPublisherSubscriptionCount, 10)
XCTAssertEqual(innerPublisherCompletionCount, 10)
XCTAssertTrue(isUpstreamCompleted)
}
}
#endif