diff --git a/flow-typed/npm/rsocket-flowable_vx.x.x.js b/flow-typed/npm/rsocket-flowable_vx.x.x.js new file mode 100644 index 000000000..9d5e0da6b --- /dev/null +++ b/flow-typed/npm/rsocket-flowable_vx.x.x.js @@ -0,0 +1,81 @@ +// flow-typed signature: 06cae6786a7bab166b894590a59b8780 +// flow-typed version: <>/rsocket-flowable_v0.0.6/flow_v0.76.0 + +/** + * This is an autogenerated libdef stub for: + * + * 'rsocket-flowable' + * + * Fill this stub out by replacing all the `any` types. + * + * Once filled out, we encourage you to share your work with the + * community by sending a pull request to: + * https://github.com/flowtype/flow-typed + */ + +declare module 'rsocket-flowable' { + declare module.exports: any; +} + +/** + * We include stubs for each file inside this npm package in case you need to + * require those files directly. Feel free to delete any files that aren't + * needed. + */ +declare module 'rsocket-flowable/build/Flowable' { + declare module.exports: any; +} + +declare module 'rsocket-flowable/build/FlowableMapOperator' { + declare module.exports: any; +} + +declare module 'rsocket-flowable/build/FlowableRequestOperator' { + declare module.exports: any; +} + +declare module 'rsocket-flowable/build/FlowableTakeOperator' { + declare module.exports: any; +} + +declare module 'rsocket-flowable/build/FlowableTimer' { + declare module.exports: any; +} + +declare module 'rsocket-flowable/build/haste/rsocket-flowable' { + declare module.exports: any; +} + +declare module 'rsocket-flowable/build/index' { + declare module.exports: any; +} + +declare module 'rsocket-flowable/build/Single' { + declare module.exports: any; +} + +// Filename aliases +declare module 'rsocket-flowable/build/Flowable.js' { + declare module.exports: $Exports<'rsocket-flowable/build/Flowable'>; +} +declare module 'rsocket-flowable/build/FlowableMapOperator.js' { + declare module.exports: $Exports<'rsocket-flowable/build/FlowableMapOperator'>; +} +declare module 'rsocket-flowable/build/FlowableRequestOperator.js' { + declare module.exports: $Exports<'rsocket-flowable/build/FlowableRequestOperator'>; +} +declare module 'rsocket-flowable/build/FlowableTakeOperator.js' { + declare module.exports: $Exports<'rsocket-flowable/build/FlowableTakeOperator'>; +} +declare module 'rsocket-flowable/build/FlowableTimer.js' { + declare module.exports: $Exports<'rsocket-flowable/build/FlowableTimer'>; +} +declare module 'rsocket-flowable/build/haste/rsocket-flowable.js' { + declare module.exports: $Exports<'rsocket-flowable/build/haste/rsocket-flowable'>; +} +declare module 'rsocket-flowable/build/index.js' { + declare module.exports: $Exports<'rsocket-flowable/build/index'>; +} +declare module 'rsocket-flowable/build/Single.js' { + declare module.exports: $Exports<'rsocket-flowable/build/Single'>; +} diff --git a/src/server.js b/src/server.js index 5c946f9e5..b244fcaf9 100644 --- a/src/server.js +++ b/src/server.js @@ -12,6 +12,7 @@ import type {ClientQuery} from './Client.js'; import CertificateProvider from './utils/CertificateProvider'; import {RSocketServer, ReactiveSocket} from 'rsocket-core'; import RSocketTCPServer from 'rsocket-tcp-server'; +import {Single} from 'rsocket-flowable'; import Client from './Client.js'; import {RecurringError} from './utils/errors'; @@ -155,6 +156,48 @@ export default class Server extends EventEmitter { } return { + requestResponse: (payload: {data: string}) => { + if (typeof payload.data !== 'string') { + return; + } + + let rawData; + try { + rawData = JSON.parse(payload.data); + } catch (err) { + console.error(`Invalid JSON: ${payload.data}`, 'clientMessage'); + return; + } + + const json: {| + method: 'signCertificate', + csr: string, + destination: string, + |} = rawData; + if (json.method === 'signCertificate') { + console.warn('CSR received from device', 'server'); + const {csr, destination} = json; + return new Single(subscriber => { + subscriber.onSubscribe(); + this.certificateProvider + .processCertificateSigningRequest(csr, clientData.os, destination) + .then(_ => { + subscriber.onComplete({ + data: JSON.stringify({}), + metadata: '', + }); + }) + .catch(e => { + console.error(e); + subscriber.onError(e); + }); + }); + } + }, + + // Leaving this here for a while for backwards compatibility, + // but for up to date SDKs it will no longer used. + // We can delete it after the SDK change has been using requestResponse for a few weeks. fireAndForget: (payload: {data: string}) => { if (typeof payload.data !== 'string') { return; @@ -176,11 +219,11 @@ export default class Server extends EventEmitter { if (json.method === 'signCertificate') { console.warn('CSR received from device', 'server'); const {csr, destination} = json; - this.certificateProvider.processCertificateSigningRequest( - csr, - clientData.os, - destination, - ); + this.certificateProvider + .processCertificateSigningRequest(csr, clientData.os, destination) + .catch(e => { + console.error(e); + }); } }, }; diff --git a/src/utils/CertificateProvider.js b/src/utils/CertificateProvider.js index 1f71bac9a..355015c74 100644 --- a/src/utils/CertificateProvider.js +++ b/src/utils/CertificateProvider.js @@ -100,8 +100,7 @@ export default class CertificateProvider { csr, os, ), - ) - .catch(e => console.error(e)); + ); } ensureOpenSSLIsAvailable(): void {