Articles of rxjs

听stream言承诺

我一直在阅读有关Observables 。 有一件事我读到,就是它能够像stream一样听。 我试着用下面的代码。 const Rx = require('rxjs/Rx'); Promise = require('bluebird'); let i = 0; function calculate() { return new Promise((resolve, reject) => { setInterval(() => { console.log(++i); resolve(i); }, 5000); }) } let source = Rx.Observable.fromPromise(calculate()); source.subscribe( next => { console.log(next, ' next '); }, err => { console.error(err) }, () => { console.log('done!'); […]

RxJS:将Node.js套接字转换为Observable并将它们合并成一个stream

我试图将Node的套接字转换为使用RxJS的stream。 目标是让每个套接字创build它自己的stream,并将所有stream合并成一个。 当新的套接字连接时,将使用socketStream = Rx.Observable.fromEvent(socket, 'message')创build一个stream。 然后这个stream被合并到一个类似的主stream中 mainStream = mainStream.merge(socketStream) 这似乎工作正常,问题是,200-250客户端连接后,服务器引发RangeError: Maximum call stack size exceeded 。 我有示例服务器和客户端代码,在这里主要演示此行为: 示例服务器和客户端 我怀疑,随着客户端连接/断开连接,主stream不能正确清理。

静态资产不在Heroku上,但在本地?

我有一个nodeJS + expressJS + Angular2-RC.5的网站,在我的本地Windows 10机器上正常工作。 但是,当我尝试部署( 这里的日志)到Heroku并访问该网站时,前端控制台(Chrome)抛出以下错误,并且不加载站点。 RXJs似乎没有在Heroku上提供一些问题? zone.js:101 GET https://ns-docs.herokuapp.com/node_modules/rxjs/RX.js 404(Not Found) 我的systemjs.config.js包含这个: (function(global) { // map tells the System loader where to look for things var map = { 'app': '/app', // 'dist', '@angular': '/node_modules/@angular', 'angular2-in-memory-web-api': '/node_modules/angular2-in-memory-web-api', 'rxjs': '/node_modules/rxjs' }; // packages tells the System loader how to load when no filename […]

有没有办法与RxJSpipe理并发?

TL; DR – 我正在寻找一种方法来控制在使用RxJS的同时连接到REST API的HTTP请求的数量。 我的Node.js应用程序将向第三方提供商进行几千个REST API调用。 但是,我知道,如果我立即提出所有这些请求,则由于DDoS攻击,服务可能会closures或拒绝我的请求。 所以,我想在任何给定的时间设置最大并发连接数。 我曾经通过利用Throat Package来实现Promises的并发控制,但是我还没有find类似的方法来实现它。 我试图使用merge与本文中build议的并发1 如何限制flatMap的并发性? ,但所有的请求都是一次发送的。 这是我的代码: var Rx = require('rx'), rp = require('request-promise'); var array = ['https://httpbin.org/ip', 'https://httpbin.org/user-agent', 'https://httpbin.org/delay/3', 'https://httpbin.org/delay/3', 'https://httpbin.org/delay/3' ]; var source = Rx.Observable.fromArray(array).map(httpGet).merge(1); function httpGet(url) { return rp.get(url); } var results = []; var subscription = source.subscribe( function (x) { console.log('=====', x, '======'); […]

RxJS方法导致callback地狱

我想在下面的用例中使用ReactiveJS Observable方法。 IF MAIN_CACHE EXIST RETURN OUTPUT ELSE IF DB CONNECTION EXIST CACHE MAIN_CACHE (1 Hour) CACHE FALLBACK_CACHE (3 Days) RETURN OUTPUT ELSE IF FALLBACK_CACHE EXIST RETURN OUTPUT 我得到了预期的输出,但我觉得这导致了Callback Hell ,我认为,仍然不是一个好的方法,我错过了ReactiveJS Observable关键好处。 下面是我的代码,整个代码在JS Bin Link中 mainCache.subscribe(function (response) { console.log(response); }, function (error) { dbData.subscribe(function (response) { console.log(response); }, function (error) { console.log('DB CAL Log info', […]

如何将RxJs observable的结果作为来自Node.js的rest响应返回

场景:来自多个rest调用的数据必须汇总为一个单独的对象,并作为通过Node.js服务的初始请求的其余响应返回。 问题:其余响应不等待observable完成,因此在调度剩余响应之后实现突变(聚合)。 //teamsController class invoked via GET /teams import * as Rx from 'rxjs/Rx' import http from 'axios' import Teams from '../models/teams' const teamsAPI = "http://localhost:8081/api/v1/teams/players/"; const usersAPI = "http://localhost:8082/api/v1/users/"; exports.getTeamByPlayer = function (req, res) { let username= req.get("username"); Rx.Observable.fromPromise(fetchTeam(username)) .map(team => { Rx.Observable.from(team.players).subscribe(player => { console.log(`Player name is ${player.username}`); Rx.Observable.fromPromise(fetchUser(player.username)) .map(avatar => avatar.avatar) .subscribe(avatar […]

“TypeError:this._subscribe不是函数”在简单的Node模块中尝试RxJS时

我有以下节点代码 //index.js var username = process.argv[2], password = process.argv[3], factories = require('./factories')(username, password); factories.subscribe(function(data){ console.log(data.length); }); factories.refresh(); //factories.js var Rx = require('rx'); var factories = function(username, password){ var factories = []; var source = Rx.Observable.from(factories); var baseUrl = "<url>", factoriesOpt = { 'uri': baseUrl+"<path>", 'method': 'GET', 'auth': { 'user': username, 'pass': password } }, response="", […]

Angular:按照调用顺序接收响应

嗨,我很新angular度和观察 我正在试图通过一个循环获取他们的ID的对象。 但是不要收到我的回应。 例 get ID(1) get ID(2) get ID(3) Receive Object ID(2) Receive Object ID(3) Receive Object ID(1) 是否有可能让我的对象恢复秩序? 以下是我多次拨打我的服务function的地方: conferences-attendance.component.ts ExportExcelAttendance() { for (var i = 0; i < this.contactsAttendance.length; i++) { this.practiceService.GetPracticebyDBID(this.contactsAttendance[i].practiceId) .subscribe( (practice: Practice) => { this.practicesAttendance.push(practice); if (this.practicesAttendance.length == this.contactsAttendance.length) { this.ExportExcelAttendance2(); } }, error => this.errorMessage = <any>error ); […]

如何链接之前在forkjoin()操作中的每个observable的组

我在MySQL中有一个订单表,每个订单都有一些关联的文档,不pipe它们是报价单,发票等等。因此有第二个称为“documents”的表,它具有“document_id”主键和“ order_id“外键; 以类似的方式,我还有另一个技术人员对每辆车进行检查的情况,然后是另一张车辆图片。 我正在创build一个使用Node和Express的Web服务,需要返回类似于这个的Json … [ { "order_id": 1003, "customer_id": 8000, "csi": 90, "date_admitted": "2016-10-28T05:00:00.000Z", "plates": "YZG-5125", … documents: { "type": "invoice", "number": "1234", … }, checks: { "scanner": "good", "battery": "average", … }, vehicle_pictures: { "title": "a title…", "path": "the file path" … } }, { … }, … ] 正如你所看到的,有必要为每个订单做三个查询,一个用于检查,另一个用于文档,第三个用于图片,然后我需要将这些子结果添加到最终返回数组中的顺序。 在旧版的同步编程中这将是一件非常容易的事情,但是由于mysql库的连接对象中的query()方法的asynchronous性质,这个威胁变成了一个真正的地狱。 在需要处理单个订单的情况下,使用forkJoin()在服务器上使用RxJS库就足以一次处理所有三个结果,但我不确定如何“链接”每个订单(使用forkJoin来pipe理3个查询),所以一切都得到处理,最后我可以调用res.json(结果),一切整齐。 注意:我想用RxJS来解决这个问题,而不是使用像node-mysql-libmysqlclient这样的同步库软件包。 […]

我怎样才能缓冲一个简单的Rxjs观察?

我想通过一些任意的(但是简单的)标准来缓冲一个观察值。 我在这里设置了一个简单的例子: const observable = Rx.Observable.from([1,2,3]) const filtered = observable.filter((n) => n === 3); observable .buffer(filtered) .subscribe((n) => { // Why is this empty? console.log(n); }); 宾在这里 试图filter做这只会产生一个空的数组。 我期待[1,2,3]的数组,但是这似乎不是如何工作的。 所有缓冲区的文档使用asynchronous事件,如计时器,但这不是我想要的。 我只想根据我自己决定的任意标准来取最后的n个项目。 帮助非常感谢!