1 // Written in the D programming language.
2 
3 /**
4  * MessagePack RPC UDP transport layer
5  */
6 module msgpackrpc.transport.udp;
7 
8 import msgpackrpc.common;
9 import msgpackrpc.server;
10 
11 import msgpack;
12 import vibe.core.net;
13 
14 import std.conv;
15 
16 
17 abstract class BaseSocket
18 {
19   private:
20     UDPConnection _connection;
21     StreamingUnpacker _unpacker;
22 
23   public:
24     this(UDPConnection connection)
25     {
26         _connection = connection;
27         _unpacker = StreamingUnpacker([], 2048);
28     }
29 
30     void onRequest(size_t id, string method, Value[] params, ref NetworkAddress netAddr)
31     {
32         throw new Exception("Not implemented yet");
33     }
34 
35     void onResponse(size_t id, Value error, Value result)
36     {
37         throw new Exception("Not implemented yet");
38     }
39 
40     void onNotify(string method, Value[] params)
41     {
42         throw new Exception("Not implemented yet");
43     }
44 
45     void onRead()
46     {
47         NetworkAddress netAddr;
48         auto data = _connection.recv(null, &netAddr);
49         proccessRequest(data, netAddr);
50     }
51 
52   private:
53     void sendMessage(ubyte[] message)
54     {
55         _connection.send(message);
56     }
57 
58     // TODO: Merge TCP proccessRequest
59     void proccessRequest(const(ubyte)[] data, ref NetworkAddress netAddr)
60     {
61         _unpacker.feed(data);
62         foreach (ref unpacked; _unpacker) {
63             immutable msgSize = unpacked.length;
64             if (msgSize != 4 && msgSize != 3)
65                 throw new Exception("Mismatched");
66 
67             immutable type = unpacked[0].as!uint;
68             switch (type) {
69             case MessageType.request:
70                 onRequest(unpacked[1].as!size_t, unpacked[2].as!string, unpacked[3].via.array, netAddr);
71                 break;
72             case MessageType.response:
73                 onResponse(unpacked[1].as!size_t, unpacked[2], unpacked[3]);
74                 break;
75             case MessageType.notify:
76                 onNotify(unpacked[1].as!string, unpacked[2].via.array);
77                 break;
78             default:
79                 throw new RPCException("Unknown message type: type = " ~ to!string(type));
80             }
81         }
82     }
83 }
84 
85 
86 class ServerSocket(Server) : BaseSocket
87 {
88   private:
89     Server _server;
90 
91   public:
92     this(UDPConnection connection, Server server)
93     {
94         super(connection);
95         _server = server;
96     }
97 
98     override void onRequest(size_t id, string method, Value[] params, ref NetworkAddress netAddr)
99     {
100         _server.onRequest(Sender(_connection, &netAddr), id, method, params);
101     }
102 
103     override void onNotify(string method, Value[] params)
104     {
105         _server.onNotify(method, params);
106     }
107 
108     // Wraps a client endpoint to send the response
109     static struct Sender
110     {
111       private:
112         NetworkAddress* _netAddr;
113         UDPConnection _connection;
114 
115       public:
116         this(UDPConnection connection, NetworkAddress* netAddr)
117         {
118             _connection = connection;
119             _netAddr = netAddr;
120         }
121 
122         void sendResponse(Args...)(const Args args)
123         {
124             _connection.send(pack(MessageType.response, args), _netAddr);
125         }
126     }
127 }
128 
129 
130 final class ServerTransport(Server)
131 {
132   private:
133     Endpoint _endpoint;
134 
135   public:
136     this(Endpoint endpoint)
137     {
138         _endpoint = endpoint;
139     }
140 
141     void listen(Server server)
142     {
143         runTask({
144             auto socket = new ServerSocket!Server(listenUDP(_endpoint.port, _endpoint.address), server);
145             while (true)
146                 socket.onRead();
147         });
148     }
149 
150     void close()
151     {
152     }
153 }