1 // Written in the D programming language.
2 
3 /**
4  * MessagePack RPC TCP transport layer
5  */
6 module msgpackrpc.transport.tcp;
7 
8 import msgpackrpc.common;
9 import msgpackrpc.server;
10 
11 import msgpack;
12 import vibe.core.net;
13 import vibe.core.driver;
14 
15 import std.conv;
16 
17 
18 abstract class BaseSocket
19 {
20   private:
21     TCPConnection _connection;
22     StreamingUnpacker _unpacker;
23 
24   public:
25     this(TCPConnection connection)
26     {
27         _connection = connection;
28         _unpacker = StreamingUnpacker([], 2048);
29     }
30 
31     void close()
32     {
33         _connection.close();
34     }
35 
36     void sendResponse(Args...)(const Args args)
37     {
38         sendMessage(pack(MessageType.response, args));
39     }
40 
41     void onRequest(size_t id, string method, Value[] params)
42     {
43         throw new Exception("Not implemented yet");
44     }
45 
46     void onResponse(size_t id, Value error, Value result)
47     {
48         throw new Exception("Not implemented yet");
49     }
50 
51     void onNotify(string method, Value[] params)
52     {
53         throw new Exception("Not implemented yet");
54     }
55 
56     void onRead()
57     {
58         InputStream input = _connection;
59 
60         do {
61             static if (size_t.sizeof == 4)
62                 auto size = cast(uint)input.leastSize;
63             else
64                 auto size = input.leastSize;
65             if (size > 0) {
66                 ubyte[] data = new ubyte[](size);
67 
68                 input.read(data);
69                 proccessRequest(data);
70                 //if (!_connection.waitForData(dur!"seconds"(10)))
71                 //    break;
72             }
73         } while (_connection.connected);
74     }
75 
76   private:
77     void sendMessage(ubyte[] message)
78     {
79         OutputStream output = _connection;
80         output.write(message);
81         output.flush();
82     }
83 
84     void proccessRequest(const(ubyte)[] data)
85     {
86         _unpacker.feed(data);
87         foreach (ref unpacked; _unpacker) {
88             immutable msgSize = unpacked.length;
89             if (msgSize != 4 && msgSize != 3)
90                 throw new Exception("Mismatched");
91 
92             immutable type = unpacked[0].as!uint;
93             switch (type) {
94             case MessageType.request:
95                 onRequest(unpacked[1].as!size_t, unpacked[2].as!string, unpacked[3].via.array);
96                 break;
97             case MessageType.response:
98                 onResponse(unpacked[1].as!size_t, unpacked[2], unpacked[3]);
99                 break;
100             case MessageType.notify:
101                 onNotify(unpacked[1].as!string, unpacked[2].via.array);
102                 break;
103             default:
104                 throw new RPCException("Unknown message type: type = " ~ to!string(type));
105             }
106         }
107     }
108 }
109 
110 
111 class ClientSocket(Client) : BaseSocket
112 {
113   private:
114     Client _client;
115 
116   public:
117     this(TCPConnection connection, Client client)
118     {
119         super(connection);
120         _client = client;
121     }
122 
123     override void onRead()
124     {
125         InputStream input = _connection;
126 
127         do {
128             static if (size_t.sizeof == 4)
129                 ubyte[] data = new ubyte[](cast(uint)input.leastSize);
130             else
131                 ubyte[] data = new ubyte[](input.leastSize);
132             input.read(data);
133             proccessRequest(data);
134             // CHECKME
135             if(_unpacker.size == 0)
136                 break;
137         } while (_connection.connected);
138     }
139 
140     override void onResponse(size_t id, Value error, Value result)
141     {
142         _client.onResponse(id, error, result);
143     }
144 }
145 
146 
147 final class ClientTransport(Client)
148 {
149   private:
150     Endpoint _endpoint;
151     Client _client;
152     ClientSocket!Client _socket;
153 
154   public:
155     this(Client client, Endpoint endpoint)
156     {
157         _client = client;
158         _endpoint = endpoint;
159         _socket = new ClientSocket!Client(connectTCP(_endpoint.address, _endpoint.port), client);
160     }
161 
162     void sendMessage(ubyte[] message, bool request = true)
163     {
164         _socket.sendMessage(message);
165         if (request)
166             _socket.onRead();
167         else
168             getEventDriver().processEvents();  // force notify event to send
169     }
170 
171     void close()
172     {
173         _socket.close();
174     }
175 }
176 
177 
178 class ServerSocket(Server) : BaseSocket
179 {
180   private:
181     Server _server;
182 
183   public:
184     this(TCPConnection connection, Server server)
185     {
186         super(connection);
187         _server = server;
188     }
189 
190     override void onRequest(size_t id, string method, Value[] params)
191     {
192         _server.onRequest(this, id, method, params);
193     }
194 
195     override void onNotify(string method, Value[] params)
196     {
197         _server.onNotify(method, params);
198     }
199 }
200 
201 
202 final class ServerTransport(Server)
203 {
204   private:
205     Endpoint _endpoint;
206 
207   public:
208     this(Endpoint endpoint)
209     {
210         _endpoint = endpoint;
211     }
212 
213     void listen(Server server)
214     {
215         auto callback = (TCPConnection conn) {
216             auto socket = new ServerSocket!Server(conn, server);
217             socket.onRead();
218         };
219         listenTCP(_endpoint.port, callback, _endpoint.address);
220     }
221 
222     void close()
223     {
224     }
225 }