POSIX Message Queues
POSIX Message Queues
POSIX Message Queues (deutsch: *POSIX-Nachrichtenwarteschlangen*) sind ein Mechanismus zur Interprozesskommunikation (IPC), der es Prozessen erlaubt, Nachrichten über eine systemweite Warteschlange auszutauschen. Sie sind in der POSIX-Norm (IEEE Std 1003.1b-1993) definiert und bieten eine modernere und leistungsfähigere Alternative zu den älteren System-V-Nachrichtenwarteschlangen.
---
Grundprinzip
Eine POSIX Message Queue (Nachrichtenwarteschlange) funktioniert ähnlich wie eine FIFO-Queue: Prozesse können Nachrichten anhängen (senden) und entnehmen (empfangen). Im Gegensatz zu klassischen Pipes oder Sockets werden hier strukturierte Nachrichten mit einer Priorität verwaltet.
+-------------------------+ | Warteschlange | (z. B. /myqueue) |-------------------------| | Priorität 10: "Start" | | Priorität 5: "Ping" | | Priorität 1: "Stop" | +-------------------------+
→ Nachrichten werden in der Reihenfolge ihrer Priorität (höchste zuerst) abgearbeitet.
---
Eigenschaften
- Kommunikation zwischen beliebigen Prozessen über Namen (z. B.
/myqueue) - Nachrichtenbasierte Kommunikation (keine Bytestreams)
- Nachrichten enthalten Nutzdaten + Priorität
- Thread-sicher und asynchron nutzbar
- Optionale Benachrichtigung via Signal oder Thread
---
Header-Dateien
#include <mqueue.h>
#include <fcntl.h> // O_CREAT, O_RDWR
#include <sys/stat.h> // mode_t
Beim Linken ist meist die Echtzeitbibliothek erforderlich:
gcc main.c -o main -lrt
---
Aufbau einer Message Queue
Eine POSIX Message Queue besitzt folgende Parameter:
| Attribut | Beschreibung |
|---|---|
| Name | Beginnt immer mit / (z. B. /demoqueue)
|
| Nachrichtenkapazität | Maximale Anzahl gespeicherter Nachrichten |
| Nachrichtenlänge | Maximale Größe einer Nachricht (Bytes) |
| Flags | Öffnungs- und Zugriffsmodi (z. B. O_RDONLY, O_NONBLOCK) |
Diese Parameter können beim Anlegen mit mq_open() übergeben werden.
---
Wichtige Funktionen
| Funktion | Beschreibung |
|---|---|
mq_open() |
Öffnet oder erzeugt eine Message Queue |
mq_send() |
Sendet eine Nachricht in die Queue |
mq_receive() |
Empfängt die älteste oder höchstpriorisierte Nachricht |
mq_close() |
Schließt die Queue im aktuellen Prozess |
mq_unlink() |
Löscht die Queue (wie unlink() für Dateien)
|
mq_getattr() / mq_setattr() |
Liest oder ändert Attribute |
mq_notify() |
Registriert Benachrichtigung bei neuen Nachrichten |
---
Beispiel: Einfacher Sender
#include <stdio.h>
#include <stdlib.h>
#include <mqueue.h>
#include <fcntl.h>
#include <string.h>
#define QUEUE_NAME "/demoqueue"
int main() {
mqd_t mq;
struct mq_attr attr;
attr.mq_flags = 0;
attr.mq_maxmsg = 10; // maximal 10 Nachrichten
attr.mq_msgsize = 256; // maximal 256 Byte pro Nachricht
attr.mq_curmsgs = 0;
mq = mq_open(QUEUE_NAME, O_CREAT | O_WRONLY, 0644, &attr);
if (mq == (mqd_t)-1) {
perror("mq_open");
exit(EXIT_FAILURE);
}
char message[256];
sprintf(message, "Hallo vom Sender!");
if (mq_send(mq, message, strlen(message) + 1, 5) == -1) {
perror("mq_send");
} else {
printf("Nachricht gesendet.\n");
}
mq_close(mq);
return 0;
}
---
Beispiel: Einfacher Empfänger
#include <stdio.h>
#include <stdlib.h>
#include <mqueue.h>
#include <fcntl.h>
#include <string.h>
#define QUEUE_NAME "/demoqueue"
int main() {
mqd_t mq;
char buffer[256];
unsigned int prio;
mq = mq_open(QUEUE_NAME, O_RDONLY);
if (mq == (mqd_t)-1) {
perror("mq_open");
exit(EXIT_FAILURE);
}
if (mq_receive(mq, buffer, sizeof(buffer), &prio) >= 0) {
printf("Empfangen (Priorität %u): %s\n", prio, buffer);
} else {
perror("mq_receive");
}
mq_close(mq);
mq_unlink(QUEUE_NAME); // Queue entfernen
return 0;
}
Beispielausgabe:
$ ./sender Nachricht gesendet. $ ./empfaenger Empfangen (Priorität 5): Hallo vom Sender!
---
Beispiel: Nicht-blockierende Nutzung
mq = mq_open("/demoqueue", O_RDONLY | O_NONBLOCK);
if (mq_receive(mq, buffer, sizeof(buffer), &prio) == -1) {
if (errno == EAGAIN)
printf("Keine Nachrichten vorhanden.\n");
}
---
Beispiel: Benachrichtigung über neue Nachrichten
Ein Prozess kann sich mit mq_notify() registrieren, um ein Signal oder einen Thread-Callback zu erhalten, wenn eine neue Nachricht eintrifft.
#include <stdio.h>
#include <stdlib.h>
#include <mqueue.h>
#include <signal.h>
#define QUEUE_NAME "/demoqueue"
void handler(union sigval sv) {
mqd_t mq = *((mqd_t*)sv.sival_ptr);
char msg[256];
unsigned int prio;
mq_receive(mq, msg, sizeof(msg), &prio);
printf("Neue Nachricht (Prio %u): %s\n", prio, msg);
}
int main() {
mqd_t mq = mq_open(QUEUE_NAME, O_CREAT | O_RDONLY, 0644, NULL);
struct sigevent sev;
sev.sigev_notify = SIGEV_THREAD;
sev.sigev_notify_function = handler;
sev.sigev_notify_attributes = NULL;
sev.sigev_value.sival_ptr = &mq;
mq_notify(mq, &sev); // registrieren
printf("Warte auf Nachrichten...\n");
while (1) pause();
}
---
Attribute einer Queue
struct mq_attr attr;
mq_getattr(mq, &attr);
printf("Max. Nachrichten: %ld\n", attr.mq_maxmsg);
printf("Nachrichtengröße: %ld\n", attr.mq_msgsize);
printf("Aktuelle Nachrichten: %ld\n", attr.mq_curmsgs);
---
Vergleich mit anderen IPC-Mechanismen
| Mechanismus | Kommunikationstyp | Besonderheiten |
|---|---|---|
| Pipe (Unix) | Stream (Bytestrom) | einfach, unstrukturiert |
| FIFO (Named Pipe) | Stream | über Dateisystemname erreichbar |
| POSIX Message Queues | Nachricht (strukturiert, priorisiert) | speicherbasiert, modern |
| System-V Message Queues | Nachricht | älter, weniger portabel |
| Unix Domain Sockets | Stream oder Datagram | flexibel, duplex, bidirektional |
| Shared Memory | Speicherzugriff | extrem schnell, aber Synchronisation nötig |
---
Vorteile
- Nachrichtenorientiert (keine Bytestreams)
- Priorisierung integriert
- Blockierendes oder nicht-blockierendes Verhalten möglich
- Ereignisbenachrichtigung (mq_notify)
- POSIX-standardisiert und portabel
Nachteile
- Begrenzte Nachrichtenlänge
- Verwaltung durch Kernel (etwas Overhead)
- Nur auf demselben System verfügbar (nicht über Netzwerk)
---
Typische Einsatzgebiete
- Kommunikation zwischen Prozessen mit klarer Nachrichtenstruktur
- Ereignisbasierte Systeme (Produzent–Konsument)
- GUI–Daemon-Kommunikation
- Steuer- und Regelprozesse in Embedded-Systemen
- Ersatz für Pipes bei strukturierter Datenübertragung
---
Siehe auch
- Interprozesskommunikation (IPC)
- POSIX Shared Memory
- Unix Domain Sockets
- Named Pipe (FIFO)
- System-V Message Queue
- POSIX Threads
---
Quellen
- IEEE Std 1003.1-2001 – POSIX Realtime Extensions
- Stevens, W. Richard: UNIX Network Programming, Vol. 2: Interprocess Communications, Prentice Hall
- The Linux Programming Interface – Michael Kerrisk, 2010
- Linux man pages:
man mq_overview,man mq_open - GNU C Library Documentation