1 // Written in the D programming language. 2 3 /** 4 * MessagePack RPC Client 5 */ 6 module msgpackrpc.client; 7 8 public import msgpackrpc.common; 9 import msgpackrpc.transport.tcp; 10 11 import msgpack; 12 import vibe.vibe; 13 14 import std.array; 15 import std.traits; 16 17 18 /** 19 * MessagePack RPC Client 20 */ 21 class Client(alias Protocol) 22 { 23 private: 24 alias Protocol.ClientTransport!(typeof(this)) Transport; 25 26 Transport _transport; 27 IDGenerater _generator; 28 Future[size_t] _table; 29 30 public: 31 this(Endpoint endpoint) 32 { 33 _transport = new Transport(this, endpoint); 34 } 35 36 this(string endpoint) 37 { 38 _transport = new Transport(this, Endpoint(endpoint)); 39 } 40 41 void close() 42 { 43 _transport.close(); 44 _table.destroy(); 45 } 46 47 T call(T, Args...)(string method, Args args) 48 { 49 return sendRequest(method, args).get!T; 50 } 51 52 Future callAsync(Args...)(string method, Args args) 53 { 54 return sendRequest(method, args); 55 } 56 57 void notify(Args...)(string method, Args args) 58 { 59 auto packer = packer(Appender!(ubyte[])()); 60 61 packer.beginArray(3).pack(MessageType.notify, method).packArray(args); 62 _transport.sendMessage(packer.stream.data, false); 63 } 64 65 void onResponse(size_t id, ref Value error, ref Value result) 66 { 67 auto future = id in _table; 68 if (future is null) 69 return; 70 71 if (error.type == Value.Type.nil) { 72 future.result = result; 73 } else { 74 future.error = error; 75 } 76 version( noExitEventloop) {} else 77 { getEventDriver().exitEventLoop(); } 78 } 79 80 private: 81 Future sendRequest(Args...)(string method, Args args) 82 { 83 import std.array; 84 85 auto id = ++_generator; 86 auto future = new Future(); 87 auto packer = packer(Appender!(ubyte[])()); 88 89 _table[id] = future; 90 packer.beginArray(4).pack(MessageType.request, id, method).packArray(args); 91 _transport.sendMessage(packer.stream.data); 92 93 return future; 94 } 95 } 96 97 alias Client!(msgpackrpc.transport.tcp) TCPClient; 98 99 /** 100 * Compose the future value 101 */ 102 class Future 103 { 104 alias void delegate(Future) Callback; 105 106 private: 107 Value _value; 108 Callback _callback; 109 bool _err; 110 bool _yet = true; 111 112 public: 113 void join() 114 { 115 while (_yet) 116 getEventDriver().runEventLoopOnce(); 117 } 118 119 @property 120 T get(T = Value)() 121 { 122 join(); 123 124 if (_err) { 125 RPCException.rethrow(_value); 126 return T.init; 127 } 128 129 static if (is(T : Value)) 130 { 131 return _value; 132 } 133 else 134 { 135 return _value.as!T; 136 } 137 } 138 139 @property 140 { 141 ref Value result() 142 { 143 return _value; 144 } 145 146 void result(ref Value res) 147 { 148 _yet = false; 149 _value = res; 150 151 if (_callback !is null) 152 _callback(this); 153 } 154 155 bool errorOccurred() 156 { 157 return _err; 158 } 159 160 ref Value error() 161 { 162 return _value; 163 } 164 165 void error(ref Value err) 166 { 167 _yet = false; 168 _err = true; 169 _value = err; 170 171 if (_callback !is null) 172 _callback(this); 173 } 174 175 void callback(Callback callback) 176 { 177 _callback = callback; 178 } 179 } 180 } 181 182 private: 183 184 // TODO: Make shared 185 struct IDGenerater 186 { 187 private: 188 size_t _id; 189 190 public: 191 size_t opUnary(string op)() if (op == "++" || op == "--") 192 { 193 static if (op == "++") 194 { 195 return ++_id; 196 } 197 else 198 { 199 return ++_id; 200 } 201 } 202 }