POSIX Message Queues

Aus dev.kaibel.net
Zur Navigation springen Zur Suche springen

POSIX Message Queues

POSIX Message Queues (deutsch: *POSIX-Nachrichtenwarteschlangen*) sind ein Mechanismus zur Interprozesskommunikation (IPC), der es Prozessen erlaubt, Nachrichten über eine systemweite Warteschlange auszutauschen. Sie sind in der POSIX-Norm (IEEE Std 1003.1b-1993) definiert und bieten eine modernere und leistungsfähigere Alternative zu den älteren System-V-Nachrichtenwarteschlangen.

---

Grundprinzip

Eine POSIX Message Queue (Nachrichtenwarteschlange) funktioniert ähnlich wie eine FIFO-Queue: Prozesse können Nachrichten anhängen (senden) und entnehmen (empfangen). Im Gegensatz zu klassischen Pipes oder Sockets werden hier strukturierte Nachrichten mit einer Priorität verwaltet.

+-------------------------+
|   Warteschlange         |   (z. B. /myqueue)
|-------------------------|
|  Priorität 10: "Start"  |
|  Priorität  5: "Ping"   |
|  Priorität  1: "Stop"   |
+-------------------------+

→ Nachrichten werden in der Reihenfolge ihrer Priorität (höchste zuerst) abgearbeitet.

---

Eigenschaften

  • Kommunikation zwischen beliebigen Prozessen über Namen (z. B. /myqueue)
  • Nachrichtenbasierte Kommunikation (keine Bytestreams)
  • Nachrichten enthalten Nutzdaten + Priorität
  • Thread-sicher und asynchron nutzbar
  • Optionale Benachrichtigung via Signal oder Thread

---

Header-Dateien

#include <mqueue.h>
#include <fcntl.h>      // O_CREAT, O_RDWR
#include <sys/stat.h>   // mode_t

Beim Linken ist meist die Echtzeitbibliothek erforderlich:

gcc main.c -o main -lrt

---

Aufbau einer Message Queue

Eine POSIX Message Queue besitzt folgende Parameter:

Attribut Beschreibung
Name Beginnt immer mit / (z. B. /demoqueue)
Nachrichtenkapazität Maximale Anzahl gespeicherter Nachrichten
Nachrichtenlänge Maximale Größe einer Nachricht (Bytes)
Flags Öffnungs- und Zugriffsmodi (z. B. O_RDONLY, O_NONBLOCK)

Diese Parameter können beim Anlegen mit mq_open() übergeben werden.

---

Wichtige Funktionen

Funktion Beschreibung
mq_open() Öffnet oder erzeugt eine Message Queue
mq_send() Sendet eine Nachricht in die Queue
mq_receive() Empfängt die älteste oder höchstpriorisierte Nachricht
mq_close() Schließt die Queue im aktuellen Prozess
mq_unlink() Löscht die Queue (wie unlink() für Dateien)
mq_getattr() / mq_setattr() Liest oder ändert Attribute
mq_notify() Registriert Benachrichtigung bei neuen Nachrichten

---

Beispiel: Einfacher Sender

#include <stdio.h>
#include <stdlib.h>
#include <mqueue.h>
#include <fcntl.h>
#include <string.h>

#define QUEUE_NAME "/demoqueue"

int main() {
    mqd_t mq;
    struct mq_attr attr;
    attr.mq_flags = 0;
    attr.mq_maxmsg = 10;     // maximal 10 Nachrichten
    attr.mq_msgsize = 256;   // maximal 256 Byte pro Nachricht
    attr.mq_curmsgs = 0;

    mq = mq_open(QUEUE_NAME, O_CREAT | O_WRONLY, 0644, &attr);
    if (mq == (mqd_t)-1) {
        perror("mq_open");
        exit(EXIT_FAILURE);
    }

    char message[256];
    sprintf(message, "Hallo vom Sender!");

    if (mq_send(mq, message, strlen(message) + 1, 5) == -1) {
        perror("mq_send");
    } else {
        printf("Nachricht gesendet.\n");
    }

    mq_close(mq);
    return 0;
}

---

Beispiel: Einfacher Empfänger

#include <stdio.h>
#include <stdlib.h>
#include <mqueue.h>
#include <fcntl.h>
#include <string.h>

#define QUEUE_NAME "/demoqueue"

int main() {
    mqd_t mq;
    char buffer[256];
    unsigned int prio;

    mq = mq_open(QUEUE_NAME, O_RDONLY);
    if (mq == (mqd_t)-1) {
        perror("mq_open");
        exit(EXIT_FAILURE);
    }

    if (mq_receive(mq, buffer, sizeof(buffer), &prio) >= 0) {
        printf("Empfangen (Priorität %u): %s\n", prio, buffer);
    } else {
        perror("mq_receive");
    }

    mq_close(mq);
    mq_unlink(QUEUE_NAME); // Queue entfernen
    return 0;
}

Beispielausgabe:

$ ./sender
Nachricht gesendet.
$ ./empfaenger
Empfangen (Priorität 5): Hallo vom Sender!

---

Beispiel: Nicht-blockierende Nutzung

mq = mq_open("/demoqueue", O_RDONLY | O_NONBLOCK);
if (mq_receive(mq, buffer, sizeof(buffer), &prio) == -1) {
    if (errno == EAGAIN)
        printf("Keine Nachrichten vorhanden.\n");
}

---

Beispiel: Benachrichtigung über neue Nachrichten

Ein Prozess kann sich mit mq_notify() registrieren, um ein Signal oder einen Thread-Callback zu erhalten, wenn eine neue Nachricht eintrifft.

#include <stdio.h>
#include <stdlib.h>
#include <mqueue.h>
#include <signal.h>

#define QUEUE_NAME "/demoqueue"

void handler(union sigval sv) {
    mqd_t mq = *((mqd_t*)sv.sival_ptr);
    char msg[256];
    unsigned int prio;
    mq_receive(mq, msg, sizeof(msg), &prio);
    printf("Neue Nachricht (Prio %u): %s\n", prio, msg);
}

int main() {
    mqd_t mq = mq_open(QUEUE_NAME, O_CREAT | O_RDONLY, 0644, NULL);

    struct sigevent sev;
    sev.sigev_notify = SIGEV_THREAD;
    sev.sigev_notify_function = handler;
    sev.sigev_notify_attributes = NULL;
    sev.sigev_value.sival_ptr = &mq;

    mq_notify(mq, &sev);  // registrieren

    printf("Warte auf Nachrichten...\n");
    while (1) pause();
}

---

Attribute einer Queue

struct mq_attr attr;
mq_getattr(mq, &attr);

printf("Max. Nachrichten: %ld\n", attr.mq_maxmsg);
printf("Nachrichtengröße: %ld\n", attr.mq_msgsize);
printf("Aktuelle Nachrichten: %ld\n", attr.mq_curmsgs);

---

Vergleich mit anderen IPC-Mechanismen

Mechanismus Kommunikationstyp Besonderheiten
Pipe (Unix) Stream (Bytestrom) einfach, unstrukturiert
FIFO (Named Pipe) Stream über Dateisystemname erreichbar
POSIX Message Queues Nachricht (strukturiert, priorisiert) speicherbasiert, modern
System-V Message Queues Nachricht älter, weniger portabel
Unix Domain Sockets Stream oder Datagram flexibel, duplex, bidirektional
Shared Memory Speicherzugriff extrem schnell, aber Synchronisation nötig

---

Vorteile

  • Nachrichtenorientiert (keine Bytestreams)
  • Priorisierung integriert
  • Blockierendes oder nicht-blockierendes Verhalten möglich
  • Ereignisbenachrichtigung (mq_notify)
  • POSIX-standardisiert und portabel

Nachteile

  • Begrenzte Nachrichtenlänge
  • Verwaltung durch Kernel (etwas Overhead)
  • Nur auf demselben System verfügbar (nicht über Netzwerk)

---

Typische Einsatzgebiete

  • Kommunikation zwischen Prozessen mit klarer Nachrichtenstruktur
  • Ereignisbasierte Systeme (Produzent–Konsument)
  • GUI–Daemon-Kommunikation
  • Steuer- und Regelprozesse in Embedded-Systemen
  • Ersatz für Pipes bei strukturierter Datenübertragung

---

Siehe auch

---

Quellen

  • IEEE Std 1003.1-2001 – POSIX Realtime Extensions
  • Stevens, W. Richard: UNIX Network Programming, Vol. 2: Interprocess Communications, Prentice Hall
  • The Linux Programming Interface – Michael Kerrisk, 2010
  • Linux man pages: man mq_overview, man mq_open
  • GNU C Library Documentation