主线程

RecordAccumulator 针对每个分区都会有一个 deque 队列.

他这个 topicpartition 转换到 node 有说法的吧. 他怎么知道这个分区的 leader broker, 应该是去查询过相关的 kafka 集群信息, 然后知道的吧.

new producer, 进入producer 包,创建 producer 对象, producerConfig 是用来进行 producer 配置的,比如 brokerid, 序列化, 自定拦截器类名, 等等其他生产者的配置信息吧.

public static final String BOOTSTRAP_SERVERS_CONFIG = CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;

    /** <code>client.dns.lookup</code> */
    public static final String CLIENT_DNS_LOOKUP_CONFIG = CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG;

    /** <code>metadata.max.age.ms</code> */
    public static final String METADATA_MAX_AGE_CONFIG = CommonClientConfigs.METADATA_MAX_AGE_CONFIG;
    private static final String METADATA_MAX_AGE_DOC = CommonClientConfigs.METADATA_MAX_AGE_DOC;

    /** <code>metadata.max.idle.ms</code> */
    public static final String METADATA_MAX_IDLE_CONFIG = "metadata.max.idle.ms";
    private static final String METADATA_MAX_IDLE_DOC =
            "Controls how long the producer will cache metadata for a topic that's idle. If the elapsed " +
            "time since a topic was last produced to exceeds the metadata idle duration, then the topic's " +
            "metadata is forgotten and the next access to it will force a metadata fetch request.";

并且对于 producerConfig 还进行了配置默认的初始化工作: (先执行静态代码块—> 父类构造方法—> 子类构造方法)

Untitled

Untitled

Untitled

Untitled

definition 参数是已经初始化好的70个配置变量.

originals 是我们自己定义的配置, 就是那些 brokerId, 编码解码器等

然后接着

common—config包下:

AbstractConfig