1 /++
2     The Pipeline plugin opens a Posix named pipe in a temporary directory or
3     the current directory, to which you can pipe text and have it be sent
4     verbatim to the server. There is also syntax to manually send bus messages
5     to plugins.
6 
7     It has no commands; it doesn't listen to [dialect.defs.IRCEvent|IRCEvent]s
8     at all, only to what is sent to it via the named FIFO pipe.
9 
10     This requires version `Posix`, which is true for UNIX-like systems (like
11     Linux and macOS).
12 
13     See_Also:
14         https://github.com/zorael/kameloso/wiki/Current-plugins#pipeline,
15         [kameloso.plugins.common.core],
16         [kameloso.plugins.common.misc]
17 
18     Copyright: [JR](https://github.com/zorael)
19     License: [Boost Software License 1.0](https://www.boost.org/users/license.html)
20 
21     Authors:
22         [JR](https://github.com/zorael)
23  +/
24 module kameloso.plugins.pipeline;
25 
26 version(Posix):
27 version(WithPipelinePlugin):
28 
29 private:
30 
31 import kameloso.plugins;
32 import kameloso.plugins.common.core;
33 import kameloso.common : logger;
34 import kameloso.messaging;
35 import dialect.defs;
36 import std.typecons : Flag, No, Yes;
37 
38 
39 /+
40     For storage location of the FIFO it makes sense to default to /tmp;
41     Posix defines a variable `$TMPDIR`, which should take precedence.
42     However, this supposedly makes the file really difficult to access on macOS
43     where it translates to some really long, programmatically generated path.
44     macOS naturally does support /tmp though. So shrug and version it to
45     default-ignore `$TMPDIR` on macOS but obey it on other platforms.
46  +/
47 //version = OSXTMPDIR;
48 
49 
50 // PipelineSettings
51 /++
52     All settings for a [PipelinePlugin], aggregated.
53  +/
54 @Settings struct PipelineSettings
55 {
56 private:
57     import lu.uda : Unserialisable;
58 
59 public:
60     /// Whether or not the Pipeline plugin should do anything at all.
61     @Enabler bool enabled = true;
62 
63     /++
64         Whether or not to place the FIFO in the working directory. If false, it
65         will be saved in `/tmp` or wherever `$TMPDIR` points. If macOS, then there
66         only if version `OSXTMPDIR`.
67      +/
68     bool fifoInWorkingDir = false;
69 
70     /++
71         Whether or not to always use a unique filename for the FIFO; if one exists
72         with the wanted name, simply append a number to make a new, unique one.
73      +/
74     bool bumpFilenameIfItExists = true;
75 
76     /// Custom, full path to use as FIFO filename, specified with --set pipeline.path.
77     @Unserialisable string path;
78 }
79 
80 
81 // pipereader
82 /++
83     Reads a FIFO (named pipe) and relays lines received there to the main
84     thread, to send to the server.
85 
86     It is to be run in a separate thread.
87 
88     Params:
89         newState = The [kameloso.plugins.common.core.IRCPluginState|IRCPluginState]
90             of the original [PipelinePlugin], to provide the main thread's
91             [core.thread.Tid|Tid] for concurrency messages, made `shared` to
92             allow being sent between threads.
93         filename = String filename of the FIFO to read from.
94  +/
95 void pipereader(shared IRCPluginState newState, const string filename)
96 in (filename.length, "Tried to set up a pipereader with an empty filename")
97 {
98     import kameloso.thread : ThreadMessage, boxed, setThreadName;
99     import std.concurrency : OwnerTerminated, receiveTimeout, send;
100     import std.format : format;
101     import std.stdio : File;
102     import std.variant : Variant;
103     import core.time : Duration;
104     static import kameloso.common;
105 
106     // The whole module is version Posix, no need to encase here
107     setThreadName("pipeline");
108 
109     auto state = cast()newState;
110 
111     // Set the global settings so messaging functions don't segfault us
112     kameloso.common.settings = &state.settings;
113 
114     // Creating the File struct blocks, so do it after reporting.
115     enum pattern = "Pipe text to the <i>%s</> file to send raw commands to the server.";
116     state.askToLog(pattern.format(filename));
117 
118     File fifo = File(filename, "r");
119 
120     static void tryRemove(const string filename)
121     {
122         import std.file : exists, remove;
123 
124         if (filename.exists)
125         {
126             try
127             {
128                 remove(filename);
129             }
130             catch (Exception _)
131             {
132                 // Race, ignore
133             }
134         }
135     }
136 
137     scope(exit) tryRemove(filename);
138 
139     while (true)
140     {
141         // foreach but always break after processing one line, to be responsive
142         // and retaining the ability to break out of it.
143         foreach (immutable line; fifo.byLineCopy)
144         {
145             import std.algorithm.searching : startsWith;
146             import std.uni : asLowerCase;
147 
148             if (!line.length) break;
149 
150             if (line[0] == ':')
151             {
152                 import kameloso.thread : boxed;
153                 import lu.string : contains, nom;
154 
155                 if (line.contains(' '))
156                 {
157                     string slice = line[1..$];
158                     immutable header = slice.nom(' ');
159                     state.mainThread.send(ThreadMessage.busMessage(header, boxed(slice)));
160                 }
161                 else
162                 {
163                     state.mainThread.send(ThreadMessage.busMessage(line[1..$]));
164                 }
165                 break;
166             }
167             else if (line.asLowerCase.startsWith("quit"))
168             {
169                 if ((line.length > 6) && (line[4..6] == " :"))
170                 {
171                     quit(state, line[6..$]);
172                 }
173                 else
174                 {
175                     quit(state);
176                 }
177                 return;
178             }
179             else
180             {
181                 immutable slice = (line[0] == ' ') ? line[1..$] : line;
182                 raw(state, slice);
183             }
184             break;
185         }
186 
187         bool halt;
188 
189         void checkMessages()
190         {
191             cast(void)receiveTimeout(Duration.zero,
192                 (ThreadMessage message)
193                 {
194                     if (message.type == ThreadMessage.Type.teardown)
195                     {
196                         halt = true;
197                     }
198                 },
199                 (OwnerTerminated _)
200                 {
201                     halt = true;
202                 },
203                 (Variant v)
204                 {
205                     enum variantPattern = "Pipeline plugin received Variant: <l>%s";
206                     state.askToError(variantPattern.format(v.toString));
207                     state.mainThread.send(ThreadMessage.busMessage("pipeline", boxed("halted")));
208                     halt = true;
209                 }
210             );
211         }
212 
213         checkMessages();
214         if (halt) return;
215 
216         import std.exception : ErrnoException;
217 
218         try
219         {
220             fifo.reopen(filename);
221         }
222         catch (ErrnoException e)
223         {
224             checkMessages();
225             if (halt) return;
226 
227             enum fifoPattern = "Pipeline plugin failed to reopen FIFO: <l>%s";
228             state.askToError(fifoPattern.format(e.msg));
229             version(PrintStacktraces) state.askToTrace(e.info.toString);
230             state.mainThread.send(ThreadMessage.busMessage("pipeline", boxed("halted")));
231             return;
232         }
233         catch (Exception e)
234         {
235             checkMessages();
236             if (halt) return;
237 
238             state.askToError("Pipeline plugin saw unexpected exception");
239             version(PrintStacktraces) state.askToTrace(e.toString);
240             return;
241         }
242     }
243 }
244 
245 
246 // createFIFO
247 /++
248     Creates a FIFO (named pipe) in the filesystem.
249 
250     It will be named a passed filename.
251 
252     Params:
253         filename = String filename of FIFO to create.
254 
255     Throws:
256         [lu.common.ReturnValueException|ReturnValueException] if the FIFO
257         could not be created.
258 
259         [lu.common.FileExistsException|FileExistsException] if a FIFO with
260         the same filename already exists, suggesting concurrent conflicting
261         instances of the program (or merely a zombie FIFO after a crash).
262 
263         [lu.common.FileTypeMismatchException|FileTypeMismatchException] if a file or directory
264         exists with the same name as the FIFO we want to create.
265  +/
266 void createFIFO(const string filename)
267 in (filename.length, "Tried to create a FIFO with an empty filename")
268 {
269     import lu.common : FileExistsException, FileTypeMismatchException, ReturnValueException;
270     import std.file : exists;
271 
272     if (!filename.exists)
273     {
274         import std.process : execute;
275 
276         immutable mkfifo = execute([ "mkfifo", filename ]);
277 
278         if (mkfifo.status != 0)
279         {
280             enum message = "Could not create FIFO";
281             throw new ReturnValueException(
282                 message,
283                 "mkfifo",
284                 mkfifo.status);
285         }
286     }
287     else
288     {
289         import std.file : getAttributes;
290         import core.sys.posix.sys.stat : S_ISFIFO;
291 
292         immutable attrs = cast(ushort)getAttributes(filename);
293 
294         if (S_ISFIFO(attrs))
295         {
296             enum message = "A FIFO with that name already exists";
297             throw new FileExistsException(
298                 message,
299                 filename,
300                 __FILE__,
301                 __LINE__);
302         }
303         else
304         {
305             enum message = "Wanted to create a FIFO but a file or directory " ~
306                 "with the desired name already exists";
307             throw new FileTypeMismatchException(
308                 message,
309                 filename,
310                 attrs,
311                 __FILE__,
312                 __LINE__);
313         }
314     }
315 }
316 
317 
318 // onWelcome
319 /++
320     Initialises the fifo pipe and thus the purpose of the plugin, by leveraging [initPipe].
321  +/
322 @(IRCEventHandler()
323     .onEvent(IRCEvent.Type.RPL_WELCOME)
324 )
325 void onWelcome(PipelinePlugin plugin)
326 {
327     initPipe(plugin);
328 }
329 
330 
331 // reload
332 /++
333     Reloads the plugin, initialising the fifo pipe if it was not already initialised.
334 
335     This lets us remedy the "A FIFO with that name already exists" error.
336  +/
337 void reload(PipelinePlugin plugin)
338 {
339     if (!plugin.workerRunning)
340     {
341         initPipe(plugin);
342     }
343 }
344 
345 
346 // initPipe
347 /++
348     Spawns the pipereader thread.
349 
350     Snapshots the filename to use, as we base it on the bot's nickname, which
351     may change during the connection's lifetime.
352 
353     Params:
354         plugin = The current [PipelinePlugin].
355  +/
356 void initPipe(PipelinePlugin plugin)
357 in (!plugin.workerRunning, "Tried to double-initialise the pipereader")
358 {
359     if (plugin.pipelineSettings.path.length)
360     {
361         // Custom filename specified with --set pipeline.path=xyz
362         plugin.fifoFilename = plugin.pipelineSettings.path;
363     }
364     else
365     {
366         import std.conv : text;
367 
368         // Save the filename *once* so it persists across nick changes.
369         // If !fifoInWorkingDir then in /tmp or $TMPDIR
370         plugin.fifoFilename = text(plugin.state.client.nickname, '@', plugin.state.server.address);
371 
372         if (!plugin.pipelineSettings.fifoInWorkingDir)
373         {
374             // See notes at the top of module.
375             version(OSX)
376             {
377                 version(OSXTMPDIR)
378                 {
379                     enum useTMPDIR = true;
380                 }
381                 else
382                 {
383                     enum useTMPDIR = false;
384                 }
385             }
386             else // Implicitly not Windows since Posix-only plugin
387             {
388                 enum useTMPDIR = true;
389             }
390 
391             static if (useTMPDIR)
392             {
393                 import std.process : environment;
394                 immutable tempdir = environment.get("TMPDIR", "/tmp");
395             }
396             else
397             {
398                 enum tempdir = "/tmp";
399             }
400 
401             import std.path : buildNormalizedPath;
402             plugin.fifoFilename = buildNormalizedPath(tempdir, plugin.fifoFilename);
403         }
404     }
405 
406     import std.file : exists;
407 
408     if (plugin.pipelineSettings.bumpFilenameIfItExists && plugin.fifoFilename.exists)
409     {
410         import std.string : succ;
411 
412         plugin.fifoFilename ~= "-1";
413 
414         while (plugin.fifoFilename.exists)
415         {
416             plugin.fifoFilename = plugin.fifoFilename.succ;
417 
418             if (plugin.fifoFilename[$-2..$] == "-0")
419             {
420                 plugin.fifoFilename = plugin.fifoFilename[0..$-2] ~ "10";
421             }
422             else if (plugin.fifoFilename[$-3..$] == "-99")
423             {
424                 // Don't infinitely loop, should realistically never happen though
425                 break;
426             }
427         }
428     }
429 
430     import lu.common : FileExistsException, FileTypeMismatchException, ReturnValueException;
431 
432     try
433     {
434         import std.concurrency : spawn;
435 
436         createFIFO(plugin.fifoFilename);
437         plugin.fifoThread = spawn(&pipereader, cast(shared)plugin.state, plugin.fifoFilename);
438         plugin.workerRunning = true;
439     }
440     catch (ReturnValueException e)
441     {
442         enum pattern = "Failed to initialise the Pipeline plugin: <l>%s</> (<l>%s</> returned <l>%d</>)";
443         logger.warningf(pattern, e.msg, e.command, e.retval);
444         //version(PrintStacktraces) logger.trace(e.info);
445     }
446     catch (FileExistsException e)
447     {
448         enum pattern = "Failed to initialise the Pipeline plugin: <l>%s</> [<l>%s</>]";
449         logger.warningf(pattern, e.msg, e.filename);
450         //version(PrintStacktraces) logger.trace(e.info);
451     }
452     catch (FileTypeMismatchException e)
453     {
454         enum pattern = "Failed to initialise the Pipeline plugin: <l>%s</> [<l>%s</>]";
455         logger.warningf(pattern, e.msg, e.filename);
456         //version(PrintStacktraces) logger.trace(e.info);
457     }
458 
459     // Let other Exceptions pass
460 }
461 
462 
463 // teardown
464 /++
465     De-initialises the Pipeline plugin. Shuts down the pipereader thread.
466  +/
467 void teardown(PipelinePlugin plugin)
468 {
469     import kameloso.thread : ThreadMessage;
470     import std.concurrency : send;
471     import std.file : exists, isDir;
472     import std.stdio : File;
473 
474     if (!plugin.workerRunning) return;
475 
476     plugin.fifoThread.send(ThreadMessage.teardown());
477 
478     if (plugin.fifoFilename.exists && !plugin.fifoFilename.isDir)
479     {
480         // Tell the reader of the pipe to exit
481         auto fifo = File(plugin.fifoFilename, "w");
482         fifo.writeln();
483     }
484 }
485 
486 
487 import kameloso.thread : Sendable;
488 
489 // onBusMessage
490 /++
491     Receives a passed [kameloso.thread.BusMessage|BusMessage] with the "`pipeline`" header,
492     and performs actions based on the payload message.
493 
494     This is used to let the worker thread signal the main context that it halted.
495 
496     Params:
497         plugin = The current [PipelinePlugin].
498         header = String header describing the passed content payload.
499         content = Message content.
500  +/
501 void onBusMessage(PipelinePlugin plugin, const string header, shared Sendable content)
502 {
503     if (!plugin.isEnabled) return;
504     if (header != "pipeline") return;
505 
506     import kameloso.thread : Boxed;
507 
508     auto message = cast(Boxed!string)content;
509     assert(message, "Incorrectly cast message: " ~ typeof(message).stringof);
510 
511     if (message.payload == "halted")
512     {
513         plugin.workerRunning = false;
514     }
515     else
516     {
517         logger.error("[pipeline] Unimplemented bus message verb: <i>", message.payload);
518     }
519 }
520 
521 
522 mixin PluginRegistration!PipelinePlugin;
523 
524 public:
525 
526 
527 // PipelinePlugin
528 /++
529     The Pipeline plugin reads from a local named pipe (FIFO) for messages to
530     send to the server, as well as to live-control the bot to a certain degree.
531  +/
532 final class PipelinePlugin : IRCPlugin
533 {
534 private:
535     import std.concurrency : Tid;
536 
537     /// All Pipeline settings gathered.
538     PipelineSettings pipelineSettings;
539 
540     /// Thread ID of the thread reading the named pipe.
541     Tid fifoThread;
542 
543     /// Filename of the created FIFO.
544     string fifoFilename;
545 
546     /// Whether or not the worker is running in the background.
547     bool workerRunning;
548 
549     mixin IRCPluginImpl;
550 }