`
qiemengdao
  • 浏览: 272586 次
  • 性别: Icon_minigender_1
  • 来自: 武汉
社区版块
存档分类
最新评论

Cassandra启动过程详解【原创】

阅读更多

Cassandra启动过程详解

这里的分析从CassandraDaemon.java文件开始。

一、配置文件storage-config.xml的读取和log4j的配置文件log4j.property的设置。

配置文件的读取和解析都是在 org.apache.cassandra.config.DatabaseDescriptor 类中完成的,这个类的作用非常简单,就是读取配置文件中各个配置项所定义的值,经过简单的验证,符合条件就将其值赋给 DatabaseDescriptor 的私有静态常量。值得注意的是关于 Keyspace 的解析,按照 ColumnFamily 的配置信息构建成 org.apache.cassandra.config.CFMetaData 对象,最后把这些所有 ColumnFamily 放入 Keyspace 的 HashMap 对象 org.apache.cassandra.config.KSMetaData 中,每个 Keyspace 就是一个 Table。这些信息都是作为基本的元信息,可以通过 DatabaseDescriptor 类直接获取。

二、Keyspace的初始化。


这里主要调用Table.open(tableName)方法创建每个Table的实例。创建 Table 的实例将完成:1)获取该 Table 的元信息 TableMatedate。2)创建改 Table 下每个 ColumnFamily 的存储操作对象 ColumnFamilyStore。3)启动定时程序,检查该 ColumnFamily 的 Memtable 设置的 MemtableFlushAfterMinutes 是否已经过期,过期立即写到磁盘。详细过程可参见我前面关于该方法的详细代码跟踪分析。

一个 Keyspace 对应一个 Table,一个 Table 持有多个 ColumnFamilyStore,而一个 ColumnFamily 对应一个 ColumnFamilyStore。Table 并没有直接持有 ColumnFamily 的引用而是持有 ColumnFamilyStore,这是因为 ColumnFamilyStore 类中不仅定义了对 ColumnFamily 的各种操作而且它还持有 ColumnFamily 在各种状态下数据对象的引用,所以持有了 ColumnFamilyStore 就可以操作任何与 ColumnFamily 相关的数据了。

三、Commitlog日志文件的恢复。

这里调用CmmitLog.recover()方法主要完成这几个操作,发现是否有没有被写到磁盘的数据,恢复这个数据,构建新的日志文件。CommitLog 日志文件的恢复策略是,在头文件中发现没有被序列化的最新的ColumnFamily Id,然后取出这个这个被序列化 RowMutation 对象的起始地址,反序列化成为 RowMutation 对象,后面的操作和新添一条数据的流程是一样的,如果这个 RowMutation 对象中的数据被成功写到磁盘中,那么会在 CommitLog 去掉已经被持久化的 ColumnFamily Id。

四、检查数据文件是否需要压缩

调用CompactionManager.instance.checkAllColumnFamilies()检查CF对应的数据文件是否需要压缩。将相似大小的SStable放到一个bucket中,然后调用submitMinorIfNeeded(cfs)。

五、启动存储服务

这是启动过程最重要的一步,需要启动很多服务。具体步骤有:

5.1)创建StorageMetadata

调用方法SystemTable.initMetadata()创建StorageMetadata。元数据只创建一次,如果元数据已经存在,则直接返回。StorageMetadata 将包含三个关键信息:本节点的 Token、当前 generation 以及 ClusterName。这三个信息被存在 StorageService 类的 属性metadata中(metadata是StorageMetadata类型的对象),以便后面随时调用。

Cassandra 判断如果是第一次启动,Cassandra 将会创建三列分别存储这些信息并将它们存在在系统表的 LocationInfo ColumnFamily 中,key 是“L”。这里的 Token 判断用户是否指定,如果指定了则使用用户指定的,否则随机生成一个 Token,但是这个 Token 有可能在后面被修改;generation=System.currentTimeMillis() / 1000,ClusterName为读取配置文件得到的值。

如果不是第一次启动将会更新这三个值:读取数据文件中的Token信息,generation信息以及ClusterName信息后设置Token值和ClusterName的值,更新generation的值为max(当前时间秒数,old_generation+1)。这里有点要注意的是,如果在后续的过程中更改了配置文件中ClusterName的名字,这会跟数据文件中存储的信息不一致,最终会导致Cassandra无法启动。

5.2)创建所有目录

调用方法DatabaseDescriptor.createAllDirectories()创建所有的目录。包括数据文件目录data/以及日志文件目录commitlog/。同时还为keyspaces创建了数据文件目录的子目录data/system和data/keyspace,…(keyspace为用户定义的keyspace)。当然这个方法早在Table.open()已经调用过了,在这里再次调用可能是为了某些测试需要。

5.3)启动GCInspector.instance.start 服务

主要是统计统计当前系统中资源的使用情况(主要就是内存使用和回收情况),将这个信息记录到日志文件中,这个可以作为系统的监控日志使用。

5.4)启动消息监听服务

这个消息监听服务就是监听整个集群中其它节点发送到本节点的所有消息,Cassandra 会根据每个消息的类型,做出相应的反应。消息监听代码如下:

public void listen(InetAddress localEp) throws IOException {

ServerSocketChannel serverChannel = ServerSocketChannel.open();

final ServerSocket ss = serverChannel.socket();

ss.setReuseAddress(true);

ss.bind(new InetSocketAddress(localEp, DatabaseDescriptor.getStoragePort()));

socketThread = new SocketThread(ss, “ACCEPT-” + localEp);

socketThread.start();

}



这里用到了nio里面的异步IO与连网的部分。监听端口默认为7000。创建一个线程SocketThread用于监听消息。

每接收到一个消息,就创建一个新的线程new IncomingTcpConnection(socket).start()进行消息响应;该线程run方法中主要是对消息进行魔数的验证,以及读取消息头部和消息体等内容,然后将消息内容反序列化的任务MessageDeserializationTask 递交到相应的消息反序列化线程池messageDeserializerExecutor_中。

MessageDeserializationTask反序列化消息内容后调用MessagingService.receive()处理消息。

receive()方法中创建MessageDeliveryTask任务对象,根据消息类型得到相应的stage的线程池对象,如果没有对应的线程池,则使用messageDeserializerExecutor_。

stage线程池执行MessageDeliveryTask任务,该任务主要是根据消息中的Verb,调用相应的VerbHandler.doVerb()方法来完成消息的处理。比如GossipDigestAckVerbHandler.doVerb()用来处理Gossip阶段的ACK消息。

5.5)启动StorageLoadBalancer.instance.startBroadcasting 服务

调用 方法 loadTimer_.schedule(new LoadDisseminator(), 2 * Gossiper.intervalInMillis_, BROADCAST_INTERVAL),定时得到节点负载信息,2个Gossiper心跳后开始,间隔时间为60s。该任务得到节点数据总量(包括所有Data文件、FIlter文件以及Index文件),并将其更新到ApplicationState中,然后就可以通过这个 state 来和其它节点交换信息。这个 load 信息在数据的存储和新节点加入的时候,会有参考价值。

5.6)启动Gossiper服务

在启动Gossiper服务之前,将 StorageService 注册为观察者,一旦节点的某些状态发生变化,而这些状态是 StorageService 感兴趣的,StorageService 的 onChange 方法就会触发。Gossiper 服务就是一个定时程序,它会创建一个EndPointState对象。EndPointState对象持有HeartBeatState 的引用和ApplicationState的一个引用集Map<String, ApplicationState> applicationState_ = new Hashtable<String, ApplicationState>()。对于每个Application对象,EndPointState只保存一个最新的值,所以新值会覆盖旧值。

HeartBeatState 对象记录了当前心跳的 generation 和 version,这个 generation 和前面的 StorageMetadata 存储的 generation 是一致的,在节点每次启动的时候更新;而version 是从 0 开始的,每次更新加1;每个节点有一个HeartBeatState对象与之关联。

ApplicationState的一个引用集Map<String, ApplicationState> applicationState_则是记录一些状态信息,比如前面 startBroadcasting()过程中记录节点负载情况。ApplicationState对象包含state值和version值。比如表示节点负载的状态信息可能表示形式为(5.2, 45),意思就是在version为45的时候负载为5.2;相似地,节点启动的状态信息可能表示形式为(bLpassF3XD8Kyks, 56),前面的值表示启动的token,后面的56是version值。是需要注意的是创建ApplicationState对象时,version值加1。

还有一个结构需要注意,就是Gossiper中的Map<InetAddress, EndPointState> endPointStateMap__,它保存了它监听到的所有节点的EndPointState信息,包括它自己的。

Gossiper这个定时程序每隔一秒钟随机向定义的节点发送一个消息,而这个消息是保持集群中节点状态一致的唯一途径(具体过程后面详述)。

5.7)判断启动模式。

启动模式跟配置文件中的AutoBootstrap这一项相关。那这个配置项与 Token 和负载又有何关联性?其实表面上看起来这个配置项是当这个节点启动时是否自动加入集群。但是,当你设置成 False 时它是不是就不加入集群呢?显然不是,这还要看你有没有配置 seeds,如果你配置了其它 seed,那么它仍然会去加入集群。

那么到底有何区别,通过分析其启动代码发现,这个配置项不仅跟 seed 配置项有关而且和 Cassandra 是否是第一次启动也有关。Cassandra 的启动规则大慨如下:

1)当 AutoBootstrap 设为 FALSE,

第一次启动时 Cassandra 会设置系统表中 key为Bootstrap,CF为STATUS_CF的Column为B的值为TRUE,以表示这是由系统自动设置的,其实这个只是作为一个标志来判断你以后的启动情况。 标记启动方式,主要是防止用户再修改 AutoBootstrap 的启动模式。
调用TokenMetadata.updateToken()更新token。
加入ApplicationState对象到EPS的map<key, AS>中。key为MOVE,state为NORMAL:Token 。
设置模式为“Normal”。

 

2)当 AutoBootstrap 设为 TRUE,第一次启动,Cassandra 会判断当前节点配置在seeds 中,Cassandra 的启动情况和 1 是一样的。


3)当 AutoBootstrap 设为 TRUE,第一次启动,并且没有配置为 seed,Cassandra 将会有一个漫长的启动过程,当然这个时间的长短和你的当前的集群的数据量有很大的关系。

设置模式为“Joining: getting load information”。
等待90s(BROADCAST_INTERVAL+RING_DELAY)为了节点获得所有其他节点的负载信息。
如果tokenMetadata已经包含了本节点ip,则抛出异常。
设置模式为“Joining: getting bootstrap token”。
如果在配置文件中指定了InitialToken,则返回这个InitialToken。否则调用getBalancedToken(metadata, load)。
getBalancedToken()方法首先调用getBootstrapSource()方法得到负载最大的节点的ip地址,(如果没有任何节点的负载信息,则抛出运行时异常;这个排序过程是先以落入该节点token范围内的正处于bootstrap的节点数目排序,数目越多优先级越低。如果落入其中的bootstrap节点数目相同,再以负载大小排序)。然后向这个节点发送消息,获取其一半 key 范围所对应的 Token,这个 Token 是前半部分值(如果key的数目<3,则返回一个随机Token值)。

public static Token getBalancedToken(TokenMetadata metadata, Map<InetAddress, Double> load) {

InetAddress maxEndpoint = getBootstrapSource(metadata, load);

Token<?> t = getBootstrapTokenFrom(maxEndpoint);

return t;

}



调用startBootstrap()方法。

privatevoid startBootstrap(Token token) throws IOException {
isBootstrapMode = true; //设置isBootstrapMode。

SystemTable.updateToken(token); //更新本节点的Token值。

//加入状态信息。(MOVE, appstate(BOOT:Token))

Gossiper.instance.addLocalApplicationState(MOVE_STATE, new ApplicationState(STATE_BOOTSTRAPPING + Delimiter

partitioner_.getTokenFactory().toString(token)));

//设置模式为等待range重新分布

setMode(“Joining: sleeping “ + RING_DELAY + ” for pending range setup”, true);

try {

Thread.sleep(RING_DELAY);

} catch (InterruptedException e) {

thrownew AssertionError(e);

}

//设置模式为正在启动

setMode(“Bootstrapping”, true);

//开始启动

new BootStrapper(FBUtilities.getLocalAddress(), token, tokenMetadata_).startBootstrap(); // handles

}



调用 BootStrapper.startBootstrap()方法完成启动,发送STREAM_REQUEST消息,请求数据。

publicvoid startBootstrap() throws IOException {
for (String table : DatabaseDescriptor.getNonSystemTables()) {

Multimap<Range, InetAddress> rangesWithSourceTarget = getRangesWithSources(table);

/* Send messages to respective folks to stream data over to me*/

for (Map.Entry<InetAddress, Collection<Range>> entry : getWorkMap(rangesWithSourceTarget).asMap().entrySet()) {

InetAddress source = entry.getKey();

StorageService.instance.addBootstrapSource(source, table);

StreamIn.requestRanges(source, table, entry.getValue());

}

}

}


  1. 首先调用getRangesWithSources(table)得到该节点负责的ranges所对应的节点集合。
  2. 对于上一步得到的节点集合,移除掉不存活的节点,然后将活着的节点加入到Multimap<InetAddress, Range> sources 集合中。通过循环对每一个活着的节点,将其加入到bootstrapSet中(即作为bootstrap source),然后调用StreamIn.requestRanges()请求该节点对应范围内的数据。
  3. requestRanges(ip, tableName, ranges)方法构建流请求消息StreamRequestMessage,然后调用MessagingService.instance.sendOneWay(message, source)发送消息。
  4. 流请求消息的序列化格式为

     5. 流数据需要经历STREAM_REQUEST, STREAM_INITIATE, STREAM_INITIATE_DONE, STREAM_COMPLETE, STREAM_FINISHED..等阶段。在最后会调用 finishBootstrapping()方法,其中设置启动标志,并在setToken()中设置系统表中token值,并调用updateToken()更新token环。最后加入状态信息<MOVE,NORMAL:Token>,并设置模式为Normal。

 

privatevoid finishBootstrapping() {
isBootstrapMode = false;

SystemTable.setBootstrapped(true);

setToken(getLocalToken());

Gossiper.instance.addLocalApplicationState(MOVE_STATE,

new ApplicationState(STATE_NORMAL + Delimiter + partitioner_.getTokenFactory().toString(getLocalToken())));

setMode(“Normal”, false);

}



参考资料:[url]
cassandra详解 http://www.ibm.com/developerworks/cn/opensource/os-cn-cassandraxu1/
[/url]

 

  • 大小: 10.1 KB
分享到:
评论
1 楼 zhoujianyong2011 2012-04-17  
192.168.5.6     datacenter1 rack1       Up     Normal  64.49 MB        33.33%  0                                         
192.168.5.11    datacenter1 rack1       Up     Normal  64.62 MB        33.33%  56713727820156410577229101238628035242    
192.168.5.7     datacenter1 rack1       Up     Normal  64.57 MB        33.33%  113427455640312821154458202477256070484
三台机器的状态已经这样了,求指点!
Space used (total): 67485766
Space used (live): 67485766
我用cfstats命令看cassandra状态成这样了,是不是代表没有空间了?
如何才能扩大空间呢?

相关推荐

Global site tag (gtag.js) - Google Analytics