Adobe Commerce


Adobe Commerce & Message Queue Lösungen – integration review


Author Photo
Alex Lyzun
Lead Developer Adobe Commerce

Adobe Commerce Queuing Systems Integration Review

Adobe Commerce (Magento) verfügt über ein großartiges Framework, das seit Version 2.1.0 verfügbar ist und "Magento Message Queue Framework (MMQF)" heißt. Seit dem ersten Release hat es sich drastisch verbessert mit dem Ergebnis, dass viele Module und Integrationen heute auf die Verwendung von MMQF als Teil ihrer Funktionalität angewiesen sind.

Heute unterstützt Magento 2 verschiedene Warteschlangensysteme:

  • Magento's Datenbank-Queue-Implementierung
  • RabbitMQ 

Die wichtigste Frage, die uns immer wieder gestellt wird, ist: "Welche anderen Warteschlangensysteme werden von Magento unterstützt?"

Das Ziel dieses Artikels ist es, einen genaueren Blick auf andere existierende Queuing-Lösungen zu werfen und eine fundierte Einschätzung der Kandidaten vorzunehmen, die sich leicht in Magento integrieren lassen und daher möglicherweise in naher Zukunft als Teil einer Magento-Kernimplementierung verwendet werden können.


Verarbeitung von Nachrichtenwarteschlangen in Adobe Commerce

Eine einfache Erklärung, wie die Nachrichtenverarbeitung (basierend auf RabbitMQ) in Magento funktioniert, ist in diesem Diagramm dargestellt.

Source: Adobe Commerce Developer Guides

Grundlage für Warteschlangenimplementierungen auf MMQF ist  

\Magento\Framework\MessageQueue\QueueInterface

Beide Standardimplementierungen von Magento (DB und RabbitMQ) implementieren diese Schnittstelle.

Schauen wir uns die Schnittstelle anhand eines Flusses genauer an.

Dieser Fluss stellt die aktuelle Queue-Implementierung dar, und wie wir sehen, nutzt jeder Dienst seine eigene Funktionalität, aber beide verwenden die Queue-Schnittstelle:

Magento Queue Interface

  

Queue Interface

In der nachstehenden Tabelle werden alle Methoden beschrieben, die von dem jeweiligen Warteschlangenanbieter implementiert werden sollen, und es wird erläutert, welche Funktionalität von diesen Methoden erwartet wird.

Method Purpose / Description
dequeue() Get a single message from the queue
acknowledge() Acknowledge message delivery
subscribe()

Wait for messages and dispatch them, this is based on pub/sub mechanism, consumes the messages through callbacks, until connection is closed

reject() Reject message, messages gets returned to the queue
push()

Push message to queue directly without using exchange; it uses publish behind the scenes

Es fällt auf, dass jede Implementierung ihre eigene Logik verwirklicht, die vom verwendeten Warteschlangensystem abhängt:

  • DB nutzt die Verwendung eigener Tabellen in der Datenbank
  • RabbitMQ implementiert die Verbindung zum Dienst und führt die erforderlichen Aktionen aus

Auf der anderen Seite haben wir Konsumenten, die die Nachricht aus der Warteschlange abholen und sie entsprechend der Konfiguration verarbeiten.


Überblick der verfügbaren Message Queue Lösungen

Werfen wir einen Blick darauf, welche anderen Warteschlangentechnologien es gibt und ob wir sie in naher Zukunft in Magento verwenden können. Innerhalb der Messaging-Technologien können wir sie auch in die folgenden Kategorien einteilen, die auf verschiedenen Anwendungsfällen basieren.

Use Case Type Objectives
Portable systems or Standard based protocols Either these systems follow industry standard protocols, or the projects are portable like Kafka
High throughput & data streaming Streaming for near real-time use cases is the key here, or very high throughput
Serverless & rule-based event streaming / routing These are modern systems built with intelligent routing in mind, they also support serverless functions in their ecosystem like Adobe I/O Runtime and Adobe I/O Events
Interservice communication or Microservices Third party connectivity, extensibility and message delivery
Third party connectivity, extensibility and message delivery Suitable for large organizations, usually developed internally for wide scale use cases; they encapsulate modern extensibility requirements like out of process extension functions or in-process extensibility. They may also include message delivery via web-hooks.
Source: Github User Content

  

Message Queue Technologien – Überblick

Platform Pub/Sub Mechanism Connectivity Messge Ordering / FIFO High Throughput Data Streaming Rule Based Filter / Routing
AWS EventBridge *Available
(limited to AWS Targets)
*HTTPS (for publishers only)     Available
AWS MQ Available JMS, NMS, AMQP 1.0, STOMP, MQTT, WebSocket *Available    
AWS SQS   HTTPS *Available    
AWS Kinesis Available HTTP/2 Persistent and HTTP REST *Available Available  
Apache Kafka Available TCP Socket, Kafka Connect, Kafka Streams Apps *Available Available  
Azure Service Bus Available AMQP 1.0 and REST *Available    
Adobe I/O *Available
(limited to Adobe Event Providers)
*HTTP Webhooks (for subscribers only, designed for integration with Adobe SaaS solutions)     Available

*Available with some limitations
**Other Aspects to consider (consumer groups, batching, multi-tenant seggregation, message encryption/security, aggregation, counting, scheduling, dead-letter queue)

  

Integrationsmöglichkeiten mit MMQF

Da die Basis für alle Queueing-Implementierungen in Magento das MMQF und das QueueInterface ist, werden wir prüfen, welche Lösungen das bestehende Interface implementieren und einfach in die bestehende Magento-Architektur integriert werden können.


AWS EventBridge

AWS EventBridge ist ein serverless Event-Bus, der den Empfang von Daten von Ihrer Anwendung und von Drittanbietern an AWS Services erleichtert. Derzeit scheint es, dass die Ziele speziell AWS Services sind. Diese Ziele werden über spezifische Regeln definiert. 

Weitere Informationen finden Sie hier: https://docs.aws.amazon.com/eventbridge/index.html  

Und mehr über verfügbare Ziele finden Sie hier: https://docs.aws.amazon.com/eventbridge/latest/APIReference/API_PutTargets.html  

Aber sehen wir uns an, ob wir die AWS-Integration mit dem MMQF-Framework nutzen können:

Method Evaluation Implementation Readiness
dequeue() Not Possible or N/A Multiple Targets can be set, to receive the messages when they are available asynchronously. There is no concept of fetching the message from the Event Bus on-demand, its more of a serverless architecture.
acknowledge() Not Possible or N/A There is no need to acknowledge the message, AWS internally makesure that the Target receives the message.
subscribe() Not Possible or N/A AWS related Targets can be set or subscribed for the EventBus based on the Rules; but we cannot set PHP Callback as Functions.
reject() Not Possible or N/A The concept is not available or used.
push() Available PutEvents or PutPartnerEvents functions can be used for this purpose.

  


AWS MQ

AWS MQ ist ein Message Broker, der auf dem beliebten Apache ActiveMQ basiert. Er unterstützt mehrere Protokolle für die Konnektivität, zum Beispiel AMQP, JMS, STOMP, NMS, MQTT und WebSocket.

Die meisten Funktionen sind verfügbar, da Magento mit RabbitMQ ebenfalls das AMQP-Protokoll verwendet, aber die Protokollversion ist unterschiedlich: RabbitMQ verwendet 0.9 und Amazon MQ verwendet AMQP 1.0; daher würde jede Migration eine Portierung der Warteschlangen von RabbitMQ auf AWS MQ erfordern.

Sie können hier mehr darüber lesen: https://aws.amazon.com/amazon-mq/

In diesem Fall sieht alles viel besser aus. Der Grund ist wiederum, dass MMQF bereits AMQP verwendet

Method Evaluation Implementation Readiness
dequeue() Available receive()
acknowledge() Available accept() / release()
subscribe() *Workaround Long Polling might need to be implemented, unless we find a good library that supports AMQP 1.0 in PHP; Java has full support though.
reject() Available reject()
push() Available sendMessage(Message, Destination)

  


AWS SQS

AWS SQS ist eine verteilte und fehlertolerante Warteschlangentechnologie, die Punkt-zu-Punkt-Konnektivität bietet. Sie kann mit SNS verwendet werden, um einen Veröffentlichungs- und Abonnementmechanismus hinzuzufügen. Eine einzelne Nachricht wird auf mehreren SQS-Servern repliziert.

Weitere Informationen: https://aws.amazon.com/sqs/

SQS kann, wie wir sehen, auch das nächste Warteschlangensystem in Magento sein, obwohl es einige Anstrengungen erfordert, um in MMQF implementiert zu werden.

Method Evaluation Implementation Readiness
dequeue() Available ReceiveMessage() - possibility with many available options for instance long & short polling.
acknowledge() Possibility DeleteMessage() for positive acknowledge, by default message locked for Visibility Timeout period for other consumers.
subscribe() Workaround ReceiveMessage() - Using polling based mechanism, it can be implemented; but not exactly as true callback mechanism.
reject() Possibility The messages are auto visible again for consumption, if explicit DeleteMessage() is not called before timeout, as explained above.
push() Available SendMessage()

  


AWS Kinesis

AWS Kinesis ist eine auf Streaming basierende distributierte Messaging-Technologie. Sie verwendet den Publish/Subscribe-Mechanismus für eine lose Kopplung zwischen Sendern und Empfängern. Sie ist für einen extrem hohen Durchsatz bei Echtzeitanwendungen ausgelegt.

Streaming und das Konzept des Streams selbst sind die zentrale Idee hinter Kinesis. Es ist dem Apache Kafka sehr ähnlich, mit einigen Unterschieden. Kinesis eignet sich auch für die Implementierung von Event Sourcing und CQRS-Mustern, die häufig in Microservices-Architekturen verwendet werden, da es sofortige Unterstützung für Messaging- und Publish/Subscribe-Mechanismen mit hohem Durchsatz bietet.

Lesen Sie mehr: https://aws.amazon.com/kinesis/

Auch Kinesis sieht wie ein guter Kandidat aus:

Method Evaluation Implementation Readiness
dequeue() Possibility getRecords() - ShardIterator needs to be managed behind the scenes. Stream can be Queue name. It can use getShardIterator(), before the call.
acknowledge() Possibility Need to maintain the ShardIterator & SequenceNumber associated with it, using getShardIterator(). We can save NextShardIterator from getRecords() to acknowledge the message.
subscribe() Workaround Workaround - batch reads with long polling can be implemented, example Long Polling Subscribe Mechanism in AWS Kinesis
reject() Possibility If we don't move the ShardIterator to NextShardIterator, we are pretty much staying on the same message.
push() Possibility Since you will have to provide stream, data & partition; we need to have some strategy to for partition selection; and need to maintain these values for Consumers.

  


Apache Kafka

Apache Kafka ist eine weit verbreitete Open-Source-Plattform für Stream-Verarbeitung und Nachrichtenübermittlung. Sie ist dezentralisiert, repliziert und widerstandsfähig (oder fehlertolerant) und kann einen sehr hohen Durchsatz erreichen.

Der Topic- und Publish/Subscribe-Mechanismus ist der Kern von Kafka. Es eignet sich gut für die Implementierung von Event Sourcing und CQRS-Mustern, die häufig in Microservices-Architekturen verwendet werden. Kafka wird auch für eine Vielzahl von Streaming-Anwendungsfällen verwendet, die eine nahezu Echtzeitverarbeitung von Datensätzen erfordern.

Weitere Informationen:  https://kafka.apache.org/

Kafka scheint im Moment die vielversprechendste Lösung zu sein:

Method Evaluation Implementation Readiness
dequeue() Possibility Initiate poll () or consume(). If there are records available, the call will immediately returns, otherwise it will wait for specified timeout which can be passed as parameter.
acknowledge() Possibility There are several ways to commit the offset, which indicates that a particular consumer has consumed those messages. The way you call commit API controls the delivery semantics.
subscribe() Possibility There are multiple ways in which the subscribtion mechanism can be implemented, the default Kafka subscribtion is telling Kafka which topics a consumer is interested in. But we can also subscribe a callback function; and we can use Kafka Stream API to receive messages in near realtime.
reject() Possibility If we don't auto-commit or manually commit the offset, then we are not moving the needle.
push() Possibility Since you will have to provide topic, data & partition; we need to have some strategy to for partition selection; and need to maintain these values for Consumers.

  


Azure Service Bus

Microsoft Azure Service Bus ist ein Managed Service Message Broker für  Enterprise Integrationen. Er unterstützt bekannte Konzepte wie Warteschlangen, Themen, Regeln/Filter und mehr.

Source: Github User Content

Azure Service Bus unterstützt AMQP 1.0 und einige Sprachen, die PHP-Unterstützung ist für das Protokoll wiederum eingeschränkt.

Hauptseite: https://azure.microsoft.com/en-us/services/service-bus/#overview

Da Azure auch das AMQP-Protokoll verwendet, sieht es auch recht optimistisch aus. Und die Tatsache, dass Magento Cloud ebenfalls Microsoft Azure nutzen wird, macht den Einsatz von ASB noch wahrscheinlicher.

Method Evaluation Implementation Readiness
dequeue() Available receive()
acknowledge() Available accept() / release()
subscribe() *Workaround Long Polling might need to be implemented, unless we find a good library that supports AMQP 1.0 for PHP; Java has full support for required features.
reject() Available reject(errorCondition, errorDescription)
push() Available sendMessage(message, destination)

  


Adobe I/O

Adobe I/O ist eine serverlose, ereignisgesteuerte Plattform, mit der Sie benutzerdefinierte Funktionen/Code schnell und ohne Servereinrichtung in der Cloud bereitstellen können. Diese Funktionen werden über HTTP-Anforderungen oder Adobe I/O-Ereignisse ausgeführt. Diese Ereignisse können mit Sequenzen und Kompositionen orchestriert werden. Die Lösung basiert auf dem Apache OpenWhisk-Framework.

Ereignisse werden von Ereignisanbietern innerhalb der Adobe Services ausgelöst, z. B. Creative Cloud Assets, Adobe Experience Manager und Adobe Analytics. Um Ereignisse für Ihre Anwendung zu empfangen, müssen Sie einen Webhook (URL-Endpunkt) registrieren, der angibt, welche Ereignistypen er von welchen Ereignisanbietern empfangen möchte; Adobe sendet Ereignisse über HTTP-POST-Nachrichten an Ihren Webhook.

Homepage: https://www.adobe.io/

In der aktuellen Magento-Implementierung ist es nicht möglich, Adobe I/O zu verwenden. Daher muss ein völlig neuer Dienst entwickelt werden, um diesen zu unterstützen.

Method Evaluation Implementation Readiness
dequeue() Not Possible or N/A There is not a concept of explicit fetching of event, rather you define a trigger/event and the actions associated with it.
acknowledge() Not Possible or N/A This concept is not used, the architecture is funadementally different
subscribe() Not Possible or N/A A PHP callback function is not possible, although a custom webhook (http endpoint) can be configured to be triggered for a particular Event.
reject() Not Possible or N/A This concept is not used, the architecture is funadementally different
push() Not Possible or N/A Events are triggered by Adobe SaaS Services in the Adobe Cloud as discussed above.

  


Bewertung und Schlussfolgerung

Vergleicht man alle möglichen Lösungen, so ergibt sich folgendes Bild:

Method (1) AWS EventBridge (2) AWS MQ (3) AWS SQS (4) AWS Kinesis (5) Apache Kafka (6) Azure Service Bus (8) Adobe I/O
dequeue() Not Possible or N/A Available Available Possibility Possibility Available Not Possible or N/A
acknowledge() Not Possible or N/A Available Possibility Possibility Possibility Available Not Possible or N/A
subscribe() Not Possible or N/A *Workaround *Workaround *Workaround Possibility *Workaround Not Possible or N/A
reject() Not Possible or N/A Available Possibility Possibility Possibility Available Not Possible or N/A
push() Available Available Available Possibility Possibility Available Not Possible or N/A

 *Workaround - The function may be available, but full support for PHP (library) is not available.

  

Wie wir sehen können, ist Apache Kafka derzeit der Gewinner dieser Untersuchung. Kafka ist ein sehr beliebtes System, das performant ist und alle Anforderungen abdeckt, die wir mit der wachsenden Komplexität von Magento haben. Auch in den letzten Monaten ist Kafka eine der meistdiskutierten Technologien als nächster Dienst, den Magento für die asynchrone Kommunikation nutzen könnte.

Integratoren und Entwickler können sich jedoch durchaus an AWS MQ, AWS SQS, AWS Kinesis und Azure Service Bus orientieren, da die Integration dieser Dienste nicht allzu viel Aufwand erfordert.

Bei diesen Systemen besteht eines der Hauptprobleme darin, dass AMQP 1.0 nicht vollständig von PHP unterstützt wird, aber es gibt PHP Enqueue- und Symfony-Bibliotheken, die eine Abstraktionsschicht für mehrere Broker bieten.

  Supported Brokers / Protocols Notes
Enqueue Apache Kafka, AWS SQS/SNS, AMQP 0.9, Database, MongoDB, Redis etc. Enqueue tries to follow JMS specification as close as possible; although this library does not have very good documentation. Theoretically this library can enable multiple brokers for Magento; with only few deviations in terms of configurations.
Symfony AMQP 0.9, Doctorine, Redis, In Memory, Serializing Messages This is more mature framework with active community, also has better documentation
Symfony via Enqueue Transport Add support for Enqueue Brokers with Symfony GitHub - sroze/messenger-enqueue-transport: Uses Enqueue with Symfony's Messenger component.

  


CO-AUTOR:

Author Photo
Tariq Jawed
System Designer, Architect and Product Strategist at Adobe Inc.

Related Articles