You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
703 lines
22 KiB
703 lines
22 KiB
<!--- TEST_NAME ChannelsGuideTest -->
|
|
|
|
**Table of contents**
|
|
|
|
<!--- TOC -->
|
|
|
|
* [Channels](#channels)
|
|
* [Channel basics](#channel-basics)
|
|
* [Closing and iteration over channels](#closing-and-iteration-over-channels)
|
|
* [Building channel producers](#building-channel-producers)
|
|
* [Pipelines](#pipelines)
|
|
* [Prime numbers with pipeline](#prime-numbers-with-pipeline)
|
|
* [Fan-out](#fan-out)
|
|
* [Fan-in](#fan-in)
|
|
* [Buffered channels](#buffered-channels)
|
|
* [Channels are fair](#channels-are-fair)
|
|
* [Ticker channels](#ticker-channels)
|
|
|
|
<!--- END -->
|
|
|
|
## Channels
|
|
|
|
Deferred values provide a convenient way to transfer a single value between coroutines.
|
|
Channels provide a way to transfer a stream of values.
|
|
|
|
### Channel basics
|
|
|
|
A [Channel] is conceptually very similar to `BlockingQueue`. One key difference is that
|
|
instead of a blocking `put` operation it has a suspending [send][SendChannel.send], and instead of
|
|
a blocking `take` operation it has a suspending [receive][ReceiveChannel.receive].
|
|
|
|
|
|
<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
|
|
|
|
```kotlin
|
|
import kotlinx.coroutines.*
|
|
import kotlinx.coroutines.channels.*
|
|
|
|
fun main() = runBlocking {
|
|
//sampleStart
|
|
val channel = Channel<Int>()
|
|
launch {
|
|
// this might be heavy CPU-consuming computation or async logic, we'll just send five squares
|
|
for (x in 1..5) channel.send(x * x)
|
|
}
|
|
// here we print five received integers:
|
|
repeat(5) { println(channel.receive()) }
|
|
println("Done!")
|
|
//sampleEnd
|
|
}
|
|
```
|
|
|
|
</div>
|
|
|
|
> You can get the full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-channel-01.kt).
|
|
|
|
The output of this code is:
|
|
|
|
```text
|
|
1
|
|
4
|
|
9
|
|
16
|
|
25
|
|
Done!
|
|
```
|
|
|
|
<!--- TEST -->
|
|
|
|
### Closing and iteration over channels
|
|
|
|
Unlike a queue, a channel can be closed to indicate that no more elements are coming.
|
|
On the receiver side it is convenient to use a regular `for` loop to receive elements
|
|
from the channel.
|
|
|
|
Conceptually, a [close][SendChannel.close] is like sending a special close token to the channel.
|
|
The iteration stops as soon as this close token is received, so there is a guarantee
|
|
that all previously sent elements before the close are received:
|
|
|
|
<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
|
|
|
|
```kotlin
|
|
import kotlinx.coroutines.*
|
|
import kotlinx.coroutines.channels.*
|
|
|
|
fun main() = runBlocking {
|
|
//sampleStart
|
|
val channel = Channel<Int>()
|
|
launch {
|
|
for (x in 1..5) channel.send(x * x)
|
|
channel.close() // we're done sending
|
|
}
|
|
// here we print received values using `for` loop (until the channel is closed)
|
|
for (y in channel) println(y)
|
|
println("Done!")
|
|
//sampleEnd
|
|
}
|
|
```
|
|
|
|
</div>
|
|
|
|
> You can get the full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-channel-02.kt).
|
|
|
|
<!--- TEST
|
|
1
|
|
4
|
|
9
|
|
16
|
|
25
|
|
Done!
|
|
-->
|
|
|
|
### Building channel producers
|
|
|
|
The pattern where a coroutine is producing a sequence of elements is quite common.
|
|
This is a part of _producer-consumer_ pattern that is often found in concurrent code.
|
|
You could abstract such a producer into a function that takes channel as its parameter, but this goes contrary
|
|
to common sense that results must be returned from functions.
|
|
|
|
There is a convenient coroutine builder named [produce] that makes it easy to do it right on producer side,
|
|
and an extension function [consumeEach], that replaces a `for` loop on the consumer side:
|
|
|
|
<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
|
|
|
|
```kotlin
|
|
import kotlinx.coroutines.*
|
|
import kotlinx.coroutines.channels.*
|
|
|
|
fun CoroutineScope.produceSquares(): ReceiveChannel<Int> = produce {
|
|
for (x in 1..5) send(x * x)
|
|
}
|
|
|
|
fun main() = runBlocking {
|
|
//sampleStart
|
|
val squares = produceSquares()
|
|
squares.consumeEach { println(it) }
|
|
println("Done!")
|
|
//sampleEnd
|
|
}
|
|
```
|
|
|
|
</div>
|
|
|
|
> You can get the full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-channel-03.kt).
|
|
|
|
<!--- TEST
|
|
1
|
|
4
|
|
9
|
|
16
|
|
25
|
|
Done!
|
|
-->
|
|
|
|
### Pipelines
|
|
|
|
A pipeline is a pattern where one coroutine is producing, possibly infinite, stream of values:
|
|
|
|
<div class="sample" markdown="1" theme="idea" data-highlight-only>
|
|
|
|
```kotlin
|
|
fun CoroutineScope.produceNumbers() = produce<Int> {
|
|
var x = 1
|
|
while (true) send(x++) // infinite stream of integers starting from 1
|
|
}
|
|
```
|
|
|
|
</div>
|
|
|
|
And another coroutine or coroutines are consuming that stream, doing some processing, and producing some other results.
|
|
In the example below, the numbers are just squared:
|
|
|
|
<div class="sample" markdown="1" theme="idea" data-highlight-only>
|
|
|
|
```kotlin
|
|
fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce {
|
|
for (x in numbers) send(x * x)
|
|
}
|
|
```
|
|
|
|
</div>
|
|
|
|
The main code starts and connects the whole pipeline:
|
|
|
|
<!--- CLEAR -->
|
|
|
|
<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
|
|
|
|
```kotlin
|
|
import kotlinx.coroutines.*
|
|
import kotlinx.coroutines.channels.*
|
|
|
|
fun main() = runBlocking {
|
|
//sampleStart
|
|
val numbers = produceNumbers() // produces integers from 1 and on
|
|
val squares = square(numbers) // squares integers
|
|
repeat(5) {
|
|
println(squares.receive()) // print first five
|
|
}
|
|
println("Done!") // we are done
|
|
coroutineContext.cancelChildren() // cancel children coroutines
|
|
//sampleEnd
|
|
}
|
|
|
|
fun CoroutineScope.produceNumbers() = produce<Int> {
|
|
var x = 1
|
|
while (true) send(x++) // infinite stream of integers starting from 1
|
|
}
|
|
|
|
fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce {
|
|
for (x in numbers) send(x * x)
|
|
}
|
|
```
|
|
|
|
</div>
|
|
|
|
> You can get the full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-channel-04.kt).
|
|
|
|
<!--- TEST
|
|
1
|
|
4
|
|
9
|
|
16
|
|
25
|
|
Done!
|
|
-->
|
|
|
|
> All functions that create coroutines are defined as extensions on [CoroutineScope],
|
|
so that we can rely on [structured concurrency](https://kotlinlang.org/docs/reference/coroutines/composing-suspending-functions.html#structured-concurrency-with-async) to make
|
|
sure that we don't have lingering global coroutines in our application.
|
|
|
|
### Prime numbers with pipeline
|
|
|
|
Let's take pipelines to the extreme with an example that generates prime numbers using a pipeline
|
|
of coroutines. We start with an infinite sequence of numbers.
|
|
|
|
<div class="sample" markdown="1" theme="idea" data-highlight-only>
|
|
|
|
```kotlin
|
|
fun CoroutineScope.numbersFrom(start: Int) = produce<Int> {
|
|
var x = start
|
|
while (true) send(x++) // infinite stream of integers from start
|
|
}
|
|
```
|
|
|
|
</div>
|
|
|
|
The following pipeline stage filters an incoming stream of numbers, removing all the numbers
|
|
that are divisible by the given prime number:
|
|
|
|
<div class="sample" markdown="1" theme="idea" data-highlight-only>
|
|
|
|
```kotlin
|
|
fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> {
|
|
for (x in numbers) if (x % prime != 0) send(x)
|
|
}
|
|
```
|
|
|
|
</div>
|
|
|
|
Now we build our pipeline by starting a stream of numbers from 2, taking a prime number from the current channel,
|
|
and launching new pipeline stage for each prime number found:
|
|
|
|
```
|
|
numbersFrom(2) -> filter(2) -> filter(3) -> filter(5) -> filter(7) ...
|
|
```
|
|
|
|
The following example prints the first ten prime numbers,
|
|
running the whole pipeline in the context of the main thread. Since all the coroutines are launched in
|
|
the scope of the main [runBlocking] coroutine
|
|
we don't have to keep an explicit list of all the coroutines we have started.
|
|
We use [cancelChildren][kotlin.coroutines.CoroutineContext.cancelChildren]
|
|
extension function to cancel all the children coroutines after we have printed
|
|
the first ten prime numbers.
|
|
|
|
<!--- CLEAR -->
|
|
|
|
<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
|
|
|
|
```kotlin
|
|
import kotlinx.coroutines.*
|
|
import kotlinx.coroutines.channels.*
|
|
|
|
fun main() = runBlocking {
|
|
//sampleStart
|
|
var cur = numbersFrom(2)
|
|
repeat(10) {
|
|
val prime = cur.receive()
|
|
println(prime)
|
|
cur = filter(cur, prime)
|
|
}
|
|
coroutineContext.cancelChildren() // cancel all children to let main finish
|
|
//sampleEnd
|
|
}
|
|
|
|
fun CoroutineScope.numbersFrom(start: Int) = produce<Int> {
|
|
var x = start
|
|
while (true) send(x++) // infinite stream of integers from start
|
|
}
|
|
|
|
fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> {
|
|
for (x in numbers) if (x % prime != 0) send(x)
|
|
}
|
|
```
|
|
|
|
</div>
|
|
|
|
> You can get the full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-channel-05.kt).
|
|
|
|
The output of this code is:
|
|
|
|
```text
|
|
2
|
|
3
|
|
5
|
|
7
|
|
11
|
|
13
|
|
17
|
|
19
|
|
23
|
|
29
|
|
```
|
|
|
|
<!--- TEST -->
|
|
|
|
Note that you can build the same pipeline using
|
|
[`iterator`](https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.sequences/iterator.html)
|
|
coroutine builder from the standard library.
|
|
Replace `produce` with `iterator`, `send` with `yield`, `receive` with `next`,
|
|
`ReceiveChannel` with `Iterator`, and get rid of the coroutine scope. You will not need `runBlocking` either.
|
|
However, the benefit of a pipeline that uses channels as shown above is that it can actually use
|
|
multiple CPU cores if you run it in [Dispatchers.Default] context.
|
|
|
|
Anyway, this is an extremely impractical way to find prime numbers. In practice, pipelines do involve some
|
|
other suspending invocations (like asynchronous calls to remote services) and these pipelines cannot be
|
|
built using `sequence`/`iterator`, because they do not allow arbitrary suspension, unlike
|
|
`produce`, which is fully asynchronous.
|
|
|
|
### Fan-out
|
|
|
|
Multiple coroutines may receive from the same channel, distributing work between themselves.
|
|
Let us start with a producer coroutine that is periodically producing integers
|
|
(ten numbers per second):
|
|
|
|
<div class="sample" markdown="1" theme="idea" data-highlight-only>
|
|
|
|
```kotlin
|
|
fun CoroutineScope.produceNumbers() = produce<Int> {
|
|
var x = 1 // start from 1
|
|
while (true) {
|
|
send(x++) // produce next
|
|
delay(100) // wait 0.1s
|
|
}
|
|
}
|
|
```
|
|
|
|
</div>
|
|
|
|
Then we can have several processor coroutines. In this example, they just print their id and
|
|
received number:
|
|
|
|
<div class="sample" markdown="1" theme="idea" data-highlight-only>
|
|
|
|
```kotlin
|
|
fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
|
|
for (msg in channel) {
|
|
println("Processor #$id received $msg")
|
|
}
|
|
}
|
|
```
|
|
|
|
</div>
|
|
|
|
Now let us launch five processors and let them work for almost a second. See what happens:
|
|
|
|
<!--- CLEAR -->
|
|
|
|
<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
|
|
|
|
```kotlin
|
|
import kotlinx.coroutines.*
|
|
import kotlinx.coroutines.channels.*
|
|
|
|
fun main() = runBlocking<Unit> {
|
|
//sampleStart
|
|
val producer = produceNumbers()
|
|
repeat(5) { launchProcessor(it, producer) }
|
|
delay(950)
|
|
producer.cancel() // cancel producer coroutine and thus kill them all
|
|
//sampleEnd
|
|
}
|
|
|
|
fun CoroutineScope.produceNumbers() = produce<Int> {
|
|
var x = 1 // start from 1
|
|
while (true) {
|
|
send(x++) // produce next
|
|
delay(100) // wait 0.1s
|
|
}
|
|
}
|
|
|
|
fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
|
|
for (msg in channel) {
|
|
println("Processor #$id received $msg")
|
|
}
|
|
}
|
|
```
|
|
|
|
</div>
|
|
|
|
> You can get the full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-channel-06.kt).
|
|
|
|
The output will be similar to the the following one, albeit the processor ids that receive
|
|
each specific integer may be different:
|
|
|
|
```
|
|
Processor #2 received 1
|
|
Processor #4 received 2
|
|
Processor #0 received 3
|
|
Processor #1 received 4
|
|
Processor #3 received 5
|
|
Processor #2 received 6
|
|
Processor #4 received 7
|
|
Processor #0 received 8
|
|
Processor #1 received 9
|
|
Processor #3 received 10
|
|
```
|
|
|
|
<!--- TEST lines.size == 10 && lines.withIndex().all { (i, line) -> line.startsWith("Processor #") && line.endsWith(" received ${i + 1}") } -->
|
|
|
|
Note that cancelling a producer coroutine closes its channel, thus eventually terminating iteration
|
|
over the channel that processor coroutines are doing.
|
|
|
|
Also, pay attention to how we explicitly iterate over channel with `for` loop to perform fan-out in `launchProcessor` code.
|
|
Unlike `consumeEach`, this `for` loop pattern is perfectly safe to use from multiple coroutines. If one of the processor
|
|
coroutines fails, then others would still be processing the channel, while a processor that is written via `consumeEach`
|
|
always consumes (cancels) the underlying channel on its normal or abnormal completion.
|
|
|
|
### Fan-in
|
|
|
|
Multiple coroutines may send to the same channel.
|
|
For example, let us have a channel of strings, and a suspending function that
|
|
repeatedly sends a specified string to this channel with a specified delay:
|
|
|
|
<div class="sample" markdown="1" theme="idea" data-highlight-only>
|
|
|
|
```kotlin
|
|
suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
|
|
while (true) {
|
|
delay(time)
|
|
channel.send(s)
|
|
}
|
|
}
|
|
```
|
|
|
|
</div>
|
|
|
|
Now, let us see what happens if we launch a couple of coroutines sending strings
|
|
(in this example we launch them in the context of the main thread as main coroutine's children):
|
|
|
|
<!--- CLEAR -->
|
|
|
|
<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
|
|
|
|
```kotlin
|
|
import kotlinx.coroutines.*
|
|
import kotlinx.coroutines.channels.*
|
|
|
|
fun main() = runBlocking {
|
|
//sampleStart
|
|
val channel = Channel<String>()
|
|
launch { sendString(channel, "foo", 200L) }
|
|
launch { sendString(channel, "BAR!", 500L) }
|
|
repeat(6) { // receive first six
|
|
println(channel.receive())
|
|
}
|
|
coroutineContext.cancelChildren() // cancel all children to let main finish
|
|
//sampleEnd
|
|
}
|
|
|
|
suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
|
|
while (true) {
|
|
delay(time)
|
|
channel.send(s)
|
|
}
|
|
}
|
|
```
|
|
|
|
</div>
|
|
|
|
> You can get the full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-channel-07.kt).
|
|
|
|
The output is:
|
|
|
|
```text
|
|
foo
|
|
foo
|
|
BAR!
|
|
foo
|
|
foo
|
|
BAR!
|
|
```
|
|
|
|
<!--- TEST -->
|
|
|
|
### Buffered channels
|
|
|
|
The channels shown so far had no buffer. Unbuffered channels transfer elements when sender and receiver
|
|
meet each other (aka rendezvous). If send is invoked first, then it is suspended until receive is invoked,
|
|
if receive is invoked first, it is suspended until send is invoked.
|
|
|
|
Both [Channel()] factory function and [produce] builder take an optional `capacity` parameter to
|
|
specify _buffer size_. Buffer allows senders to send multiple elements before suspending,
|
|
similar to the `BlockingQueue` with a specified capacity, which blocks when buffer is full.
|
|
|
|
Take a look at the behavior of the following code:
|
|
|
|
|
|
<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
|
|
|
|
```kotlin
|
|
import kotlinx.coroutines.*
|
|
import kotlinx.coroutines.channels.*
|
|
|
|
fun main() = runBlocking<Unit> {
|
|
//sampleStart
|
|
val channel = Channel<Int>(4) // create buffered channel
|
|
val sender = launch { // launch sender coroutine
|
|
repeat(10) {
|
|
println("Sending $it") // print before sending each element
|
|
channel.send(it) // will suspend when buffer is full
|
|
}
|
|
}
|
|
// don't receive anything... just wait....
|
|
delay(1000)
|
|
sender.cancel() // cancel sender coroutine
|
|
//sampleEnd
|
|
}
|
|
```
|
|
|
|
</div>
|
|
|
|
> You can get the full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-channel-08.kt).
|
|
|
|
It prints "sending" _five_ times using a buffered channel with capacity of _four_:
|
|
|
|
```text
|
|
Sending 0
|
|
Sending 1
|
|
Sending 2
|
|
Sending 3
|
|
Sending 4
|
|
```
|
|
|
|
<!--- TEST -->
|
|
|
|
The first four elements are added to the buffer and the sender suspends when trying to send the fifth one.
|
|
|
|
### Channels are fair
|
|
|
|
Send and receive operations to channels are _fair_ with respect to the order of their invocation from
|
|
multiple coroutines. They are served in first-in first-out order, e.g. the first coroutine to invoke `receive`
|
|
gets the element. In the following example two coroutines "ping" and "pong" are
|
|
receiving the "ball" object from the shared "table" channel.
|
|
|
|
|
|
<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
|
|
|
|
```kotlin
|
|
import kotlinx.coroutines.*
|
|
import kotlinx.coroutines.channels.*
|
|
|
|
//sampleStart
|
|
data class Ball(var hits: Int)
|
|
|
|
fun main() = runBlocking {
|
|
val table = Channel<Ball>() // a shared table
|
|
launch { player("ping", table) }
|
|
launch { player("pong", table) }
|
|
table.send(Ball(0)) // serve the ball
|
|
delay(1000) // delay 1 second
|
|
coroutineContext.cancelChildren() // game over, cancel them
|
|
}
|
|
|
|
suspend fun player(name: String, table: Channel<Ball>) {
|
|
for (ball in table) { // receive the ball in a loop
|
|
ball.hits++
|
|
println("$name $ball")
|
|
delay(300) // wait a bit
|
|
table.send(ball) // send the ball back
|
|
}
|
|
}
|
|
//sampleEnd
|
|
```
|
|
|
|
</div>
|
|
|
|
> You can get the full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-channel-09.kt).
|
|
|
|
The "ping" coroutine is started first, so it is the first one to receive the ball. Even though "ping"
|
|
coroutine immediately starts receiving the ball again after sending it back to the table, the ball gets
|
|
received by the "pong" coroutine, because it was already waiting for it:
|
|
|
|
```text
|
|
ping Ball(hits=1)
|
|
pong Ball(hits=2)
|
|
ping Ball(hits=3)
|
|
pong Ball(hits=4)
|
|
```
|
|
|
|
<!--- TEST -->
|
|
|
|
Note that sometimes channels may produce executions that look unfair due to the nature of the executor
|
|
that is being used. See [this issue](https://github.com/Kotlin/kotlinx.coroutines/issues/111) for details.
|
|
|
|
### Ticker channels
|
|
|
|
Ticker channel is a special rendezvous channel that produces `Unit` every time given delay passes since last consumption from this channel.
|
|
Though it may seem to be useless standalone, it is a useful building block to create complex time-based [produce]
|
|
pipelines and operators that do windowing and other time-dependent processing.
|
|
Ticker channel can be used in [select] to perform "on tick" action.
|
|
|
|
To create such channel use a factory method [ticker].
|
|
To indicate that no further elements are needed use [ReceiveChannel.cancel] method on it.
|
|
|
|
Now let's see how it works in practice:
|
|
|
|
<div class="sample" markdown="1" theme="idea" data-highlight-only>
|
|
|
|
```kotlin
|
|
import kotlinx.coroutines.*
|
|
import kotlinx.coroutines.channels.*
|
|
|
|
fun main() = runBlocking<Unit> {
|
|
val tickerChannel = ticker(delayMillis = 100, initialDelayMillis = 0) // create ticker channel
|
|
var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
|
|
println("Initial element is available immediately: $nextElement") // no initial delay
|
|
|
|
nextElement = withTimeoutOrNull(50) { tickerChannel.receive() } // all subsequent elements have 100ms delay
|
|
println("Next element is not ready in 50 ms: $nextElement")
|
|
|
|
nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
|
|
println("Next element is ready in 100 ms: $nextElement")
|
|
|
|
// Emulate large consumption delays
|
|
println("Consumer pauses for 150ms")
|
|
delay(150)
|
|
// Next element is available immediately
|
|
nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
|
|
println("Next element is available immediately after large consumer delay: $nextElement")
|
|
// Note that the pause between `receive` calls is taken into account and next element arrives faster
|
|
nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
|
|
println("Next element is ready in 50ms after consumer pause in 150ms: $nextElement")
|
|
|
|
tickerChannel.cancel() // indicate that no more elements are needed
|
|
}
|
|
```
|
|
|
|
</div>
|
|
|
|
> You can get the full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-channel-10.kt).
|
|
|
|
It prints following lines:
|
|
|
|
```text
|
|
Initial element is available immediately: kotlin.Unit
|
|
Next element is not ready in 50 ms: null
|
|
Next element is ready in 100 ms: kotlin.Unit
|
|
Consumer pauses for 150ms
|
|
Next element is available immediately after large consumer delay: kotlin.Unit
|
|
Next element is ready in 50ms after consumer pause in 150ms: kotlin.Unit
|
|
```
|
|
|
|
<!--- TEST -->
|
|
|
|
Note that [ticker] is aware of possible consumer pauses and, by default, adjusts next produced element
|
|
delay if a pause occurs, trying to maintain a fixed rate of produced elements.
|
|
|
|
Optionally, a `mode` parameter equal to [TickerMode.FIXED_DELAY] can be specified to maintain a fixed
|
|
delay between elements.
|
|
|
|
|
|
<!--- MODULE kotlinx-coroutines-core -->
|
|
<!--- INDEX kotlinx.coroutines -->
|
|
[CoroutineScope]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-coroutine-scope/index.html
|
|
[runBlocking]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/run-blocking.html
|
|
[kotlin.coroutines.CoroutineContext.cancelChildren]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/kotlin.coroutines.-coroutine-context/cancel-children.html
|
|
[Dispatchers.Default]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-default.html
|
|
<!--- INDEX kotlinx.coroutines.channels -->
|
|
[Channel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-channel/index.html
|
|
[SendChannel.send]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/send.html
|
|
[ReceiveChannel.receive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/receive.html
|
|
[SendChannel.close]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/close.html
|
|
[produce]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/produce.html
|
|
[consumeEach]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/consume-each.html
|
|
[Channel()]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-channel.html
|
|
[ticker]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/ticker.html
|
|
[ReceiveChannel.cancel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/cancel.html
|
|
[TickerMode.FIXED_DELAY]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-ticker-mode/-f-i-x-e-d_-d-e-l-a-y.html
|
|
<!--- INDEX kotlinx.coroutines.selects -->
|
|
[select]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.selects/select.html
|
|
<!--- END -->
|