1 /++ 2 Functionality related to connecting to a server over the Internet. 3 4 Includes [core.thread.fiber.Fiber|Fiber]s that help with resolving the 5 address of, connecting to, and reading full string lines from a server. 6 7 Having them as [core.thread.fiber.Fiber|Fiber]s means a program can do 8 address resolution, connecting and reading while retaining the ability to do 9 other stuff concurrently. This means you can conveniently run code in between 10 each connection attempt, for instance, without breaking the program's flow. 11 12 Example: 13 --- 14 import std.concurrency : Generator; 15 16 Connection conn; 17 bool abort; // Set to true if something goes wrong 18 19 conn.reset(); 20 21 bool useIPv6 = false; 22 enum resolveAttempts = 10; 23 24 auto resolver = new Generator!ResolveAttempt(() => 25 resolveFiber(conn, "irc.libera.chat", 6667, useIPv6, resolveAttempts, abort)); 26 27 resolver.call(); 28 29 resolveloop: 30 foreach (const attempt; resolver) 31 { 32 // attempt is a yielded `ResolveAttempt` 33 // switch on `attempt.state`, deal with it accordingly 34 } 35 36 // Resolution done 37 38 enum connectionRetries = 10; 39 40 auto connector = new Generator!ConnectionAttempt(() => 41 connectFiber(conn, false, connectionRetries, abort)); 42 43 connector.call(); 44 45 connectorloop: 46 foreach (const attempt; connector) 47 { 48 // attempt is a yielded `ConnectionAttempt` 49 // switch on `attempt.state`, deal with it accordingly 50 } 51 52 // Connection established 53 54 enum timeoutSeconds = 600; 55 56 auto listener = new Generator!ListenAttempt(() => listenFiber(conn, abort, timeoutSecond)); 57 58 listener.call(); 59 60 foreach (const attempt; listener) 61 { 62 // attempt is a yielded `ListenAttempt` 63 doThingsWithLineFromServer(attempt.line); 64 // program logic goes here 65 } 66 --- 67 68 Copyright: [JR](https://github.com/zorael) 69 License: [Boost Software License 1.0](https://www.boost.org/users/license.html) 70 71 Authors: 72 [JR](https://github.com/zorael) 73 +/ 74 module kameloso.net; 75 76 private: 77 78 import kameloso.constants : BufferSize, Timeout; 79 80 public: 81 82 @safe: 83 84 85 // Connection 86 /++ 87 Functions and state needed to maintain a connection. 88 89 This is simply to decrease the amount of globals and to create some 90 convenience functions. 91 +/ 92 struct Connection 93 { 94 private: 95 import requests.ssl_adapter : SSL, SSL_CTX, openssl; 96 import std.socket : Address, Socket, SocketOption; 97 98 /// Real IPv4 and IPv6 sockets to connect through. 99 Socket socket4, socket6; 100 101 /// Private cached send timeout setting. 102 uint _sendTimeout; 103 104 /// Private cached received timeout setting. 105 uint _receiveTimeout; 106 107 /// Private SSL context. 108 SSL_CTX* sslContext; 109 110 /++ 111 OpenSSL [requests.ssl_adapter.SSL] instance, for use with SSL connections. 112 +/ 113 SSL* sslInstance; 114 115 116 // setTimemout 117 /++ 118 Sets the [std.socket.SocketOption.RCVTIMEO|SocketOption.RCVTIMEO] of the 119 *current* [std.socket.Socket|Socket] [socket] to the specified duration. 120 121 Params: 122 option = The [std.socket.SocketOption|SocketOption] to set. 123 dur = The duration to assign for the option, in number of milliseconds. 124 +/ 125 void setTimeout(const SocketOption option, const uint dur) 126 { 127 import std.socket : SocketOptionLevel; 128 import core.time : msecs; 129 130 with (socket) 131 with (SocketOptionLevel) 132 { 133 setOption(SOCKET, option, dur.msecs); 134 } 135 } 136 137 public: 138 /++ 139 Pointer to the socket of the [std.socket.AddressFamily|AddressFamily] we 140 want to connect with. 141 +/ 142 Socket socket; 143 144 /++ 145 Whether or not this [Connection] should use SSL when sending and receiving. 146 +/ 147 bool ssl; 148 149 /// IPs already resolved using [kameloso.net.resolveFiber|resolveFiber]. 150 Address[] ips; 151 152 /++ 153 Implicitly proxies calls to the current [std.socket.Socket|Socket]. 154 This successfully proxies to [std.socket.Socket.receive|Socket.receive]. 155 +/ 156 alias socket this; 157 158 /++ 159 Whether we are connected or not. 160 +/ 161 bool connected; 162 163 /++ 164 Path to a (`.pem`) SSL certificate file. 165 +/ 166 string certFile; 167 168 /++ 169 Path to a private SSL key file. 170 +/ 171 string privateKeyFile; 172 173 // sendTimeout 174 /++ 175 Accessor; returns the current send timeout. 176 177 Returns: 178 A copy of [_sendTimeout]. 179 +/ 180 pragma(inline, true) 181 auto sendTimeout() const @property pure @nogc nothrow 182 { 183 return _sendTimeout; 184 } 185 186 // sendTimeout 187 /++ 188 Mutator; sets the send timeout socket option to the passed duration. 189 190 Params: 191 dur = The duration to assign as send timeout, in number of milliseconds. 192 +/ 193 pragma(inline, true) 194 void sendTimeout(const uint dur) @property 195 { 196 setTimeout(SocketOption.SNDTIMEO, dur); 197 _sendTimeout = dur; 198 } 199 200 // receiveTimeout 201 /++ 202 Accessor; returns the current receive timeout. 203 204 Returns: 205 A copy of [_receiveTimeout]. 206 +/ 207 pragma(inline, true) 208 auto receiveTimeout() const @property pure @nogc nothrow 209 { 210 return _receiveTimeout; 211 } 212 213 // sendTimeout 214 /++ 215 Mutator; sets the receive timeout socket option to the passed duration. 216 217 Params: 218 dur = The duration to assign as receive timeout, in number of milliseconds. 219 +/ 220 void receiveTimeout(const uint dur) @property 221 { 222 setTimeout(SocketOption.RCVTIMEO, dur); 223 _receiveTimeout = dur; 224 } 225 226 // reset 227 /++ 228 (Re-)initialises the sockets and sets the IPv4 one as the active one. 229 230 If we ever change this to a class, this should be the default constructor. 231 +/ 232 void reset() 233 { 234 teardown(); 235 setup(); 236 connected = false; 237 } 238 239 // resetSSL 240 /++ 241 Resets the SSL context and resources of this [Connection]. 242 +/ 243 void resetSSL() @system 244 in (ssl, "Tried to reset SSL on a non-SSL `Connection`") 245 { 246 teardownSSL(); 247 setupSSL(); 248 } 249 250 // getSSLErrorMessage 251 /++ 252 Returns the SSL error message for the passed SSL error code. 253 254 Params: 255 code = SSL error code to translate to string. 256 257 Returns: 258 A string with the last SSL error code translated into humanly-readable text. 259 +/ 260 auto getSSLErrorMessage(const int code) @system 261 in (ssl, "Tried to get SSL error message on a non-SSL `Connection`") 262 { 263 import std.string : fromStringz; 264 265 immutable errorCode = openssl.SSL_get_error(sslInstance, code); 266 267 return openssl.ERR_reason_error_string(errorCode) 268 .fromStringz 269 .idup; 270 } 271 272 // setDefaultOptions 273 /++ 274 Sets up sockets with the [std.socket.SocketOption|SocketOption]s needed. 275 These include timeouts and buffer sizes. 276 277 Params: 278 socketToSetup = Reference to the [std.socket.Socket|Socket] to modify. 279 +/ 280 void setDefaultOptions(Socket socketToSetup) 281 { 282 import std.socket : SocketOption, SocketOptionLevel; 283 import core.time : msecs; 284 285 with (socketToSetup) 286 with (SocketOption) 287 with (SocketOptionLevel) 288 { 289 setOption(SOCKET, RCVBUF, BufferSize.socketOptionReceive); 290 setOption(SOCKET, SNDBUF, BufferSize.socketOptionSend); 291 setOption(SOCKET, RCVTIMEO, Timeout.receiveMsecs.msecs); 292 setOption(SOCKET, SNDTIMEO, Timeout.sendMsecs.msecs); 293 294 _receiveTimeout = Timeout.receiveMsecs; 295 _sendTimeout = Timeout.sendMsecs; 296 blocking = true; 297 } 298 } 299 300 // setupSSL 301 /++ 302 Sets up the SSL context for this connection. 303 304 Throws: 305 [SSLException] if the SSL context could not be set up. 306 307 [SSLFileException] if any specified certificate or private key could not be found. 308 +/ 309 void setupSSL() @system 310 in (ssl, "Tried to set up SSL context on a non-SSL `Connection`") 311 in (socket, "Tried to set up an SSL context on a null `Socket`") 312 { 313 import std.file : exists; 314 import std.path : extension; 315 import std.string : toStringz; 316 317 sslContext = openssl.SSL_CTX_new(openssl.TLS_method); 318 openssl.SSL_CTX_set_verify(sslContext, 0, null); 319 320 if (certFile.length) 321 { 322 // Before SSL_new 323 if (!certFile.exists) 324 { 325 enum message = "No such certificate file"; 326 throw new SSLFileException( 327 message, 328 certFile, 329 __FILE__, 330 __LINE__); 331 } 332 333 immutable filetype = (certFile.extension == ".pem") ? 1 : 0; 334 immutable code = openssl.SSL_CTX_use_certificate_file( 335 sslContext, 336 toStringz(certFile), 337 filetype); 338 if (code != 1) throw new SSLException("Failed to set certificate", code); 339 } 340 341 if (privateKeyFile.length) 342 { 343 // Ditto 344 if (!privateKeyFile.exists) 345 { 346 enum message = "No such private key file"; 347 throw new SSLFileException( 348 message, 349 privateKeyFile, 350 __FILE__, 351 __LINE__); 352 } 353 354 immutable filetype = (privateKeyFile.extension == ".pem") ? 1 : 0; 355 immutable code = openssl.SSL_CTX_use_PrivateKey_file( 356 sslContext, 357 toStringz(privateKeyFile), 358 filetype); 359 if (code != 1) throw new SSLException("Failed to set private key", code); 360 } 361 362 sslInstance = openssl.SSL_new(sslContext); 363 immutable code = openssl.SSL_set_fd(sslInstance, cast(int)socket.handle); 364 if (code != 1) throw new SSLException("Failed to attach socket handle", code); 365 } 366 367 // teardownSSL 368 /++ 369 Frees SSL context and resources. 370 +/ 371 void teardownSSL() 372 in (ssl, "Tried to teardown SSL on a non-SSL `Connection`") 373 { 374 if (sslInstance) openssl.SSL_free(sslInstance); 375 if (sslContext) openssl.SSL_CTX_free(sslContext); 376 } 377 378 // teardown 379 /++ 380 Shuts down and closes the internal [std.socket.Socket|Socket]s. 381 +/ 382 void teardown() 383 { 384 import std.range : only; 385 import std.socket : SocketShutdown; 386 387 foreach (thisSocket; only(socket4, socket6)) 388 { 389 if (!thisSocket) continue; 390 391 thisSocket.shutdown(SocketShutdown.BOTH); 392 thisSocket.close(); 393 } 394 } 395 396 // setup 397 /++ 398 Initialises new [std.socket.Socket|Socket]s and sets their options. 399 +/ 400 void setup() 401 { 402 import std.socket : TcpSocket, AddressFamily, SocketType; 403 404 socket4 = new TcpSocket; 405 socket6 = new Socket(AddressFamily.INET6, SocketType.STREAM); 406 socket = socket4; 407 408 setDefaultOptions(socket4); 409 setDefaultOptions(socket6); 410 } 411 412 // sendline 413 /++ 414 Sends a line to the server. 415 416 Intended for servers that deliminates lines by linebreaks, such as IRC servers. 417 418 Example: 419 --- 420 conn.sendline("NICK foobar"); 421 conn.sendline("PRIVMSG #channel :text"); 422 conn.sendline("PRIVMSG " ~ channel ~ " :" ~ content); 423 conn.sendline(longerLine, 1024L); // Now with custom line lengths 424 --- 425 426 Params: 427 rawline = Line to send. May contain substrings separated by newline 428 characters. A final linebreak is added to the end of the send. 429 maxLineLength = Maximum line length before the sent message will be truncated. 430 linebreak = Characters to use as linebreak, marking the end of a line to send. 431 432 Throws: 433 [SocketSendException] if the call to send data through the socket 434 returns [std.socket.Socket.ERROR|Socket.ERROR]. 435 +/ 436 void sendline( 437 const string rawline, 438 const uint maxLineLength = 512, 439 const string linebreak = "\r\n") @system 440 in (connected, "Tried to send a line on an unconnected `Connection`") 441 { 442 import std.string : indexOf; 443 444 if (!rawline.length) return; 445 446 immutable maxAvailableLength = (maxLineLength - linebreak.length); 447 448 void sendlineImpl(const string line) 449 { 450 import std.algorithm.comparison : min; 451 452 immutable lineLength = min(line.length, maxAvailableLength); 453 size_t totalSent; 454 455 auto sendSubstring(const string substring) 456 in (substring.length, "Tried to send empty substring to server") 457 { 458 immutable bytesSent = ssl ? 459 openssl.SSL_write(sslInstance, substring.ptr, cast(int)substring.length) : 460 socket.send(substring); 461 462 if (bytesSent == Socket.ERROR) 463 { 464 enum message = "Socket.ERROR returned when sending data to server"; 465 throw new SocketSendException(message); 466 } 467 468 return bytesSent; 469 } 470 471 while (totalSent < lineLength) 472 { 473 totalSent += sendSubstring(line[totalSent..lineLength]); 474 } 475 476 // Always end the line with a linebreak 477 sendSubstring(linebreak); 478 } 479 480 auto newlinePos = rawline.indexOf('\n'); // mutable 481 482 if (newlinePos != -1) 483 { 484 // Line incorrectly has at least one newline, so send up until 485 // the first and discard the remainder 486 487 if ((newlinePos > 0) && (rawline[newlinePos-1] == '\r')) 488 { 489 // It was actually "\r\n", so omit the '\r' too 490 --newlinePos; 491 } 492 493 sendlineImpl(rawline[0..newlinePos]); 494 } 495 else 496 { 497 // Plain line 498 sendlineImpl(rawline); 499 } 500 } 501 } 502 503 504 // ListenAttempt 505 /++ 506 Embodies the idea of a listening attempt. 507 +/ 508 struct ListenAttempt 509 { 510 /++ 511 The various states a listening attempt may be in. 512 +/ 513 enum State 514 { 515 unset, /// Init value. 516 prelisten, /// About to listen. 517 isEmpty, /// Empty result; nothing read or similar. 518 hasString, /// String read, ready for processing. 519 timeout, /// Connection read timed out. 520 warning, /// Recoverable exception thrown; warn and continue. 521 error, /// Unrecoverable exception thrown; abort. 522 } 523 524 /// The current state of the attempt. 525 State state; 526 527 /// The last read line of text sent by the server. 528 string line; 529 530 /// The [std.socket.lastSocketError|lastSocketError] at the last point of error. 531 string error; 532 533 /// [core.stdc.errno.errno|errno] at time of read. 534 int errno; 535 536 /// The amount of bytes received this attempt. 537 long bytesReceived; 538 } 539 540 541 // listenFiber 542 /++ 543 A [std.socket.Socket|Socket]-reading [std.concurrency.Generator|Generator]. 544 It reads and yields full string lines. 545 546 It maintains its own buffer into which it receives from the server, though 547 not necessarily full lines. It thus keeps filling the buffer until it 548 finds a newline character, yields [ListenAttempt]s back to the caller of 549 the Fiber, checks for more lines to yield, and if none yields an attempt 550 with a [ListenAttempt.State] denoting that nothing was read and that a new 551 attempt should be made later. 552 553 Example: 554 --- 555 //Connection conn; // Address previously connected established with 556 557 enum timeoutSeconds = 600; 558 559 auto listener = new Generator!ListenAttempt(() => listenFiber(conn, abort, timeoutSeconds)); 560 561 listener.call(); 562 563 foreach (const attempt; listener) 564 { 565 // attempt is a yielded `ListenAttempt` 566 567 with (ListenAttempt.State) 568 final switch (attempt.state) 569 { 570 case prelisten: 571 assert(0, "shouldn't happen"); 572 573 case isEmpty: 574 case timeout: 575 // Reading timed out or nothing was read, happens 576 break; 577 578 case hasString: 579 // A line was successfully read! 580 // program logic goes here 581 doThings(attempt.line); 582 break; 583 584 case warning: 585 // Recoverable 586 warnAboutSomething(attempt.error); 587 break; 588 589 case error: 590 // Unrecoverable 591 dealWithError(attempt.error); 592 return; 593 } 594 } 595 --- 596 597 Params: 598 bufferSize = What size static array to use as buffer. Defaults to twice of 599 [kameloso.constants.BufferSize.socketReceive|BufferSize.socketReceive] for now. 600 conn = [Connection] whose [std.socket.Socket|Socket] it reads from the server with. 601 abort = Reference "abort" flag, which -- if set -- should make the 602 function return and the [core.thread.fiber.Fiber|Fiber] terminate. 603 connectionLost = How many seconds may pass before we consider the connection lost. 604 Optional, defaults to 605 [kameloso.constants.Timeout.connectionLost|Timeout.connectionLost]. 606 607 Yields: 608 [ListenAttempt]s with information about the line received in its member values. 609 +/ 610 void listenFiber(size_t bufferSize = BufferSize.socketReceive*2) 611 (Connection conn, 612 ref bool abort, 613 const int connectionLost = Timeout.connectionLost) @system 614 in ((conn.connected), "Tried to set up a listening fiber on a dead connection") 615 in ((connectionLost > 0), "Tried to set up a listening fiber with connection timeout of <= 0") 616 { 617 import kameloso.constants : BufferSize; 618 import std.concurrency : yield; 619 import std.datetime.systime : Clock; 620 import std.socket : Socket, lastSocketError; 621 import std.string : indexOf; 622 623 if (abort) return; 624 625 ubyte[bufferSize] buffer; 626 long timeLastReceived = Clock.currTime.toUnixTime; 627 size_t start; 628 629 alias State = ListenAttempt.State; 630 631 // The Generator we use this function with popFronts the first thing it does 632 // after being instantiated. To work around our main loop popping too we 633 // yield an initial empty value; else the first thing to happen will be a 634 // double pop, and the first line is missed. 635 yield(ListenAttempt.init); 636 637 /// How many consecutive warnings to allow before yielding an error. 638 enum maxConsecutiveWarningsUntilError = 20; 639 640 /// Current consecutive warnings count. 641 uint consecutiveWarnings; 642 643 while (!abort) 644 { 645 version(Posix) 646 { 647 import core.stdc.errno; 648 649 // https://www-numi.fnal.gov/offline_software/srt_public_context/WebDocs/Errors/unix_system_errors.html 650 651 enum Errno 652 { 653 timedOut = EAGAIN, 654 wouldBlock = EWOULDBLOCK, 655 netDown = ENETDOWN, 656 netUnreachable = ENETUNREACH, 657 endpointNotConnected = ENOTCONN, 658 connectionReset = ECONNRESET, 659 connectionAborted = ECONNABORTED, 660 interrupted = EINTR, 661 } 662 } 663 else version(Windows) 664 { 665 import core.sys.windows.winsock2; 666 667 alias errno = WSAGetLastError; 668 669 // https://www.hardhats.org/cs/broker/docs/winsock.html 670 // https://infosys.beckhoff.com/english.php?content=../content/1033/tcpipserver/html/tcplclibtcpip_e_winsockerror.htm 671 672 enum Errno 673 { 674 unexpectedEOF = 0, 675 timedOut = WSAETIMEDOUT, 676 wouldBlock = WSAEWOULDBLOCK, 677 netDown = WSAENETDOWN, 678 netUnreachable = WSAENETUNREACH, 679 endpointNotConnected = WSAENOTCONN, 680 connectionReset = WSAECONNRESET, 681 connectionAborted = WSAECONNABORTED, 682 interrupted = WSAEINTR, 683 overlappedIO = 997, 684 } 685 } 686 else 687 { 688 static assert(0, "Unsupported platform, please file a bug."); 689 } 690 691 ListenAttempt attempt; 692 693 if (conn.ssl) 694 { 695 import requests.ssl_adapter : openssl; 696 attempt.bytesReceived = openssl.SSL_read( 697 conn.sslInstance, 698 cast(void*)buffer.ptr+start, 699 cast(int)(buffer.length-start)); 700 } 701 else 702 { 703 attempt.bytesReceived = conn.receive(buffer[start..$]); 704 } 705 706 attempt.errno = errno; 707 708 if (!attempt.bytesReceived) 709 { 710 attempt.state = State.error; 711 attempt.error = lastSocketError; 712 yield(attempt); 713 // Should never get here 714 assert(0, "Dead `listenFiber` resumed after yield (no bytes received)"); 715 } 716 717 if (attempt.errno == Errno.interrupted) 718 { 719 // Interrupted read; try again 720 // Unlucky callgrind_control -d timing 721 attempt.state = State.isEmpty; 722 attempt.error = lastSocketError; 723 consecutiveWarnings = 0; 724 yield(attempt); 725 continue; 726 } 727 728 if (attempt.bytesReceived == Socket.ERROR) 729 { 730 if ((Clock.currTime.toUnixTime - timeLastReceived) > connectionLost) 731 { 732 attempt.state = State.timeout; 733 yield(attempt); 734 735 // Should never get here 736 enum message = "Timed out `listenFiber` resumed after yield " ~ 737 "(received error, elapsed > timeout)"; 738 assert(0, message); 739 } 740 741 with (Errno) 742 switch (attempt.errno) 743 { 744 case timedOut: 745 // Resource temporarily unavailable 746 /* 747 A connection attempt failed because the connected party did not 748 properly respond after a period of time, or established connection 749 failed because connected host has failed to respond. 750 */ 751 // Timed out, nothing received 752 attempt.state = State.isEmpty; 753 consecutiveWarnings = 0; 754 yield(attempt); 755 continue; 756 757 static if (int(timedOut) != int(wouldBlock)) 758 { 759 case wouldBlock: 760 /+ 761 Portability Note: In many older Unix systems ... 762 [EWOULDBLOCK was] a distinct error code different from 763 EAGAIN. To make your program portable, you should check 764 for both codes and treat them the same. 765 +/ 766 // A non-blocking socket operation could not be completed immediately. 767 goto case timedOut; 768 } 769 770 version(Windows) 771 { 772 case overlappedIO: 773 // "Overlapped I/O operation is in progress." 774 // seems benign 775 goto case timedOut; 776 777 case unexpectedEOF: 778 /+ 779 If you're getting 0 from WSAGetLastError, then this is 780 most likely due to an unexpected EOF occurring on the socket, 781 i.e. the client has gracefully closed the connection 782 without sending a close_notify alert. 783 +/ 784 // "The operation completed successfully." 785 goto case; 786 } 787 788 case netDown: 789 case netUnreachable: 790 case endpointNotConnected: 791 case connectionReset: 792 case connectionAborted: 793 attempt.state = State.error; 794 attempt.error = lastSocketError; 795 yield(attempt); 796 // Should never get here 797 assert(0, "Dead `listenFiber` resumed after yield (`lastSocketError` error)"); 798 799 default: 800 attempt.error = lastSocketError; 801 802 if (++consecutiveWarnings >= maxConsecutiveWarningsUntilError) 803 { 804 attempt.state = State.error; 805 yield(attempt); 806 // Should never get here 807 assert(0, "Dead `listenFiber` resumed after yield (exceeded max consecutive errors)"); 808 } 809 else 810 { 811 attempt.state = State.warning; 812 yield(attempt); 813 continue; 814 } 815 } 816 } 817 818 timeLastReceived = Clock.currTime.toUnixTime; 819 consecutiveWarnings = 0; 820 821 immutable ptrdiff_t end = cast(ptrdiff_t)(start + attempt.bytesReceived); 822 ptrdiff_t newline = (cast(char[])buffer[0..end]).indexOf('\n'); 823 size_t pos; 824 825 while (newline != -1) 826 { 827 attempt.state = State.hasString; 828 attempt.line = (cast(char[])buffer[pos..pos+newline-1]).idup; // eat \r before \n 829 yield(attempt); 830 pos += (newline + 1); // eat remaining newline 831 newline = (cast(char[])buffer[pos..end]).indexOf('\n'); 832 } 833 834 attempt.state = State.isEmpty; 835 yield(attempt); 836 837 if (pos >= end) 838 { 839 // can be end or end+1 840 start = 0; 841 continue; 842 } 843 844 start = (end - pos); 845 846 // writefln("REMNANT:|%s|", cast(string)buffer[pos..end]); 847 import core.stdc.string : memmove; 848 memmove(buffer.ptr, (buffer.ptr + pos), (ubyte.sizeof * start)); 849 } 850 } 851 852 853 // ConnectionAttempt 854 /++ 855 Embodies the idea of a connection attempt. 856 +/ 857 struct ConnectionAttempt 858 { 859 private: 860 import std.socket : Address; 861 862 public: 863 /++ 864 The various states a connection attempt may be in. 865 +/ 866 enum State 867 { 868 unset, /// Init value. 869 preconnect, /// About to connect. 870 connected, /// Successfully connected. 871 delayThenReconnect, /// Failed to connect; should delay and retry. 872 delayThenNextIP, /// Failed to reconnect several times; next IP. 873 //noMoreIPs, /// Exhausted all IPs and could not connect. 874 ipv6Failure, /// IPv6 connection failed. 875 transientSSLFailure, /// Transient failure establishing an SSL connection, safe to retry. 876 fatalSSLFailure, /// Fatal failure establishing an SSL connection, should abort. 877 invalidConnectionError, /// The current IP cannot be connected to. 878 error, /// Error connecting; should abort. 879 } 880 881 /// The current state of the attempt. 882 State state; 883 884 /// The IP that the attempt is trying to connect to. 885 Address ip; 886 887 /// The error message as thrown by an exception. 888 string error; 889 890 /// [core.stdc.errno.errno|errno] at time of connect. 891 int errno; 892 893 /// The number of retries so far towards this [ip]. 894 uint retryNum; 895 } 896 897 898 // connectFiber 899 /++ 900 Fiber function that tries to connect to IPs in the `ips` array of the passed 901 [Connection], yielding at certain points throughout the process to let the 902 calling function do stuff in between connection attempts. 903 904 Example: 905 --- 906 //Connection conn; // Address previously resolved with `resolveFiber` 907 908 auto connector = new Generator!ConnectionAttempt(() => 909 connectFiber(conn, false, 10, abort)); 910 911 connector.call(); 912 913 connectorloop: 914 foreach (const attempt; connector) 915 { 916 // attempt is a yielded `ConnectionAttempt` 917 918 with (ConnectionAttempt.State) 919 final switch (attempt.state) 920 { 921 case preconnect: 922 assert(0, "shouldn't happen"); 923 924 case connected: 925 // Socket is connected, continue with normal routine 926 break connectorloop; 927 928 case delayThenReconnect: 929 case delayThenNextIP: 930 // Delay and retry 931 Thread.sleep(5.seconds); 932 break; 933 934 case ipv6Failure: 935 // Deal with it 936 dealWithIPv6(attempt.error); 937 break; 938 939 case sslFailure: 940 case error: 941 // Failed to connect 942 return; 943 } 944 } 945 946 // Connection established 947 --- 948 949 Params: 950 conn = Reference to the current, unconnected [Connection]. 951 connectionRetries = How many times to attempt to connect before signalling 952 that we should move on to the next IP. 953 abort = Reference "abort" flag, which -- if set -- should make the 954 function return and the [core.thread.fiber.Fiber|Fiber] terminate. 955 +/ 956 void connectFiber( 957 ref Connection conn, 958 const uint connectionRetries, 959 ref bool abort) @system 960 in (!conn.connected, "Tried to set up a connecting fiber on an already live connection") 961 in ((conn.ips.length > 0), "Tried to connect to an unresolved connection") 962 { 963 import std.concurrency : yield; 964 import std.socket : AddressFamily, SocketException; 965 966 if (abort) return; 967 968 alias State = ConnectionAttempt.State; 969 970 yield(ConnectionAttempt.init); 971 972 scope(exit) 973 { 974 conn.teardown(); 975 if (conn.ssl) conn.teardownSSL(); 976 } 977 978 do 979 { 980 iploop: 981 foreach (immutable i, ip; conn.ips) 982 { 983 immutable isIPv6 = (ip.addressFamily == AddressFamily.INET6); 984 985 ConnectionAttempt attempt; 986 attempt.ip = ip; 987 988 attemptloop: 989 foreach (immutable retry; 0..connectionRetries) 990 { 991 if (abort) return; 992 993 conn.reset(); 994 conn.socket = isIPv6 ? conn.socket6 : conn.socket4; 995 996 try 997 { 998 if (conn.ssl) 999 { 1000 // *After* conn.socket has been changed. 1001 conn.resetSSL(); 1002 } 1003 1004 attempt.retryNum = retry; 1005 attempt.state = State.preconnect; 1006 attempt.errno = 0; // reset 1007 yield(attempt); 1008 1009 conn.socket.connect(ip); 1010 1011 if (conn.ssl) 1012 { 1013 import requests.ssl_adapter : openssl; 1014 1015 immutable code = openssl.SSL_connect(conn.sslInstance); 1016 1017 if (code != 1) 1018 { 1019 enum message = "Failed to establish SSL connection " ~ 1020 "after successful connect"; 1021 throw new SSLException(message, code); 1022 } 1023 } 1024 1025 // If we're here no exception was thrown and we didn't yield 1026 // out of SSL errors, so we're connected 1027 1028 attempt.state = State.connected; 1029 conn.connected = true; 1030 yield(attempt); 1031 // Should never get here 1032 assert(0, "Finished `connectFiber` resumed after yield"); 1033 } 1034 catch (SocketException e) 1035 { 1036 version(Posix) 1037 { 1038 import core.stdc.errno : EAFNOSUPPORT, ECONNREFUSED, 1039 EHOSTUNREACH, ENETUNREACH, errno; 1040 1041 // https://www-numi.fnal.gov/offline_software/srt_public_context/WebDocs/Errors/unix_system_errors.html 1042 1043 enum Errno 1044 { 1045 addressFamilyNoSupport = EAFNOSUPPORT, 1046 connectionRefused = ECONNREFUSED, 1047 noRouteToHost = EHOSTUNREACH, 1048 networkUnreachable = ENETUNREACH, 1049 } 1050 1051 attempt.errno = errno; 1052 } 1053 else version(Windows) 1054 { 1055 import core.sys.windows.winsock2 : WSAEAFNOSUPPORT, WSAECONNREFUSED, 1056 WSAEHOSTUNREACH, WSAENETUNREACH, WSAGetLastError; 1057 1058 enum Errno 1059 { 1060 addressFamilyNoSupport = WSAEAFNOSUPPORT, 1061 connectionRefused = WSAECONNREFUSED, 1062 noRouteToHost = WSAEHOSTUNREACH, 1063 networkUnreachable = WSAENETUNREACH, 1064 } 1065 1066 attempt.errno = WSAGetLastError(); 1067 } 1068 else 1069 { 1070 static assert(0, "Unsupported platform, please file a bug."); 1071 } 1072 1073 with (Errno) 1074 switch (attempt.errno) 1075 { 1076 case addressFamilyNoSupport: 1077 // Address family not supported by protocol 1078 // An address incompatible with the requested protocol was used. 1079 if (isIPv6) 1080 { 1081 attempt.state = State.ipv6Failure; 1082 attempt.error = e.msg; 1083 yield(attempt); 1084 1085 // Remove IPv6 addresses from conn.ips 1086 foreach_reverse (immutable n, const arrayIP; conn.ips) 1087 { 1088 if (n == i) break; // caught up to current 1089 1090 if (arrayIP.addressFamily == AddressFamily.INET6) 1091 { 1092 import std.algorithm.mutation : SwapStrategy, remove; 1093 conn.ips = conn.ips 1094 .remove!(SwapStrategy.unstable)(n); 1095 } 1096 } 1097 continue iploop; 1098 } 1099 else 1100 { 1101 // Just treat it as a normal error 1102 goto case; 1103 } 1104 1105 case connectionRefused: 1106 // Connection refused 1107 // No connection could be made because the target machine actively refused it. 1108 attempt.state = State.invalidConnectionError; 1109 attempt.error = e.msg; 1110 yield(attempt); 1111 continue iploop; 1112 1113 //case noRouteToHost: 1114 // No route to host 1115 // A socket operation was attempted to an unreachable host. 1116 //case networkUnreachable: 1117 // Network is unreachable 1118 // A socket operation was attempted to an unreachable network. 1119 default: 1120 // Don't delay for retrying on the last retry, drop down below 1121 if (retry+1 < connectionRetries) 1122 { 1123 attempt.state = State.delayThenReconnect; 1124 attempt.error = e.msg; 1125 yield(attempt); 1126 } 1127 continue attemptloop; 1128 } 1129 } 1130 catch (SSLException e) 1131 { 1132 import std.format : format; 1133 1134 enum pattern = "%s (%s)"; 1135 attempt.state = State.transientSSLFailure; 1136 attempt.error = pattern.format(e.msg, conn.getSSLErrorMessage(e.code)); 1137 yield(attempt); 1138 continue attemptloop; 1139 } 1140 catch (SSLFileException e) 1141 { 1142 import std.format : format; 1143 1144 enum pattern = "%s: %s"; 1145 attempt.state = State.fatalSSLFailure; 1146 attempt.error = pattern.format(e.msg, e.filename); 1147 yield(attempt); 1148 continue attemptloop; 1149 } 1150 } 1151 1152 // foreach ended; connectionRetries reached. 1153 // Move on to next IP (or same again if only one) 1154 attempt.state = (conn.ips.length > 1) ? 1155 State.delayThenNextIP : 1156 State.delayThenReconnect; 1157 yield(attempt); 1158 } 1159 } 1160 while (!abort); 1161 } 1162 1163 1164 // ResolveAttempt 1165 /++ 1166 Embodies the idea of an address resolution attempt. 1167 +/ 1168 struct ResolveAttempt 1169 { 1170 /++ 1171 The various states an address resolution attempt may be in. 1172 +/ 1173 enum State 1174 { 1175 unset, /// Init value. 1176 preresolve, /// About to resolve. 1177 success, /// Successfully resolved. 1178 exception, /// Failure, recoverable exception thrown. 1179 failure, /// Resolution failure; should abort. 1180 error, /// Failure, unrecoverable exception thrown. 1181 } 1182 1183 /// The current state of the attempt. 1184 State state; 1185 1186 /// The error message as thrown by an exception. 1187 string error; 1188 1189 /// [core.stdc.errno.errno|errno] at time of resolve. 1190 int errno; 1191 1192 /// The number of retries so far towards this address. 1193 uint retryNum; 1194 } 1195 1196 1197 // resolveFiber 1198 /++ 1199 Given an address and a port, resolves these and populates the array of unique 1200 [std.socket.Address|Address] IPs inside the passed [Connection]. 1201 1202 Example: 1203 --- 1204 import std.concurrency : Generator; 1205 1206 Connection conn; 1207 conn.reset(); 1208 1209 auto resolver = new Generator!ResolveAttempt(() => 1210 resolveFiber(conn, "irc.libera.chat", 6667, false, 10, abort)); 1211 1212 resolver.call(); 1213 1214 resolveloop: 1215 foreach (const attempt; resolver) 1216 { 1217 // attempt is a yielded `ResolveAttempt` 1218 1219 with (ResolveAttempt.State) 1220 final switch (attempt.state) 1221 { 1222 case preresolve: 1223 assert(0, "shouldn't happen"); 1224 1225 case success: 1226 // Address was resolved, the passed `conn` was modified 1227 break resolveloop; 1228 1229 case exception: 1230 // Recoverable 1231 dealWithException(attempt.error); 1232 break; 1233 1234 case failure: 1235 // Resolution failed without errors 1236 failGracefully(attempt.error); 1237 break; 1238 1239 case error: 1240 // Unrecoverable 1241 dealWithError(attempt.error); 1242 return; 1243 } 1244 } 1245 1246 // Address resolved 1247 --- 1248 1249 Params: 1250 conn = Reference to the current [Connection]. 1251 address = String address to look up. 1252 port = Remote port build into the [std.socket.Address|Address]. 1253 useIPv6 = Whether to include resolved IPv6 addresses or not. 1254 abort = Reference "abort" flag, which -- if set -- should make the 1255 function return and the [core.thread.fiber.Fiber|Fiber] terminate. 1256 +/ 1257 void resolveFiber( 1258 ref Connection conn, 1259 const string address, 1260 const ushort port, 1261 const bool useIPv6, 1262 ref bool abort) @system 1263 in (!conn.connected, "Tried to set up a resolving fiber on an already live connection") 1264 in (address.length, "Tried to set up a resolving fiber on an empty address") 1265 { 1266 import std.concurrency : yield; 1267 import std.socket : AddressFamily, SocketOSException, getAddress; 1268 1269 if (abort) return; 1270 1271 alias State = ResolveAttempt.State; 1272 1273 yield(ResolveAttempt(State.preresolve)); 1274 1275 for (uint i; (i >= 0); ++i) 1276 { 1277 if (abort) return; 1278 1279 ResolveAttempt attempt; 1280 attempt.retryNum = i; 1281 1282 try 1283 { 1284 import std.algorithm.iteration : filter, uniq; 1285 import std.array : array; 1286 1287 conn.ips = getAddress(address, port) 1288 .filter!(ip => (ip.addressFamily == AddressFamily.INET) || 1289 ((ip.addressFamily == AddressFamily.INET6) && useIPv6)) 1290 .uniq!((a,b) => a.toAddrString == b.toAddrString) 1291 .array; 1292 1293 attempt.state = State.success; 1294 yield(attempt); 1295 // Should never get here 1296 assert(0, "Dead `resolveFiber` resumed after yield"); 1297 } 1298 catch (SocketOSException e) 1299 { 1300 attempt.errno = e.errorCode; 1301 1302 version(Posix) 1303 { 1304 import core.sys.posix.netdb : EAI_AGAIN, EAI_FAIL, EAI_FAMILY, 1305 EAI_NONAME, EAI_SOCKTYPE, EAI_SYSTEM; 1306 1307 enum EAI_NODATA = -5; 1308 1309 // https://stackoverflow.com/questions/4395919/linux-system-call-getaddrinfo-return-2 1310 1311 enum AddrInfoErrors 1312 { 1313 //badFlags = EAI_BADFLAGS, /** Invalid value for `ai_flags` field. */ 1314 noName = EAI_NONAME, /** NAME or SERVICE is unknown. */ 1315 again = EAI_AGAIN, /** Temporary failure in name resolution. */ 1316 fail = EAI_FAIL, /** Non-recoverable failure in name res. */ 1317 noData = EAI_NODATA, /** No address associated with NAME. (GNU) */ 1318 family = EAI_FAMILY, /** `ai_family` not supported. */ 1319 sockType = EAI_SOCKTYPE, /** `ai_socktype` not supported. */ 1320 //service = EAI_SERVICE, /** SERVICE not supported for `ai_socktype`. */ 1321 //addrFamily = EAI_ADDRFAMILY, /** Address family for NAME not supported. (GNU) */ 1322 //memory = EAI_MEMORY, /** Memory allocation failure. */ 1323 system = EAI_SYSTEM, /** System error returned in `errno`. */ 1324 //overflow = EAI_OVERFLOW, /** Argument buffer overflow. */ 1325 } 1326 } 1327 else version(Windows) 1328 { 1329 import core.sys.windows.winsock2 : WSAEAFNOSUPPORT, WSAESOCKTNOSUPPORT, 1330 WSAHOST_NOT_FOUND, WSANO_DATA, WSANO_RECOVERY, WSATRY_AGAIN; 1331 1332 // https://docs.microsoft.com/en-us/windows/win32/api/ws2tcpip/nf-ws2tcpip-getaddrinfo 1333 1334 enum AddrInfoErrors 1335 { 1336 //badFlags = WSAEINVAL, /** An invalid value was provided for the `ai_flags` member of the `pHints` parameter. */ 1337 noName = WSAHOST_NOT_FOUND, /** The name does not resolve for the supplied parameters or the `pNodeName` and `pServiceName` parameters were not provided. */ 1338 again = WSATRY_AGAIN, /** A temporary failure in name resolution occurred. */ 1339 fail = WSANO_RECOVERY, /** A nonrecoverable failure in name resolution occurred. */ 1340 noData = WSANO_DATA, 1341 family = WSAEAFNOSUPPORT, /** The 'ai_family' member of the `pHints` parameter is not supported. */ 1342 sockType = WSAESOCKTNOSUPPORT, /** The `ai_socktype` member of the `pHints` parameter is not supported. */ 1343 //service = WSATYPE_NOT_FOUND, /** The `pServiceName` parameter is not supported for the specified `ai_socktype` member of the `pHints` parameter. */ 1344 //addrFamily = ?, 1345 //memory = WSANOT_ENOUGH_MEMORY, /** A memory allocation failure occurred. */ 1346 //system = ?, 1347 //overflow = ?, 1348 } 1349 } 1350 else 1351 { 1352 static assert(0, "Unsupported platform, please file a bug."); 1353 } 1354 1355 with (AddrInfoErrors) 1356 switch (attempt.errno) 1357 { 1358 case noName: 1359 case again: 1360 // Assume net down, wait and try again 1361 attempt.state = State.exception; 1362 attempt.error = e.msg; 1363 yield(attempt); 1364 continue; 1365 1366 version(Posix) 1367 { 1368 case system: 1369 import core.stdc.errno : errno; 1370 attempt.errno = errno; 1371 goto default; 1372 } 1373 1374 //case noData: 1375 //case fail: 1376 //case family: 1377 //case sockType: 1378 default: 1379 attempt.state = State.error; 1380 attempt.error = e.msg; 1381 yield(attempt); 1382 // Should never get here 1383 assert(0, "Dead `resolveFiber` resumed after yield"); 1384 } 1385 } 1386 } 1387 1388 // This doesn't really happen at present. Subject to change, so keep it here. 1389 /*ResolveAttempt endAttempt; 1390 endAttempt.state = State.failure; 1391 yield(endAttempt);*/ 1392 assert(0, "Broke out of unending `for` loop in `resolveFiber`"); 1393 } 1394 1395 1396 // SSLException 1397 /++ 1398 Exception thrown when OpenSSL functions return a non-`1` error code, such as 1399 when the OpenSSL context could not be setup, or when it could not establish 1400 an SSL connection from an otherwise live connection. 1401 1402 The attached `code` should be the error integer yielded from the failing SSL call. 1403 +/ 1404 final class SSLException : Exception 1405 { 1406 /// SSL error code. 1407 int code; 1408 1409 /// Constructor attaching an error code. 1410 this( 1411 const string msg, 1412 const int code, 1413 const string file = __FILE__, 1414 const size_t line = __LINE__, 1415 Throwable nextInChain = null) pure nothrow @nogc @safe 1416 { 1417 this.code = code; 1418 super(msg, file, line, nextInChain); 1419 } 1420 1421 /// Passthrough constructor. 1422 this( 1423 const string msg, 1424 const string file = __FILE__, 1425 const size_t line = __LINE__, 1426 Throwable nextInChain = null) pure nothrow @nogc @safe 1427 { 1428 super(msg, file, line, nextInChain); 1429 } 1430 } 1431 1432 1433 // SSLFileException 1434 /++ 1435 Exception thrown when a certificate or a private key file could not be found. 1436 +/ 1437 final class SSLFileException : Exception 1438 { 1439 /// Filename that doesn't exist. 1440 string filename; 1441 1442 /// Constructor attaching an error code. 1443 this( 1444 const string msg, 1445 const string filename, 1446 const string file = __FILE__, 1447 const size_t line = __LINE__, 1448 Throwable nextInChain = null) pure nothrow @nogc @safe 1449 { 1450 this.filename = filename; 1451 super(msg, file, line, nextInChain); 1452 } 1453 1454 /// Passthrough constructor. 1455 this( 1456 const string msg, 1457 const string file = __FILE__, 1458 const size_t line = __LINE__, 1459 Throwable nextInChain = null) pure nothrow @nogc @safe 1460 { 1461 super(msg, file, line, nextInChain); 1462 } 1463 } 1464 1465 1466 // SocketSendException 1467 /++ 1468 Exception thrown when a socket send action returned [std.socket.Socket.ERROR|Socket.ERROR]. 1469 +/ 1470 final class SocketSendException : Exception 1471 { 1472 /// Passthrough constructor. 1473 this( 1474 const string msg, 1475 const string file = __FILE__, 1476 const size_t line = __LINE__, 1477 Throwable nextInChain = null) pure nothrow @nogc @safe 1478 { 1479 super(msg, file, line, nextInChain); 1480 } 1481 }