import pika
import uuid
# ---------- НАСТРОЙКИ ----------
rabbit_host = ""
rabbit_port = 5672
rabbit_user = "user"
rabbit_password = ""
rabbit_vhost = "/"
source_queue = "GWPC.INPUT.DEAD.LETTER"
target_queue = "GWPC.INPUT"
buffer_queue = f"TMP_BUFFER_{uuid.uuid4().hex}"
# список нужных CorrelationId из headers
wanted_ids = {
"0bfe129b-55d6-4ca9-89ca-acf6b338971c"
}
# ---------- ПОДКЛЮЧЕНИЕ ----------
credentials = pika.PlainCredentials(rabbit_user, rabbit_password)
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host=rabbit_host,
port=rabbit_port,
virtual_host=rabbit_vhost,
credentials=credentials
)
)
channel = connection.channel()
# временная очередь для неподходящих сообщений
channel.queue_declare(
queue=buffer_queue,
durable=False,
auto_delete=True
)
processed = 0
moved = 0
returned = 0
print("Start scanning source queue...")
# ---------- ЭТАП 1: ПРОСМОТР И РАЗБОР ----------
while True:
method, properties, body = channel.basic_get(
queue=source_queue,
auto_ack=False
)
if method is None:
break
processed += 1
headers = properties.headers if properties and properties.headers else {}
correlation_id = headers.get("CorrelationId")
if correlation_id in wanted_ids:
# нашли нужное сообщение -> перекладываем в target_queue
channel.basic_publish(
exchange="",
routing_key=target_queue,
body=body,
properties=properties
)
channel.basic_ack(method.delivery_tag)
moved += 1
else:
# не подошло -> временно складываем в buffer_queue
channel.basic_publish(
exchange="",
routing_key=buffer_queue,
body=body,
properties=properties
)
channel.basic_ack(method.delivery_tag)
if processed % 1000 == 0:
print(f"processed={processed}, moved={moved}")
print("Filtering done")
print("Restoring unmatched messages back to source queue...")
# ---------- ЭТАП 2: ВОЗВРАТ НЕПОДОШЕДШИХ ----------
while True:
method, properties, body = channel.basic_get(
queue=buffer_queue,
auto_ack=False
)
if method is None:
break
channel.basic_publish(
exchange="",
routing_key=source_queue,
body=body,
properties=properties
)
channel.basic_ack(method.delivery_tag)
returned += 1
print("Done")
print(f"Processed: {processed}")
print(f"Moved to {target_queue}: {moved}")
print(f"Returned to {source_queue}: {returned}")
connection.close()