1 // Written in the D programming language. 2 3 /** 4 * MessagePack RPC TCP transport layer 5 */ 6 module msgpackrpc.transport.tcp; 7 8 import msgpackrpc.common; 9 import msgpackrpc.server; 10 11 import msgpack; 12 import vibe.core.net; 13 import vibe.core.driver; 14 15 import std.conv; 16 17 18 abstract class BaseSocket 19 { 20 private: 21 TCPConnection _connection; 22 StreamingUnpacker _unpacker; 23 24 public: 25 this(TCPConnection connection) 26 { 27 _connection = connection; 28 _unpacker = StreamingUnpacker([], 2048); 29 } 30 31 void close() 32 { 33 _connection.close(); 34 } 35 36 void sendResponse(Args...)(const Args args) 37 { 38 sendMessage(pack(MessageType.response, args)); 39 } 40 41 void onRequest(size_t id, string method, Value[] params) 42 { 43 throw new Exception("Not implemented yet"); 44 } 45 46 void onResponse(size_t id, Value error, Value result) 47 { 48 throw new Exception("Not implemented yet"); 49 } 50 51 void onNotify(string method, Value[] params) 52 { 53 throw new Exception("Not implemented yet"); 54 } 55 56 void onRead() 57 { 58 InputStream input = _connection; 59 60 do { 61 static if (size_t.sizeof == 4) 62 auto size = cast(uint)input.leastSize; 63 else 64 auto size = input.leastSize; 65 if (size > 0) { 66 ubyte[] data = new ubyte[](size); 67 68 input.read(data); 69 proccessRequest(data); 70 //if (!_connection.waitForData(dur!"seconds"(10))) 71 // break; 72 } 73 } while (_connection.connected); 74 } 75 76 private: 77 void sendMessage(ubyte[] message) 78 { 79 OutputStream output = _connection; 80 output.write(message); 81 output.flush(); 82 } 83 84 void proccessRequest(const(ubyte)[] data) 85 { 86 _unpacker.feed(data); 87 foreach (ref unpacked; _unpacker) { 88 immutable msgSize = unpacked.length; 89 if (msgSize != 4 && msgSize != 3) 90 throw new Exception("Mismatched"); 91 92 immutable type = unpacked[0].as!uint; 93 switch (type) { 94 case MessageType.request: 95 onRequest(unpacked[1].as!size_t, unpacked[2].as!string, unpacked[3].via.array); 96 break; 97 case MessageType.response: 98 onResponse(unpacked[1].as!size_t, unpacked[2], unpacked[3]); 99 break; 100 case MessageType.notify: 101 onNotify(unpacked[1].as!string, unpacked[2].via.array); 102 break; 103 default: 104 throw new RPCException("Unknown message type: type = " ~ to!string(type)); 105 } 106 } 107 } 108 } 109 110 111 class ClientSocket(Client) : BaseSocket 112 { 113 private: 114 Client _client; 115 116 public: 117 this(TCPConnection connection, Client client) 118 { 119 super(connection); 120 _client = client; 121 } 122 123 override void onRead() 124 { 125 InputStream input = _connection; 126 127 do { 128 static if (size_t.sizeof == 4) 129 ubyte[] data = new ubyte[](cast(uint)input.leastSize); 130 else 131 ubyte[] data = new ubyte[](input.leastSize); 132 input.read(data); 133 proccessRequest(data); 134 // CHECKME 135 if(_unpacker.size == 0) 136 break; 137 } while (_connection.connected); 138 } 139 140 override void onResponse(size_t id, Value error, Value result) 141 { 142 _client.onResponse(id, error, result); 143 } 144 } 145 146 147 final class ClientTransport(Client) 148 { 149 private: 150 Endpoint _endpoint; 151 Client _client; 152 ClientSocket!Client _socket; 153 154 public: 155 this(Client client, Endpoint endpoint) 156 { 157 _client = client; 158 _endpoint = endpoint; 159 _socket = new ClientSocket!Client(connectTCP(_endpoint.address, _endpoint.port), client); 160 } 161 162 void sendMessage(ubyte[] message, bool request = true) 163 { 164 _socket.sendMessage(message); 165 if (request) 166 _socket.onRead(); 167 else 168 getEventDriver().processEvents(); // force notify event to send 169 } 170 171 void close() 172 { 173 _socket.close(); 174 } 175 } 176 177 178 class ServerSocket(Server) : BaseSocket 179 { 180 private: 181 Server _server; 182 183 public: 184 this(TCPConnection connection, Server server) 185 { 186 super(connection); 187 _server = server; 188 } 189 190 override void onRequest(size_t id, string method, Value[] params) 191 { 192 _server.onRequest(this, id, method, params); 193 } 194 195 override void onNotify(string method, Value[] params) 196 { 197 _server.onNotify(method, params); 198 } 199 } 200 201 202 final class ServerTransport(Server) 203 { 204 private: 205 Endpoint _endpoint; 206 207 public: 208 this(Endpoint endpoint) 209 { 210 _endpoint = endpoint; 211 } 212 213 void listen(Server server) 214 { 215 auto callback = (TCPConnection conn) { 216 auto socket = new ServerSocket!Server(conn, server); 217 socket.onRead(); 218 }; 219 listenTCP(_endpoint.port, callback, _endpoint.address); 220 } 221 222 void close() 223 { 224 } 225 }