This repository was archived by the owner on Feb 17, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathindex.ts
66 lines (55 loc) · 1.99 KB
/
index.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import _IrisAMQP from './amqp';
import {LibOpts as IrisAMQPLibOpts} from './amqp';
import {parse, serialize} from './serialization';
import {is, identity, ifElse, map, curry, pipeP, lensProp, over} from 'ramda';
import {RPCError} from './errors';
export const IrisAMQP = _IrisAMQP;
import {RegisterActiveContext, RegisterHandler, Iris, RegisterInput, EmitInput, CollectInput,RequestInput} from './types';
import { Observable } from 'rxjs';
export * from './types';
export * from './utils';
type F1<T, R> = (t: T) => R;
export const toPromise = curry(function toPromise<T, R>(f: F1<T, R>, val: T) {
return Promise.resolve(f(val));
});
const lensPayload = lensProp('payload');
const lensHandler = lensProp('handler');
const serializePayload = toPromise(over(lensPayload, serialize));
const parsePayload = toPromise(over(lensPayload, parse));
const serializeP = toPromise(serialize);
const parseArray = toPromise(map(ifElse(is(Buffer), parse, identity))) as any;
const transformHandler = toPromise(over(lensHandler, (handler: RegisterHandler<any, any>) => pipeP(parsePayload, handler, serializeP)));
export default function(opts: (IrisAMQPLibOpts & {
_IrisAMQP?: typeof IrisAMQP
})): Observable<Iris> {
const __IrisAMQP = opts._IrisAMQP || IrisAMQP;
return __IrisAMQP(opts).map(backend => {
const request: <T, R>(o: RequestInput<T>) => Promise<R | undefined> = pipeP(
serializePayload,
backend.request,
parse
);
const register: <T, R> (o: RegisterInput<T, R>) => Promise<RegisterActiveContext> = pipeP(
transformHandler,
backend.register
);
const emit: <T>(o: EmitInput<T>) => Promise<undefined> = pipeP(
serializePayload,
backend.emit
);
const collect: <T, R> (o: CollectInput<T>) => Promise<(R | RPCError)[]> = pipeP(
serializePayload,
backend.collect,
parseArray
);
return {
backend,
request,
register,
emit,
collect,
observe: backend.observe,
stream: backend.stream
};
});
}