RxJS5 - error - TypeError: You provided an invalid object where a stream was expected

1863
December 30, 2016, at 12:19 PM

My code was working with Rx version 4, switching to RxJS5, I get this error:

TypeError: You provided an invalid object where a stream was expected. 
You can provide an Observable, Promise, Array, or Iterable.

The stacktrace is unfortunately useless and is all RxJS internals, so I cannot pinpoint where the problem is in my code (my biggest gripe with RxJS). Although I would happy to provide the full trace if it might help someone help me.

The code that throws the error looks like so:

this.init = function (isPublish) {
        if (this.isReady) {
            return makeGenericObservable(null, {isPublish: isPublish});
        }
        if (!callable) {
            return makeEEObservable(ee, {isPublish: isPublish});
        }
        callable = false;
        const promise = lmUtils.conditionallyLaunchSocketServer({port: port});
        return Rx.Observable.fromPromise(promise)
            .flatMap(() => {
                console.log('creating client...'); // works
                this.client = new Client({port: port});
                return acquireLock(this)
                    .filter(obj => !obj.error);
            })
            .flatMap(obj => {
                console.log(' space truncate...');
                return ifFileExistAndIsAllWhiteSpaceThenTruncate(this)
                    .map(() => obj)
            })
            .flatMap(obj => {
                console.log(' generic append file...');
                return genericAppendFile(this, '')
                    .map(() => obj)
            })
            .flatMap(obj => {
                console.log(' release lock...');
                return releaseLock(this, obj.id)
            }).map(() => {
                ee.emit('ready');
                this.isReady = true;
                //start tailing, only after we know that the file exists, etc.
                tail(fp).on('data', data => {
                    debug('\n', colors.cyan(' => raw data (well, trimmed) from tail => '), '\n', String(data).trim());
                    data = String(data).split('\n')
                        .filter(ln => String(ln).trim().length > 0)
                        .map(ln => String(ln).trim());
                    data.map(function (d) {
                        try {
                            return JSON.parse(d);
                        }
                        catch (err) {
                            console.log('\n', colors.red(' => bad data from tail => '), '\n', d);
                            return '';
                        }
                    }).filter(function (d) {
                        return String(d).trim().length > 0;
                    }).forEach(d => {
                        this.push(d);
                        // this.obsEnqueue.onNext(d);
                    });
                });
            })
            .catch(e => {
                console.error(e.stack || e);
                return releaseLock(this, true);
            });
    };

it seems to fail at acquireLock(this), which looks as follows:

exports.acquireLock = function _acquireLock(queue) {
    const lock = queue.lock;
    const client = queue.client;
    return Rx.Observable.create(obs => {
        client.lock(lock, function (err, unlock, id) {
            obs.onNext({
                error: err ? (err.stack || err) : undefined,
                id: id
            });
            obs.onCompleted();
        });
        return function () {
            // console.log('disposing acquireLock()');
        }
    });
};

anybody know what I am doing wrong?

my guess is that

          .flatMap(() => {
                console.log('creating client...'); // works
                this.client = new Client({port: port});
                return acquireLock(this)
                    .filter(obj => !obj.error);
            })

.filter being used with .flatMap does not work anymore with version 5?

Perhaps calling .filter does not return an observable?

READ ALSO
Get header info Android/NodeJS

Get header info Android/NodeJS

I've created an API in NodeJS that contains headers (which are the tokens)

115
Sending parameters in multipart/form-data using Android 7 and Node.js

Sending parameters in multipart/form-data using Android 7 and Node.js

I've been searching everywhere for a way to do a post request using Nodejs as my back-end and android as my front-end

146
Having trouble setting persisting session in a typescript / angular2 environment and nodejs in the backend

Having trouble setting persisting session in a typescript / angular2 environment and nodejs in the backend

I'm using typescript / angular2 in the frontend and nodejs in the backend

136
Can't install react-native-cli after following tutorial

Can't install react-native-cli after following tutorial

Before reading and following the tutorial in NPM I installed the react-native-cli globally and it worked just fine

117