ADOBE COMMERCE

Adobe Commerce & Queue Systems – integration review

Alex Lyzun
Tariq Jawed

Photo by Levi Jones on Unsplash Adobe Commerce Queuing Systems Integration Review

Adobe Commerce (Magento) has a great framework, which is available since version 2.1.0 called “Magento Message Queue Framework (MMQF)”. Since it's first release it has drasticly improved with the result that a lot of modules and integrations today depend on using MMQF as part of their functionality.

As of today Magento supports 2 different queuing systems:

The main question that is being rased with us on a frequent basis is “which other queuing system are going to be supported by Magento and when?”

The goal of this article is to have a closer look at other existing queuing solutions and make an educated guess on the candidates that can easily integrate with Magento and therefor possibly be used as part of a Magento core implementation in near future.

Message Queue Processing in Adobe Commerce

A simple explanation how message processing (based on RabbitMQ) works in Magento is illustrated in this diagram.

Message Queue processing in Magento using RabbitMQ Source: Adobe Commerce Developer Guides

Basis for Queue implementations on MMQF is

\Magento\Framework\MessageQueue\QueueInterface. 

Both default Magento implementations (DB and RabbitMQ) are implementing this interface.

Let’s take a deeper look on it based on a flow.

This flow represents current Queue implementation, and as we see each service utilises its own functionality, but both of them are using the Queue Interface:

magento_queue_interface Magento Queue Interface

Queue Interface

The table below describes all the methods expected to be implemented by the specific Queue provider and the intention of what functionality is expected from these methods.

Method
Purpose / Description
dequeue()
Get a single message from the queue
acknowledge()
Acknowledge message delivery
subscribe()
Wait for messages and dispatch them, this is based on pub/sub mechanism, consumes the messages through callbacks, until connection is closed
reject()
Reject message, messages gets returned to the queue
push()
Push message to queue directly without using exchange; it uses publish behind the scenes

In the following it is shown that each implementation realizes its own logic, which depends on the queuing system used:

On the other hand, we have consumers that fetch the message from the queue and process it according to the configuration.

Queuing Landscape

Let's take a look at what other queuing technologies exist and if we can use them in Magento in the near future. Within messaging technologies, we can also divide them into the following categories based on different high-level use cases.

Use Case Type
Objectives
Portable systems or Standard based protocols
Either these systems follow industry standard protocols, or the projects are portable like Kafka
High throughput & data streaming
Streaming for near real-time use cases is the key here, or very high throughput
Serverless & rule-based event streaming / routing
These are modern systems built with intelligent routing in mind, they also support serverless functions in their ecosystem like Adobe I/O Runtime and Adobe I/O Events
Interservice communication or Microservices
Third party connectivity, extensibility and message delivery
Third party connectivity, extensibility and message delivery
Suitable for large organizations, usually developed internally for wide scale use cases; they encapsulate modern extensibility requirements like out of process extension functions or in-process extensibility. They may also include message delivery via web-hooks.

Queuing technologies landscape for Adobe Commerce Source: Github User Content

Queuing Technologies – Functional Overview

Platform
Pub/Sub Mechanism
Connectivity
Messge Ordering / FIFO
High Throughput Data Streaming
Rule Based Filter / Routing
AWS EventBridge

*Available

\

(limited to AWS Targets)

*HTTPS (for publishers only)
Available
AWS MQ
Available
JMS, NMS, AMQP 1.0, STOMP, MQTT, WebSocket
*Available
AWS SQS
HTTPS
*Available
AWS Kinesis
Available
HTTP/2 Persistent and HTTP REST
*Available
Available
Apache Kafka
Available
TCP Socket, Kafka Connect, Kafka Streams Apps
*Available
Available
Azure Service Bus
Available
AMQP 1.0 and REST
*Available
Adobe I/O

*Available

\

(limited to Adobe Event Providers)

*HTTP Webhooks (for subscribers only, designed for integration with Adobe SaaS solutions)
Available

*Available with some limitations
**Other Aspects to consider (consumer groups, batching, multi-tenant seggregation, message encryption/security, aggregation, counting, scheduling, dead-letter queue)

Integration Options with MMQF

Since the basis for all queueing implementations in Magento is MMQF and QueueInterface, we will try to take a look at whether existing technologies implement the existing interface and can be easily integrated into the existing Magento architecture.

AWS EventBridge

AWS EventBridge is a serverless event bus that facilitates the receipt of data from your application and from third-party providers to AWS Services. Currently, it appears that the destinations are specifically AWS Services. These destinations are defined via specific rules.

More information can be found here: https://docs.aws.amazon.com/eventbridge/index.html

And more about available targets can be found here: https://docs.aws.amazon.com/eventbridge/latest/APIReference/API_PutTargets.html

But let's see if we can leverage AWS integration with the MMQF framework:

Method
Evaluation
Implementation Readiness
dequeue()
Not Possible or N/A

Multiple

Targets

can be set, to receive the messages when they are available asynchronously. There is no concept of fetching the message from the Event Bus on-demand, its more of a serverless architecture.

acknowledge()
Not Possible or N/A
There is no need to acknowledge the message, AWS internally makesure that the Target receives the message.
subscribe()
Not Possible or N/A
AWS related Targets can be set or subscribed for the EventBus based on the Rules; but we cannot set PHP Callback as Functions.
reject()
Not Possible or N/A
The concept is not available or used.
push()
Available
PutEvents or PutPartnerEvents functions can be used for this purpose.

AWS MQ

AWS MQ is a message broker based on the popular Apache ActiveMQ. It supports multiple protocols for connectivity, for example, AMQP, JMS, STOMP, NMS, MQTT, and WebSocket.

Most of the features are available because Magento also uses the AMQP protocol with RabbitMQ, but the protocol version is different, RabbitMQ uses 0.9 and Amazon MQ uses AMQP 1.0; therefore, any migration would require porting the queues from RabbitMQ to AWS MQ.

You can read more about this here: https://aws.amazon.com/amazon-mq/

In this case, everything looks much better. The reason is again that MMQF already uses AMQP

Method
Evaluation
Implementation Readiness
dequeue()
Available
receive()
acknowledge()
Available
accept() / release()
subscribe()
*Workaround
Long Polling might need to be implemented, unless we find a good library that supports AMQP 1.0 in PHP; Java has full support though.
reject()
Available
reject()
push()
Available
sendMessage(Message, Destination)

AWS SQS

AWS SQS is a distributed and fault-tolerant queuing technology that provides point-to-point connectivity. It can be used with SNS to add a publishing and subscription mechanism. A single message is replicated on multiple SQS servers.

More information: https://aws.amazon.com/sqs/

SQS, as we can see, can also be the next queuing system in Magento, although it requires some effort to be implemented in MMQF

Method
Evaluation
Implementation Readiness
dequeue()
Available
ReceiveMessage() - possibility with many available options for instance long & short polling.
acknowledge()
Possibility

DeleteMessage() for positive acknowledge, by default message locked for

Visibility Timeout

period for other consumers.

subscribe()
Workaround
ReceiveMessage() - Using polling based mechanism, it can be implemented; but not exactly as true callback mechanism.
reject()
Possibility
The messages are auto visible again for consumption, if explicit DeleteMessage() is not called before timeout, as explained above.
push()
Available
SendMessage()

AWS Kinesis

WS Kinesis is a streaming-based distributed messaging technology; it uses the publish/subscribe mechanism for loose coupling between senders and receivers. It is designed for extremely high throughput for real-time applications.

Streaming and the concept of stream itself are the central idea behind Kinesis. It is very similar to Apache Kafka, with some differences. Kinesis is also suitable for implementing event sourcing and CQRS patterns, which are often used in microservices architectures, as it provides immediate support for high-throughput messaging and publish/subscribe mechanisms.

Read more: https://aws.amazon.com/kinesis/

Kinesis also looks quite optimistic:

Method
Evaluation
Implementation Readiness
dequeue()
Possibility
getRecords() - ShardIterator needs to be managed behind the scenes. Stream can be Queue name. It can use getShardIterator(), before the call.
acknowledge()
Possibility
Need to maintain the ShardIterator & SequenceNumber associated with it, using getShardIterator(). We can save NextShardIterator from getRecords() to acknowledge the message.
subscribe()
Workaround

Workaround - batch reads with long polling can be implemented, example

Long Polling Subscribe Mechanism in AWS Kinesis

reject()
Possibility
If we don't move the ShardIterator to NextShardIterator, we are pretty much staying on the same message.
push()
Possibility
Since you will have to provide stream, data & partition; we need to have some strategy to for partition selection; and need to maintain these values for Consumers.

Apache Kafka

Apache Kafka is a widely used open source platform for stream processing and message passing. It is decentralized, replicated, and resilient (or fault-tolerant) and can achieve very high throughput.

The topic and publish/subscribe mechanism is the core of Kafka. Effective for implementing event sourcing and CQRS patterns commonly used in microservices architectures. Kafka is also used for a variety of streaming use cases that require near real-time processing of data sets.

More information: https://kafka.apache.org/

Kafka seems to be the more optimistic and valuable solution at the moment:

Method
Evaluation
Implementation Readiness
dequeue()
Possibility

Initiate

poll () or consume()

. If there are records available, the call will immediately returns, otherwise it will wait for specified timeout which can be passed as parameter.

acknowledge()
Possibility
There are several ways to commit the offset, which indicates that a particular consumer has consumed those messages. The way you call commit API controls the delivery semantics.
subscribe()
Possibility
There are multiple ways in which the subscribtion mechanism can be implemented, the default Kafka subscribtion is telling Kafka which topics a consumer is interested in. But we can also subscribe a callback function; and we can use Kafka Stream API to receive messages in near realtime.
reject()
Possibility
If we don't auto-commit or manually commit the offset, then we are not moving the needle.
push()
Possibility
Since you will have to provide topic, data & partition; we need to have some strategy to for partition selection; and need to maintain these values for Consumers.

Azure Service Bus

Microsoft Azure Service Bus is a fully managed message broker for enterprise integration. It supports familiar concepts such as queues, topics, rules/filters, and more.

Azure Service Bus Schema Source: Github User Content

Azure Service Bus supports AMQP 1.0 and some languages, PHP support is again limited for the protocol.

Main page: https://azure.microsoft.com/en-us/services/service-bus/#overview

Since Azure also uses the AMQP protocol, it also looks quite optimistic. And the fact that Magento Cloud will also use Microsoft Azure makes the use of ASB even more likely.

Method
Evaluation
Implementation Readiness
dequeue()
Available
receive()
acknowledge()
Available
accept() / release()
subscribe()
*Workaround
Long Polling might need to be implemented, unless we find a good library that supports AMQP 1.0 for PHP; Java has full support for required features.
reject()
Available
reject(errorCondition, errorDescription)
push()
Available
sendMessage(message, destination)

Adobe I/O

Adobe I/O is a serverless, event-driven platform that lets you quickly deploy custom functions/code to the cloud without server setup. These functions are executed through HTTP requests or Adobe I/O events. These events can be orchestrated with sequences and compositions. The solution is based on the Apache OpenWhisk framework.

Events are triggered by event providers within Adobe Services, for example, Creative Cloud Assets, Adobe Experience Manager, and Adobe Analytics. To receive events for your application, you must register a webhook (URL endpoint) that specifies which event types it wants to receive from which event providers; Adobe sends events to your webhook via HTTP POST messages.

Home page: https://www.adobe.io/

In the current Magento implementation, it is not possible to use Adobe I/O. Therefore, an entirely new service needs to be developed to support it.

Method
Evaluation
Implementation Readiness
dequeue()
Not Possible or N/A
There is not a concept of explicit fetching of event, rather you define a trigger/event and the actions associated with it.
acknowledge()
Not Possible or N/A
This concept is not used, the architecture is funadementally different
subscribe()
Not Possible or N/A
A PHP callback function is not possible, although a custom webhook (http endpoint) can be configured to be triggered for a particular Event.
reject()
Not Possible or N/A
This concept is not used, the architecture is funadementally different
push()
Not Possible or N/A
Events are triggered by Adobe SaaS Services in the Adobe Cloud as discussed above.

Evaluation and Conclusion

If we compare all possible solutions, the following picture emerges:

Method
(1) AWS EventBridge
(2) AWS MQ
(3) AWS SQS
(4) AWS Kinesis
(5) Apache Kafka
(6) Azure Service Bus
(8) Adobe I/O
dequeue()
Not Possible or N/A
Available
Available
Possibility
Possibility
Available
Not Possible or N/A
acknowledge()
Not Possible or N/A
Available
Possibility
Possibility
Possibility
Available
Not Possible or N/A
subscribe()
Not Possible or N/A
*Workaround
*Workaround
*Workaround
Possibility
*Workaround
Not Possible or N/A
reject()
Not Possible or N/A
Available
Possibility
Possibility
Possibility
Available
Not Possible or N/A
push()
Available
Available
Available
Possibility
Possibility
Available
Not Possible or N/A

*Workaround - The function may be available, but full support for PHP (library) is not available.

As we can see, Apache Kafka is currently the winner of this research. Kafka is a very popular system that is performant and covers all the requirements we have with the growing complexity of Magento. Also in recent months, Kafka is one of the most discussed technologies as the next service Magento could use for asynchronous communication

However, integrators and developers can definitely look towards AWS MQ, AWS SQS, AWS Kinesis and Azure Service Bus integrations, as integrating these services does not require too much effort.

For these systems, one of the main problems is that AMQP 1.0 is not fully supported by PHP, but there are PHP Enqueue and Symfony libraries that provide an abstraction layer across multiple brokers.

Supported Brokers / Protocols
Notes
Enqueue
Apache Kafka, AWS SQS/SNS, AMQP 0.9, Database, MongoDB, Redis etc.
Enqueue tries to follow JMS specification as close as possible; although this library does not have very good documentation. Theoretically this library can enable multiple brokers for Magento; with only few deviations in terms of configurations.
Symfony
AMQP 0.9, Doctorine, Redis, In Memory, Serializing Messages
This is more mature framework with active community, also has better documentation
Symfony via Enqueue Transport
Add support for Enqueue Brokers with Symfony
GitHub - sroze/messenger-enqueue-transport: Uses Enqueue with Symfony's Messenger component.

In any case, as Magento service isolation approach insists on modules independence and extensibility, we must always be careful in the future direction of Magento asynchronous communication development. Because interfaces and architecture we build/extend now must be easily reusable and extensible in the future.

Photo by Levi Jones on Unsplash

CO-AUTHOR: