1 /*
2  * Copyright (c) 2018-2020 sel-project
3  *
4  * Permission is hereby granted, free of charge, to any person obtaining a copy
5  * of this software and associated documentation files (the "Software"), to deal
6  * in the Software without restriction, including without limitation the rights
7  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8  * copies of the Software, and to permit persons to whom the Software is
9  * furnished to do so, subject to the following conditions:
10  *
11  * The above copyright notice and this permission notice shall be included in all
12  * copies or substantial portions of the Software.
13  *
14  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20  * SOFTWARE.
21  *
22  */
23 /**
24  * Copyright: Copyright (c) 2018-2020 sel-project
25  * License: MIT
26  * Authors: Kripth
27  * Source: $(HTTP github.com/sel-project/sel-util/stream/sel/stream.d, sel/stream.d)
28  */
29 module sel.stream;
30 
31 import std.conv : to;
32 import std.socket : getAddress;
33 import std.system : Endian;
34 import std.typetuple : TypeTuple;
35 import std.zlib : Compress, UnCompress;
36 
37 import kiss.event : EventLoop;
38 import kiss.net : TcpStream;
39 
40 import xbuffer.buffer : Buffer, BufferOverflowException;
41 import xbuffer.varint : isVar;
42 
43 /**
44  * Generic stream.
45  */
46 class Stream {
47 
48 	TcpStream conn;
49 	Buffer buffer;
50 
51 	public void delegate(Buffer) handler;
52 
53 	public void delegate() onConnect, onClose;
54 
55 	private Modifier[] modifiers;
56 
57 	this(TcpStream conn, void delegate(Buffer) handler) {
58 		this.conn = conn;
59 		this.conn.onDataReceived = &this.handle;
60 		this.conn.onConnected((bool success){ onConnect(); });
61 		this.conn.onClosed({ onClose(); });
62 		this.buffer = new Buffer(1024);
63 		this.handler = handler;
64 		this.onConnect = {};
65 		this.onClose = {};
66 	}
67 
68 	this(TcpStream conn) {
69 		this(conn, (Buffer buffer){});
70 	}
71 
72 	this(EventLoop eventLoop, string ip, ushort port) {
73 		this(new TcpStream(eventLoop));
74 		this.conn.connect(getAddress(ip, port)[0]);
75 	}
76 
77 	public void handle(in ubyte[] data) {
78 		this.buffer.data = data;
79 		this.handleData();
80 	}
81 
82 	public void handleData() {
83 		bool more;
84 		do {
85 			more = false;
86 			foreach(modifier ; this.modifiers) {
87 				more |= modifier.decode(buffer);
88 			}
89 			if(buffer.data.length) this.handler(buffer);
90 		} while(more);
91 	}
92 
93 	public void send(ubyte[] data) {
94 		this.buffer.data = data;
95 		this.send(this.buffer);
96 	}
97 
98 	public void send(Buffer buffer) {
99 		foreach_reverse(modifier ; this.modifiers) {
100 			modifier.encode(buffer);
101 		}
102 		this.sendData(buffer);
103 		buffer.reset();
104 	}
105 
106 	public void sendData(Buffer buffer) {
107 		this.conn.write(buffer.data!ubyte);
108 	}
109 
110 	public void modify(M:Modifier, E...)(E args) {
111 		this.modifiers ~= new M(args);
112 	}
113 	
114 }
115 
116 abstract class Modifier {
117 
118 	abstract void encode(Buffer buffer);
119 
120 	abstract bool decode(Buffer buffer);
121 
122 }
123 
124 class LengthPrefixedModifier(T, Endian endianness=Endian.bigEndian) : Modifier {
125 
126 	static if(!isVar!T) alias E = TypeTuple!(T, endianness);
127 	else alias E = T;
128 
129 	private size_t length = 0;
130 	private Buffer buffer;
131 
132 	this() {
133 		this.buffer = new Buffer(1024);
134 	}
135 
136 	override void encode(Buffer buffer) {
137 		static if(isVar!T) buffer.write!E(buffer.data.length.to!(T.Base), 0);
138 		else buffer.write!(endianness, T)(buffer.data.length.to!T, 0);
139 	}
140 
141 	override bool decode(Buffer buffer) {
142 		buffer.data = this.buffer.data ~ buffer.data;
143 		if(this.length == 0) {
144 			return this.parseLength(buffer);
145 		} else {
146 			return this.parseImpl(buffer);
147 		}
148 	}
149 
150 	private bool parseLength(Buffer buffer) {
151 		try {
152 			static if(isVar!T) this.length = buffer.read!E();
153 			else this.length = buffer.read!(endianness, T)();
154 			if(this.length != 0) return this.parseImpl(buffer);
155 			else return false;
156 		} catch(BufferOverflowException) {
157 			// cannot read the length
158 			this.buffer.data = buffer.data;
159 			return false;
160 		}
161 	}
162 
163 	private bool parseImpl(Buffer buffer) {
164 		if(buffer.canRead(this.length)) {
165 			this.buffer.data = buffer.readData(this.length);
166 			this.length = 0;
167 			void[] rest = buffer.data;
168 			buffer.data = this.buffer.data;
169 			this.buffer.data = rest;
170 			return this.buffer.data.length > 0;
171 		} else {
172 			// not enough data to read
173 			this.buffer.data = buffer.data;
174 			return false;
175 		}
176 	}
177 
178 }
179 
180 class CompressedModifier(T, Endian endianness=Endian.bigEndian) : Modifier {
181 	
182 	static if(!isVar!T) alias E = TypeTuple!(T, endianness);
183 	else alias E = T;
184 
185 	private size_t thresold;
186 
187 	this(size_t thresold) {
188 		this.thresold = thresold;
189 	}
190 
191 	override void encode(Buffer buffer) {
192 		if(buffer.data.length >= this.thresold) {
193 			immutable length = buffer.data.length;
194 			Compress c = new Compress();
195 			auto data = c.compress(buffer.data);
196 			data ~= c.flush();
197 			buffer.data = data;
198 			static if(isVar!T) buffer.write!E(length.to!(T.Base), 0);
199 			else buffer.write!E(length.to!T, 0);
200 		} else {
201 			buffer.write!T(0, 0);
202 		}
203 	}
204 
205 	override bool decode(Buffer buffer) {
206 		try {
207 			size_t length = buffer.read!E();
208 			if(length != 0) {
209 				UnCompress uc = new UnCompress(length.to!uint);
210 				auto data = uc.uncompress(buffer.data);
211 				data ~= uc.flush();
212 				buffer.data = data;
213 			}
214 		} catch(BufferOverflowException) {}
215 		return false;
216 	}
217 
218 }