1 // Written in the D programming language. 2 3 /** 4 * MessagePack RPC UDP transport layer 5 */ 6 module msgpackrpc.transport.udp; 7 8 import msgpackrpc.common; 9 import msgpackrpc.server; 10 11 import msgpack; 12 import vibe.core.net; 13 14 import std.conv; 15 16 17 abstract class BaseSocket 18 { 19 private: 20 UDPConnection _connection; 21 StreamingUnpacker _unpacker; 22 23 public: 24 this(UDPConnection connection) 25 { 26 _connection = connection; 27 _unpacker = StreamingUnpacker([], 2048); 28 } 29 30 void onRequest(size_t id, string method, Value[] params, ref NetworkAddress netAddr) 31 { 32 throw new Exception("Not implemented yet"); 33 } 34 35 void onResponse(size_t id, Value error, Value result) 36 { 37 throw new Exception("Not implemented yet"); 38 } 39 40 void onNotify(string method, Value[] params) 41 { 42 throw new Exception("Not implemented yet"); 43 } 44 45 void onRead() 46 { 47 NetworkAddress netAddr; 48 auto data = _connection.recv(null, &netAddr); 49 proccessRequest(data, netAddr); 50 } 51 52 private: 53 void sendMessage(ubyte[] message) 54 { 55 _connection.send(message); 56 } 57 58 // TODO: Merge TCP proccessRequest 59 void proccessRequest(const(ubyte)[] data, ref NetworkAddress netAddr) 60 { 61 _unpacker.feed(data); 62 foreach (ref unpacked; _unpacker) { 63 immutable msgSize = unpacked.length; 64 if (msgSize != 4 && msgSize != 3) 65 throw new Exception("Mismatched"); 66 67 immutable type = unpacked[0].as!uint; 68 switch (type) { 69 case MessageType.request: 70 onRequest(unpacked[1].as!size_t, unpacked[2].as!string, unpacked[3].via.array, netAddr); 71 break; 72 case MessageType.response: 73 onResponse(unpacked[1].as!size_t, unpacked[2], unpacked[3]); 74 break; 75 case MessageType.notify: 76 onNotify(unpacked[1].as!string, unpacked[2].via.array); 77 break; 78 default: 79 throw new RPCException("Unknown message type: type = " ~ to!string(type)); 80 } 81 } 82 } 83 } 84 85 86 class ServerSocket(Server) : BaseSocket 87 { 88 private: 89 Server _server; 90 91 public: 92 this(UDPConnection connection, Server server) 93 { 94 super(connection); 95 _server = server; 96 } 97 98 override void onRequest(size_t id, string method, Value[] params, ref NetworkAddress netAddr) 99 { 100 _server.onRequest(Sender(_connection, &netAddr), id, method, params); 101 } 102 103 override void onNotify(string method, Value[] params) 104 { 105 _server.onNotify(method, params); 106 } 107 108 // Wraps a client endpoint to send the response 109 static struct Sender 110 { 111 private: 112 NetworkAddress* _netAddr; 113 UDPConnection _connection; 114 115 public: 116 this(UDPConnection connection, NetworkAddress* netAddr) 117 { 118 _connection = connection; 119 _netAddr = netAddr; 120 } 121 122 void sendResponse(Args...)(const Args args) 123 { 124 _connection.send(pack(MessageType.response, args), _netAddr); 125 } 126 } 127 } 128 129 130 final class ServerTransport(Server) 131 { 132 private: 133 Endpoint _endpoint; 134 135 public: 136 this(Endpoint endpoint) 137 { 138 _endpoint = endpoint; 139 } 140 141 void listen(Server server) 142 { 143 runTask({ 144 auto socket = new ServerSocket!Server(listenUDP(_endpoint.port, _endpoint.address), server); 145 while (true) 146 socket.onRead(); 147 }); 148 } 149 150 void close() 151 { 152 } 153 }