main,run和线程链状结构深度分析(续)
紧接上文,我给大家来个续文,之前没有这个想法,突然感觉有料了,就赶紧来篇续文。看不懂,没有关系。时间会解决一切问题的。
在群里有个小伙伴提出一个问题,关于Kafka的,他使用的是python语言发送Kafka消息。问题是这样的:
在使用pykafka的生产者的时候,尝试捕获异常,然后有异常的话,就重新实例化KafkaClient,然后再尝试继续生产,然后自测的时候,让生产者不停的发东西,同时关掉kafka的服务,这个时候就会捕获到异常,然后再启动kafka,程序尝试重新连接,但是连上以后,这个生产者的进程的线程数增加了一个。因为之前的代码是别人写的 之前每次有异常就会开一个线程去处理,现在改成比较简单的单线程的操作,但是没想到线程数还是会增加,所以觉得可能是pykafka用的不对。
代码是这样写的:
我看到这个问题之后很感兴趣,这是一个线程问题。我感觉,人人都知道鸡蛋有营养,人人都在吃鸡蛋,但是能把鸡蛋做到出神入化的人很少,反正我是没有见过的。同样道理,人人都知道线程的好处,人人也都会写多线程,但是真正吃透线程的人也是凤毛麟角的。上面这个问题的根源在于在socket的连接上(注:kafkaclient的底层实现)。每次连接kafka服务端都是随机找个端口生成socket的,因为是非堵塞的,自然就新起动一个线程。虽然运行过程中抛出了异常,但是并没有杀死线程,所以每次抛出异常,线程还在,然后新建socket,都会启动一个新线程,这样自然线程数量就不断的增加。解决方法是:出现异常,不要重新生成socket。
线程可以生成,可以启动,人人都会怎么办。但是线程怎么杀死怎么结束呢?一般我们用不到,即是用到可以上网查一下,但是要明白,抛出异常并捕获,线程仍然还是活着呢?抛出异常不捕获,那么线程就over了。
最近翻看了一下Guava Cache,说个例子:
public static void main(String[] args) throws ExecutionException {
LoadingCache<String, String> cache = CacheBuilder.newBuilder()
.maximumSize(100)
//写之后30ms过期
.expireAfterWrite(30L, TimeUnit.MILLISECONDS)
//访问之后30ms过期
.expireAfterAccess(30L, TimeUnit.MILLISECONDS)
//20ms之后刷新
.refreshAfterWrite(20L, TimeUnit.MILLISECONDS)
//开启weakKey key 当启动垃圾回收时,该缓存也被回收
.weakKeys()
.build(createCacheLoader());
System.out.println(cache.get("hello"));
cache.put("hello", "hello");
System.out.println(cache.get("hello"));
cache.put("hello", "world");
System.out.println(cache.get("hello"));
}
public static com.google.common.cache.CacheLoader<String, String> createCacheLoader() {
return new com.google.common.cache.CacheLoader<String, String>() {
@Override
public String load(String key) throws Exception {
return key;
}
};
}
LoadingCache类里面启动到一个线程,这个线程实时监控缓存的情况,这种设计的初衷很值得玩味。遇到一个计算复杂的问题,为了不卡顿,很多人自然会想到多线程,这不算什么。能主动的想到并发,然后将线程运用的很自然,这种功底很值得反思和学习。这种境界是如何炼成的呢?暂时先不分析了,后期有时间再分析吧。