1 /++ 2 Structures and functions related to concurrency message passing, threads and 3 [core.thread.fiber.Fiber|Fiber]s. 4 5 Example: 6 --- 7 import std.concurrency; 8 9 mainThread.send(ThreadMessage.sendline("Message to send to server")); 10 mainThread.send(ThreadMessage.pong("irc.libera.chat")); 11 mainThread.send(OutputRequest(ThreadMessage.TerminalOutput.writeln, "writeln this for me please")); 12 mainThread.send(ThreadMessage.busMessage("header", boxed("payload"))); 13 14 auto fiber = new CarryingFiber!string(&someDelegate, BufferSize.fiberStack); 15 fiber.payload = "This string is carried by the Fiber and can be accessed from within it"; 16 fiber.call(); 17 fiber.payload = "You can change it in between calls to pass information to it"; 18 fiber.call(); 19 20 // As such we can make Fibers act like they're taking new arguments each call 21 auto fiber2 = new CarryingFiber!IRCEvent(&otherDelegate, BufferSize.fiberStack); 22 fiber2.payload = newIncomingIRCEvent; 23 fiber2.call(); 24 // [...] 25 fiber2.payload = evenNewerIncomingIRCEvent; 26 fiber2.call(); 27 --- 28 29 See_Also: 30 [kameloso.messaging] 31 32 Copyright: [JR](https://github.com/zorael) 33 License: [Boost Software License 1.0](https://www.boost.org/users/license.html) 34 35 Authors: 36 [JR](https://github.com/zorael) 37 +/ 38 module kameloso.thread; 39 40 private: 41 42 import core.thread : Fiber; 43 44 public: 45 46 47 // ScheduledFiber 48 /++ 49 A [core.thread.fiber.Fiber|Fiber] paired with a `long` UNIX timestamp. 50 51 If we bundle the two together like this, we can associate a point in time 52 with a [core.thread.fiber.Fiber|Fiber] without having to to use an associative 53 array (with UNIX timestamp keys). 54 55 Example: 56 --- 57 import std.datetime.systime : Clock; 58 import core.thread : Fiber; 59 60 void dg() { /* ... */ } 61 62 auto scheduledFiber = ScheduledFiber(new Fiber(&dg, BufferSize.fiberStack), 63 Clock.currTime.stdTime + 10 * 10_000_000); // ten seconds in hnsecs 64 --- 65 +/ 66 struct ScheduledFiber 67 { 68 /// Fiber to trigger at the point in time [timestamp]. 69 Fiber fiber; 70 71 /// When [fiber] is scheduled to be called, in hnsecs from midnight Jan 1st 1970. 72 long timestamp; 73 } 74 75 76 // ScheduledDelegate 77 /++ 78 A delegate paired with a `long` UNIX timestamp. 79 80 If we bundle the two together like this, we can associate a point in time 81 with a delegate without having to to use an associative array (with UNIX 82 timestamp keys). 83 84 Example: 85 --- 86 import std.datetime.systime : Clock; 87 88 void dg() { /* ... */ } 89 90 auto scheduledDg = ScheduledDelegate(&dg, Clock.currTime.stdTime + 10 * 10_000_000); 91 --- 92 +/ 93 struct ScheduledDelegate 94 { 95 /// Delegate to trigger at the point in time [timestamp]. 96 void delegate() dg; 97 98 /// When [dg] is scheduled to be called, in hnsecs from midnight Jan 1st 1970. 99 long timestamp; 100 } 101 102 103 version(Posix) 104 { 105 private import core.sys.posix.pthread : pthread_t; 106 107 // pthread_setname_np 108 /++ 109 Prototype to allow linking to `pthread`'s function for naming threads. 110 +/ 111 extern(C) private int pthread_setname_np(pthread_t, const char*); 112 113 114 // setThreadName 115 /++ 116 Sets the thread name of the current thread, so they will show up named 117 in process managers (like `top`). 118 119 Params: 120 name = String name to assign to the current thread. 121 +/ 122 void setThreadName(const string name) 123 { 124 import std.string : toStringz; 125 import core.thread : Thread; 126 127 cast(void)pthread_setname_np(Thread.getThis().id, name.toStringz); 128 } 129 } 130 131 132 // ThreadMessage 133 /++ 134 Collection of static functions used to construct thread messages, for passing 135 information of different kinds yet still as one type, to stop [std.concurrency.send] 136 from requiring so much compilation memory. 137 138 The type of the message is defined as a [ThreadMessage.Type|Type] in 139 [ThreadMessage.type]. Recipients will have to do a (final) switch over that 140 enum to deal with messages accordingly. 141 +/ 142 struct ThreadMessage 143 { 144 /++ 145 Different thread message types. 146 +/ 147 enum Type 148 { 149 /++ 150 Request to send a server [dialect.defs.IRCEvent.Type.PONG|PONG] response. 151 +/ 152 pong, 153 154 /++ 155 Request to send a server [dialect.defs.IRCEvent.Type.PING|PING] query. 156 +/ 157 ping, 158 159 /++ 160 Request to send an outgoing normal line. 161 +/ 162 sendline, 163 164 /++ 165 Request to send a quiet normal line. 166 +/ 167 quietline, 168 169 /++ 170 Request to send a line immediately, bypassing queues. 171 +/ 172 immediateline, 173 174 /++ 175 Request to quit the program. 176 +/ 177 quit, 178 179 /++ 180 Request to teardown (destroy) a plugin. 181 +/ 182 teardown, 183 184 /++ 185 Request to save configuration to file. 186 +/ 187 save, 188 189 /++ 190 Request to reload resources from disk. 191 +/ 192 reload, 193 194 /++ 195 Request to disconnect and reconnect to the server. 196 +/ 197 reconnect, 198 199 /++ 200 A bus message. 201 +/ 202 busMessage, 203 204 /++ 205 Request to print a connection summary to the local terminal. 206 +/ 207 wantLiveSummary, 208 209 /++ 210 Request to abort and exit the program. 211 +/ 212 abort, 213 214 /++ 215 Request to lower receive timeout briefly and improve 216 responsiveness/precision during that time. 217 +/ 218 shortenReceiveTimeout, 219 220 /++ 221 Removes an entry from the custom settings array popualted at program 222 start with the `--set` parameter. 223 +/ 224 popCustomSetting, 225 226 /++ 227 Request to put an [dialect.defs.IRCUser|IRCUser] into each plugin's (and service's) 228 [kameloso.plugins.common.core.IRCPluginState.users|IRCPluginState.users] 229 associative array. 230 +/ 231 putUser, 232 } 233 234 /++ 235 The [Type] of this thread message. 236 +/ 237 Type type; 238 239 /++ 240 String content body of message, where applicable. 241 +/ 242 string content; 243 244 /++ 245 Bundled `shared` [Sendable] payload, where applicable. 246 +/ 247 shared Sendable payload; 248 249 /++ 250 Whether or not the action requested should be done quietly. 251 +/ 252 bool quiet; 253 254 /++ 255 An `opDispatch`, constructing one function for each member in [Type]. 256 257 What the parameters functionally do is contextual to each [Type]. 258 259 Params: 260 memberstring = String name of a member of [Type]. 261 content = Optional content string. 262 payload = Optional boxed [Sendable] payloda. 263 quiet = Whether or not to pass a flag for the action to be done quietly. 264 265 Returns: 266 A [ThreadMessage] whose members have the passed values. 267 +/ 268 static auto opDispatch(string memberstring) 269 (const string content = string.init, 270 shared Sendable payload = null, 271 const bool quiet = false) 272 { 273 mixin("return ThreadMessage(Type." ~ memberstring ~ ", content, payload, quiet);"); 274 } 275 } 276 277 278 // OutputRequest 279 /++ 280 Embodies the notion of a request to output something to the local terminal. 281 282 Merely bundles a [OutputRequest.Level|Level] log level and 283 a `string` message line. What log level is picked decides what log level is 284 passed to the [kameloso.logger.KamelosoLogger|KamelosoLogger] instance, and 285 dictates things like what colour to tint the message with (if any). 286 +/ 287 struct OutputRequest 288 { 289 /++ 290 Output log levels. 291 292 See_Also: 293 [kameloso.logger.LogLevel] 294 +/ 295 enum Level 296 { 297 writeln, /// writeln the line. 298 trace, /// Log at [kameloso.logger.LogLevel.trace]. 299 log, /// Log at [kameloso.logger.LogLevel.all] (log). 300 info, /// Log at [kameloso.logger.LogLevel.info]. 301 warning, /// Log at [kameloso.logger.LogLevel.warning]. 302 error, /// Log at [kameloso.logger.LogLevel.error]. 303 critical, /// Log at [kameloso.logger.LogLevel.critical]. 304 fatal, /// Log at [kameloso.logger.LogLevel.fatal]. 305 } 306 307 /++ 308 Log level of the message. 309 +/ 310 Level logLevel; 311 312 /++ 313 String line to request to be output to the local terminal. 314 +/ 315 string line; 316 } 317 318 319 // Sendable 320 /++ 321 Interface for a message sendable through the message bus. 322 +/ 323 interface Sendable {} 324 325 326 // Boxed 327 /++ 328 A payload of type `T` wrapped in a class implementing the [Sendable] interface. 329 Used to box values for sending via the message bus. 330 331 Params: 332 T = Type to embed into the [Boxed] as the type of the payload. 333 +/ 334 final class Boxed(T) : Sendable 335 { 336 /// Payload value embedded in this message. 337 T payload; 338 339 /++ 340 Constructor that adds a passed payload to the internal stored [payload], 341 creating a *shared* `Boxed`. 342 +/ 343 auto this(T payload) shared 344 { 345 this.payload = cast(shared)payload; 346 } 347 } 348 349 350 // BusMessage 351 /++ 352 Deprecated alias to [Boxed]. 353 +/ 354 deprecated("Use `Boxed!T` instead") 355 alias BusMessage = Boxed; 356 357 358 // boxed 359 /++ 360 Constructor function to create a `shared` [Boxed] with an unqualified 361 template type. 362 363 Example: 364 --- 365 IRCEvent event; // ... 366 mainThread.send(ThreadMessage.busMessage("header", boxed(event))); 367 mainThread.send(ThreadMessage.busMessage("other header", boxed("text payload"))); 368 mainThread.send(ThreadMessage.busMessage("ladida", boxed(42))); 369 --- 370 371 Params: 372 payload = Payload whose type to instantiate the [Boxed] with, and 373 then assign to its internal `payload`. 374 375 Returns: 376 A `shared` [Boxed]!T` where `T` is the unqualified type of the payload. 377 +/ 378 shared(Sendable) boxed(T)(T payload) 379 { 380 import std.traits : Unqual; 381 return new shared Boxed!(Unqual!T)(payload); 382 } 383 384 385 // sendable 386 /++ 387 Deprecated alias to [boxed]. 388 +/ 389 deprecated("Use `boxed` instead") 390 alias sendable = boxed; 391 392 /// 393 unittest 394 { 395 { 396 auto msg = boxed("asdf"); 397 auto asCast = cast(Boxed!string)msg; 398 assert((msg !is null), "Incorrectly cast message: " ~ typeof(asCast).stringof); 399 asCast = null; // silence dscanner 400 } 401 { 402 auto msg = boxed(123_456); 403 auto asCast = cast(Boxed!int)msg; 404 assert((msg !is null), "Incorrectly cast message: " ~ typeof(asCast).stringof); 405 asCast = null; // silence dscanner 406 } 407 { 408 struct Foo {} 409 auto msg = boxed(Foo()); 410 auto asCast = cast(Boxed!Foo)msg; 411 assert((msg !is null), "Incorrectly cast message: " ~ typeof(asCast).stringof); 412 asCast = null; // silence dscanner 413 } 414 } 415 416 417 // CarryingFiber 418 /++ 419 A [core.thread.fiber.Fiber|Fiber] carrying a payload of type `T`. 420 421 Used interchangeably with [core.thread.fiber.Fiber|Fiber], but allows for 422 casting to true `CarryingFiber!T`-ness to access the `payload` member. 423 424 Example: 425 --- 426 void dg() 427 { 428 CarryingFiber!bool fiber = cast(CarryingFiber!bool)(Fiber.getThis); 429 assert(fiber !is null); // Correct cast 430 431 assert(fiber.payload); 432 Fiber.yield(); 433 assert(!fiber.payload); 434 } 435 436 auto fiber = new CarryingFiber!bool(true, &dg, BufferSize.fiberStack); 437 fiber.call(); 438 fiber.payload = false; 439 fiber.call(); 440 --- 441 442 Params: 443 T = Type to embed into the class as the type of [CarryingFiber.payload]. 444 +/ 445 final class CarryingFiber(T) : Fiber 446 { 447 /++ 448 Embedded payload value in this Fiber; what distinguishes it from plain `Fiber`s. 449 +/ 450 T payload; 451 452 /++ 453 Constructor function merely taking a function/delegate pointer, to call 454 when invoking this Fiber (via `.call()`). 455 +/ 456 this(Fn, Args...)(Fn fn, Args args) 457 { 458 // fn is a pointer 459 super(fn, args); 460 } 461 462 /++ 463 Constructor function taking a `T` `payload` to assign to its own 464 internal `this.payload`, as well as a function/delegate pointer to call 465 when invoking this Fiber (via `.call()`). 466 +/ 467 this(Fn, Args...)(T payload, Fn fn, Args args) 468 { 469 this.payload = payload; 470 // fn is a pointer 471 super(fn, args); 472 } 473 474 /++ 475 Resets the payload to its initial value. 476 +/ 477 void resetPayload() 478 { 479 payload = T.init; 480 } 481 } 482 483 484 private import core.time : Duration; 485 486 // interruptibleSleep 487 /++ 488 Sleep in small periods, checking the passed `abort` bool in between to see 489 if we should break and return. 490 491 This is useful when a different signal handler has been set up, as triggering 492 it won't break sleeps. This way it does, assuming the `abort` bool is the 493 same one the signal handler monitors. As such, take it by `ref`. 494 495 Example: 496 --- 497 interruptibleSleep(1.seconds, abort); 498 --- 499 500 Params: 501 dur = Duration to sleep for. 502 abort = Reference to the bool flag which, if set, means we should 503 interrupt and return early. 504 +/ 505 void interruptibleSleep(const Duration dur, const ref bool abort) @system 506 { 507 import core.thread : Thread, msecs; 508 509 static immutable step = 100.msecs; 510 static immutable nothing = 0.msecs; 511 512 Duration left = dur; 513 514 while (left > nothing) 515 { 516 if (abort) return; 517 518 immutable nextStep = (left > step) ? step : left; 519 520 if (nextStep <= nothing) break; 521 522 Thread.sleep(nextStep); 523 left -= step; 524 } 525 } 526 527 528 // exhaustMessages 529 /++ 530 Exhausts the concurrency message mailbox. 531 532 This is done between connection attempts to get a fresh start. 533 +/ 534 void exhaustMessages() 535 { 536 import std.concurrency : receiveTimeout, thisTid; 537 import std.variant : Variant; 538 import core.time : msecs; 539 540 // core.exception.AssertError@std/concurrency.d(910): Cannot receive a message 541 // until a thread was spawned or thisTid was passed to a running thread. 542 cast(void)thisTid; 543 544 bool receivedSomething; 545 static immutable almostInstant = 10.msecs; 546 547 do 548 { 549 receivedSomething = receiveTimeout(almostInstant, 550 (Variant _) scope {} 551 ); 552 } 553 while (receivedSomething); 554 } 555 556 /// 557 unittest 558 { 559 import std.concurrency : receiveTimeout, send, thisTid; 560 import std.variant : Variant; 561 import core.time : Duration; 562 563 foreach (immutable i; 0..10) 564 { 565 thisTid.send(i); 566 } 567 568 exhaustMessages(); 569 570 immutable receivedSomething = receiveTimeout(Duration.zero, 571 (Variant _) {}, 572 ); 573 574 assert(!receivedSomething); 575 }