1 /++
2     Functionality related to connecting to a server over the Internet.
3 
4     Includes [core.thread.fiber.Fiber|Fiber]s that help with resolving the
5     address of, connecting to, and reading full string lines from a server.
6 
7     Having them as [core.thread.fiber.Fiber|Fiber]s means a program can do
8     address resolution, connecting and reading while retaining the ability to do
9     other stuff concurrently. This means you can conveniently run code in between
10     each connection attempt, for instance, without breaking the program's flow.
11 
12     Example:
13     ---
14     import std.concurrency : Generator;
15 
16     Connection conn;
17     bool abort;  // Set to true if something goes wrong
18 
19     conn.reset();
20 
21     bool useIPv6 = false;
22     enum resolveAttempts = 10;
23 
24     auto resolver = new Generator!ResolveAttempt(() =>
25         resolveFiber(conn, "irc.libera.chat", 6667, useIPv6, resolveAttempts, abort));
26 
27     resolver.call();
28 
29     resolveloop:
30     foreach (const attempt; resolver)
31     {
32         // attempt is a yielded `ResolveAttempt`
33         // switch on `attempt.state`, deal with it accordingly
34     }
35 
36     // Resolution done
37 
38     enum connectionRetries = 10;
39 
40     auto connector = new Generator!ConnectionAttempt(() =>
41         connectFiber(conn, false, connectionRetries, abort));
42 
43     connector.call();
44 
45     connectorloop:
46     foreach (const attempt; connector)
47     {
48         // attempt is a yielded `ConnectionAttempt`
49         // switch on `attempt.state`, deal with it accordingly
50     }
51 
52     // Connection established
53 
54     enum timeoutSeconds = 600;
55 
56     auto listener = new Generator!ListenAttempt(() => listenFiber(conn, abort, timeoutSecond));
57 
58     listener.call();
59 
60     foreach (const attempt; listener)
61     {
62         // attempt is a yielded `ListenAttempt`
63         doThingsWithLineFromServer(attempt.line);
64         // program logic goes here
65     }
66     ---
67 
68     Copyright: [JR](https://github.com/zorael)
69     License: [Boost Software License 1.0](https://www.boost.org/users/license.html)
70 
71     Authors:
72         [JR](https://github.com/zorael)
73  +/
74 module kameloso.net;
75 
76 private:
77 
78 import kameloso.constants : BufferSize, Timeout;
79 
80 public:
81 
82 @safe:
83 
84 
85 // Connection
86 /++
87     Functions and state needed to maintain a connection.
88 
89     This is simply to decrease the amount of globals and to create some
90     convenience functions.
91  +/
92 struct Connection
93 {
94 private:
95     import requests.ssl_adapter : SSL, SSL_CTX, openssl;
96     import std.socket : Address, Socket, SocketOption;
97 
98     /// Real IPv4 and IPv6 sockets to connect through.
99     Socket socket4, socket6;
100 
101     /// Private cached send timeout setting.
102     uint _sendTimeout;
103 
104     /// Private cached received timeout setting.
105     uint _receiveTimeout;
106 
107     /// Private SSL context.
108     SSL_CTX* sslContext;
109 
110     /++
111         OpenSSL [requests.ssl_adapter.SSL] instance, for use with SSL connections.
112      +/
113     SSL* sslInstance;
114 
115 
116     // setTimemout
117     /++
118         Sets the [std.socket.SocketOption.RCVTIMEO|SocketOption.RCVTIMEO] of the
119         *current* [std.socket.Socket|Socket] [socket] to the specified duration.
120 
121         Params:
122             option = The [std.socket.SocketOption|SocketOption] to set.
123             dur = The duration to assign for the option, in number of milliseconds.
124      +/
125     void setTimeout(const SocketOption option, const uint dur)
126     {
127         import std.socket : SocketOptionLevel;
128         import core.time : msecs;
129 
130         with (socket)
131         with (SocketOptionLevel)
132         {
133             setOption(SOCKET, option, dur.msecs);
134         }
135     }
136 
137 public:
138     /++
139         Pointer to the socket of the [std.socket.AddressFamily|AddressFamily] we
140         want to connect with.
141      +/
142     Socket socket;
143 
144     /++
145         Whether or not this [Connection] should use SSL when sending and receiving.
146      +/
147     bool ssl;
148 
149     /// IPs already resolved using [kameloso.net.resolveFiber|resolveFiber].
150     Address[] ips;
151 
152     /++
153         Implicitly proxies calls to the current [std.socket.Socket|Socket].
154         This successfully proxies to [std.socket.Socket.receive|Socket.receive].
155      +/
156     alias socket this;
157 
158     /++
159         Whether we are connected or not.
160      +/
161     bool connected;
162 
163     /++
164         Path to a (`.pem`) SSL certificate file.
165      +/
166     string certFile;
167 
168     /++
169         Path to a private SSL key file.
170      +/
171     string privateKeyFile;
172 
173     // sendTimeout
174     /++
175         Accessor; returns the current send timeout.
176 
177         Returns:
178             A copy of [_sendTimeout].
179      +/
180     pragma(inline, true)
181     auto sendTimeout() const @property pure @nogc nothrow
182     {
183         return _sendTimeout;
184     }
185 
186     // sendTimeout
187     /++
188         Mutator; sets the send timeout socket option to the passed duration.
189 
190         Params:
191             dur = The duration to assign as send timeout, in number of milliseconds.
192      +/
193     pragma(inline, true)
194     void sendTimeout(const uint dur) @property
195     {
196         setTimeout(SocketOption.SNDTIMEO, dur);
197         _sendTimeout = dur;
198     }
199 
200     // receiveTimeout
201     /++
202         Accessor; returns the current receive timeout.
203 
204         Returns:
205             A copy of [_receiveTimeout].
206      +/
207     pragma(inline, true)
208     auto receiveTimeout() const @property pure @nogc nothrow
209     {
210         return _receiveTimeout;
211     }
212 
213     // sendTimeout
214     /++
215         Mutator; sets the receive timeout socket option to the passed duration.
216 
217         Params:
218             dur = The duration to assign as receive timeout, in number of milliseconds.
219      +/
220     void receiveTimeout(const uint dur) @property
221     {
222         setTimeout(SocketOption.RCVTIMEO, dur);
223         _receiveTimeout = dur;
224     }
225 
226     // reset
227     /++
228         (Re-)initialises the sockets and sets the IPv4 one as the active one.
229 
230         If we ever change this to a class, this should be the default constructor.
231      +/
232     void reset()
233     {
234         teardown();
235         setup();
236         connected = false;
237     }
238 
239     // resetSSL
240     /++
241         Resets the SSL context and resources of this [Connection].
242      +/
243     void resetSSL() @system
244     in (ssl, "Tried to reset SSL on a non-SSL `Connection`")
245     {
246         teardownSSL();
247         setupSSL();
248     }
249 
250     // getSSLErrorMessage
251     /++
252         Returns the SSL error message for the passed SSL error code.
253 
254         Params:
255             code = SSL error code to translate to string.
256 
257         Returns:
258             A string with the last SSL error code translated into humanly-readable text.
259      +/
260     auto getSSLErrorMessage(const int code) @system
261     in (ssl, "Tried to get SSL error message on a non-SSL `Connection`")
262     {
263         import std.string : fromStringz;
264 
265         immutable errorCode = openssl.SSL_get_error(sslInstance, code);
266 
267         return openssl.ERR_reason_error_string(errorCode)
268             .fromStringz
269             .idup;
270     }
271 
272     // setDefaultOptions
273     /++
274         Sets up sockets with the [std.socket.SocketOption|SocketOption]s needed.
275         These include timeouts and buffer sizes.
276 
277         Params:
278             socketToSetup = Reference to the [std.socket.Socket|Socket] to modify.
279      +/
280     void setDefaultOptions(Socket socketToSetup)
281     {
282         import std.socket : SocketOption, SocketOptionLevel;
283         import core.time : msecs;
284 
285         with (socketToSetup)
286         with (SocketOption)
287         with (SocketOptionLevel)
288         {
289             setOption(SOCKET, RCVBUF, BufferSize.socketOptionReceive);
290             setOption(SOCKET, SNDBUF, BufferSize.socketOptionSend);
291             setOption(SOCKET, RCVTIMEO, Timeout.receiveMsecs.msecs);
292             setOption(SOCKET, SNDTIMEO, Timeout.sendMsecs.msecs);
293 
294             _receiveTimeout = Timeout.receiveMsecs;
295             _sendTimeout = Timeout.sendMsecs;
296             blocking = true;
297         }
298     }
299 
300     // setupSSL
301     /++
302         Sets up the SSL context for this connection.
303 
304         Throws:
305             [SSLException] if the SSL context could not be set up.
306 
307             [SSLFileException] if any specified certificate or private key could not be found.
308      +/
309     void setupSSL() @system
310     in (ssl, "Tried to set up SSL context on a non-SSL `Connection`")
311     in (socket, "Tried to set up an SSL context on a null `Socket`")
312     {
313         import std.file : exists;
314         import std.path : extension;
315         import std.string : toStringz;
316 
317         sslContext = openssl.SSL_CTX_new(openssl.TLS_method);
318         openssl.SSL_CTX_set_verify(sslContext, 0, null);
319 
320         if (certFile.length)
321         {
322             // Before SSL_new
323             if (!certFile.exists)
324             {
325                 enum message = "No such certificate file";
326                 throw new SSLFileException(
327                     message,
328                     certFile,
329                     __FILE__,
330                     __LINE__);
331             }
332 
333             immutable filetype = (certFile.extension == ".pem") ? 1 : 0;
334             immutable code = openssl.SSL_CTX_use_certificate_file(
335                 sslContext,
336                 toStringz(certFile),
337                 filetype);
338             if (code != 1) throw new SSLException("Failed to set certificate", code);
339         }
340 
341         if (privateKeyFile.length)
342         {
343             // Ditto
344             if (!privateKeyFile.exists)
345             {
346                 enum message = "No such private key file";
347                 throw new SSLFileException(
348                     message,
349                     privateKeyFile,
350                     __FILE__,
351                     __LINE__);
352             }
353 
354             immutable filetype = (privateKeyFile.extension == ".pem") ? 1 : 0;
355             immutable code = openssl.SSL_CTX_use_PrivateKey_file(
356                 sslContext,
357                 toStringz(privateKeyFile),
358                 filetype);
359             if (code != 1) throw new SSLException("Failed to set private key", code);
360         }
361 
362         sslInstance = openssl.SSL_new(sslContext);
363         immutable code = openssl.SSL_set_fd(sslInstance, cast(int)socket.handle);
364         if (code != 1) throw new SSLException("Failed to attach socket handle", code);
365     }
366 
367     // teardownSSL
368     /++
369         Frees SSL context and resources.
370      +/
371     void teardownSSL()
372     in (ssl, "Tried to teardown SSL on a non-SSL `Connection`")
373     {
374         if (sslInstance) openssl.SSL_free(sslInstance);
375         if (sslContext) openssl.SSL_CTX_free(sslContext);
376     }
377 
378     // teardown
379     /++
380         Shuts down and closes the internal [std.socket.Socket|Socket]s.
381      +/
382     void teardown()
383     {
384         import std.range : only;
385         import std.socket : SocketShutdown;
386 
387         foreach (thisSocket; only(socket4, socket6))
388         {
389             if (!thisSocket) continue;
390 
391             thisSocket.shutdown(SocketShutdown.BOTH);
392             thisSocket.close();
393         }
394     }
395 
396     // setup
397     /++
398         Initialises new [std.socket.Socket|Socket]s and sets their options.
399      +/
400     void setup()
401     {
402         import std.socket : TcpSocket, AddressFamily, SocketType;
403 
404         socket4 = new TcpSocket;
405         socket6 = new Socket(AddressFamily.INET6, SocketType.STREAM);
406         socket = socket4;
407 
408         setDefaultOptions(socket4);
409         setDefaultOptions(socket6);
410     }
411 
412     // sendline
413     /++
414         Sends a line to the server.
415 
416         Intended for servers that deliminates lines by linebreaks, such as IRC servers.
417 
418         Example:
419         ---
420         conn.sendline("NICK foobar");
421         conn.sendline("PRIVMSG #channel :text");
422         conn.sendline("PRIVMSG " ~ channel ~ " :" ~ content);
423         conn.sendline(longerLine, 1024L);  // Now with custom line lengths
424         ---
425 
426         Params:
427             rawline = Line to send. May contain substrings separated by newline
428                 characters. A final linebreak is added to the end of the send.
429             maxLineLength = Maximum line length before the sent message will be truncated.
430             linebreak = Characters to use as linebreak, marking the end of a line to send.
431 
432         Throws:
433             [SocketSendException] if the call to send data through the socket
434             returns [std.socket.Socket.ERROR|Socket.ERROR].
435      +/
436     void sendline(
437         const string rawline,
438         const uint maxLineLength = 512,
439         const string linebreak = "\r\n") @system
440     in (connected, "Tried to send a line on an unconnected `Connection`")
441     {
442         import std.string : indexOf;
443 
444         if (!rawline.length) return;
445 
446         immutable maxAvailableLength = (maxLineLength - linebreak.length);
447 
448         void sendlineImpl(const string line)
449         {
450             import std.algorithm.comparison : min;
451 
452             immutable lineLength = min(line.length, maxAvailableLength);
453             size_t totalSent;
454 
455             auto sendSubstring(const string substring)
456             in (substring.length, "Tried to send empty substring to server")
457             {
458                 immutable bytesSent = ssl ?
459                     openssl.SSL_write(sslInstance, substring.ptr, cast(int)substring.length) :
460                     socket.send(substring);
461 
462                 if (bytesSent == Socket.ERROR)
463                 {
464                     enum message = "Socket.ERROR returned when sending data to server";
465                     throw new SocketSendException(message);
466                 }
467 
468                 return bytesSent;
469             }
470 
471             while (totalSent < lineLength)
472             {
473                 totalSent += sendSubstring(line[totalSent..lineLength]);
474             }
475 
476             // Always end the line with a linebreak
477             sendSubstring(linebreak);
478         }
479 
480         auto newlinePos = rawline.indexOf('\n');  // mutable
481 
482         if (newlinePos != -1)
483         {
484             // Line incorrectly has at least one newline, so send up until
485             // the first and discard the remainder
486 
487             if ((newlinePos > 0) && (rawline[newlinePos-1] == '\r'))
488             {
489                 // It was actually "\r\n", so omit the '\r' too
490                 --newlinePos;
491             }
492 
493             sendlineImpl(rawline[0..newlinePos]);
494         }
495         else
496         {
497             // Plain line
498             sendlineImpl(rawline);
499         }
500     }
501 }
502 
503 
504 // ListenAttempt
505 /++
506     Embodies the idea of a listening attempt.
507  +/
508 struct ListenAttempt
509 {
510     /++
511         The various states a listening attempt may be in.
512      +/
513     enum State
514     {
515         unset,      /// Init value.
516         prelisten,  /// About to listen.
517         isEmpty,    /// Empty result; nothing read or similar.
518         hasString,  /// String read, ready for processing.
519         timeout,    /// Connection read timed out.
520         warning,    /// Recoverable exception thrown; warn and continue.
521         error,      /// Unrecoverable exception thrown; abort.
522     }
523 
524     /// The current state of the attempt.
525     State state;
526 
527     /// The last read line of text sent by the server.
528     string line;
529 
530     /// The [std.socket.lastSocketError|lastSocketError] at the last point of error.
531     string error;
532 
533     /// [core.stdc.errno.errno|errno] at time of read.
534     int errno;
535 
536     /// The amount of bytes received this attempt.
537     long bytesReceived;
538 }
539 
540 
541 // listenFiber
542 /++
543     A [std.socket.Socket|Socket]-reading [std.concurrency.Generator|Generator].
544     It reads and yields full string lines.
545 
546     It maintains its own buffer into which it receives from the server, though
547     not necessarily full lines. It thus keeps filling the buffer until it
548     finds a newline character, yields [ListenAttempt]s back to the caller of
549     the Fiber, checks for more lines to yield, and if none yields an attempt
550     with a [ListenAttempt.State] denoting that nothing was read and that a new
551     attempt should be made later.
552 
553     Example:
554     ---
555     //Connection conn;  // Address previously connected established with
556 
557     enum timeoutSeconds = 600;
558 
559     auto listener = new Generator!ListenAttempt(() => listenFiber(conn, abort, timeoutSeconds));
560 
561     listener.call();
562 
563     foreach (const attempt; listener)
564     {
565         // attempt is a yielded `ListenAttempt`
566 
567         with (ListenAttempt.State)
568         final switch (attempt.state)
569         {
570         case prelisten:
571             assert(0, "shouldn't happen");
572 
573         case isEmpty:
574         case timeout:
575             // Reading timed out or nothing was read, happens
576             break;
577 
578         case hasString:
579             // A line was successfully read!
580             // program logic goes here
581             doThings(attempt.line);
582             break;
583 
584         case warning:
585             // Recoverable
586             warnAboutSomething(attempt.error);
587             break;
588 
589         case error:
590             // Unrecoverable
591             dealWithError(attempt.error);
592             return;
593         }
594     }
595     ---
596 
597     Params:
598         bufferSize = What size static array to use as buffer. Defaults to twice of
599             [kameloso.constants.BufferSize.socketReceive|BufferSize.socketReceive] for now.
600         conn = [Connection] whose [std.socket.Socket|Socket] it reads from the server with.
601         abort = Reference "abort" flag, which -- if set -- should make the
602             function return and the [core.thread.fiber.Fiber|Fiber] terminate.
603         connectionLost = How many seconds may pass before we consider the connection lost.
604             Optional, defaults to
605             [kameloso.constants.Timeout.connectionLost|Timeout.connectionLost].
606 
607     Yields:
608         [ListenAttempt]s with information about the line received in its member values.
609  +/
610 void listenFiber(size_t bufferSize = BufferSize.socketReceive*2)
611     (Connection conn,
612     ref bool abort,
613     const int connectionLost = Timeout.connectionLost) @system
614 in ((conn.connected), "Tried to set up a listening fiber on a dead connection")
615 in ((connectionLost > 0), "Tried to set up a listening fiber with connection timeout of <= 0")
616 {
617     import kameloso.constants : BufferSize;
618     import std.concurrency : yield;
619     import std.datetime.systime : Clock;
620     import std.socket : Socket, lastSocketError;
621     import std.string : indexOf;
622 
623     if (abort) return;
624 
625     ubyte[bufferSize] buffer;
626     long timeLastReceived = Clock.currTime.toUnixTime;
627     size_t start;
628 
629     alias State = ListenAttempt.State;
630 
631     // The Generator we use this function with popFronts the first thing it does
632     // after being instantiated. To work around our main loop popping too we
633     // yield an initial empty value; else the first thing to happen will be a
634     // double pop, and the first line is missed.
635     yield(ListenAttempt.init);
636 
637     /// How many consecutive warnings to allow before yielding an error.
638     enum maxConsecutiveWarningsUntilError = 20;
639 
640     /// Current consecutive warnings count.
641     uint consecutiveWarnings;
642 
643     while (!abort)
644     {
645         version(Posix)
646         {
647             import core.stdc.errno;
648 
649             // https://www-numi.fnal.gov/offline_software/srt_public_context/WebDocs/Errors/unix_system_errors.html
650 
651             enum Errno
652             {
653                 timedOut = EAGAIN,
654                 wouldBlock = EWOULDBLOCK,
655                 netDown = ENETDOWN,
656                 netUnreachable = ENETUNREACH,
657                 endpointNotConnected = ENOTCONN,
658                 connectionReset = ECONNRESET,
659                 connectionAborted = ECONNABORTED,
660                 interrupted = EINTR,
661             }
662         }
663         else version(Windows)
664         {
665             import core.sys.windows.winsock2;
666 
667             alias errno = WSAGetLastError;
668 
669             // https://www.hardhats.org/cs/broker/docs/winsock.html
670             // https://infosys.beckhoff.com/english.php?content=../content/1033/tcpipserver/html/tcplclibtcpip_e_winsockerror.htm
671 
672             enum Errno
673             {
674                 unexpectedEOF = 0,
675                 timedOut = WSAETIMEDOUT,
676                 wouldBlock = WSAEWOULDBLOCK,
677                 netDown = WSAENETDOWN,
678                 netUnreachable = WSAENETUNREACH,
679                 endpointNotConnected = WSAENOTCONN,
680                 connectionReset = WSAECONNRESET,
681                 connectionAborted = WSAECONNABORTED,
682                 interrupted = WSAEINTR,
683                 overlappedIO = 997,
684             }
685         }
686         else
687         {
688             static assert(0, "Unsupported platform, please file a bug.");
689         }
690 
691         ListenAttempt attempt;
692 
693         if (conn.ssl)
694         {
695             import requests.ssl_adapter : openssl;
696             attempt.bytesReceived = openssl.SSL_read(
697                 conn.sslInstance,
698                 cast(void*)buffer.ptr+start,
699                 cast(int)(buffer.length-start));
700         }
701         else
702         {
703             attempt.bytesReceived = conn.receive(buffer[start..$]);
704         }
705 
706         attempt.errno = errno;
707 
708         if (!attempt.bytesReceived)
709         {
710             attempt.state = State.error;
711             attempt.error = lastSocketError;
712             yield(attempt);
713             // Should never get here
714             assert(0, "Dead `listenFiber` resumed after yield (no bytes received)");
715         }
716 
717         if (attempt.errno == Errno.interrupted)
718         {
719             // Interrupted read; try again
720             // Unlucky callgrind_control -d timing
721             attempt.state = State.isEmpty;
722             attempt.error = lastSocketError;
723             consecutiveWarnings = 0;
724             yield(attempt);
725             continue;
726         }
727 
728         if (attempt.bytesReceived == Socket.ERROR)
729         {
730             if ((Clock.currTime.toUnixTime - timeLastReceived) > connectionLost)
731             {
732                 attempt.state = State.timeout;
733                 yield(attempt);
734 
735                 // Should never get here
736                 enum message = "Timed out `listenFiber` resumed after yield " ~
737                     "(received error, elapsed > timeout)";
738                 assert(0, message);
739             }
740 
741             with (Errno)
742             switch (attempt.errno)
743             {
744             case timedOut:
745                 // Resource temporarily unavailable
746                 /*
747                     A connection attempt failed because the connected party did not
748                     properly respond after a period of time, or established connection
749                     failed because connected host has failed to respond.
750                  */
751                 // Timed out, nothing received
752                 attempt.state = State.isEmpty;
753                 consecutiveWarnings = 0;
754                 yield(attempt);
755                 continue;
756 
757             static if (int(timedOut) != int(wouldBlock))
758             {
759                 case wouldBlock:
760                     /+
761                         Portability Note: In many older Unix systems ...
762                         [EWOULDBLOCK was] a distinct error code different from
763                         EAGAIN. To make your program portable, you should check
764                         for both codes and treat them the same.
765                      +/
766                     // A non-blocking socket operation could not be completed immediately.
767                     goto case timedOut;
768             }
769 
770             version(Windows)
771             {
772                 case overlappedIO:
773                     // "Overlapped I/O operation is in progress."
774                     // seems benign
775                     goto case timedOut;
776 
777                 case unexpectedEOF:
778                     /+
779                         If you're getting 0 from WSAGetLastError, then this is
780                         most likely due to an unexpected EOF occurring on the socket,
781                         i.e. the client has gracefully closed the connection
782                         without sending a close_notify alert.
783                      +/
784                     // "The operation completed successfully."
785                     goto case;
786             }
787 
788             case netDown:
789             case netUnreachable:
790             case endpointNotConnected:
791             case connectionReset:
792             case connectionAborted:
793                 attempt.state = State.error;
794                 attempt.error = lastSocketError;
795                 yield(attempt);
796                 // Should never get here
797                 assert(0, "Dead `listenFiber` resumed after yield (`lastSocketError` error)");
798 
799             default:
800                 attempt.error = lastSocketError;
801 
802                 if (++consecutiveWarnings >= maxConsecutiveWarningsUntilError)
803                 {
804                     attempt.state = State.error;
805                     yield(attempt);
806                     // Should never get here
807                     assert(0, "Dead `listenFiber` resumed after yield (exceeded max consecutive errors)");
808                 }
809                 else
810                 {
811                     attempt.state = State.warning;
812                     yield(attempt);
813                     continue;
814                 }
815             }
816         }
817 
818         timeLastReceived = Clock.currTime.toUnixTime;
819         consecutiveWarnings = 0;
820 
821         immutable ptrdiff_t end = cast(ptrdiff_t)(start + attempt.bytesReceived);
822         ptrdiff_t newline = (cast(char[])buffer[0..end]).indexOf('\n');
823         size_t pos;
824 
825         while (newline != -1)
826         {
827             attempt.state = State.hasString;
828             attempt.line = (cast(char[])buffer[pos..pos+newline-1]).idup;  // eat \r before \n
829             yield(attempt);
830             pos += (newline + 1); // eat remaining newline
831             newline = (cast(char[])buffer[pos..end]).indexOf('\n');
832         }
833 
834         attempt.state = State.isEmpty;
835         yield(attempt);
836 
837         if (pos >= end)
838         {
839             // can be end or end+1
840             start = 0;
841             continue;
842         }
843 
844         start = (end - pos);
845 
846         // writefln("REMNANT:|%s|", cast(string)buffer[pos..end]);
847         import core.stdc.string : memmove;
848         memmove(buffer.ptr, (buffer.ptr + pos), (ubyte.sizeof * start));
849     }
850 }
851 
852 
853 // ConnectionAttempt
854 /++
855     Embodies the idea of a connection attempt.
856  +/
857 struct ConnectionAttempt
858 {
859 private:
860     import std.socket : Address;
861 
862 public:
863     /++
864         The various states a connection attempt may be in.
865      +/
866     enum State
867     {
868         unset,                   /// Init value.
869         preconnect,              /// About to connect.
870         connected,               /// Successfully connected.
871         delayThenReconnect,      /// Failed to connect; should delay and retry.
872         delayThenNextIP,         /// Failed to reconnect several times; next IP.
873         //noMoreIPs,             /// Exhausted all IPs and could not connect.
874         ipv6Failure,             /// IPv6 connection failed.
875         transientSSLFailure,     /// Transient failure establishing an SSL connection, safe to retry.
876         fatalSSLFailure,         /// Fatal failure establishing an SSL connection, should abort.
877         invalidConnectionError,  /// The current IP cannot be connected to.
878         error,                   /// Error connecting; should abort.
879     }
880 
881     /// The current state of the attempt.
882     State state;
883 
884     /// The IP that the attempt is trying to connect to.
885     Address ip;
886 
887     /// The error message as thrown by an exception.
888     string error;
889 
890     /// [core.stdc.errno.errno|errno] at time of connect.
891     int errno;
892 
893     /// The number of retries so far towards this [ip].
894     uint retryNum;
895 }
896 
897 
898 // connectFiber
899 /++
900     Fiber function that tries to connect to IPs in the `ips` array of the passed
901     [Connection], yielding at certain points throughout the process to let the
902     calling function do stuff in between connection attempts.
903 
904     Example:
905     ---
906     //Connection conn;  // Address previously resolved with `resolveFiber`
907 
908     auto connector = new Generator!ConnectionAttempt(() =>
909         connectFiber(conn, false, 10, abort));
910 
911     connector.call();
912 
913     connectorloop:
914     foreach (const attempt; connector)
915     {
916         // attempt is a yielded `ConnectionAttempt`
917 
918         with (ConnectionAttempt.State)
919         final switch (attempt.state)
920         {
921         case preconnect:
922             assert(0, "shouldn't happen");
923 
924         case connected:
925             // Socket is connected, continue with normal routine
926             break connectorloop;
927 
928         case delayThenReconnect:
929         case delayThenNextIP:
930             // Delay and retry
931             Thread.sleep(5.seconds);
932             break;
933 
934         case ipv6Failure:
935             // Deal with it
936             dealWithIPv6(attempt.error);
937             break;
938 
939         case sslFailure:
940         case error:
941             // Failed to connect
942             return;
943         }
944     }
945 
946     // Connection established
947     ---
948 
949     Params:
950         conn = Reference to the current, unconnected [Connection].
951         connectionRetries = How many times to attempt to connect before signalling
952             that we should move on to the next IP.
953         abort = Reference "abort" flag, which -- if set -- should make the
954             function return and the [core.thread.fiber.Fiber|Fiber] terminate.
955  +/
956 void connectFiber(
957     ref Connection conn,
958     const uint connectionRetries,
959     ref bool abort) @system
960 in (!conn.connected, "Tried to set up a connecting fiber on an already live connection")
961 in ((conn.ips.length > 0), "Tried to connect to an unresolved connection")
962 {
963     import std.concurrency : yield;
964     import std.socket : AddressFamily, SocketException;
965 
966     if (abort) return;
967 
968     alias State = ConnectionAttempt.State;
969 
970     yield(ConnectionAttempt.init);
971 
972     scope(exit)
973     {
974         conn.teardown();
975         if (conn.ssl) conn.teardownSSL();
976     }
977 
978     do
979     {
980         iploop:
981         foreach (immutable i, ip; conn.ips)
982         {
983             immutable isIPv6 = (ip.addressFamily == AddressFamily.INET6);
984 
985             ConnectionAttempt attempt;
986             attempt.ip = ip;
987 
988             attemptloop:
989             foreach (immutable retry; 0..connectionRetries)
990             {
991                 if (abort) return;
992 
993                 conn.reset();
994                 conn.socket = isIPv6 ? conn.socket6 : conn.socket4;
995 
996                 try
997                 {
998                     if (conn.ssl)
999                     {
1000                         // *After* conn.socket has been changed.
1001                         conn.resetSSL();
1002                     }
1003 
1004                     attempt.retryNum = retry;
1005                     attempt.state = State.preconnect;
1006                     attempt.errno = 0;  // reset
1007                     yield(attempt);
1008 
1009                     conn.socket.connect(ip);
1010 
1011                     if (conn.ssl)
1012                     {
1013                         import requests.ssl_adapter : openssl;
1014 
1015                         immutable code = openssl.SSL_connect(conn.sslInstance);
1016 
1017                         if (code != 1)
1018                         {
1019                             enum message = "Failed to establish SSL connection " ~
1020                                 "after successful connect";
1021                             throw new SSLException(message, code);
1022                         }
1023                     }
1024 
1025                     // If we're here no exception was thrown and we didn't yield
1026                     // out of SSL errors, so we're connected
1027 
1028                     attempt.state = State.connected;
1029                     conn.connected = true;
1030                     yield(attempt);
1031                     // Should never get here
1032                     assert(0, "Finished `connectFiber` resumed after yield");
1033                 }
1034                 catch (SocketException e)
1035                 {
1036                     version(Posix)
1037                     {
1038                         import core.stdc.errno : EAFNOSUPPORT, ECONNREFUSED,
1039                             EHOSTUNREACH, ENETUNREACH, errno;
1040 
1041                         // https://www-numi.fnal.gov/offline_software/srt_public_context/WebDocs/Errors/unix_system_errors.html
1042 
1043                         enum Errno
1044                         {
1045                             addressFamilyNoSupport = EAFNOSUPPORT,
1046                             connectionRefused = ECONNREFUSED,
1047                             noRouteToHost = EHOSTUNREACH,
1048                             networkUnreachable = ENETUNREACH,
1049                         }
1050 
1051                         attempt.errno = errno;
1052                     }
1053                     else version(Windows)
1054                     {
1055                         import core.sys.windows.winsock2 : WSAEAFNOSUPPORT, WSAECONNREFUSED,
1056                             WSAEHOSTUNREACH, WSAENETUNREACH, WSAGetLastError;
1057 
1058                         enum Errno
1059                         {
1060                             addressFamilyNoSupport = WSAEAFNOSUPPORT,
1061                             connectionRefused = WSAECONNREFUSED,
1062                             noRouteToHost = WSAEHOSTUNREACH,
1063                             networkUnreachable = WSAENETUNREACH,
1064                         }
1065 
1066                         attempt.errno = WSAGetLastError();
1067                     }
1068                     else
1069                     {
1070                         static assert(0, "Unsupported platform, please file a bug.");
1071                     }
1072 
1073                     with (Errno)
1074                     switch (attempt.errno)
1075                     {
1076                     case addressFamilyNoSupport:
1077                         // Address family not supported by protocol
1078                         // An address incompatible with the requested protocol was used.
1079                         if (isIPv6)
1080                         {
1081                             attempt.state = State.ipv6Failure;
1082                             attempt.error = e.msg;
1083                             yield(attempt);
1084 
1085                             // Remove IPv6 addresses from conn.ips
1086                             foreach_reverse (immutable n, const arrayIP; conn.ips)
1087                             {
1088                                 if (n == i) break;  // caught up to current
1089 
1090                                 if (arrayIP.addressFamily == AddressFamily.INET6)
1091                                 {
1092                                     import std.algorithm.mutation : SwapStrategy, remove;
1093                                     conn.ips = conn.ips
1094                                         .remove!(SwapStrategy.unstable)(n);
1095                                 }
1096                             }
1097                             continue iploop;
1098                         }
1099                         else
1100                         {
1101                             // Just treat it as a normal error
1102                             goto case;
1103                         }
1104 
1105                     case connectionRefused:
1106                         // Connection refused
1107                         // No connection could be made because the target machine actively refused it.
1108                         attempt.state = State.invalidConnectionError;
1109                         attempt.error = e.msg;
1110                         yield(attempt);
1111                         continue iploop;
1112 
1113                     //case noRouteToHost:
1114                         // No route to host
1115                         // A socket operation was attempted to an unreachable host.
1116                     //case networkUnreachable:
1117                         // Network is unreachable
1118                         // A socket operation was attempted to an unreachable network.
1119                     default:
1120                         // Don't delay for retrying on the last retry, drop down below
1121                         if (retry+1 < connectionRetries)
1122                         {
1123                             attempt.state = State.delayThenReconnect;
1124                             attempt.error = e.msg;
1125                             yield(attempt);
1126                         }
1127                         continue attemptloop;
1128                     }
1129                 }
1130                 catch (SSLException e)
1131                 {
1132                     import std.format : format;
1133 
1134                     enum pattern = "%s (%s)";
1135                     attempt.state = State.transientSSLFailure;
1136                     attempt.error = pattern.format(e.msg, conn.getSSLErrorMessage(e.code));
1137                     yield(attempt);
1138                     continue attemptloop;
1139                 }
1140                 catch (SSLFileException e)
1141                 {
1142                     import std.format : format;
1143 
1144                     enum pattern = "%s: %s";
1145                     attempt.state = State.fatalSSLFailure;
1146                     attempt.error = pattern.format(e.msg, e.filename);
1147                     yield(attempt);
1148                     continue attemptloop;
1149                 }
1150             }
1151 
1152             // foreach ended; connectionRetries reached.
1153             // Move on to next IP (or same again if only one)
1154             attempt.state = (conn.ips.length > 1) ?
1155                 State.delayThenNextIP :
1156                 State.delayThenReconnect;
1157             yield(attempt);
1158         }
1159     }
1160     while (!abort);
1161 }
1162 
1163 
1164 // ResolveAttempt
1165 /++
1166     Embodies the idea of an address resolution attempt.
1167  +/
1168 struct ResolveAttempt
1169 {
1170     /++
1171         The various states an address resolution attempt may be in.
1172      +/
1173     enum State
1174     {
1175         unset,          /// Init value.
1176         preresolve,     /// About to resolve.
1177         success,        /// Successfully resolved.
1178         exception,      /// Failure, recoverable exception thrown.
1179         failure,        /// Resolution failure; should abort.
1180         error,          /// Failure, unrecoverable exception thrown.
1181     }
1182 
1183     /// The current state of the attempt.
1184     State state;
1185 
1186     /// The error message as thrown by an exception.
1187     string error;
1188 
1189     /// [core.stdc.errno.errno|errno] at time of resolve.
1190     int errno;
1191 
1192     /// The number of retries so far towards this address.
1193     uint retryNum;
1194 }
1195 
1196 
1197 // resolveFiber
1198 /++
1199     Given an address and a port, resolves these and populates the array of unique
1200     [std.socket.Address|Address] IPs inside the passed [Connection].
1201 
1202     Example:
1203     ---
1204     import std.concurrency : Generator;
1205 
1206     Connection conn;
1207     conn.reset();
1208 
1209     auto resolver = new Generator!ResolveAttempt(() =>
1210         resolveFiber(conn, "irc.libera.chat", 6667, false, 10, abort));
1211 
1212     resolver.call();
1213 
1214     resolveloop:
1215     foreach (const attempt; resolver)
1216     {
1217         // attempt is a yielded `ResolveAttempt`
1218 
1219         with (ResolveAttempt.State)
1220         final switch (attempt.state)
1221         {
1222         case preresolve:
1223             assert(0, "shouldn't happen");
1224 
1225         case success:
1226             // Address was resolved, the passed `conn` was modified
1227             break resolveloop;
1228 
1229         case exception:
1230             // Recoverable
1231             dealWithException(attempt.error);
1232             break;
1233 
1234         case failure:
1235             // Resolution failed without errors
1236             failGracefully(attempt.error);
1237             break;
1238 
1239         case error:
1240             // Unrecoverable
1241             dealWithError(attempt.error);
1242             return;
1243         }
1244     }
1245 
1246     // Address resolved
1247     ---
1248 
1249     Params:
1250         conn = Reference to the current [Connection].
1251         address = String address to look up.
1252         port = Remote port build into the [std.socket.Address|Address].
1253         useIPv6 = Whether to include resolved IPv6 addresses or not.
1254         abort = Reference "abort" flag, which -- if set -- should make the
1255             function return and the [core.thread.fiber.Fiber|Fiber] terminate.
1256  +/
1257 void resolveFiber(
1258     ref Connection conn,
1259     const string address,
1260     const ushort port,
1261     const bool useIPv6,
1262     ref bool abort) @system
1263 in (!conn.connected, "Tried to set up a resolving fiber on an already live connection")
1264 in (address.length, "Tried to set up a resolving fiber on an empty address")
1265 {
1266     import std.concurrency : yield;
1267     import std.socket : AddressFamily, SocketOSException, getAddress;
1268 
1269     if (abort) return;
1270 
1271     alias State = ResolveAttempt.State;
1272 
1273     yield(ResolveAttempt(State.preresolve));
1274 
1275     for (uint i; (i >= 0); ++i)
1276     {
1277         if (abort) return;
1278 
1279         ResolveAttempt attempt;
1280         attempt.retryNum = i;
1281 
1282         try
1283         {
1284             import std.algorithm.iteration : filter, uniq;
1285             import std.array : array;
1286 
1287             conn.ips = getAddress(address, port)
1288                 .filter!(ip => (ip.addressFamily == AddressFamily.INET) ||
1289                     ((ip.addressFamily == AddressFamily.INET6) && useIPv6))
1290                 .uniq!((a,b) => a.toAddrString == b.toAddrString)
1291                 .array;
1292 
1293             attempt.state = State.success;
1294             yield(attempt);
1295             // Should never get here
1296             assert(0, "Dead `resolveFiber` resumed after yield");
1297         }
1298         catch (SocketOSException e)
1299         {
1300             attempt.errno = e.errorCode;
1301 
1302             version(Posix)
1303             {
1304                 import core.sys.posix.netdb : EAI_AGAIN, EAI_FAIL, EAI_FAMILY,
1305                     EAI_NONAME, EAI_SOCKTYPE, EAI_SYSTEM;
1306 
1307                 enum EAI_NODATA = -5;
1308 
1309                 // https://stackoverflow.com/questions/4395919/linux-system-call-getaddrinfo-return-2
1310 
1311                 enum AddrInfoErrors
1312                 {
1313                     //badFlags   = EAI_BADFLAGS,     /** Invalid value for `ai_flags` field. */
1314                     noName       = EAI_NONAME,       /** NAME or SERVICE is unknown. */
1315                     again        = EAI_AGAIN,        /** Temporary failure in name resolution. */
1316                     fail         = EAI_FAIL,         /** Non-recoverable failure in name res. */
1317                     noData       = EAI_NODATA,       /** No address associated with NAME. (GNU) */
1318                     family       = EAI_FAMILY,       /** `ai_family` not supported. */
1319                     sockType     = EAI_SOCKTYPE,     /** `ai_socktype` not supported. */
1320                     //service    = EAI_SERVICE,      /** SERVICE not supported for `ai_socktype`. */
1321                     //addrFamily = EAI_ADDRFAMILY,   /** Address family for NAME not supported. (GNU) */
1322                     //memory     = EAI_MEMORY,       /** Memory allocation failure. */
1323                     system       = EAI_SYSTEM,       /** System error returned in `errno`. */
1324                     //overflow   = EAI_OVERFLOW,     /** Argument buffer overflow. */
1325                 }
1326             }
1327             else version(Windows)
1328             {
1329                 import core.sys.windows.winsock2 : WSAEAFNOSUPPORT, WSAESOCKTNOSUPPORT,
1330                     WSAHOST_NOT_FOUND, WSANO_DATA, WSANO_RECOVERY, WSATRY_AGAIN;
1331 
1332                 // https://docs.microsoft.com/en-us/windows/win32/api/ws2tcpip/nf-ws2tcpip-getaddrinfo
1333 
1334                 enum AddrInfoErrors
1335                 {
1336                     //badFlags   = WSAEINVAL,            /** An invalid value was provided for the `ai_flags` member of the `pHints` parameter. */
1337                     noName       = WSAHOST_NOT_FOUND,    /** The name does not resolve for the supplied parameters or the `pNodeName` and `pServiceName` parameters were not provided. */
1338                     again        = WSATRY_AGAIN,         /** A temporary failure in name resolution occurred. */
1339                     fail         = WSANO_RECOVERY,       /** A nonrecoverable failure in name resolution occurred. */
1340                     noData       = WSANO_DATA,
1341                     family       = WSAEAFNOSUPPORT,      /** The 'ai_family' member of the `pHints` parameter is not supported. */
1342                     sockType     = WSAESOCKTNOSUPPORT,   /** The `ai_socktype` member of the `pHints` parameter is not supported. */
1343                     //service    = WSATYPE_NOT_FOUND,    /** The `pServiceName` parameter is not supported for the specified `ai_socktype` member of the `pHints` parameter. */
1344                     //addrFamily = ?,
1345                     //memory     = WSANOT_ENOUGH_MEMORY, /** A memory allocation failure occurred. */
1346                     //system     = ?,
1347                     //overflow   = ?,
1348                 }
1349             }
1350             else
1351             {
1352                 static assert(0, "Unsupported platform, please file a bug.");
1353             }
1354 
1355             with (AddrInfoErrors)
1356             switch (attempt.errno)
1357             {
1358             case noName:
1359             case again:
1360                 // Assume net down, wait and try again
1361                 attempt.state = State.exception;
1362                 attempt.error = e.msg;
1363                 yield(attempt);
1364                 continue;
1365 
1366             version(Posix)
1367             {
1368                 case system:
1369                     import core.stdc.errno : errno;
1370                     attempt.errno = errno;
1371                     goto default;
1372             }
1373 
1374             //case noData:
1375             //case fail:
1376             //case family:
1377             //case sockType:
1378             default:
1379                 attempt.state = State.error;
1380                 attempt.error = e.msg;
1381                 yield(attempt);
1382                 // Should never get here
1383                 assert(0, "Dead `resolveFiber` resumed after yield");
1384             }
1385         }
1386     }
1387 
1388     // This doesn't really happen at present. Subject to change, so keep it here.
1389     /*ResolveAttempt endAttempt;
1390     endAttempt.state = State.failure;
1391     yield(endAttempt);*/
1392     assert(0, "Broke out of unending `for` loop in `resolveFiber`");
1393 }
1394 
1395 
1396 // SSLException
1397 /++
1398     Exception thrown when OpenSSL functions return a non-`1` error code, such as
1399     when the OpenSSL context could not be setup, or when it could not establish
1400     an SSL connection from an otherwise live connection.
1401 
1402     The attached `code` should be the error integer yielded from the failing SSL call.
1403  +/
1404 final class SSLException : Exception
1405 {
1406     /// SSL error code.
1407     int code;
1408 
1409     /// Constructor attaching an error code.
1410     this(
1411         const string msg,
1412         const int code,
1413         const string file = __FILE__,
1414         const size_t line = __LINE__,
1415         Throwable nextInChain = null) pure nothrow @nogc @safe
1416     {
1417         this.code = code;
1418         super(msg, file, line, nextInChain);
1419     }
1420 
1421     /// Passthrough constructor.
1422     this(
1423         const string msg,
1424         const string file = __FILE__,
1425         const size_t line = __LINE__,
1426         Throwable nextInChain = null) pure nothrow @nogc @safe
1427     {
1428         super(msg, file, line, nextInChain);
1429     }
1430 }
1431 
1432 
1433 // SSLFileException
1434 /++
1435     Exception thrown when a certificate or a private key file could not be found.
1436  +/
1437 final class SSLFileException : Exception
1438 {
1439     /// Filename that doesn't exist.
1440     string filename;
1441 
1442     /// Constructor attaching an error code.
1443     this(
1444         const string msg,
1445         const string filename,
1446         const string file = __FILE__,
1447         const size_t line = __LINE__,
1448         Throwable nextInChain = null) pure nothrow @nogc @safe
1449     {
1450         this.filename = filename;
1451         super(msg, file, line, nextInChain);
1452     }
1453 
1454     /// Passthrough constructor.
1455     this(
1456         const string msg,
1457         const string file = __FILE__,
1458         const size_t line = __LINE__,
1459         Throwable nextInChain = null) pure nothrow @nogc @safe
1460     {
1461         super(msg, file, line, nextInChain);
1462     }
1463 }
1464 
1465 
1466 // SocketSendException
1467 /++
1468     Exception thrown when a socket send action returned [std.socket.Socket.ERROR|Socket.ERROR].
1469  +/
1470 final class SocketSendException : Exception
1471 {
1472     /// Passthrough constructor.
1473     this(
1474         const string msg,
1475         const string file = __FILE__,
1476         const size_t line = __LINE__,
1477         Throwable nextInChain = null) pure nothrow @nogc @safe
1478     {
1479         super(msg, file, line, nextInChain);
1480     }
1481 }