flutter: Future, Stream, RxDart

Future

In Flutter, Future is a class in the Dart language, which is used to represent the result of an asynchronous operation. Important keywords related to Future include async and await.

  • async: This keyword is used to add before the method or function declaration to indicate that the method is an asynchronous method. In an asynchronous method, the execution sequence can be non-blocking, without blocking the current thread.
  • await: This keyword is used to wait for and obtain the execution result of an asynchronous expression in an asynchronous method. It can only be used in async modified methods.
class _MyHomePageState extends State<MyHomePage> {<!-- -->
  String string = '';
  @override
  Widget build(BuildContext context) {<!-- -->
    return Scaffold(
        appBar: AppBar(
          backgroundColor: Theme.of(context).colorScheme.inversePrimary,
          title: Text(widget. title),
        ),
        body: Center(
          child: ElevatedButton(
            onPressed: () async{<!-- -->
              print("Start getting data");
             await fetchData();
              print(string);
            },
            child: Text("get data"),
          ),
        ));
  }

  Future fetchData() async {<!-- -->
    await Future. delayed(const Duration(seconds: 2));
    string = 'Data acquisition complete';
  }
}


At the same time, it can be used in conjunction with FutureBuilder. For example, if you request a piece of data, the loading component will be displayed when the data is requested, and the data will be displayed when the data request is successful.

Future<String> getValue() async {<!-- -->
    await Future. delayed(Duration(seconds: 3));
    return "100";
}

FutureBuilder(
                // asynchronous method
                future: getValue(),
                builder: (context, snapshot) {<!-- -->
                  // Widget waiting for status display
                  if (snapshot.connectionState == ConnectionState.waiting) {<!-- -->
                    return const Center(
                      child: CircularProgressIndicator(),
                    );
                    // widget displayed on error
                  } else if (snapshot. hasError) {<!-- -->
                    return const Text('Error');
                    // The data displayed after loading is complete
                  } else {<!-- -->
                    return snapshot.data  const Text('No data');
                  }
                })

Stream

In Flutter, Stream (flow) is a concept for processing asynchronous sequence of events. Its common applications include:

  • Asynchronous data acquisition: Stream is often used to acquire data from asynchronous sources such as servers or local databases. You can use Stream to listen for changes in the data source and respond when data is available.

  • State management: Stream can be used as a state management tool for applications. You can encapsulate the state in the application into a Stream, and update the user interface by listening to the Stream. For example, you can store the login status of the application in a Stream, and notify the interface to update the corresponding UI when the login status changes.

  • Event Bus: Stream can be used as an event bus for passing events and data between different parts of the application. You can create a global Stream, and subscribers can listen to the Stream and receive events. This enables decoupling and communication between different components.

  • User Input: Stream is also useful when dealing with user input. You can use Stream to monitor various user operations in the application, such as clicking buttons, swiping the screen, etc. By converting user input into Stream events, you can correlate your application with user interactions.

  • File read and write: Stream can also be used to handle file read and write operations. You can read and write files with Stream for asynchronously processing large files or streaming data.

In Flutter, use Dart’s Stream class to create and manage Stream. You can use StreamController to control Stream creation, data addition and Stream closing operations. In addition, Flutter also provides many operators and methods related to Stream, such as map, where, transform, etc. , for stream transformation and processing.

Asynchronous data acquisition

class _MyHomePageState extends State<MyHomePage> {<!-- -->
  // create controller
  final StreamController _streamController = StreamController();

  @override
  void dispose() {<!-- -->
    super.dispose();
    _streamController. close();
  }

  @override
  Widget build(BuildContext context) {<!-- -->
    return Scaffold(
        appBar: AppBar(
          backgroundColor: Theme.of(context).colorScheme.inversePrimary,
          title: Text(widget. title),
        ),
        body: Column(
          children: [
            ElevatedButton(
                onPressed: () {<!-- -->
                  fetchData();
                },
                child: const Text("get data")),
            StreamBuilder(
                stream: _streamController. stream,
                builder: (BuildContext context, AsyncSnapshot snapshot) {<!-- -->
                  if (snapshot.hasData) {<!-- -->
                    return Text('Asynchronous data: ${<!-- -->snapshot.data}');
                  } else if (snapshot. hasError) {<!-- -->
                    return Text('An error occurred: ${<!-- -->snapshot.error}');
                  } else {<!-- -->
                    return const Text('Loading...');
                  }
                })
          ],
        ));
  }

  fetchData() async {<!-- -->
    await Future.delayed(const Duration(seconds: 1));
    _streamController.sink.add(1); // send the first value
    await Future. delayed(const Duration(seconds: 2));
    _streamController.sink.add(2); // send the second value
  }
}

There are several differences between using Stream to handle async and using async/await to handle async, including:

  1. Control flow: When using Stream, you can handle the results of asynchronous operations by listening to the events of the data stream. When new data arrives, the corresponding logic can be executed. When using async/await, the code will continue to execute after waiting for the completion of the asynchronous operation, and execute sequentially.

  2. Data processing: Use Stream to process multiple values or asynchronous operations of a series of values, such as data streams, event streams, etc. Using async/await can only process the result of one asynchronous operation at a time.

  3. Usage scenario: Stream is suitable for asynchronous operations that need to process continuously generated data, such as network requests, sensor data, etc. Async/await is suitable for asynchronous operations that obtain results at one time, such as reading files, waiting for user input, and so on.

  4. Code structure: When using Stream, you need to create a StreamController and manually manage the sending and subscription of data. When using async/await, you can directly use the keyword await in the asynchronous function to wait for the result of the asynchronous operation, and the code is more concise.

In general, Stream is more suitable for processing asynchronous operations that continuously generate data, and can easily process and convert data streams. However, async/await is more suitable for asynchronous operations that obtain results at one time, and the code structure is simpler and clearer. Which method to use depends on your needs and the complexity of your code structure. In some cases, the two methods can also be used in combination, such as using async/await to wait for the result of a Future and converting it to a Stream for subsequent processing.

Status management

The above example can also be regarded as state management. When a certain state changes, Stream will monitor it, and then update the view according to the new state.

What is the difference between that and Provider? I think the most important difference is that when using Provider, the state can be stored, but Stream will not be stored. Based on this you can determine which one you need to use.

Event Bus

import 'dart:async';

class EventBus {<!-- -->
  static final EventBus _instance = EventBus._internal();

  factory EventBus() => _instance;

  EventBus._internal();
  // Use the broadcast() method to create a StreamController that can broadcast events in real time
  final _controller = StreamController<dynamic>.broadcast();
  
  Stream get stream => _controller. stream;

  void fire(dynamic event) {<!-- -->
    _controller. sink. add(event);
  }

  void dispose() {<!-- -->
    _controller. close();
  }
}
// Subscribe to events
EventBus(). stream. listen((event) {<!-- -->
  // handle events
  print('Received event: $event');
});

// send event
EventBus().fire('Event data');

User input

I don’t quite understand, it seems useless

File reading

class _MyHomePageState extends State<MyHomePage> {<!-- -->
  // create controller
  final StreamController _streamController = StreamController<String>();

  @override
  void dispose() {<!-- -->
    super.dispose();
    _streamController. close();
  }

  @override
  Widget build(BuildContext context) {<!-- -->
    return Scaffold(
        appBar: AppBar(
          backgroundColor: Theme.of(context).colorScheme.inversePrimary,
          title: Text(widget. title),
        ),
        body: Column(
          children: [
            ElevatedButton(
                onPressed: () {<!-- -->
                  fetchData();
                },
                child: const Text("get data")),
            const SizedBox(
              height: 30,
            ),
            StreamBuilder(
                stream: _streamController. stream,
                builder: (BuildContext context, AsyncSnapshot snapshot) {<!-- -->
                  if (snapshot.hasData) {<!-- -->
                    return Text('Asynchronous data: ${<!-- -->snapshot.data}');
                  } else if (snapshot. hasError) {<!-- -->
                    return Text('An error occurred: ${<!-- -->snapshot.error}');
                  } else {<!-- -->
                    return const Text('Loading...');
                  }
                })
          ],
        ));
  }

  fetchData() async {<!-- -->
    // file is a reference to a file on the filesystem, so file cannot be read from assets using file. You cannot access asset files file by file
    File file = File('a.txt');

    Stream<String> fileStream = file
        .openRead()
        .transform(utf8.decoder) // decoding
        .transform(const LineSplitter()); // Split the content into lines

    fileStream. listen((String line) {<!-- -->
      // Send the read content to Stream
      _streamController. add(line);
    }, onDone: () {<!-- -->
      // File reading is complete, close the stream
      _streamController. close();
    }, onError: (error) {<!-- -->
      // send error event to Stream
      _streamController. addError(error);
    });
  }
}

That’s about it, but you can’t read the files under the project.
Either use path_provider to get the path; or turn the file into a static file, but after turning into a static file, use rootBundle.loadString to read it

Benefits

  • When using Stream to process files, it is more convenient to process a large amount of data without having to load the entire file into memory at one time, which is very useful for processing large files or real-time data streams.
  • When using Stream, various events are triggered during the file reading process, such as data is available, reading is completed, or an error occurs. You can listen to these events to take appropriate action, such as updating the UI or handling errors.

Common method

This can be used according to the specific needs of Baidu

  • map: Transforms each event in the data stream into a new event. For example, you can use the map method to double each number in the data stream.

  • where: Filters events in the data stream based on the given criteria. For example, you can use the where method to filter out even numbers in a data stream.

  • expand: Convert each event into multiple events and flatten them into a data stream. For example, each string event can be split into individual character events using the expand method.

  • take: Take only the first n events from the stream. For example, the first 5 events can be obtained using the take method.

  • skip: skip the first n events in the data stream, and start receiving subsequent events. For example, the first 3 events can be skipped using the skip method.

  • distinct: Filter out duplicate events in the data stream. For example, you can use the distinct method to filter out duplicate string events.

  • merge: Merge multiple data streams into one data stream. For example, you can use the merge method to combine two streams of integer data into a single stream of integer data.

  • zip: Pairs events from two data streams one by one and merges them into a new event. For example, a stream of strings and a stream of integers can be paired into a new stream using the zip method

RxDart

RxDart is a Dart-based responsive programming library that provides extensions and enhancements to Stream. In general, it is completely sufficient to use Dart’s built-in Stream. Here is just a brief introduction. If you are interested, you can check the documentation by yourself.

How to choose

When using Stream:

  • Simple asynchronous operations: If you only need to handle simple asynchronous operations, such as monitoring network request results, handling user input events, etc., using Stream is enough. Stream provides a basic asynchronous programming mechanism that can meet most needs.
  • Less data conversion and processing: If you don’t need complex data conversion and processing operations, but only need to monitor changes in the data stream and perform some simple operations, such as filtering, sorting, etc., then using Stream is enough.

When using RxDart:

  • Complex data processing: If you need to perform complex data processing and conversion operations, such as data mapping, filtering, combination, flattening, etc., RxDart provides a wealth of operators and functions, which can greatly simplify code and improve development efficiency.
  • Responsive requirements: If you need to implement the idea of responsive programming, that is, divide the data flow into multiple stages for processing, and monitor and respond to the data in each stage, RxDart is very suitable. The Observable objects and operators it provides can help you build responsive data stream processing chains.
  • Error handling: RxDart provides a more convenient error handling mechanism. The onError() operator can easily capture and handle exceptions, making error handling more flexible and efficient.

Official Documentation
https://pub-web.flutter-io.cn/packages/rxdart

Install

flutter pub add rxdart

Asynchronous data acquisition

class _MyHomePageState extends State<MyHomePage> {<!-- -->
  // create controller
  final BehaviorSubject<int> _streamController = BehaviorSubject<int>();

  @override
  void dispose() {<!-- -->
    super.dispose();
    _streamController. close();
  }

  @override
  Widget build(BuildContext context) {<!-- -->
    return Scaffold(
        appBar: AppBar(
          backgroundColor: Theme.of(context).colorScheme.inversePrimary,
          title: Text(widget. title),
        ),
        body: Column(
          children: [
            ElevatedButton(
                onPressed: () {<!-- -->
                  fetchData();
                },
                child: const Text("get data")),
            StreamBuilder(
                stream: _streamController. stream,
                builder: (BuildContext context, AsyncSnapshot snapshot) {<!-- -->
                  if (snapshot.hasData) {<!-- -->
                    return Text('Asynchronous data: ${<!-- -->snapshot.data}');
                  } else if (snapshot. hasError) {<!-- -->
                    return Text('An error occurred: ${<!-- -->snapshot.error}');
                  } else {<!-- -->
                    return const Text('Loading...');
                  }
                })
          ],
        ));
  }

  fetchData() async {<!-- -->
    await Future.delayed(const Duration(seconds: 1));
    _streamController.add(1); // send the first value
    await Future. delayed(const Duration(seconds: 2));
    _streamController.add(2); // send the second value
  }
}

File reading

Use the File class to open a file for reading.

final file = File('data.txt');

Use Observable to create an observable stream, and use the fromStream method to convert the contents of the file into a stream. For example:

final observable = Observable. fromStream(file. openRead());

Use the operators provided by rxdart to process streams. For example, use the listen method to subscribe to a stream and perform an action each time data is available.

observable. listen((data) {<!-- -->
  // Process the read data here
  print(data);
}, onError: (error) {<!-- -->
  // handle errors
  print(error);
}, onDone: () {<!-- -->
  // Handle the completion event
  print('Read completed');
});