How to limit concurrent live URLSessions with Combine?

0

I have a lot (~200) urls for images, and I need to download each one, then process (resize) it, then update the cache. The thing is - I only want to have at max 3 requests at once, and since the images are heavy, I also don't want a lot of responses "hanging" waiting to be processed (and taking memory...).

TLDR I want to call the next (4th) network request only after the receiveValue in the sink is called on one of the first 3 requests... (ie after the network response & processing are both done...).

Will this flow work, and will it hold on to the waiting urls and not drop them on the floor?

Also do I need that buffer() call? I use it after seeing this answer: https://stackoverflow.com/a/67011837/2242359

wayTooManyURLsToHandleAtOnce // this is a `[URL]`
    .publisher
    .buffer(size: .max, prefetch: .byRequest, whenFull: .dropNewest) // NEEDED?
    .flatMap(maxPublishers: .max(3)) { url in
       URLSession.shared
           .dataTaskPublisher(for: url)
           .map { (data: Data, _) -> Picture in
               Picture(from: data)
           }
    }
    .tryCompactMap {
        resizeImage(picture: $0) // takes a while and might fail
    }
    .receive(on: DispatchQueue.main)
    .sink { completion
        // handling completion... 
    } receiveValue: { resizedImage
        self.cache.append(resizedImage)
    }
    .store(...)
combine swift urlsession
2021-11-23 22:14:45
1

0

I would use a subject. This not an optimal solution but it looks working and maybe will trigger other ideas

var cancellable: AnyCancellable?

var urls: [String] = (0...6).map { _ in "http://httpbin.org/delay/" + String((0...2).randomElement()!) }

var subject: PassthroughSubject<[String], Never> = .init()

let maxConcurrentRequests = 3

override func viewDidAppear(_ animated: Bool) {
    super.viewDidAppear(animated)
    
    print(urls)
    
    cancellable = subject
        .flatMap({ urls -> AnyPublisher<[URLSession.DataTaskPublisher.Output], URLError> in
            let requests = urls.map { URLSession.shared.dataTaskPublisher(for: URL.init(string: $0)!) }
            return Publishers.MergeMany(requests)
                .collect().eraseToAnyPublisher()
        })
        .print()
        .sink(receiveCompletion: { completion in
            print(completion)
        }, receiveValue: { value in
            print(value)
            if self.urls.count <= self.maxConcurrentRequests {
                self.urls.removeAll()
                self.subject.send(completion: .finished)
            } else {
                self.urls.removeLast(self.maxConcurrentRequests)
                self.subject.send(self.urls.suffix(self.maxConcurrentRequests))
            }
        })
    
    subject.send(urls.suffix(maxConcurrentRequests))
}
2021-11-24 11:30:11

Wouldn't calling self.subject.send(completion: .finished) on the sink end my subscription forever? (ie ignoring future values emitted)
Aviel Gross

@AvielGross Yes, it does. I understood that your collection of urls created once per view controller present / dismiss. If its not true then do not send ".finished" but you have a trigger like didSet on urls array or another to relaunch with subject.send after emptying the array and refilling it.
Blazej SLEBODA

In other languages

This page is in other languages

Русский
..................................................................................................................
Italiano
..................................................................................................................
Polski
..................................................................................................................
Română
..................................................................................................................
한국어
..................................................................................................................
हिन्दी
..................................................................................................................
Français
..................................................................................................................
Türk
..................................................................................................................
Česk
..................................................................................................................
Português
..................................................................................................................
ไทย
..................................................................................................................
中文
..................................................................................................................
Español
..................................................................................................................
Slovenský
..................................................................................................................