JS中的反应式流如何工作

How do reactive streams in JS work?

本文关键字:工作 何工作 反应式 JS      更新时间:2023-09-26

我是反应式流的新手,现在试图理解它们。这个想法看起来非常清晰和简单,但在实践中,我无法理解那里到底发生了什么。

现在我正在玩大多数.js,试图实现一个简单的调度程序。扫描方法似乎正是我所需要的。

我的代码:

var dispatch;
// expose method for pushing events to stream:
var events = require("most").create(add => dispatch = add); 
// initialize stream, so callback in `create` above is actually called
events.drain();
events.observe(v => console.log("new event", v));
dispatch(1);
var scaner = events.scan(
  (state, patch) => {
    console.log("scaner", patch);
    // update state here
    return state;
  },
  { foo: 0 }
);
scaner.observe(v => console.log("scaner state", v));
dispatch(2);

据我了解,第一个观察者应该调用两次(每个事件一次),扫描器回调和第二个观察者 – 各一次(因为它们是在触发第一个事件后添加的)。

但是,在实践中,控制台显示了这一点:

new event 1
new event 2
scaner state { foo: 0 }

无论我在流中推送多少事件,都不会调用 Scaner。

但是,如果我删除第一个dispatch调用(在创建扫描器之前),一切都按我的预期工作。

这是为什么呢?我正在阅读文档,阅读文章,但到目前为止还没有发现任何与此问题类似的内容。我的建议哪里错了?

最有可能的是,您已经从 API 中研究了这样的示例:

most.from(['a', 'b', 'c', 'd'])
    .scan(function(string, letter) {
         return string + letter;
        }, '')
    .forEach(console.log.bind(console));

他们建议像这样逐步执行:

  1. 获取一个数组['a', 'b', 'c', 'd']并将其值馈送到流中。
  2. 馈送的值由 scan() 转换。
  3. 。并被forEach()消耗.

但这并不完全正确。这就是您的代码不起作用的原因。

在大多数源代码.js,您可以在第 1340 ff 行看到:

exports.from = from;
function from(a) {
    if(Array.isArray(a) || isArrayLike(a)) {
        return fromArray(a);
    }  
...

所以from()转发到一些fromArray().然后,fromArray()(在下面的代码中)正在创建一个新Stream

...
function fromArray (a) {
    return new Stream(new ArraySource(a));
}
...

如果你坚持到底,你会从Streamsink.event(0, array[i]);,0表示超时毫秒。代码中没有setTimeout,但是如果您进一步搜索代码以查找.event = function,您会发现许多额外的代码可以发现更多。特别是,在第 4692 行周围有带有delay()和时间戳的Scheduler

总结一下:上面示例中的数组在一段时间后异步馈送到流中,即使时间似乎是 0 毫秒。

这意味着您必须假设以某种方式首先构建流,然后使用。即使程序代码看起来不是这样。但是,嘿,隐藏复杂性并不总是目标吗:-)?

现在您可以使用自己的代码进行检查。这是基于您的片段的小提琴:

https://jsfiddle.net/aak18y0m/1/

看看你在小提琴中的dispatch()叫声。我用setTimeout()包裹了它们:

setTimeout( function() { dispatch( 1 /* or 2 */); }, 0);

通过这样做,我强制它们也是异步调用,就像示例中的数组值一样。

为了运行小提琴,您需要打开浏览器调试器(以查看控制台),然后按上面的运行按钮。控制台输出显示您的扫描仪现在被调用三次:

doc ready
(index):61 Most loaded: [object Object]
(index):82 scanner state Object {foo: 0}
(index):75 scanner! 1
(index):82 scanner state Object {foo: 0}
(index):75 scanner! 2
(index):82 scanner state Object {foo: 0}

首先是drain(),然后是每个事件。

如果你同步使用dispatch(),在 JavaScript 能够构建整个流之后,在最后添加它们,你也可以得到一个有效的结果(但在幕后不一样)。只需取消注释// Alternative solution后的行,再次运行并观察结果。

好吧,我的问题似乎并不像听起来那么笼统。这只是一个特定于库的。

首先 – 主题的方法对most.js无效。他们主张"采取声明性而非命令性方法"。

其次 - 我尝试Kefir.js库,并且主题中的代码可以完美地工作。只是工作。更重要的是,most.js中不支持的相同方法明确推荐用于Kefir.js

所以,问题出在一个特定的库实现中,而不是在我的脑海中。