1 /* 2 * Copyright (c) 2018-2020 sel-project 3 * 4 * Permission is hereby granted, free of charge, to any person obtaining a copy 5 * of this software and associated documentation files (the "Software"), to deal 6 * in the Software without restriction, including without limitation the rights 7 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 8 * copies of the Software, and to permit persons to whom the Software is 9 * furnished to do so, subject to the following conditions: 10 * 11 * The above copyright notice and this permission notice shall be included in all 12 * copies or substantial portions of the Software. 13 * 14 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 15 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 16 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 17 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 18 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 19 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 20 * SOFTWARE. 21 * 22 */ 23 /** 24 * Copyright: Copyright (c) 2018-2020 sel-project 25 * License: MIT 26 * Authors: Kripth 27 * Source: $(HTTP github.com/sel-project/sel-util/stream/sel/stream.d, sel/stream.d) 28 */ 29 module sel.stream; 30 31 import std.conv : to; 32 import std.socket : getAddress; 33 import std.system : Endian; 34 import std.typetuple : TypeTuple; 35 import std.zlib : Compress, UnCompress; 36 37 import kiss.event : EventLoop; 38 import kiss.net : TcpStream; 39 40 import xbuffer.buffer : Buffer, BufferOverflowException; 41 import xbuffer.varint : isVar; 42 43 /** 44 * Generic stream. 45 */ 46 class Stream { 47 48 TcpStream conn; 49 Buffer buffer; 50 51 public void delegate(Buffer) handler; 52 53 public void delegate() onConnect, onClose; 54 55 private Modifier[] modifiers; 56 57 this(TcpStream conn, void delegate(Buffer) handler) { 58 this.conn = conn; 59 this.conn.onDataReceived = &this.handle; 60 this.conn.onConnected((bool success){ onConnect(); }); 61 this.conn.onClosed({ onClose(); }); 62 this.buffer = new Buffer(1024); 63 this.handler = handler; 64 this.onConnect = {}; 65 this.onClose = {}; 66 } 67 68 this(TcpStream conn) { 69 this(conn, (Buffer buffer){}); 70 } 71 72 this(EventLoop eventLoop, string ip, ushort port) { 73 this(new TcpStream(eventLoop)); 74 this.conn.connect(getAddress(ip, port)[0]); 75 } 76 77 public void handle(in ubyte[] data) { 78 this.buffer.data = data; 79 this.handleData(); 80 } 81 82 public void handleData() { 83 bool more; 84 do { 85 more = false; 86 foreach(modifier ; this.modifiers) { 87 more |= modifier.decode(buffer); 88 } 89 if(buffer.data.length) this.handler(buffer); 90 } while(more); 91 } 92 93 public void send(ubyte[] data) { 94 this.buffer.data = data; 95 this.send(this.buffer); 96 } 97 98 public void send(Buffer buffer) { 99 foreach_reverse(modifier ; this.modifiers) { 100 modifier.encode(buffer); 101 } 102 this.sendData(buffer); 103 buffer.reset(); 104 } 105 106 public void sendData(Buffer buffer) { 107 this.conn.write(buffer.data!ubyte); 108 } 109 110 public void modify(M:Modifier, E...)(E args) { 111 this.modifiers ~= new M(args); 112 } 113 114 } 115 116 abstract class Modifier { 117 118 abstract void encode(Buffer buffer); 119 120 abstract bool decode(Buffer buffer); 121 122 } 123 124 class LengthPrefixedModifier(T, Endian endianness=Endian.bigEndian) : Modifier { 125 126 static if(!isVar!T) alias E = TypeTuple!(T, endianness); 127 else alias E = T; 128 129 private size_t length = 0; 130 private Buffer buffer; 131 132 this() { 133 this.buffer = new Buffer(1024); 134 } 135 136 override void encode(Buffer buffer) { 137 static if(isVar!T) buffer.write!E(buffer.data.length.to!(T.Base), 0); 138 else buffer.write!(endianness, T)(buffer.data.length.to!T, 0); 139 } 140 141 override bool decode(Buffer buffer) { 142 buffer.data = this.buffer.data ~ buffer.data; 143 if(this.length == 0) { 144 return this.parseLength(buffer); 145 } else { 146 return this.parseImpl(buffer); 147 } 148 } 149 150 private bool parseLength(Buffer buffer) { 151 try { 152 static if(isVar!T) this.length = buffer.read!E(); 153 else this.length = buffer.read!(endianness, T)(); 154 if(this.length != 0) return this.parseImpl(buffer); 155 else return false; 156 } catch(BufferOverflowException) { 157 // cannot read the length 158 this.buffer.data = buffer.data; 159 return false; 160 } 161 } 162 163 private bool parseImpl(Buffer buffer) { 164 if(buffer.canRead(this.length)) { 165 this.buffer.data = buffer.readData(this.length); 166 this.length = 0; 167 void[] rest = buffer.data; 168 buffer.data = this.buffer.data; 169 this.buffer.data = rest; 170 return this.buffer.data.length > 0; 171 } else { 172 // not enough data to read 173 this.buffer.data = buffer.data; 174 return false; 175 } 176 } 177 178 } 179 180 class CompressedModifier(T, Endian endianness=Endian.bigEndian) : Modifier { 181 182 static if(!isVar!T) alias E = TypeTuple!(T, endianness); 183 else alias E = T; 184 185 private size_t thresold; 186 187 this(size_t thresold) { 188 this.thresold = thresold; 189 } 190 191 override void encode(Buffer buffer) { 192 if(buffer.data.length >= this.thresold) { 193 immutable length = buffer.data.length; 194 Compress c = new Compress(); 195 auto data = c.compress(buffer.data); 196 data ~= c.flush(); 197 buffer.data = data; 198 static if(isVar!T) buffer.write!E(length.to!(T.Base), 0); 199 else buffer.write!E(length.to!T, 0); 200 } else { 201 buffer.write!T(0, 0); 202 } 203 } 204 205 override bool decode(Buffer buffer) { 206 try { 207 size_t length = buffer.read!E(); 208 if(length != 0) { 209 UnCompress uc = new UnCompress(length.to!uint); 210 auto data = uc.uncompress(buffer.data); 211 data ~= uc.flush(); 212 buffer.data = data; 213 } 214 } catch(BufferOverflowException) {} 215 return false; 216 } 217 218 }