Search

RxSwift - Buffering Operator

 Buffering Operator

Buffer, 즉 임시 저장 공간은 데이터를 한 곳에서 다른 한 곳으로 전송하는 동안 일시적으로 그 데이터를 보관하는 메모리 영역입니다.
Buffering Operator도 임시 저장 공간같이 과거의 요소를 구독자가 지정해둔 버퍼 크기만큼 일시적으로 보관해뒀다가 전달해주는 역할을 합니다.

 Buffering Operator의 종류

share

한 번 생성한 Observable을 공유
Observable을 share하지 않으면 subscribe할 때마다 새로운 Observable 시퀀스가 생성됩니다.
하지만, share를 사용하게 되면 Observable에서 발생한 이벤트가 버퍼에 저장되어 하나의 시퀀스에서 방출되는 아이템을 subscribe한 곳에서 공유해서 사용하게 됩니다.
sharecompleted되지 않는 Observable에 사용하는 것이 안전합니다. 또한, 공유하는 시퀀스가 completed된 후 새로운 Observable이 생성되지 않는다고 확신할 수 있을 때 사용하는 것이 좋습니다.
share를 사용하지 않는 경우
let nonShareObservable = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance) .do(onNext: { print($0) }) .take(3) .debug("nonShare") nonShareObservable .map { $0 > 0 } .subscribe { print($0) } .disposed(by: disposeBag) nonShareObservable .map { "Count:\($0)" } .subscribe { print($0) } .disposed(by: disposeBag) // subscribe할 때마다 새로운 Observable를 subscribe함 /* nonShare -> subscribed nonShare -> subscribed 0 nonShare -> Event next(0) next(false) 0 nonShare -> Event next(0) next(Count:0) 1 nonShare -> Event next(1) next(true) 1 nonShare -> Event next(1) next(Count:1) 2 nonShare -> Event next(2) next(true) nonShare -> Event completed completed nonShare -> isDisposed 2 nonShare -> Event next(2) next(Count:2) nonShare -> Event completed completed nonShare -> isDisposed */
Swift
복사
share를 사용하는 경우
let shareObservable = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance) .do(onNext: { print($0) }) .take(3) .debug("share") .share() shareObservable .map { $0 > 0 } .subscribe { print($0) } .disposed(by: disposeBag) shareObservable .map { "Count:\($0)" } .subscribe { print($0) } .disposed(by: disposeBag) /* share -> subscribed 0 share -> Event next(0) next(false) next(Count:0) 1 share -> Event next(1) next(true) next(Count:1) 2 share -> Event next(2) next(true) next(Count:2) share -> Event completed completed completed share -> isDisposed */
Swift
복사

share(replay:, scope:)

다른 시퀀스에서 share된 Observable를 구독했을 때, 가장 최근 방출했던 아이템을 버퍼의 크기만큼 새로운 구독 시퀀스에 전달
replay에는 버퍼의 크기를 받습니다. scope에는 버퍼의 생명주기가 들어갑니다.
버퍼의 생명주기는 .forever.whileConnected 두 가지가 있습니다.
.forever
subscription이 0이 되더라도 버퍼가 유지됩니다. 새로운 subscription이 subscribe를 하면 마지막에 버퍼에 남아있던 replay 개수만큼 값을 수신합니다.
stream을 캐시에서 삭제하지 않습니다.
.whileConnected
1개 이상의 subscriber가 존재하는 동안만 버퍼가 유지됩니다. subscription이 0이 되면 버퍼가 비워지고 새로운 subscription은 버퍼에 남아있던 값이 없으므로 replay 시에 새 값을 요청해 수신하게 됩니다.
공유되고 있던 stream을 캐시에서 삭제합니다.
let shareObservable = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance) .do(onNext: { print($0) }) .take(5) .debug("share") .share(replay: 2, scope: .forever) shareObservable .map { $0 > 0 } .subscribe { print($0) } .disposed(by: disposeBag) shareObservable .map { "Count:\($0)" } .subscribe { print($0) } .disposed(by: disposeBag) DispatchQueue.main.asyncAfter(deadline: .now() + 5, execute: { shareObservable .map { "new: \($0)"} .subscribe { print($0) } .disposed(by: self.disposeBag) }) /* share -> subscribed 0 share -> Event next(0) next(false) next(Count:0) ... share -> Event completed share -> isDisposed completed completed next(new: 3) // replay를 2로 설정해두어 2개의 아이템을 전달 next(new: 4) completed */
Swift
복사

replay

subscription을 공유시 지정한 버퍼 크기만큼 이벤트를 저장하고 전달
share에서 매개변수로 사용한 replay와 뜻하는 바가 동일합니다. 매개변수로 이벤트를 저장하고 싶은 만큼 버퍼 크기를 지정하면 됩니다.
ConnectableObservable이기 때문에 connect 메서드를 통해서 불리기 전까지는 구독자 수와 관계 없이 아무것도 방출하지 않습니다.
let bellyClock = Observable<Int> .interval(.seconds(1), scheduler: MainScheduler.instance) .take(until: { $0 == 6 }, behavior: .inclusive) let replay = bellyClock.replay(2) // replayAll()를 사용하게 되면 모든 이벤트를 방출 _ = replay.connect() replay .subscribe(onNext: { time in print("배꼽 시계 울린다!! \(time)") }) .disposed(by: disposeBag) replay .delaySubscription(.seconds(3), scheduler: MainScheduler.instance) .subscribe(onNext: { time in print("두번째 배꼽 시계 울린다!! \(time)") }) .disposed(by: disposeBag) /* 배꼽 시계 울린다!! 0 배꼽 시계 울린다!! 1 배꼽 시계 울린다!! 2 두번째 배꼽 시계 울린다!! 1 두번째 배꼽 시계 울린다!! 2 배꼽 시계 울린다!! 3 두번째 배꼽 시계 울린다!! 3 배꼽 시계 울린다!! 4 두번째 배꼽 시계 울린다!! 4 배꼽 시계 울린다!! 5 두번째 배꼽 시계 울린다!! 5 배꼽 시계 울린다!! 6 두번째 배꼽 시계 울린다!! 6 */
Swift
복사

publish

Observable의 시퀀스를 하나의 Subject를 통해 multicast로 이벤트를 전달
publishmulticastpublishSubject가 합쳐진 방식입니다. publishmulticastpublishSubject를 함께 사용하는 방식을 하나의 메서드로 함축시켜줍니다.
multicast + publishSubject를 사용하는 방식
let bellyClock = Observable<Int> .interval(.seconds(1), scheduler: MainScheduler.instance) .take(until: { $0 == 6 }, behavior: .inclusive) let subject = PublishSubject<Int>() let multicast = bellyClock.multicast(subject) _ = multicast.connect() multicast .subscribe(onNext: { time in print("배꼽 시계 울린다!! \(time)") }) .disposed(by: disposeBag) multicast .delaySubscription(.seconds(3), scheduler: MainScheduler.instance) .subscribe(onNext: { time in print("두번째 배꼽 시계 울린다!! \(time)") }) .disposed(by: disposeBag)
Swift
복사
publish를 사용하는 방식
let bellyClock = Observable<Int> .interval(.seconds(1), scheduler: MainScheduler.instance) .take(until: { $0 == 6 }, behavior: .inclusive) let multicast = bellyClock.publish() _ = multicast.connect() multicast .subscribe(onNext: { time in print("배꼽 시계 울린다!! \(time)") }) .disposed(by: disposeBag) multicast .delaySubscription(.seconds(3), scheduler: MainScheduler.instance) .subscribe(onNext: { time in print("두번째 배꼽 시계 울린다!! \(time)") }) .disposed(by: disposeBag)
Swift
복사
두 가지 방식 모두 같은 Output을 방출합니다.
/* 배꼽 시계 울린다!! 0 배꼽 시계 울린다!! 1 배꼽 시계 울린다!! 2 배꼽 시계 울린다!! 3 두번째 배꼽 시계 울린다!! 3 배꼽 시계 울린다!! 4 두번째 배꼽 시계 울린다!! 4 배꼽 시계 울린다!! 5 두번째 배꼽 시계 울린다!! 5 배꼽 시계 울린다!! 6 두번째 배꼽 시계 울린다!! 6 */
Swift
복사

buffer, window

이벤트를 묶음으로 전달
두 연산자의 인자는 완전히 동일하지만, 반환값이 다르게 나타납니다.
func buffer(timeSpan: RxTimeInterval, count: Int, scheduler: SchedulerType) -> Observable<[Element]> func window(timeSpan: RxTimeInterval, count: Int, scheduler: SchedulerType) -> Observable<Observable<Element>>
Swift
복사
buffer배열 형태로 이벤트를 방출하지만, window다른 Observable 형태로 이벤트를 방출합니다.
bufferwindow는 두 가지 조건 중 1가지가 만족되기 전까지는 이벤트를 방출하지 않습니다.
⓵ 첫 구독 시점 혹은 가장 최근 이벤트 발생 후로 timespan으로 지정한 시간이 지났을 경우
⓶ 첫 구독 시점 혹은 가장 최근 이벤트 발생 후로 원본 Observable에서 count만큼의 이벤트가 발생한 경우
하나의 조건을 만족하기 전까지는 원본 Observable의 이벤트는 내부 버퍼에 저장됩니다. 이벤트 발생 시점이 되면 해당 버퍼의 이벤트들을 모두 내보내고 버퍼를 비우고 다시 타이머(timespan)를 시작합니다.
buffer, window를 사용하면 이벤트를 특정 횟수나 시간 단위로 묶어서 한꺼번에 처리할 수 있게 됩니다.
buffer
let timer1 = Observable<Int> .interval(.seconds(1), scheduler: MainScheduler.instance) .take(until: { $0 == 5 }, behavior: .inclusive) .map({ "세고 있다 \($0)" }) timer1 .buffer(timeSpan: .seconds(3), count: 2, scheduler: MainScheduler.instance) .subscribe { event in switch event { case let .next(value): print(value) default: print("finished") } }.disposed(by: disposeBag) /* ["세고 있다 0", "세고 있다 1"] ["세고 있다 2", "세고 있다 3"] ["세고 있다 4", "세고 있다 5"] [] finished */
Swift
복사
window
let timer1 = Observable<Int> .interval(.seconds(1), scheduler: MainScheduler.instance) .take(until: { $0 == 5 }, behavior: .inclusive) .map({ "세고 있다 \($0)" }) timer1 .window(timeSpan: .seconds(3), count: 2, scheduler: MainScheduler.instance) .subscribe { [weak self] event in switch event { case let .next(observable): observable .subscribe { e in switch e { case let .next(value): print(value) default: print("inner finished") } } .disposed(by: self?.disposeBag ?? DisposeBag()) default: print("finished") } }.disposed(by: disposeBag) /* 세고 있다 0 세고 있다 1 inner finished 세고 있다 2 세고 있다 3 inner finished 세고 있다 4 세고 있다 5 inner finished inner finished finished */
Swift
복사