1 // Written in the D programming language.
2 
3 /**
4  * MessagePack RPC Client
5  */
6 module msgpackrpc.client;
7 
8 public import msgpackrpc.common;
9 import msgpackrpc.transport.tcp;
10 
11 import msgpack;
12 import vibe.vibe;
13 
14 import std.array;
15 import std.traits;
16 
17 
18 /**
19  * MessagePack RPC Client
20  */
21 class Client(alias Protocol)
22 {
23   private:
24     alias Protocol.ClientTransport!(typeof(this)) Transport;
25 
26     Transport _transport;
27     IDGenerater _generator;
28     Future[size_t] _table;
29 
30   public:
31     this(Endpoint endpoint)
32     {
33         _transport = new Transport(this, endpoint);
34     }
35 
36     this(string endpoint)
37     {
38         _transport = new Transport(this, Endpoint(endpoint));
39     }
40 
41     void close()
42     {
43         _transport.close();
44         _table.destroy();
45     }
46 
47     T call(T, Args...)(string method, Args args)
48     {
49         return sendRequest(method, args).get!T;
50     }
51 
52     Future callAsync(Args...)(string method, Args args)
53     {
54         return sendRequest(method, args);
55     }
56 
57     void notify(Args...)(string method, Args args)
58     {
59         auto packer = packer(Appender!(ubyte[])());
60 
61         packer.beginArray(3).pack(MessageType.notify, method).packArray(args);
62         _transport.sendMessage(packer.stream.data, false);
63     }
64 
65     void onResponse(size_t id, ref Value error, ref Value result)
66     {
67         auto future = id in _table;
68         if (future is null)
69             return;
70 
71         if (error.type == Value.Type.nil) {
72             future.result = result;
73         } else {
74             future.error = error;
75         }
76         version( noExitEventloop) {} else
77         { getEventDriver().exitEventLoop(); }
78     }
79 
80   private:
81     Future sendRequest(Args...)(string method, Args args)
82     {
83         import std.array;
84 
85         auto id = ++_generator;
86         auto future = new Future();
87         auto packer = packer(Appender!(ubyte[])());
88 
89         _table[id] = future;
90         packer.beginArray(4).pack(MessageType.request, id, method).packArray(args);
91         _transport.sendMessage(packer.stream.data);
92 
93         return future;
94     }
95 }
96 
97 alias Client!(msgpackrpc.transport.tcp) TCPClient;
98 
99 /**
100  * Compose the future value
101  */
102 class Future
103 {
104     alias void delegate(Future) Callback;
105 
106   private:
107     Value _value;
108     Callback _callback;
109     bool _err;
110     bool _yet = true;
111 
112   public:
113     void join()
114     {
115         while (_yet)
116             getEventDriver().runEventLoopOnce();
117     }
118 
119     @property
120     T get(T = Value)()
121     {
122         join();
123 
124         if (_err) {
125             RPCException.rethrow(_value);
126             return T.init;
127         }
128 
129         static if (is(T : Value))
130         {
131             return _value;
132         }
133         else
134         {
135             return _value.as!T;
136         }
137     }
138 
139     @property
140     {
141         ref Value result()
142         {
143             return _value;
144         }
145 
146         void result(ref Value res)
147         {
148             _yet = false;
149             _value = res;
150 
151             if (_callback !is null)
152                 _callback(this);
153         }
154 
155         bool errorOccurred()
156         {
157             return _err;
158         }
159 
160         ref Value error()
161         {
162             return _value;
163         }
164 
165         void error(ref Value err)
166         {
167             _yet = false;
168             _err = true;
169             _value = err;
170 
171             if (_callback !is null)
172                 _callback(this);
173         }
174 
175         void callback(Callback callback)
176         {
177             _callback = callback;
178         }
179     }
180 }
181 
182 private:
183 
184 // TODO: Make shared
185 struct IDGenerater
186 {
187   private:
188     size_t _id;
189 
190   public:
191     size_t opUnary(string op)() if (op == "++" || op == "--")
192     {
193         static if (op == "++")
194         {
195             return ++_id;
196         }
197         else
198         {
199             return ++_id;
200         }
201     }
202 }