Async Programming in Flutter With Streams

Async Programming in Flutter With Streams

Asynchronous Programming is a principle that allows a portion of your programs run while waiting for some other activities to occur in your application thread. It allows portions of your code run independently from the main workflow.

A couple of examples of asynchronous processes are: fetching data from network, or looping over some very large chunk of data.

In flutter there are a series of ways to write asynchronous code. Some popular methods are:

  • Future.
  • Async/Await.
  • Streams.

The purpose of this article is to get you started using streams in your flutter apps, and I expect you to have prior knowledge about writing flutter apps, let’s get to it!

What is Reactive Programming?

According to Wikipedia :

Reactive programming is a programming paradigm oriented around data flows and the propagation of change. This means that it should be possible to express static or dynamic data flows with ease in the programming languages used, and that the underlying execution model will automatically propagate changes through the data flow.

In reactive programming, data flows in a structure set by the language, framework, or library you use in such a way that when data is passed into the structure on one end, the output will always update itself if the value of that data changes.

This is exactly what streams do. A stream is a flow of asynchronous events, View it as a pipe containing a liquid. You pour in the liquid from one end of the pipe and it comes out the other end.

Streams can carry and process different types of data, like an object, a function, a map, a list, even streams of streams. Pretty cool eh?

Now, Code Examples

The first example creates a simple stream that takes in a string and prints it out when done:

@override
 void initState() {
  super.initState();
Stream<String> stream = new Stream.fromFuture(inputData());
  stream.listen((data) {
     print("Our data: " + data);
 }, onDone: () {
     print("Done");
}, onError: (error) {
      print("Error returned");
   });
}
Future<String> inputData() async {
     print("Fetching Data...");
  return "Let's Use Streams!";
}

Output:

Fetching data...

Our Data: Let’s use Streams!

Done

Here, The Stream gets its data from the inputData() function which returns the text “Let’s use streams!”. Then we use stream.listen() to work on the data entering the stream. In here we have an onDone(), and onError() method. onDone() uses the data when the process is completed, and onError() can be used to throw an error if there is one.

Second Example

final StreamController controller = StreamController();
  controller.stream.listen((data) {
 print("received data: $data");
}, onDone: () {
   print("Stream done");
}, onError: () {
   print("Error occured");
});
// data flowing into the stream
   controller.sink.add('stream controllers are awesome');
   controller.sink.add("Because You can do more");
   controller.sink.add('random string');
// Close the StreamController when done to avoid memory leaks
   controller.close();

Output:

received data: stream controllers are awesome

received data: Because you can do more

received data: random string

Stream done

The StreamController class creates a controller for any stream that is created. making it easier for you to use multiple listeners(we’ll get to that soon). Here we’re adding our data using sink.add(), which is an object that accepts data being entered into the stream. Also, you should close your controller after using it, to prevent memory leaks.

Broadcast Streams

This type of stream allows you to use multiple numbers of listeners. The streams we created before could only have one listener; they’re known as single-subscription streams.

Code example

@override
  void initState() {
 super.initState();
 stream();
}
//add a .broadcast() 
    final StreamController controller = StreamController.broadcast(); 
  stream() {
   controller.stream.where((data) => (data is String)).listen((data) {
 print("DataReceived: " + data);
}, onDone: () {
   print("Task 1 Done");
}, onError: (error) {
   print("Some Error");
});
  controller.stream.where((data) => (data is int)).listen((data) {
 print("DataReceived: " + data.toString());
}, onDone: () {
   print("Task 2 Done");
});
   controller.stream.where((data) => (data is Map)).listen((data) {
  print(data);
}, onDone: () {
   print("Task 3 Done");
});
   controller.sink.add('random string');
   controller.sink.add(1234);
   controller.sink.add({'key 1': 'value A', 'key 2': 'value B'});
   controller.close();
}

Output:

DataReceived: random string

DataReceived: 1234

{key 1: value A, key 2: value B}

Task 1 Done

Task 2 Done

Task 3 Done

Here, we initialize a StreamController with .broadcast() then, we listen to the stream three different times. Notice we used a .where() method before listening, this helps me to only listen to data in the stream that meets a given condition. First, we only print strings, then integers, then maps.

Now you know how to create and use a stream 🎉.

You can find me on twitter.