)]}'
{"/PATCHSET_LEVEL":[{"author":{"_account_id":11583,"name":"Arnaud Morin","email":"arnaud.morin@gmail.com","username":"arnaudmorin"},"change_message_id":"538e03bad51cf24275f5b6e1a48cdb977d7e3cfa","unresolved":true,"context_lines":[],"source_content_type":"","patch_set":4,"id":"5eaf512c_e81496d4","updated":"2024-12-13 15:34:03.000000000","message":"I will reconsider my last patchset to enable this for both stream AND quorum queues.","commit_id":"5c8d60c34cd1ba1dba036657945b1cef96c2cffc"}],"oslo_messaging/_drivers/impl_rabbit.py":[{"author":{"_account_id":9816,"name":"Takashi Kajinami","email":"kajinamit@oss.nttdata.com","username":"kajinamit"},"change_message_id":"bcbbd4acad253fbb6ea65eb84bed58fcbe01995e","unresolved":true,"context_lines":[{"line_number":588,"context_line":"        If on_cancel callback is not defined, cancel notification will be"},{"line_number":589,"context_line":"        treated as an error and will break connection."},{"line_number":590,"context_line":"        \"\"\""},{"line_number":591,"context_line":"        LOG.warning(\"Basic.cancel received on queue %s (consumer tag %s). \""},{"line_number":592,"context_line":"                    \"Redeclare consumer on same channel.\","},{"line_number":593,"context_line":"                    self.queue_name, tag)"},{"line_number":594,"context_line":""},{"line_number":595,"context_line":"        # NOTE(jcos): When queue is deleted, offset is reset to -1, current"},{"line_number":596,"context_line":"        # tracked offset for this consumer does not exist anymore, we need to"},{"line_number":597,"context_line":"        # reset it."},{"line_number":598,"context_line":"        # Add a random sleep will avoid all stream consumers to redeclare the"},{"line_number":599,"context_line":"        # same queue at same time."},{"line_number":600,"context_line":"        if self.queue_arguments.get(\u0027x-queue-type\u0027) \u003d\u003d \u0027stream\u0027:"},{"line_number":601,"context_line":"            self.reset_stream_offset()"},{"line_number":602,"context_line":"            time.sleep("},{"line_number":603,"context_line":"                random.uniform(0, self._conn.kombu_reconnect_splay))  # nosec"},{"line_number":604,"context_line":""},{"line_number":605,"context_line":"        self.declare(self._conn)"},{"line_number":606,"context_line":"        self._conn.set_qos(self._conn.channel)"},{"line_number":607,"context_line":"        self.consume(self._conn, tag)"},{"line_number":608,"context_line":""},{"line_number":609,"context_line":"    def consume(self, conn, tag):"},{"line_number":610,"context_line":"        \"\"\"Actually declare the consumer on the amqp channel.  This will"}],"source_content_type":"text/x-python","patch_set":2,"id":"52fee4d7_f37f35dc","line":607,"range":{"start_line":591,"start_character":0,"end_line":607,"end_character":37},"updated":"2024-10-17 01:26:47.000000000","message":"The callback may be called regardless of stream queue and I\u0027m concerned with breaking deployment without stream queue. I wonder if we should add the switch to run these logic ONLY WHEN stream queue is enabled.","commit_id":"7951069f23f227bba3eb7409fba8fc6a5152a5df"},{"author":{"_account_id":28627,"name":"Julien","email":"julien.cosmao@ovhcloud.com","username":"jcosmao"},"change_message_id":"2f0c59679bc6371c7643a0208ded5e3a21a31928","unresolved":false,"context_lines":[{"line_number":588,"context_line":"        If on_cancel callback is not defined, cancel notification will be"},{"line_number":589,"context_line":"        treated as an error and will break connection."},{"line_number":590,"context_line":"        \"\"\""},{"line_number":591,"context_line":"        LOG.warning(\"Basic.cancel received on queue %s (consumer tag %s). \""},{"line_number":592,"context_line":"                    \"Redeclare consumer on same channel.\","},{"line_number":593,"context_line":"                    self.queue_name, tag)"},{"line_number":594,"context_line":""},{"line_number":595,"context_line":"        # NOTE(jcos): When queue is deleted, offset is reset to -1, current"},{"line_number":596,"context_line":"        # tracked offset for this consumer does not exist anymore, we need to"},{"line_number":597,"context_line":"        # reset it."},{"line_number":598,"context_line":"        # Add a random sleep will avoid all stream consumers to redeclare the"},{"line_number":599,"context_line":"        # same queue at same time."},{"line_number":600,"context_line":"        if self.queue_arguments.get(\u0027x-queue-type\u0027) \u003d\u003d \u0027stream\u0027:"},{"line_number":601,"context_line":"            self.reset_stream_offset()"},{"line_number":602,"context_line":"            time.sleep("},{"line_number":603,"context_line":"                random.uniform(0, self._conn.kombu_reconnect_splay))  # nosec"},{"line_number":604,"context_line":""},{"line_number":605,"context_line":"        self.declare(self._conn)"},{"line_number":606,"context_line":"        self._conn.set_qos(self._conn.channel)"},{"line_number":607,"context_line":"        self.consume(self._conn, tag)"},{"line_number":608,"context_line":""},{"line_number":609,"context_line":"    def consume(self, conn, tag):"},{"line_number":610,"context_line":"        \"\"\"Actually declare the consumer on the amqp channel.  This will"}],"source_content_type":"text/x-python","patch_set":2,"id":"dbb07218_58461634","line":607,"range":{"start_line":591,"start_character":0,"end_line":607,"end_character":37},"in_reply_to":"1e8738fa_8bdb17d7","updated":"2024-12-03 09:17:35.000000000","message":"It should behave the same on all queue type, but if you prefer executing this callback only on stream queue, it should be defined (or not) in  self.queue.consume(.. on_cancel\u003dxx).\nIf you do it like that, i think quorum queue consumer will never consume back because it basic.cancel, it will just be ignored.","commit_id":"7951069f23f227bba3eb7409fba8fc6a5152a5df"},{"author":{"_account_id":11583,"name":"Arnaud Morin","email":"arnaud.morin@gmail.com","username":"arnaudmorin"},"change_message_id":"9ddf9739f0199883a7ab80c8180a14e4dcfaae89","unresolved":false,"context_lines":[{"line_number":588,"context_line":"        If on_cancel callback is not defined, cancel notification will be"},{"line_number":589,"context_line":"        treated as an error and will break connection."},{"line_number":590,"context_line":"        \"\"\""},{"line_number":591,"context_line":"        LOG.warning(\"Basic.cancel received on queue %s (consumer tag %s). \""},{"line_number":592,"context_line":"                    \"Redeclare consumer on same channel.\","},{"line_number":593,"context_line":"                    self.queue_name, tag)"},{"line_number":594,"context_line":""},{"line_number":595,"context_line":"        # NOTE(jcos): When queue is deleted, offset is reset to -1, current"},{"line_number":596,"context_line":"        # tracked offset for this consumer does not exist anymore, we need to"},{"line_number":597,"context_line":"        # reset it."},{"line_number":598,"context_line":"        # Add a random sleep will avoid all stream consumers to redeclare the"},{"line_number":599,"context_line":"        # same queue at same time."},{"line_number":600,"context_line":"        if self.queue_arguments.get(\u0027x-queue-type\u0027) \u003d\u003d \u0027stream\u0027:"},{"line_number":601,"context_line":"            self.reset_stream_offset()"},{"line_number":602,"context_line":"            time.sleep("},{"line_number":603,"context_line":"                random.uniform(0, self._conn.kombu_reconnect_splay))  # nosec"},{"line_number":604,"context_line":""},{"line_number":605,"context_line":"        self.declare(self._conn)"},{"line_number":606,"context_line":"        self._conn.set_qos(self._conn.channel)"},{"line_number":607,"context_line":"        self.consume(self._conn, tag)"},{"line_number":608,"context_line":""},{"line_number":609,"context_line":"    def consume(self, conn, tag):"},{"line_number":610,"context_line":"        \"\"\"Actually declare the consumer on the amqp channel.  This will"}],"source_content_type":"text/x-python","patch_set":2,"id":"d72c6ec1_d680c779","line":607,"range":{"start_line":591,"start_character":0,"end_line":607,"end_character":37},"in_reply_to":"52fee4d7_f37f35dc","updated":"2024-12-02 19:46:45.000000000","message":"Done","commit_id":"7951069f23f227bba3eb7409fba8fc6a5152a5df"},{"author":{"_account_id":11583,"name":"Arnaud Morin","email":"arnaud.morin@gmail.com","username":"arnaudmorin"},"change_message_id":"7429bb3187c1dd70c83735c67a1dba73c2567ef8","unresolved":false,"context_lines":[{"line_number":588,"context_line":"        If on_cancel callback is not defined, cancel notification will be"},{"line_number":589,"context_line":"        treated as an error and will break connection."},{"line_number":590,"context_line":"        \"\"\""},{"line_number":591,"context_line":"        LOG.warning(\"Basic.cancel received on queue %s (consumer tag %s). \""},{"line_number":592,"context_line":"                    \"Redeclare consumer on same channel.\","},{"line_number":593,"context_line":"                    self.queue_name, tag)"},{"line_number":594,"context_line":""},{"line_number":595,"context_line":"        # NOTE(jcos): When queue is deleted, offset is reset to -1, current"},{"line_number":596,"context_line":"        # tracked offset for this consumer does not exist anymore, we need to"},{"line_number":597,"context_line":"        # reset it."},{"line_number":598,"context_line":"        # Add a random sleep will avoid all stream consumers to redeclare the"},{"line_number":599,"context_line":"        # same queue at same time."},{"line_number":600,"context_line":"        if self.queue_arguments.get(\u0027x-queue-type\u0027) \u003d\u003d \u0027stream\u0027:"},{"line_number":601,"context_line":"            self.reset_stream_offset()"},{"line_number":602,"context_line":"            time.sleep("},{"line_number":603,"context_line":"                random.uniform(0, self._conn.kombu_reconnect_splay))  # nosec"},{"line_number":604,"context_line":""},{"line_number":605,"context_line":"        self.declare(self._conn)"},{"line_number":606,"context_line":"        self._conn.set_qos(self._conn.channel)"},{"line_number":607,"context_line":"        self.consume(self._conn, tag)"},{"line_number":608,"context_line":""},{"line_number":609,"context_line":"    def consume(self, conn, tag):"},{"line_number":610,"context_line":"        \"\"\"Actually declare the consumer on the amqp channel.  This will"}],"source_content_type":"text/x-python","patch_set":2,"id":"1e8738fa_8bdb17d7","line":607,"range":{"start_line":591,"start_character":0,"end_line":607,"end_character":37},"in_reply_to":"d72c6ec1_d680c779","updated":"2024-12-02 19:49:07.000000000","message":"I added all code under the if statement to check if this is a stream queue.\nIt will then only happen under stream condition.\nI added a note about this, so maybe in the future we may switch part of this code outside the if statement.\nDownstream, we have this running on production for both quorum and streams, but I believe it\u0027s not useful for quorum after all.","commit_id":"7951069f23f227bba3eb7409fba8fc6a5152a5df"},{"author":{"_account_id":9816,"name":"Takashi Kajinami","email":"kajinamit@oss.nttdata.com","username":"kajinamit"},"change_message_id":"7abef50c2534eb63d61207ab8aa226781041cae3","unresolved":true,"context_lines":[{"line_number":574,"context_line":""},{"line_number":575,"context_line":"        self._conn \u003d conn"},{"line_number":576,"context_line":""},{"line_number":577,"context_line":"    def _on_cancel_callback(self, tag):"},{"line_number":578,"context_line":"        \"\"\"Callback when Basic.cancel notification is received from server."},{"line_number":579,"context_line":""},{"line_number":580,"context_line":"        RabbitMQ server will emit amqp Basic.cancel to end current consumer in"}],"source_content_type":"text/x-python","patch_set":4,"id":"f664e9aa_4581dee7","line":577,"range":{"start_line":577,"start_character":34,"end_line":577,"end_character":37},"updated":"2024-12-11 15:58:14.000000000","message":"I\u0027ve looked into py-amqp to check how this callback is called but it seems the callback function is called with `msg` which implies a message, not `tag`. Are you sure that the argument is expected ?","commit_id":"5c8d60c34cd1ba1dba036657945b1cef96c2cffc"},{"author":{"_account_id":28627,"name":"Julien","email":"julien.cosmao@ovhcloud.com","username":"jcosmao"},"change_message_id":"f986e75d412b4e3c12fc898fa40dcc235cf10c9b","unresolved":true,"context_lines":[{"line_number":574,"context_line":""},{"line_number":575,"context_line":"        self._conn \u003d conn"},{"line_number":576,"context_line":""},{"line_number":577,"context_line":"    def _on_cancel_callback(self, tag):"},{"line_number":578,"context_line":"        \"\"\"Callback when Basic.cancel notification is received from server."},{"line_number":579,"context_line":""},{"line_number":580,"context_line":"        RabbitMQ server will emit amqp Basic.cancel to end current consumer in"}],"source_content_type":"text/x-python","patch_set":4,"id":"6900b083_a7fcdf71","line":577,"range":{"start_line":577,"start_character":34,"end_line":577,"end_character":37},"in_reply_to":"4912709d_1924d621","updated":"2025-08-27 14:25:22.000000000","message":"consumer_tag (str) declared at consume() is passed to this callback from what i see.\n\nhttps://github.com/celery/py-amqp/blob/main/amqp/channel.py#L1411","commit_id":"5c8d60c34cd1ba1dba036657945b1cef96c2cffc"},{"author":{"_account_id":11583,"name":"Arnaud Morin","email":"arnaud.morin@gmail.com","username":"arnaudmorin"},"change_message_id":"538e03bad51cf24275f5b6e1a48cdb977d7e3cfa","unresolved":true,"context_lines":[{"line_number":574,"context_line":""},{"line_number":575,"context_line":"        self._conn \u003d conn"},{"line_number":576,"context_line":""},{"line_number":577,"context_line":"    def _on_cancel_callback(self, tag):"},{"line_number":578,"context_line":"        \"\"\"Callback when Basic.cancel notification is received from server."},{"line_number":579,"context_line":""},{"line_number":580,"context_line":"        RabbitMQ server will emit amqp Basic.cancel to end current consumer in"}],"source_content_type":"text/x-python","patch_set":4,"id":"4912709d_1924d621","line":577,"range":{"start_line":577,"start_character":34,"end_line":577,"end_character":37},"in_reply_to":"f664e9aa_4581dee7","updated":"2024-12-13 15:34:03.000000000","message":"I will double check that point.\nThanks","commit_id":"5c8d60c34cd1ba1dba036657945b1cef96c2cffc"}]}
