1 /++
2     Structures and functions related to concurrency message passing, threads and
3     [core.thread.fiber.Fiber|Fiber]s.
4 
5     Example:
6     ---
7     import std.concurrency;
8 
9     mainThread.send(ThreadMessage.sendline("Message to send to server"));
10     mainThread.send(ThreadMessage.pong("irc.libera.chat"));
11     mainThread.send(OutputRequest(ThreadMessage.TerminalOutput.writeln, "writeln this for me please"));
12     mainThread.send(ThreadMessage.busMessage("header", boxed("payload")));
13 
14     auto fiber = new CarryingFiber!string(&someDelegate, BufferSize.fiberStack);
15     fiber.payload = "This string is carried by the Fiber and can be accessed from within it";
16     fiber.call();
17     fiber.payload = "You can change it in between calls to pass information to it";
18     fiber.call();
19 
20     // As such we can make Fibers act like they're taking new arguments each call
21     auto fiber2 = new CarryingFiber!IRCEvent(&otherDelegate, BufferSize.fiberStack);
22     fiber2.payload = newIncomingIRCEvent;
23     fiber2.call();
24     // [...]
25     fiber2.payload = evenNewerIncomingIRCEvent;
26     fiber2.call();
27     ---
28 
29     See_Also:
30         [kameloso.messaging]
31 
32     Copyright: [JR](https://github.com/zorael)
33     License: [Boost Software License 1.0](https://www.boost.org/users/license.html)
34 
35     Authors:
36         [JR](https://github.com/zorael)
37  +/
38 module kameloso.thread;
39 
40 private:
41 
42 import core.thread : Fiber;
43 
44 public:
45 
46 
47 // ScheduledFiber
48 /++
49     A [core.thread.fiber.Fiber|Fiber] paired with a `long` UNIX timestamp.
50 
51     If we bundle the two together like this, we can associate a point in time
52     with a [core.thread.fiber.Fiber|Fiber] without having to to use an associative
53     array (with UNIX timestamp keys).
54 
55     Example:
56     ---
57     import std.datetime.systime : Clock;
58     import core.thread : Fiber;
59 
60     void dg() { /* ... */ }
61 
62     auto scheduledFiber = ScheduledFiber(new Fiber(&dg, BufferSize.fiberStack),
63         Clock.currTime.stdTime + 10 * 10_000_000);  // ten seconds in hnsecs
64     ---
65  +/
66 struct ScheduledFiber
67 {
68     /// Fiber to trigger at the point in time [timestamp].
69     Fiber fiber;
70 
71     /// When [fiber] is scheduled to be called, in hnsecs from midnight Jan 1st 1970.
72     long timestamp;
73 }
74 
75 
76 // ScheduledDelegate
77 /++
78     A delegate paired with a `long` UNIX timestamp.
79 
80     If we bundle the two together like this, we can associate a point in time
81     with a delegate without having to to use an associative array (with UNIX
82     timestamp keys).
83 
84     Example:
85     ---
86     import std.datetime.systime : Clock;
87 
88     void dg() { /* ... */ }
89 
90     auto scheduledDg = ScheduledDelegate(&dg, Clock.currTime.stdTime + 10 * 10_000_000);
91     ---
92  +/
93 struct ScheduledDelegate
94 {
95     /// Delegate to trigger at the point in time [timestamp].
96     void delegate() dg;
97 
98     /// When [dg] is scheduled to be called, in hnsecs from midnight Jan 1st 1970.
99     long timestamp;
100 }
101 
102 
103 version(Posix)
104 {
105     private import core.sys.posix.pthread : pthread_t;
106 
107     // pthread_setname_np
108     /++
109         Prototype to allow linking to `pthread`'s function for naming threads.
110      +/
111     extern(C) private int pthread_setname_np(pthread_t, const char*);
112 
113 
114     // setThreadName
115     /++
116         Sets the thread name of the current thread, so they will show up named
117         in process managers (like `top`).
118 
119         Params:
120             name = String name to assign to the current thread.
121      +/
122     void setThreadName(const string name)
123     {
124         import std.string : toStringz;
125         import core.thread : Thread;
126 
127         cast(void)pthread_setname_np(Thread.getThis().id, name.toStringz);
128     }
129 }
130 
131 
132 // ThreadMessage
133 /++
134     Collection of static functions used to construct thread messages, for passing
135     information of different kinds yet still as one type, to stop [std.concurrency.send]
136     from requiring so much compilation memory.
137 
138     The type of the message is defined as a [ThreadMessage.Type|Type] in
139     [ThreadMessage.type]. Recipients will have to do a (final) switch over that
140     enum to deal with messages accordingly.
141  +/
142 struct ThreadMessage
143 {
144     /++
145         Different thread message types.
146      +/
147     enum Type
148     {
149         /++
150             Request to send a server [dialect.defs.IRCEvent.Type.PONG|PONG] response.
151          +/
152         pong,
153 
154         /++
155             Request to send a server [dialect.defs.IRCEvent.Type.PING|PING] query.
156          +/
157         ping,
158 
159         /++
160             Request to send an outgoing normal line.
161          +/
162         sendline,
163 
164         /++
165             Request to send a quiet normal line.
166          +/
167         quietline,
168 
169         /++
170             Request to send a line immediately, bypassing queues.
171          +/
172         immediateline,
173 
174         /++
175             Request to quit the program.
176          +/
177         quit,
178 
179         /++
180             Request to teardown (destroy) a plugin.
181          +/
182         teardown,
183 
184         /++
185             Request to save configuration to file.
186          +/
187         save,
188 
189         /++
190             Request to reload resources from disk.
191          +/
192         reload,
193 
194         /++
195             Request to disconnect and reconnect to the server.
196          +/
197         reconnect,
198 
199         /++
200             A bus message.
201          +/
202         busMessage,
203 
204         /++
205             Request to print a connection summary to the local terminal.
206          +/
207         wantLiveSummary,
208 
209         /++
210             Request to abort and exit the program.
211          +/
212         abort,
213 
214         /++
215             Request to lower receive timeout briefly and improve
216             responsiveness/precision during that time.
217          +/
218         shortenReceiveTimeout,
219 
220         /++
221             Removes an entry from the custom settings array popualted at program
222             start with the `--set` parameter.
223          +/
224         popCustomSetting,
225 
226         /++
227             Request to put an [dialect.defs.IRCUser|IRCUser] into each plugin's (and service's)
228             [kameloso.plugins.common.core.IRCPluginState.users|IRCPluginState.users]
229             associative array.
230          +/
231         putUser,
232     }
233 
234     /++
235         The [Type] of this thread message.
236      +/
237     Type type;
238 
239     /++
240         String content body of message, where applicable.
241      +/
242     string content;
243 
244     /++
245         Bundled `shared` [Sendable] payload, where applicable.
246      +/
247     shared Sendable payload;
248 
249     /++
250         Whether or not the action requested should be done quietly.
251      +/
252     bool quiet;
253 
254     /++
255         An `opDispatch`, constructing one function for each member in [Type].
256 
257         What the parameters functionally do is contextual to each [Type].
258 
259         Params:
260             memberstring = String name of a member of [Type].
261             content = Optional content string.
262             payload = Optional boxed [Sendable] payloda.
263             quiet = Whether or not to pass a flag for the action to be done quietly.
264 
265         Returns:
266             A [ThreadMessage] whose members have the passed values.
267      +/
268     static auto opDispatch(string memberstring)
269         (const string content = string.init,
270         shared Sendable payload = null,
271         const bool quiet = false)
272     {
273         mixin("return ThreadMessage(Type." ~ memberstring ~ ", content, payload, quiet);");
274     }
275 }
276 
277 
278 // OutputRequest
279 /++
280     Embodies the notion of a request to output something to the local terminal.
281 
282     Merely bundles a [OutputRequest.Level|Level] log level and
283     a `string` message line. What log level is picked decides what log level is
284     passed to the [kameloso.logger.KamelosoLogger|KamelosoLogger] instance, and
285     dictates things like what colour to tint the message with (if any).
286  +/
287 struct OutputRequest
288 {
289     /++
290         Output log levels.
291 
292         See_Also:
293             [kameloso.logger.LogLevel]
294      +/
295     enum Level
296     {
297         writeln,    /// writeln the line.
298         trace,      /// Log at [kameloso.logger.LogLevel.trace].
299         log,        /// Log at [kameloso.logger.LogLevel.all] (log).
300         info,       /// Log at [kameloso.logger.LogLevel.info].
301         warning,    /// Log at [kameloso.logger.LogLevel.warning].
302         error,      /// Log at [kameloso.logger.LogLevel.error].
303         critical,   /// Log at [kameloso.logger.LogLevel.critical].
304         fatal,      /// Log at [kameloso.logger.LogLevel.fatal].
305     }
306 
307     /++
308         Log level of the message.
309      +/
310     Level logLevel;
311 
312     /++
313         String line to request to be output to the local terminal.
314      +/
315     string line;
316 }
317 
318 
319 // Sendable
320 /++
321     Interface for a message sendable through the message bus.
322  +/
323 interface Sendable {}
324 
325 
326 // Boxed
327 /++
328     A payload of type `T` wrapped in a class implementing the [Sendable] interface.
329     Used to box values for sending via the message bus.
330 
331     Params:
332         T = Type to embed into the [Boxed] as the type of the payload.
333  +/
334 final class Boxed(T) : Sendable
335 {
336     /// Payload value embedded in this message.
337     T payload;
338 
339     /++
340         Constructor that adds a passed payload to the internal stored [payload],
341         creating a *shared* `Boxed`.
342      +/
343     auto this(T payload) shared
344     {
345         this.payload = cast(shared)payload;
346     }
347 }
348 
349 
350 // BusMessage
351 /++
352     Deprecated alias to [Boxed].
353  +/
354 deprecated("Use `Boxed!T` instead")
355 alias BusMessage = Boxed;
356 
357 
358 // boxed
359 /++
360     Constructor function to create a `shared` [Boxed] with an unqualified
361     template type.
362 
363     Example:
364     ---
365     IRCEvent event;  // ...
366     mainThread.send(ThreadMessage.busMessage("header", boxed(event)));
367     mainThread.send(ThreadMessage.busMessage("other header", boxed("text payload")));
368     mainThread.send(ThreadMessage.busMessage("ladida", boxed(42)));
369     ---
370 
371     Params:
372         payload = Payload whose type to instantiate the [Boxed] with, and
373             then assign to its internal `payload`.
374 
375     Returns:
376         A `shared` [Boxed]!T` where `T` is the unqualified type of the payload.
377  +/
378 shared(Sendable) boxed(T)(T payload)
379 {
380     import std.traits : Unqual;
381     return new shared Boxed!(Unqual!T)(payload);
382 }
383 
384 
385 // sendable
386 /++
387     Deprecated alias to [boxed].
388  +/
389 deprecated("Use `boxed` instead")
390 alias sendable = boxed;
391 
392 ///
393 unittest
394 {
395     {
396         auto msg = boxed("asdf");
397         auto asCast = cast(Boxed!string)msg;
398         assert((msg !is null), "Incorrectly cast message: " ~ typeof(asCast).stringof);
399         asCast = null;  // silence dscanner
400     }
401     {
402         auto msg = boxed(123_456);
403         auto asCast = cast(Boxed!int)msg;
404         assert((msg !is null), "Incorrectly cast message: " ~ typeof(asCast).stringof);
405         asCast = null;  // silence dscanner
406     }
407     {
408         struct Foo {}
409         auto msg = boxed(Foo());
410         auto asCast = cast(Boxed!Foo)msg;
411         assert((msg !is null), "Incorrectly cast message: " ~ typeof(asCast).stringof);
412         asCast = null;  // silence dscanner
413     }
414 }
415 
416 
417 // CarryingFiber
418 /++
419     A [core.thread.fiber.Fiber|Fiber] carrying a payload of type `T`.
420 
421     Used interchangeably with [core.thread.fiber.Fiber|Fiber], but allows for
422     casting to true `CarryingFiber!T`-ness to access the `payload` member.
423 
424     Example:
425     ---
426     void dg()
427     {
428         CarryingFiber!bool fiber = cast(CarryingFiber!bool)(Fiber.getThis);
429         assert(fiber !is null);  // Correct cast
430 
431         assert(fiber.payload);
432         Fiber.yield();
433         assert(!fiber.payload);
434     }
435 
436     auto fiber = new CarryingFiber!bool(true, &dg, BufferSize.fiberStack);
437     fiber.call();
438     fiber.payload = false;
439     fiber.call();
440     ---
441 
442     Params:
443         T = Type to embed into the class as the type of [CarryingFiber.payload].
444  +/
445 final class CarryingFiber(T) : Fiber
446 {
447     /++
448         Embedded payload value in this Fiber; what distinguishes it from plain `Fiber`s.
449      +/
450     T payload;
451 
452     /++
453         Constructor function merely taking a function/delegate pointer, to call
454         when invoking this Fiber (via `.call()`).
455      +/
456     this(Fn, Args...)(Fn fn, Args args)
457     {
458         // fn is a pointer
459         super(fn, args);
460     }
461 
462     /++
463         Constructor function taking a `T` `payload` to assign to its own
464         internal `this.payload`, as well as a function/delegate pointer to call
465         when invoking this Fiber (via `.call()`).
466      +/
467     this(Fn, Args...)(T payload, Fn fn, Args args)
468     {
469         this.payload = payload;
470         // fn is a pointer
471         super(fn, args);
472     }
473 
474     /++
475         Resets the payload to its initial value.
476      +/
477     void resetPayload()
478     {
479         payload = T.init;
480     }
481 }
482 
483 
484 private import core.time : Duration;
485 
486 // interruptibleSleep
487 /++
488     Sleep in small periods, checking the passed `abort` bool in between to see
489     if we should break and return.
490 
491     This is useful when a different signal handler has been set up, as triggering
492     it won't break sleeps. This way it does, assuming the `abort` bool is the
493     same one the signal handler monitors. As such, take it by `ref`.
494 
495     Example:
496     ---
497     interruptibleSleep(1.seconds, abort);
498     ---
499 
500     Params:
501         dur = Duration to sleep for.
502         abort = Reference to the bool flag which, if set, means we should
503             interrupt and return early.
504  +/
505 void interruptibleSleep(const Duration dur, const ref bool abort) @system
506 {
507     import core.thread : Thread, msecs;
508 
509     static immutable step = 100.msecs;
510     static immutable nothing = 0.msecs;
511 
512     Duration left = dur;
513 
514     while (left > nothing)
515     {
516         if (abort) return;
517 
518         immutable nextStep = (left > step) ? step : left;
519 
520         if (nextStep <= nothing) break;
521 
522         Thread.sleep(nextStep);
523         left -= step;
524     }
525 }
526 
527 
528 // exhaustMessages
529 /++
530     Exhausts the concurrency message mailbox.
531 
532     This is done between connection attempts to get a fresh start.
533  +/
534 void exhaustMessages()
535 {
536     import std.concurrency : receiveTimeout, thisTid;
537     import std.variant : Variant;
538     import core.time : msecs;
539 
540     // core.exception.AssertError@std/concurrency.d(910): Cannot receive a message
541     // until a thread was spawned or thisTid was passed to a running thread.
542     cast(void)thisTid;
543 
544     bool receivedSomething;
545     static immutable almostInstant = 10.msecs;
546 
547     do
548     {
549         receivedSomething = receiveTimeout(almostInstant,
550             (Variant _) scope {}
551         );
552     }
553     while (receivedSomething);
554 }
555 
556 ///
557 unittest
558 {
559     import std.concurrency : receiveTimeout, send, thisTid;
560     import std.variant : Variant;
561     import core.time : Duration;
562 
563     foreach (immutable i; 0..10)
564     {
565         thisTid.send(i);
566     }
567 
568     exhaustMessages();
569 
570     immutable receivedSomething = receiveTimeout(Duration.zero,
571         (Variant _) {},
572     );
573 
574     assert(!receivedSomething);
575 }