• 2016年道路交通安全法执法检查问卷调查 2019-07-20
  • 澳前外长:没证据显示中国对澳进行政治干涉! 2019-07-20
  • 奔驰R级优惠10.7万元 少量现车火热促销 2019-07-10
  • 侯晓春会见岳剑利一行 2019-07-10
  • 辽宁:电商成为精准扶贫的“利器” 2019-07-04
  • 人人都能享用绿豆汤吗 关于绿豆汤的禁忌你得知道 2019-07-04
  • 早晨空腹饮水有利健康?正确饮水你需要知道这些 2019-07-01
  • 微博大数据:海信强势领跑世界杯“中国赞助队” 2019-06-29
  • 新疆各族群众体验端午传统文化 2019-06-28
  • 感恩消防员 “小志”成获救婴儿新小名(图) 2019-06-28
  • 5G标准出炉!与4G有啥不一样? 或1秒内下载1G电影 2019-06-26
  • 【克山天气】最新克山今天天气,实时提供克山气温、空气质量、24小时天气预报、生活指数查询 2019-06-26
  • 不管怎么讲幸福,老百姓的愿望并不奢望,就是看病不难,不贵,不需要没钱就从手术台赶 下来,只能住医院过道,房子并不要太大,能避风挡雨,子女并不要万贯家财,能有一份 2019-06-24
  • 南岸开展“幸福邻里 粽情端午”志愿服务活动 2019-06-24
  • 一语惊坛(5月9日):推动中朝友谊,造福两国人民。 2019-06-23
  • kafka源码走读 - Producer send  

    青海快三中奖号码 www.yhxn.net 2019-05-30 10:44 发布

    2060 0 0

      发送流程
      获取meta信息
      key, value序列化
      计算发到哪个partition(如果key不为空, 则hash the keyBytes to choose a partition;否则随机一个数 n % numPrtitions,之后 (n++) % numPrtitions)
      消息入队列,返回future(放入之后就返回,所以是异步,但是实际上可能还没有发送)
      sender线程从队列中获取消息batchs(批量), 然后sendProduceRequests(batches)
      最终通过NIO模式发送(selector, bytebuffer, channel)
      源码分析
      /**
      * Implementation of asynchronously send a record to a topic.
      */
      private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
      ...
      int partition = partition(record, serializedKey, serializedValue, cluster);
      int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);
      ensureValidRecordSize(serializedSize);
      tp = new TopicPartition(record.topic(), partition);
      long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
      log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
      // producer callback will make sure to call both 'callback' and interceptor callback
      Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp);
      RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs);
      if (result.batchIsFull || result.newBatchCreated) {
      log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
      this.sender.wakeup();
      }
      return result.future;
      }
      RecordAccumulator使用双端队列存储RecordBatch,如果队列为空就新建一个RecordBatch;如果队列不为空(获取最后面的recordBatch),且recordBatch还有空间存放当前消息(每个recordBatch的默认大小为16M)。
      /**
      * RecordAccumulator tryAppend
      * If `RecordBatch.tryAppend` fails (i.e. the record batch is full), close its memory records to release temporary
      * resources (like compression streams buffers).
      */
      private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, Deque<RecordBatch> deque) {
      RecordBatch last = deque.peekLast();
      if (last != null) {
      FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds());
      if (future == null)
      last.close();
      else
      return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false);
      }
      return null;
      }
      /**
      * RecordBatch tryAppend
      * Append the record to the current record set and return the relative offset within that record set
      *
      * @return The RecordSend corresponding to this record or null if there isn't sufficient room.
      */
      public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, long now) {
      if (!recordsBuilder.hasRoomFor(key, value)) {
      return null;
      } else {
      long checksum = this.recordsBuilder.append(timestamp, key, value);
      this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value));
      this.lastAppendTime = now;
      FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
      timestamp, checksum,
      key == null ? -1 : key.length,
      value == null ? -1 : value.length);
      if (callback != null)
      thunks.add(new Thunk(callback, future));
      this.recordCount++;
      return future;
      }
      }
      Sender线程(run方法)从队列中获取batches,然后将batches封装成ClientRequest进行发送到channel。
      Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster,
      result.readyNodes,
      this.maxRequestSize,
      now);
      /**
      * Create a produce request from the given record batches
      */
      private void sendProduceRequest(long now, int destination, short acks, int timeout, List<RecordBatch> batches) {
      Map<TopicPartition, MemoryRecords> produceRecordsByPartition = new HashMap<>(batches.size());
      final Map<TopicPartition, RecordBatch> recordsByPartition = new HashMap<>(batches.size());
      for (RecordBatch batch : batches) {
      TopicPartition tp = batch.topicPartition;
      produceRecordsByPartition.put(tp, batch.records());
      recordsByPartition.put(tp, batch);
      }
      ProduceRequest.Builder requestBuilder =
      new ProduceRequest.Builder(acks, timeout, produceRecordsByPartition);
      RequestCompletionHandler callback = new RequestCompletionHandler() {
      public void onComplete(ClientResponse response) {
      handleProduceResponse(response, recordsByPartition, time.milliseconds());
      }
      };
      String nodeId = Integer.toString(destination);
      ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0, callback);
      client.send(clientRequest, now);
      log.trace("Sent produce request to {}: {}", nodeId, requestBuilder);
      }
      NetworkClient将request封装成Send,并且保存到inFlightRequests(’在空中的request’)列表中;然后使用selector注册事件,对应的channel进行处理。
      /**
      * NetworkClient doSend
      **/
      private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now) {
      ...
      Send send = request.toSend(nodeId, header);
      InFlightRequest inFlightRequest = new InFlightRequest(
      header,
      clientRequest.createdTimeMs(),
      clientRequest.destination(),
      clientRequest.callback(),
      clientRequest.expectResponse(),
      isInternalRequest,
      send,
      now);
      this.inFlightRequests.add(inFlightRequest);
      selector.send(inFlightRequest.send);
      }
      //发送成功的request保存到completedReceives中
      private void addToCompletedReceives(KafkaChannel channel, Deque<NetworkReceive> stagedDeque) {
      NetworkReceive networkReceive = stagedDeque.poll();
      this.completedReceives.add(networkReceive);
      this.sensors.recordBytesReceived(channel.id(), networkReceive.payload().limit());
      }
      NetworkClient的poll方法中会处理所有的responses
      /**
      * Do actual reads and writes to sockets.
      *
      * @param timeout The maximum amount of time to wait (in ms) for responses if there are none immediately,
      * must be non-negative. The actual timeout will be the minimum of timeout, request timeout and
      * metadata timeout
      * @param now The current time in milliseconds
      * @return The list of responses received
      */
      @Override
      public List<ClientResponse> poll(long timeout, long now) {
      long metadataTimeout = metadataUpdater.maybeUpdate(now);
      try {
      this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
      } catch (IOException e) {
      log.error("Unexpected error during I/O", e);
      }
      // process completed actions
      long updatedNow = this.time.milliseconds();
      List<ClientResponse> responses = new ArrayList<>();
      handleAbortedSends(responses);
      handleCompletedSends(responses, updatedNow);
      handleCompletedReceives(responses, updatedNow);
      handleDisconnections(responses, updatedNow);
      handleConnections();
      handleInitiateApiVersionRequests(updatedNow);
      handleTimedOutRequests(responses, updatedNow);
      // invoke callbacks
      for (ClientResponse response : responses) {
      try {
      response.onComplete();
      } catch (Exception e) {
      log.error("Uncaught error in request completion:", e);
      }
      }
      return responses;
      }
      那么发送消息的时候是如何保证顺序发送的?答案是使用guaranteeMessageOrder(等于1就是有序的)。
      kafka源码走读 - Producer send
      RecordAccumulator使用Set集合来存储当前正在发送的TopicPartitions, 如果集合中存在该topicPartition,那么此时就不能发送该topicPartition对应的batches;直到completeBatchs时才移除该topicPartition。
      //Sender run
      void run(long now) {
      ...
      Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster,
      result.readyNodes,
      this.maxRequestSize,
      now);
      if (guaranteeMessageOrder) {
      // Mute all the partitions drained
      for (List<RecordBatch> batchList : batches.values()) {
      for (RecordBatch batch : batchList)
      this.accumulator.mutePartition(batch.topicPartition);
      }
      }
      }
      /**
      * Sender completeBatch
      * Complete or retry the given batch of records.
      *
      * @param batch The record batch
      * @param response The produce response
      * @param correlationId The correlation id for the request
      * @param now The current POSIX timestamp in milliseconds
      */
      private void completeBatch(RecordBatch batch, ProduceResponse.PartitionResponse response, long correlationId,
      long now) {
      ...
      // Unmute the completed partition.
      if (guaranteeMessageOrder)
      this.accumulator.unmutePartition(batch.topicPartition);
      }
      //RecordAccumulator
      public void mutePartition(TopicPartition tp) {
      muted.add(tp);
      }
      public void unmutePartition(TopicPartition tp) {
      muted.remove(tp);
      }


    *滑块验证:
    您需要登录后才可以回帖 登录 | 网贷社区—注册

    本版积分规则

    潭梦落花潭梦落花
    优秀作品:7 作者粉丝:0 关注数量:0 拥有金豆:0 荣誉值数:0
    • 售后服务
    • 关注我们
    • 社区新手
    关闭

    站长推荐上一条 /1 下一条

    快速回复 青海快三中奖号码 返回列表
  • 2016年道路交通安全法执法检查问卷调查 2019-07-20
  • 澳前外长:没证据显示中国对澳进行政治干涉! 2019-07-20
  • 奔驰R级优惠10.7万元 少量现车火热促销 2019-07-10
  • 侯晓春会见岳剑利一行 2019-07-10
  • 辽宁:电商成为精准扶贫的“利器” 2019-07-04
  • 人人都能享用绿豆汤吗 关于绿豆汤的禁忌你得知道 2019-07-04
  • 早晨空腹饮水有利健康?正确饮水你需要知道这些 2019-07-01
  • 微博大数据:海信强势领跑世界杯“中国赞助队” 2019-06-29
  • 新疆各族群众体验端午传统文化 2019-06-28
  • 感恩消防员 “小志”成获救婴儿新小名(图) 2019-06-28
  • 5G标准出炉!与4G有啥不一样? 或1秒内下载1G电影 2019-06-26
  • 【克山天气】最新克山今天天气,实时提供克山气温、空气质量、24小时天气预报、生活指数查询 2019-06-26
  • 不管怎么讲幸福,老百姓的愿望并不奢望,就是看病不难,不贵,不需要没钱就从手术台赶 下来,只能住医院过道,房子并不要太大,能避风挡雨,子女并不要万贯家财,能有一份 2019-06-24
  • 南岸开展“幸福邻里 粽情端午”志愿服务活动 2019-06-24
  • 一语惊坛(5月9日):推动中朝友谊,造福两国人民。 2019-06-23
  • 七乐彩走势图最近两百期 多乐彩 吉林快三中奖几率 新疆十一选五助手下载 创业微信群名称大全 挪用公款买彩票 博彩老头排列5预测 足彩红人馆 海南飞鱼彩票有假吗 广东36选7开奖结果 竞彩足球比分中奖 福建快3走势 好运彩3网址 hk赛马会免费资料 甘肃快3号码一览表