掘金 后端 ( ) • 2024-04-01 23:31

Consumer 子组件

Tasks组件

我们直接点进去源码看看,位置是:celery.worker.consumer.tasks.Tasks,对于step相关的子组件,我们直接看它的createorstart方法就好:

class Tasks(bootsteps.StartStopStep):                                
    """Bootstep starting the task message consumer."""               
                                                                     
    requires = (Mingle,)                                             
                                                                     
    def __init__(self, c, **kwargs):                                 
        c.task_consumer = c.qos = None                               
        super(Tasks, self).__init__(c, **kwargs)                     
                                                                     
    def start(self, c):                                              
        """Start task consumer."""                                   
        c.update_strategies()                                        
                                                                     
        qos_global = not c.connection.qos_semantics_matches_spec     
                                                                     
        # set initial prefetch count                                 
        c.connection.default_channel.basic_qos(                      
            0, c.initial_prefetch_count, qos_global,                 
        )                                                            
        # 注意这一行,在这里我们的task_consumber 正式被启动了                                                           
        c.task_consumer = c.app.amqp.TaskConsumer(                   
            c.connection, on_decode_error=c.on_decode_error,         
        )                                                            
                                                                     
        def set_prefetch_count(prefetch_count):                      
            return c.task_consumer.qos(                              
                prefetch_count=prefetch_count,                       
                apply_global=qos_global,                             
            )                                                        
        c.qos = QoS(set_prefetch_count, c.initial_prefetch_count)    

Connection 组件

通过代码我们发现,Connection其实上啥也没干:

class Connection(bootsteps.StartStopStep):
    """Service managing the consumer broker connection."""

    def start(self, c):
        c.connection = c.connect()
        info('Connected to %s', c.connection.as_uri())

这里的c实际上是我们的Consumer类的实例,所以Connection组件实际上的作用是调用Consumer的connect方法来初始化Consumer的变量connection,点进去我们会发现,其实Consumer也是啥都没干,它调用的是Celery的方法初始化connection,好家伙,套娃开始由下往上套了。

    def connect(self):
        """Establish the broker connection used for consuming tasks.

        Retries establishing the connection if the
        :setting:`broker_connection_retry` setting is enabled
        """
        conn = self.connection_for_read(heartbeat=self.amqheartbeat)
        if self.hub:
            conn.transport.register_with_event_loop(conn.connection, self.hub)
        return conn

    def connection_for_read(self, heartbeat=None):
        return self.ensure_connected(
            self.app.connection_for_read(heartbeat=heartbeat))

注意,再次强调一下,app是Celery的实例,是我们初始化的第一层,层级关系发展到现在是:

app(Celery)->w(Worker)->c(Consumer)

在这里我们发现一个有意思的地方是什么,实际上,对于Consumer这样的任务消费者而言,它的链接实际上是一个只读链接,两个字严谨

Events 组件:

class Events(bootsteps.StartStopStep):
    """Service used for sending monitoring events."""

    requires = (Connection,)

    def start(self, c):
        # flush events sent while connection was down.
        prev = self._close(c)
        dis = c.event_dispatcher = c.app.events.Dispatcher(
            c.connection_for_write(),
            hostname=c.hostname,
            enabled=self.send_events,
            groups=self.groups,
            # we currently only buffer events when the event loop is enabled
            # XXX This excludes eventlet/gevent, which should actually buffer.
            buffer_group=['task'] if c.hub else None,
            on_send_buffered=c.on_send_event_buffered if c.hub else None,
        )
        if prev:
            dis.extend_buffer(prev)
            dis.flush()

Events组件的作用和上面两个差不多,但是不一样的是,在Events组件内部,直接就调用了app去初始化出来了一个事件分发器。然后复制给了c.event_dispatcher

具体Events组件的作用,日后再说(手动狗头)

Gossip 组件

八卦组件就厉害了,好家伙,这直接一波反客为主了属于是:

def start(self, c):
   super(Gossip, self).start(c)
   self.dispatcher = c.event_dispatcher

第一个从Consumer初始化自己的变量的组件。

关于这个组件呢,之前提到两点,一点是这个组件只消费worker相关的事件,从Gossip的代码里面印证了这一点:

def get_consumers(self, channel):
  self.register_timer()
  # routing_key='worker.#' 印证了Gossip只会消费来自worker的事件
  ev = self.Receiver(channel, routing_key='worker.#',
                     queue_ttl=self.heartbeat_interval)
  return [Consumer(
    channel,
    queues=[ev.queue],
    on_message=partial(self.on_message, ev.event_from_message),
    no_ack=True
  )]

第二个是说Gossip主要用于leader选举,从这个函数可以印证:

def on_elect_ack(self, event):
    id = event['id']
    try:
        replies = self.consensus_replies[id]
    except KeyError:
        return  # not for us
    alive_workers = set(self.state.alive_workers())
    replies.append(event['hostname'])

    if len(replies) >= len(alive_workers):
        _, leader, topic, action = self.clock.sort_heap(
            self.consensus_requests[id],
        )
        if leader == self.full_hostname:
            info('I won the election %r', id)
            try:
                handler = self.election_handlers[topic]
            except KeyError:
                logger.exception('Unknown election topic %r', topic)
            else:
                handler(action)
        else:
            info('node %s elected for %r', leader, id)
        self.consensus_requests.pop(id, None)
        self.consensus_replies.pop(id, None)

注意这个日志: I won the election,我赢了选举,选举的获胜条件看代码应该说比clock的大小,因为我们看到有一个堆排序的过程:

self.clock.sort_heap

其他关于leader选举的内容,大家可以自己去看看相关的源代码,这里就不做扩展了。百度应该可以搜到很多。

Evloop 组件

这个组件有点特殊,因为别的类都是单独的一个文件,而Evloop 竟然是和 Consumer 放在一个文件下的? 想了想,确实也没有必须放一个模块下的理由,那就只能归因于系统包袱了。

后排的同学醒醒,划重点了,还记得我们上一章发现一个on_task_received, celery接收到消息的回调方法吗?问题出现了?

它是在哪里注册和kombu的回调绑定到一起的?

你既然是个回调方法,那一定一定就是你告诉了Kombu说,嘿,bro,收到消息记得回信。

但是得益于python动态语言的特性,愣是找不着在哪里进行绑定的。于是呢,我在看Evloop代码的时候, 本来我以为,应该就是个小小的组件吧,但是随着我看的不断深入,一个惊天的秘密开始浮现在水面:

class Evloop(bootsteps.StartStopStep):
    """Event loop service.

    Note:
        This is always started last.
    """

    label = 'event loop'
    last = True

    def start(self, c):
        self.patch_all(c)
        c.loop(*c.loop_args())

    def patch_all(self, c):
        c.qos._mutex = DummyLock()

粗看,平平无奇,在看,还是平平无奇,确实,仅从start方法,我们无法获取到什么有效的信息。点进去看看loop这个变量是个啥:

在Consumer最终发现了这段代码,唯一的代码:

if not hasattr(self, 'loop'):
    self.loop = loops.asynloop if hub else loops.synloop

不需要管什么asynloop 还是 synloop, 因为不管是什么loop,一定会涉及到回调函数的绑定的。

你为什么这么确定?

废话,我看过代码才写的啊。

点进去看看:

def synloop(obj, connection, consumer, blueprint, hub, qos,
            heartbeat, clock, hbrate=2.0, **kwargs):
    """Fallback blocking event loop for transports that doesn't support AIO."""
    RUN = bootsteps.RUN
    # 获取到我们的on_task_received对象。
    on_task_received = obj.create_task_handler()
    perform_pending_operations = obj.perform_pending_operations
    if getattr(obj.pool, 'is_green', False):
        _enable_amqheartbeats(obj.timer, connection, rate=hbrate)
        
    # 千万注意这一行,
    consumer.on_message = on_task_received
    # 开始消费了
    consumer.consume()

    obj.on_ready()

    while blueprint.state == RUN and obj.connection:
        state.maybe_shutdown()
        if qos.prev != qos.value:
            qos.update()
        try:
            perform_pending_operations()
            connection.drain_events(timeout=2.0)
        except socket.timeout:
            pass
        except socket.error:
            if blueprint.state == RUN:
                raise

但是这个时候问题又出现了,这consumerobj搞得我有点乱,如果obj是Consumer的实例的话,consumer又是啥?

python意味着我们光看参数是啥也看不出来的,只能回到Events,看下传进去个啥了, 当我感到现场的时候,只发现了这段代码。

def loop_args(self):
    return (self, self.connection, self.task_consumer,
            self.blueprint, self.hub, self.qos, self.amqheartbeat,
            self.app.clock, self.amqheartbeat_rate)

好家伙。破案了,上面的consumer原来是在Tasks组件初始化出来的task_consumer,有点东西。

celery把consumer.on_message事件绑定到了Consumer中的on_task_received,这样当task_consumer收到消息的时候,就会回调我们的on_task_received方法了。

# 千万注意这一行,
consumer.on_message = on_task_received
# 开始消费了
consumer.consume()

Heart 组件:

class Heart(bootsteps.StartStopStep):
  
    requires = (Events,)

    def __init__(self, c,
                 without_heartbeat=False, heartbeat_interval=None, **kwargs):
        self.enabled = not without_heartbeat
        self.heartbeat_interval = heartbeat_interval
        c.heart = None
        super(Heart, self).__init__(c, **kwargs)

    def start(self, c):
        c.heart = heartbeat.Heart(
            c.timer, c.event_dispatcher, self.heartbeat_interval,
        )
        c.heart.start()

    def stop(self, c):
        c.heart = c.heart and c.heart.stop()
    shutdown = stop

本质上是通过timer周期性的发送心跳。默认周期是2s.

TODO: 是发给谁的? 好像是发给ambq服务器的。