1 /++ 2 The Pipeline plugin opens a Posix named pipe in a temporary directory or 3 the current directory, to which you can pipe text and have it be sent 4 verbatim to the server. There is also syntax to manually send bus messages 5 to plugins. 6 7 It has no commands; it doesn't listen to [dialect.defs.IRCEvent|IRCEvent]s 8 at all, only to what is sent to it via the named FIFO pipe. 9 10 This requires version `Posix`, which is true for UNIX-like systems (like 11 Linux and macOS). 12 13 See_Also: 14 https://github.com/zorael/kameloso/wiki/Current-plugins#pipeline, 15 [kameloso.plugins.common.core], 16 [kameloso.plugins.common.misc] 17 18 Copyright: [JR](https://github.com/zorael) 19 License: [Boost Software License 1.0](https://www.boost.org/users/license.html) 20 21 Authors: 22 [JR](https://github.com/zorael) 23 +/ 24 module kameloso.plugins.pipeline; 25 26 version(Posix): 27 version(WithPipelinePlugin): 28 29 private: 30 31 import kameloso.plugins; 32 import kameloso.plugins.common.core; 33 import kameloso.common : logger; 34 import kameloso.messaging; 35 import dialect.defs; 36 import std.typecons : Flag, No, Yes; 37 38 39 /+ 40 For storage location of the FIFO it makes sense to default to /tmp; 41 Posix defines a variable `$TMPDIR`, which should take precedence. 42 However, this supposedly makes the file really difficult to access on macOS 43 where it translates to some really long, programmatically generated path. 44 macOS naturally does support /tmp though. So shrug and version it to 45 default-ignore `$TMPDIR` on macOS but obey it on other platforms. 46 +/ 47 //version = OSXTMPDIR; 48 49 50 // PipelineSettings 51 /++ 52 All settings for a [PipelinePlugin], aggregated. 53 +/ 54 @Settings struct PipelineSettings 55 { 56 private: 57 import lu.uda : Unserialisable; 58 59 public: 60 /// Whether or not the Pipeline plugin should do anything at all. 61 @Enabler bool enabled = true; 62 63 /++ 64 Whether or not to place the FIFO in the working directory. If false, it 65 will be saved in `/tmp` or wherever `$TMPDIR` points. If macOS, then there 66 only if version `OSXTMPDIR`. 67 +/ 68 bool fifoInWorkingDir = false; 69 70 /++ 71 Whether or not to always use a unique filename for the FIFO; if one exists 72 with the wanted name, simply append a number to make a new, unique one. 73 +/ 74 bool bumpFilenameIfItExists = true; 75 76 /// Custom, full path to use as FIFO filename, specified with --set pipeline.path. 77 @Unserialisable string path; 78 } 79 80 81 // pipereader 82 /++ 83 Reads a FIFO (named pipe) and relays lines received there to the main 84 thread, to send to the server. 85 86 It is to be run in a separate thread. 87 88 Params: 89 newState = The [kameloso.plugins.common.core.IRCPluginState|IRCPluginState] 90 of the original [PipelinePlugin], to provide the main thread's 91 [core.thread.Tid|Tid] for concurrency messages, made `shared` to 92 allow being sent between threads. 93 filename = String filename of the FIFO to read from. 94 +/ 95 void pipereader(shared IRCPluginState newState, const string filename) 96 in (filename.length, "Tried to set up a pipereader with an empty filename") 97 { 98 import kameloso.thread : ThreadMessage, boxed, setThreadName; 99 import std.concurrency : OwnerTerminated, receiveTimeout, send; 100 import std.format : format; 101 import std.stdio : File; 102 import std.variant : Variant; 103 import core.time : Duration; 104 static import kameloso.common; 105 106 // The whole module is version Posix, no need to encase here 107 setThreadName("pipeline"); 108 109 auto state = cast()newState; 110 111 // Set the global settings so messaging functions don't segfault us 112 kameloso.common.settings = &state.settings; 113 114 // Creating the File struct blocks, so do it after reporting. 115 enum pattern = "Pipe text to the <i>%s</> file to send raw commands to the server."; 116 state.askToLog(pattern.format(filename)); 117 118 File fifo = File(filename, "r"); 119 120 static void tryRemove(const string filename) 121 { 122 import std.file : exists, remove; 123 124 if (filename.exists) 125 { 126 try 127 { 128 remove(filename); 129 } 130 catch (Exception _) 131 { 132 // Race, ignore 133 } 134 } 135 } 136 137 scope(exit) tryRemove(filename); 138 139 while (true) 140 { 141 // foreach but always break after processing one line, to be responsive 142 // and retaining the ability to break out of it. 143 foreach (immutable line; fifo.byLineCopy) 144 { 145 import std.algorithm.searching : startsWith; 146 import std.uni : asLowerCase; 147 148 if (!line.length) break; 149 150 if (line[0] == ':') 151 { 152 import kameloso.thread : boxed; 153 import lu.string : contains, nom; 154 155 if (line.contains(' ')) 156 { 157 string slice = line[1..$]; 158 immutable header = slice.nom(' '); 159 state.mainThread.send(ThreadMessage.busMessage(header, boxed(slice))); 160 } 161 else 162 { 163 state.mainThread.send(ThreadMessage.busMessage(line[1..$])); 164 } 165 break; 166 } 167 else if (line.asLowerCase.startsWith("quit")) 168 { 169 if ((line.length > 6) && (line[4..6] == " :")) 170 { 171 quit(state, line[6..$]); 172 } 173 else 174 { 175 quit(state); 176 } 177 return; 178 } 179 else 180 { 181 immutable slice = (line[0] == ' ') ? line[1..$] : line; 182 raw(state, slice); 183 } 184 break; 185 } 186 187 bool halt; 188 189 void checkMessages() 190 { 191 cast(void)receiveTimeout(Duration.zero, 192 (ThreadMessage message) 193 { 194 if (message.type == ThreadMessage.Type.teardown) 195 { 196 halt = true; 197 } 198 }, 199 (OwnerTerminated _) 200 { 201 halt = true; 202 }, 203 (Variant v) 204 { 205 enum variantPattern = "Pipeline plugin received Variant: <l>%s"; 206 state.askToError(variantPattern.format(v.toString)); 207 state.mainThread.send(ThreadMessage.busMessage("pipeline", boxed("halted"))); 208 halt = true; 209 } 210 ); 211 } 212 213 checkMessages(); 214 if (halt) return; 215 216 import std.exception : ErrnoException; 217 218 try 219 { 220 fifo.reopen(filename); 221 } 222 catch (ErrnoException e) 223 { 224 checkMessages(); 225 if (halt) return; 226 227 enum fifoPattern = "Pipeline plugin failed to reopen FIFO: <l>%s"; 228 state.askToError(fifoPattern.format(e.msg)); 229 version(PrintStacktraces) state.askToTrace(e.info.toString); 230 state.mainThread.send(ThreadMessage.busMessage("pipeline", boxed("halted"))); 231 return; 232 } 233 catch (Exception e) 234 { 235 checkMessages(); 236 if (halt) return; 237 238 state.askToError("Pipeline plugin saw unexpected exception"); 239 version(PrintStacktraces) state.askToTrace(e.toString); 240 return; 241 } 242 } 243 } 244 245 246 // createFIFO 247 /++ 248 Creates a FIFO (named pipe) in the filesystem. 249 250 It will be named a passed filename. 251 252 Params: 253 filename = String filename of FIFO to create. 254 255 Throws: 256 [lu.common.ReturnValueException|ReturnValueException] if the FIFO 257 could not be created. 258 259 [lu.common.FileExistsException|FileExistsException] if a FIFO with 260 the same filename already exists, suggesting concurrent conflicting 261 instances of the program (or merely a zombie FIFO after a crash). 262 263 [lu.common.FileTypeMismatchException|FileTypeMismatchException] if a file or directory 264 exists with the same name as the FIFO we want to create. 265 +/ 266 void createFIFO(const string filename) 267 in (filename.length, "Tried to create a FIFO with an empty filename") 268 { 269 import lu.common : FileExistsException, FileTypeMismatchException, ReturnValueException; 270 import std.file : exists; 271 272 if (!filename.exists) 273 { 274 import std.process : execute; 275 276 immutable mkfifo = execute([ "mkfifo", filename ]); 277 278 if (mkfifo.status != 0) 279 { 280 enum message = "Could not create FIFO"; 281 throw new ReturnValueException( 282 message, 283 "mkfifo", 284 mkfifo.status); 285 } 286 } 287 else 288 { 289 import std.file : getAttributes; 290 import core.sys.posix.sys.stat : S_ISFIFO; 291 292 immutable attrs = cast(ushort)getAttributes(filename); 293 294 if (S_ISFIFO(attrs)) 295 { 296 enum message = "A FIFO with that name already exists"; 297 throw new FileExistsException( 298 message, 299 filename, 300 __FILE__, 301 __LINE__); 302 } 303 else 304 { 305 enum message = "Wanted to create a FIFO but a file or directory " ~ 306 "with the desired name already exists"; 307 throw new FileTypeMismatchException( 308 message, 309 filename, 310 attrs, 311 __FILE__, 312 __LINE__); 313 } 314 } 315 } 316 317 318 // onWelcome 319 /++ 320 Initialises the fifo pipe and thus the purpose of the plugin, by leveraging [initPipe]. 321 +/ 322 @(IRCEventHandler() 323 .onEvent(IRCEvent.Type.RPL_WELCOME) 324 ) 325 void onWelcome(PipelinePlugin plugin) 326 { 327 initPipe(plugin); 328 } 329 330 331 // reload 332 /++ 333 Reloads the plugin, initialising the fifo pipe if it was not already initialised. 334 335 This lets us remedy the "A FIFO with that name already exists" error. 336 +/ 337 void reload(PipelinePlugin plugin) 338 { 339 if (!plugin.workerRunning) 340 { 341 initPipe(plugin); 342 } 343 } 344 345 346 // initPipe 347 /++ 348 Spawns the pipereader thread. 349 350 Snapshots the filename to use, as we base it on the bot's nickname, which 351 may change during the connection's lifetime. 352 353 Params: 354 plugin = The current [PipelinePlugin]. 355 +/ 356 void initPipe(PipelinePlugin plugin) 357 in (!plugin.workerRunning, "Tried to double-initialise the pipereader") 358 { 359 if (plugin.pipelineSettings.path.length) 360 { 361 // Custom filename specified with --set pipeline.path=xyz 362 plugin.fifoFilename = plugin.pipelineSettings.path; 363 } 364 else 365 { 366 import std.conv : text; 367 368 // Save the filename *once* so it persists across nick changes. 369 // If !fifoInWorkingDir then in /tmp or $TMPDIR 370 plugin.fifoFilename = text(plugin.state.client.nickname, '@', plugin.state.server.address); 371 372 if (!plugin.pipelineSettings.fifoInWorkingDir) 373 { 374 // See notes at the top of module. 375 version(OSX) 376 { 377 version(OSXTMPDIR) 378 { 379 enum useTMPDIR = true; 380 } 381 else 382 { 383 enum useTMPDIR = false; 384 } 385 } 386 else // Implicitly not Windows since Posix-only plugin 387 { 388 enum useTMPDIR = true; 389 } 390 391 static if (useTMPDIR) 392 { 393 import std.process : environment; 394 immutable tempdir = environment.get("TMPDIR", "/tmp"); 395 } 396 else 397 { 398 enum tempdir = "/tmp"; 399 } 400 401 import std.path : buildNormalizedPath; 402 plugin.fifoFilename = buildNormalizedPath(tempdir, plugin.fifoFilename); 403 } 404 } 405 406 import std.file : exists; 407 408 if (plugin.pipelineSettings.bumpFilenameIfItExists && plugin.fifoFilename.exists) 409 { 410 import std.string : succ; 411 412 plugin.fifoFilename ~= "-1"; 413 414 while (plugin.fifoFilename.exists) 415 { 416 plugin.fifoFilename = plugin.fifoFilename.succ; 417 418 if (plugin.fifoFilename[$-2..$] == "-0") 419 { 420 plugin.fifoFilename = plugin.fifoFilename[0..$-2] ~ "10"; 421 } 422 else if (plugin.fifoFilename[$-3..$] == "-99") 423 { 424 // Don't infinitely loop, should realistically never happen though 425 break; 426 } 427 } 428 } 429 430 import lu.common : FileExistsException, FileTypeMismatchException, ReturnValueException; 431 432 try 433 { 434 import std.concurrency : spawn; 435 436 createFIFO(plugin.fifoFilename); 437 plugin.fifoThread = spawn(&pipereader, cast(shared)plugin.state, plugin.fifoFilename); 438 plugin.workerRunning = true; 439 } 440 catch (ReturnValueException e) 441 { 442 enum pattern = "Failed to initialise the Pipeline plugin: <l>%s</> (<l>%s</> returned <l>%d</>)"; 443 logger.warningf(pattern, e.msg, e.command, e.retval); 444 //version(PrintStacktraces) logger.trace(e.info); 445 } 446 catch (FileExistsException e) 447 { 448 enum pattern = "Failed to initialise the Pipeline plugin: <l>%s</> [<l>%s</>]"; 449 logger.warningf(pattern, e.msg, e.filename); 450 //version(PrintStacktraces) logger.trace(e.info); 451 } 452 catch (FileTypeMismatchException e) 453 { 454 enum pattern = "Failed to initialise the Pipeline plugin: <l>%s</> [<l>%s</>]"; 455 logger.warningf(pattern, e.msg, e.filename); 456 //version(PrintStacktraces) logger.trace(e.info); 457 } 458 459 // Let other Exceptions pass 460 } 461 462 463 // teardown 464 /++ 465 De-initialises the Pipeline plugin. Shuts down the pipereader thread. 466 +/ 467 void teardown(PipelinePlugin plugin) 468 { 469 import kameloso.thread : ThreadMessage; 470 import std.concurrency : send; 471 import std.file : exists, isDir; 472 import std.stdio : File; 473 474 if (!plugin.workerRunning) return; 475 476 plugin.fifoThread.send(ThreadMessage.teardown()); 477 478 if (plugin.fifoFilename.exists && !plugin.fifoFilename.isDir) 479 { 480 // Tell the reader of the pipe to exit 481 auto fifo = File(plugin.fifoFilename, "w"); 482 fifo.writeln(); 483 } 484 } 485 486 487 import kameloso.thread : Sendable; 488 489 // onBusMessage 490 /++ 491 Receives a passed [kameloso.thread.BusMessage|BusMessage] with the "`pipeline`" header, 492 and performs actions based on the payload message. 493 494 This is used to let the worker thread signal the main context that it halted. 495 496 Params: 497 plugin = The current [PipelinePlugin]. 498 header = String header describing the passed content payload. 499 content = Message content. 500 +/ 501 void onBusMessage(PipelinePlugin plugin, const string header, shared Sendable content) 502 { 503 if (!plugin.isEnabled) return; 504 if (header != "pipeline") return; 505 506 import kameloso.thread : Boxed; 507 508 auto message = cast(Boxed!string)content; 509 assert(message, "Incorrectly cast message: " ~ typeof(message).stringof); 510 511 if (message.payload == "halted") 512 { 513 plugin.workerRunning = false; 514 } 515 else 516 { 517 logger.error("[pipeline] Unimplemented bus message verb: <i>", message.payload); 518 } 519 } 520 521 522 mixin PluginRegistration!PipelinePlugin; 523 524 public: 525 526 527 // PipelinePlugin 528 /++ 529 The Pipeline plugin reads from a local named pipe (FIFO) for messages to 530 send to the server, as well as to live-control the bot to a certain degree. 531 +/ 532 final class PipelinePlugin : IRCPlugin 533 { 534 private: 535 import std.concurrency : Tid; 536 537 /// All Pipeline settings gathered. 538 PipelineSettings pipelineSettings; 539 540 /// Thread ID of the thread reading the named pipe. 541 Tid fifoThread; 542 543 /// Filename of the created FIFO. 544 string fifoFilename; 545 546 /// Whether or not the worker is running in the background. 547 bool workerRunning; 548 549 mixin IRCPluginImpl; 550 }