kiến thức [Event Tặng Title] Concurrency/multi-threading in the nut shell

các thím nghĩ sao về concurrency trên máy chỉ có 1 core?

Concurrency trên 1 core có thể thấy ở thao tác liên quan đến memory. Cũng có nhiều lệnh được write vào memory nhưng một thời điểm chỉ write được một số ít lần. Vậy nên có một thứ là write buffer để lưu các phép write, sau đó CPU sẽ xử lý từ từ. Rất giống với cơ chế async await khi code app.

Mà concurrency thì thường quá, nói về parallelism nhé. Là parallel thực chứ không phải là giả tạo bằng cách context-switch:
Trong 1 core có thể có đến 3 cách parallelism:
1. Thread level: do hyper threading. Một core có thể thực thi cùng lúc nhiều thread,
2. Instruction level: do Out-of-order execution như mình đã nói trước đó. Nhờ 1 core có nhiều bộ phận để thực thi (EU) mà nhiều lệnh có thể thực thi cùng lúc,
3. Data level: hỗ trợ bởi các lệnh SIMD. Cho phép chỉ cần 1 lệnh nhưng có thể tính được nhiều phép tính. Cái này thì cần trình dịch và/hoặc lập trình viên can thiệp.
 
Các bác cho hỏi Concurrency và Parallel thì với những CPU đời mới bây giờ cái nào hiệu quả hơn?
Thực thế áp dụng như nào thì hợp lý?
 
Đống server bên mình toàn server vật lý, mỗi con 72/96 core, 1,2TB ram mà monitor méo thấy lúc nào CPU chạy nổi 40%, trung bình toàn lẹt đẹt 5-10% trong khi mem thì lúc nào cũng trong tình trạng báo động đỏ.
Làm data thì cái quan trọng nhất là memory càng to càng tốt và disk tốt 1 chút, phang cật lực hai cái này vào là chạy tít hết :bye: :bye:
End-user nào cũng muốn data thì realtime + analytic query thì nhanh như select trong MySQL :beat_brick:
 
Mình ko hiểu sao thím phải đi chứng minh cái này, có thể mình đang ko hiểu điều thím nói, nhưng ng ta định nghĩa:
1 giây là khoảng thời gian bằng 9 192 631 770 lần chu kỳ của bức xạ điện từ phát ra bởi nguyên tử Cs133 khi thay đổi trạng thái giữa hai mức năng lượng đáy siêu tinh vi. (wiki).
Vậy thì 1/6 tỷ giây của thím nó đâu đó là khoảng thời gian 1.5 chu kỳ còn gì?

Chưa hiểu điều thím đang băn khoăn ở đây là gì?
Bởi vì các phép tính xảy ra trong 1 core nó tác động đến Physical memory có thể concurrency trong 1 thời điểm siêu vi hay không? Bởi vì con người chưa bao giờ loại bỏ hoàn toàn điện trở trong vật chất cấu tạo core nên nó (điện trở) vẫn tồn tại ảnh hưởng đến vận tốc trôi của electron => tác động đến xung nhịp. Hay nói cách khác là điều kiện để một xung nhịp tạo ra n phép tính, trong n phép tính/lần xung nhịp ấy để mà có các phép tính xảy ra đồng thời thì cần duy nhất 1 thời điểm hay cần n thời điểm?

Người ta phải chứng minh rằng siêu vi thời gian ấy trong vũ trụ có tồn tại một siêu vi thời gian như vậy hay không để quy chiếu về. Lúc này thì lại cần tới Các định lý bất toàn của Gödel để đi tìm chứng minh (mà chưa ai tìm ra).

Bất kỳ vật chất nào trong nó cũng có điện trở. Nên người ta đang sử dụng qubit (bit lượng tử) để đưa một phép tính xảy ra là gần như ->0 ở mặt thời gian. Hay nói cách khác họ cũng đang chứng minh ở mốc thời gian 1/100 tỷ s là concurrency với 1/100n tỷ s (n+1).
 
Bởi vì các phép tính xảy ra trong 1 core nó tác động đến Physical memory có thể concurrency trong 1 thời điểm siêu vi hay không? Bởi vì con người chưa bao giờ loại bỏ hoàn toàn điện trở trong vật chất cấu tạo core nên nó (điện trở) vẫn tồn tại ảnh hưởng đến vận tốc trôi của electron => tác động đến xung nhịp. Hay nói cách khác là điều kiện để một xung nhịp tạo ra n phép tính, trong n phép tính/lần xung nhịp ấy để mà có các phép tính xảy ra đồng thời thì cần duy nhất 1 thời điểm hay cần n thời điểm?

Người ta phải chứng minh rằng siêu vi thời gian ấy trong vũ trụ có tồn tại một siêu vi thời gian như vậy hay không để quy chiếu về. Lúc này thì lại cần tới Các định lý bất toàn của Gödel để đi tìm chứng minh (mà chưa ai tìm ra).

Bất kỳ vật chất nào trong nó cũng có điện trở. Nên người ta đang sử dụng qubit (bit lượng tử) để đưa một phép tính xảy ra là gần như ->0 ở mặt thời gian. Hay nói cách khác họ cũng đang chứng minh ở mốc thời gian 1/100 tỷ s là concurrency với 1/100n tỷ s (n+1).

hmm đọc tầm vài lần thì tôi bắt đầu hình dung dc 1 cách mơ hồ rồi.
Hỏi tí trc thím học CS ở trường nào vậy, giờ đang làm mảng nào thế...
 
Về lý thuyết thì 1 cái công tắc chỉ có đóng hoặc mở tại 1 thời điểm, nên tại 1 thời điểm thì chỉ 1 phép tính. Có chăng phép tính đó cực nhanh và trong 1 khoảng thời gian rất ngắn thì có thể thực hiện 2 lệnh.

Tiếp theo nữa việc tối ưu từ hardware hay software cũng chỉ để tận dụng để hardware hoạt động hiệu quả nhất. Thread ở software hay intrusion hardware cũng thế thôi, trong lúc chờ các tác vụ i/o hoặc nạp/xoá thì đi làm việc khác.

Các bác cho hỏi Concurrency và Parallel thì với những CPU đời mới bây giờ cái nào hiệu quả hơn?
Thực thế áp dụng như nào thì hợp lý?
Cpu mới thì nhiều core nên parallel và ở từng core thì vẫn concurrency thôi.
 
Về lý thuyết thì 1 cái công tắc chỉ có đóng hoặc mở tại 1 thời điểm, nên tại 1 thời điểm thì chỉ 1 phép tính. Có chăng phép tính đó cực nhanh và trong 1 khoảng thời gian rất ngắn thì có thể thực hiện 2 lệnh.

Tiếp theo nữa việc tối ưu từ hardware hay software cũng chỉ để tận dụng để hardware hoạt động hiệu quả nhất. Thread ở software hay intrusion hardware cũng thế thôi, trong lúc chờ các tác vụ i/o hoặc nạp/xoá thì đi làm việc khác.


Cpu mới thì nhiều core nên parallel và ở từng core thì vẫn concurrency thôi.
Với cơ chế pipline thì concurency nó hoạt động đc trêb cả con pic16 luôn rồi ấy thím.
 
Vào đây bắt đầu thấy mình lesor rồi. Hoặc thường thì mình cố lý giải đơn giản đi do hiểu biết có hạn. Và mình cũng chưa cần dùng tới mức cao siêu như vậy
 
Với cơ chế pipline thì concurency nó hoạt động đc trêb cả con pic16 luôn rồi ấy thím.
Thật ra em không hiểu sâu đâu thím. Với pipeline gồm 5 step kia. Thì vs em nó chỉ như 5 thằng công nhân trong dây chuyền. 1 core là 1 dây chuyền. Nếu làm trên N core thì sẽ có N thằng parallel step1, dây chuyền 1 xong step1 cũng có thể đẩy sang step2 ở dây chuyền khác nhưng nó sẽ phải đi đường dài hơn giữa các core vs nhau thay vì trong cùng core
 
Thật ra em không hiểu sâu đâu thím. Với pipeline gồm 5 step kia. Thì vs em nó chỉ như 5 thằng công nhân trong dây chuyền. 1 core là 1 dây chuyền. Nếu làm trên N core thì sẽ có N thằng parallel step1, dây chuyền 1 xong step1 cũng có thể đẩy sang step2 ở dây chuyền khác nhưng nó sẽ phải đi đường dài hơn giữa các core vs nhau thay vì trong cùng core

Nhưng mà CPU Core bây giờ khác nhiều rồi. Nó không chỉ là cái nhà máy với 1 dây chuyền 5 bước cho 5 công nhân làm từng bước. Giờ với mỗi bước có thể lại là nhiều dây chuyền nhỏ giống hệt nhau, có thể hoạt động song song độc lập cho ra cùng lúc nhiều kết quả.

Lấy nhà máy đóng hộp làm ví dụ đi, một trong số các bước là "in vỏ" --> "cắt" --> "đóng hộp", sau khi đã in xong trên một tấm nền lớn thì có thể dùng nhiều dao để cắt cùng lúc ở các vị trí khác nhau, cắt xong rồi thì cũng có thể cùng lúc dùng nhiều máy đóng gói thành hộp. Tức là cùng một lúc có thể cho ra nhiều sản phẩm, mặc dù chỉ với 1 nhà máy.

Bạn có vẻ vẫn hiểu khái niệm core/thread hơi cứng nhắc, nó không phải là thứ gì đó cơ bản không phân chia được. Đúng là 1 core chỉ chạy được 1 thread tại 1 thời điểm (ko tính hyperthreading), nhưng các instruction trong thread đó nếu không phụ thuộc gì với nhau thì hoàn toàn có thể thực thi song song mà không sợ bị lỗi gì.
https://en.wikipedia.org/wiki/Instruction-level_parallelism
 
Ngành computer architecture về cách hoạt động, xử lý ...từng ngóc ngách đều đã có rất nhiều tài liệu mô tả rõ ràng. Cái khó nhất là computer architecture exceptions, cực kỳ phức tạp và đánh đố tất cả các thiên tài, thần đồng về máy tính. Người ta ví việc giải một bài toán IPO sẽ là dễ hơn rất nhiều việc tìm & giải quyết một exception mới phát sinh (hệ thống log không ghi lại) của hệ thống phần cứng máy tính. Khi không tìm ra được thì phần lớn đều phải reset lại toàn bộ hệ thống, đôi khi là phải đập đi thay mới hoàn toàn. Thiệt hại lúc này là vô cùng lớn.

Thế nên mới có câu chuyện nhiều hệ thống máy tính cồng kềnh cũ kỹ nhưng chạy ổn định và nó vẫn đang hoạt động, ở các nước tiên tiến họ không phải là không muốn thay mới mà là thay mới rồi nếu phát sinh ra các exceptions giải quyết thế nào? Ai sẽ chịu trách nhiệm nếu để xảy ra những điều tồi tệ ấy?

Nhiều khi chúng ta cứ tranh cãi những thứ vốn đã được chính chủ giải thích cụ thể làm gì?
https://www.intel.com/content/www/u...pelining-1.html?wapkw=pipeline?wapkw=pipeline

Mình nghĩ, nếu topic này hướng đến computer architecture exceptions sẽ là rất hay đấy.

Thím nào muốn đi từ cơ bản tường tận về computer architecture thì đọc cuốn này
https://www.amazon.com/Modern-Computer-Architecture-Organization-architectures-ebook/dp/B083QJG28Y
 
Last edited:
Nhưng mà CPU Core bây giờ khác nhiều rồi. Nó không chỉ là cái nhà máy với 1 dây chuyền 5 bước cho 5 công nhân làm từng bước. Giờ với mỗi bước có thể lại là nhiều dây chuyền nhỏ giống hệt nhau, có thể hoạt động song song độc lập cho ra cùng lúc nhiều kết quả.

Lấy nhà máy đóng hộp làm ví dụ đi, một trong số các bước là "in vỏ" --> "cắt" --> "đóng hộp", sau khi đã in xong trên một tấm nền lớn thì có thể dùng nhiều dao để cắt cùng lúc ở các vị trí khác nhau, cắt xong rồi thì cũng có thể cùng lúc dùng nhiều máy đóng gói thành hộp. Tức là cùng một lúc có thể cho ra nhiều sản phẩm, mặc dù chỉ với 1 nhà máy.

Bạn có vẻ vẫn hiểu khái niệm core/thread hơi cứng nhắc, nó không phải là thứ gì đó cơ bản không phân chia được. Đúng là 1 core chỉ chạy được 1 thread tại 1 thời điểm (ko tính hyperthreading), nhưng các instruction trong thread đó nếu không phụ thuộc gì với nhau thì hoàn toàn có thể thực thi song song mà không sợ bị lỗi gì.
https://en.wikipedia.org/wiki/Instruction-level_parallelism
Em không hiểu quá sâu, nên nhiều cái nhận định có thể theo tư duy chủ quan của mình.

Thế giờ nếu giờ em gọi thằng excute từng instruction là instruction executor. Thằng executor này có thực hiện 2 execution unit cùng lúc không.

Ví dụ nhiều dao để cắt em nghĩ nó vẫn là 1 execution unit chỉ là được tối ưu để tận dụng excute các unit giống nhau dùng kết quả cho nhiều thằng sử dụng.
 
Em không hiểu quá sâu, nên nhiều cái nhận định có thể theo tư duy chủ quan của mình.

Thế giờ nếu giờ em gọi thằng excute từng instruction là instruction executor. Thằng executor này có thực hiện 2 execution unit cùng lúc không.

Ví dụ nhiều dao để cắt em nghĩ nó vẫn là 1 execution unit chỉ là được tối ưu để tận dụng excute các unit giống nhau dùng kết quả cho nhiều thằng sử dụng.
Hiểu như này cũng được. Giờ có mấy cái tập lệnh SIMD (Single Instruction - Multiple Data) cũng hoạt động như này, thay vì tính 4 phép cộng 32 riêng biệt thì nó đưa 8 hạng tự vào hai thanh ghi 128 bit rồi chỉ cần cộng một lần là xong.

Nhưng mà sau bước cắt thì đã ra từng mảnh giấy rồi, sau đó đưa những sản phầm đó cho 10 công nhân để đóng thành hộp thì vẫn có thể đạt hiệu suất tối đa 10 sp / lượt chứ? Vì 10 công nhân khi đã có nguyên liệu đầu vào rồi thì không liên quan gì đến nhau nữa.
 
Bởi vì các phép tính xảy ra trong 1 core nó tác động đến Physical memory có thể concurrency trong 1 thời điểm siêu vi hay không? Bởi vì con người chưa bao giờ loại bỏ hoàn toàn điện trở trong vật chất cấu tạo core nên nó (điện trở) vẫn tồn tại ảnh hưởng đến vận tốc trôi của electron => tác động đến xung nhịp. Hay nói cách khác là điều kiện để một xung nhịp tạo ra n phép tính, trong n phép tính/lần xung nhịp ấy để mà có các phép tính xảy ra đồng thời thì cần duy nhất 1 thời điểm hay cần n thời điểm?

Người ta phải chứng minh rằng siêu vi thời gian ấy trong vũ trụ có tồn tại một siêu vi thời gian như vậy hay không để quy chiếu về. Lúc này thì lại cần tới Các định lý bất toàn của Gödel để đi tìm chứng minh (mà chưa ai tìm ra).

Bất kỳ vật chất nào trong nó cũng có điện trở. Nên người ta đang sử dụng qubit (bit lượng tử) để đưa một phép tính xảy ra là gần như ->0 ở mặt thời gian. Hay nói cách khác họ cũng đang chứng minh ở mốc thời gian 1/100 tỷ s là concurrency với 1/100n tỷ s (n+1).
Phần này thì tổ lái sang vật lý lượng tử 1 tẹo rồi. Các bit lượng tử di chuyển với tốc độ ~~ C , nên theo Einstein thì thời gian ko tồn tại ( 1s kéo vài tới vô cùng) . Thì việc chứng minh cái 1/100 tỷ là concurrency với 1/100n tỷ làm ntn nhỉ ?

Trong CPU thì đơn vị thời gian hay dùng là cycle, 1 rising-edge của Clock . Trước có làm giả lập 1 con MIPS đơn giản trên FPGA , thì đội kỹ sư đã tính toán mức xung làm sao để bảo đảm các electron kịp chạy qua các D-flipflop và kịp bật transistor cho đến rising-edge tiếp theo. Thường mức xung nhịp đã được tính toán trước và có 1 gap an toàn đủ để anh em ép xung lên 1 tẹo nữa.

Nên phần sai số do ảnh hưởng giữa các bộ phận với nhau được coi là ko đáng kể (vẫn có, nhưng thường có CRC đơn giản như parity check hoặc checksum). Nếu sai 1-2 bóng transistor thì nó tính lại. Parity check chi phát hiện được 1 bit sai lệch thôi.
 
Phần này thì tổ lái sang vật lý lượng tử 1 tẹo rồi. Các bit lượng tử di chuyển với tốc độ ~~ C , nên theo Einstein thì thời gian ko tồn tại ( 1s kéo vài tới vô cùng) . Thì việc chứng minh cái 1/100 tỷ là concurrency với 1/100n tỷ làm ntn nhỉ ?

Trong CPU thì đơn vị thời gian hay dùng là cycle, 1 rising-edge của Clock . Trước có làm giả lập 1 con MIPS đơn giản trên FPGA , thì đội kỹ sư đã tính toán mức xung làm sao để bảo đảm các electron kịp chạy qua các D-flipflop và kịp bật transistor cho đến rising-edge tiếp theo. Thường mức xung nhịp đã được tính toán trước và có 1 gap an toàn đủ để anh em ép xung lên 1 tẹo nữa.

Nên phần sai số do ảnh hưởng giữa các bộ phận với nhau được coi là ko đáng kể (vẫn có, nhưng thường có CRC đơn giản như parity check hoặc checksum). Nếu sai 1-2 bóng transistor thì nó tính lại. Parity check chi phát hiện được 1 bit sai lệch thôi.

Parity chỉ được dùng cho bộ nhớ: RAM, Cache thôi.
Còn sai số do các bộ phận tính toán gây ra thì nói chung là ... chịu. Người ta mặc định chấp nhận rằng CPU luôn luôn hoạt động đúng nếu setup như thông số của nhà sản xuất. Nếu setup khác đi thì lỗi rất dễ xảy ra, thường thấy khi OC.
Một cách để check CPU đang ổn định là dùng các phần mềm stresstest như MPrime95, nó sẽ đẩy CPU đến giới hạn cao nhất, sau kiểm tra kết quả so với đáp án đã có trước xem có tính đúng không.

Cái này thì nghe nói chứ chưa gặp bao giờ: ở những nơi cần độ chính xác cao như trong quân sự, môi trường độc hại ảnh hưởng đến hoạt động phần cứng thì người ta dùng nhiều CPU tính toán rồi cross check lẫn nhau.
 
Mấy hôm gần đây lan man qua hardware hơi nhiều, thôi quay về chính đạo.

Post này focus vào cách sync giữa nhiều thread với nhau và cách sử dụng 1 vài ExecutorService cơ bản trong Java.

Java:
final Thread t1 = new Thread(() -> {
            // Do something
});
t1.start();

Đây là cách basic để tạo 1 thread , lúc gọi t1.start() -> current-thread sẽ bị block cho đến khi cái Runnable interface (lambda function/callback ...) kia chạy xong. Không recommend lắm việc quản lý thread như thế này , vừa khó quản lý thread-lifecycle vừa tốn tài nguyên (nếu được thì share thread cho nhiều method)

Tạo 1 thread-pool để dễ quản lý và share:

Java:
// Run a single task
// then wait it finish
final ExecutorService executorService = new ThreadPoolExecutor(
                // 1 core init
                1,
                // max core in runtime
                2,
                // keepAlive -> if task done, how long keep the thread in pool
                0L,
                TimeUnit.MILLISECONDS,
                // where to store the lambda function , limit 100 task
                new ArrayBlockingQueue<>(100));

// submit the task
final Future<Double> futureResult =
                executorService.submit(Math::random);

// wait 1 second to get the result , if not throw Exception
try {
    final double ret = futureResult.get(1L, Time)
} catch (ExecutionException | TimeoutException e) {
    System.out.println("timeout");
}

Chạy 1 batch job và đợi block tới khi có hết kết quả:
Java:
try {
    // Run a batch-job
            final int jobSize = 1000;
            final List<Callable<Double>> listJob = new ArrayList<>(jobSize);
            for (int i = 0; i < jobSize; i++) {
                listJob.add(Math::random);
            }

            // default get number parallel job = available cores
            final ExecutorService forkJoinPool = Executors.newWorkStealingPool();
            final BlockingQueue<Exception> exceptions = new ArrayBlockingQueue<>(128);
            final List<Double> resultBatchJob = forkJoinPool.invokeAll(listJob)
                    .stream().map(doubleFuture -> {
                        try {
                            return doubleFuture.get(1L, TimeUnit.SECONDS);
                        } catch (Exception e) {
                            try {
                                if (!exceptions.offer(e,1L, TimeUnit.SECONDS)) {
                                    System.err.println("Full queue");
                                }
                            } catch (InterruptedException ex) {
                                System.err.println("Exception, cannot push to queue");
                            }
                            return 0.0;
                        }
                    }).collect(Collectors.toList());

// Block until timeout or finish
if(!forkJoinPool.awaitTermination(1L, TimeUnit.HOURS)) {
   System.out.println("does not finish in time");
   forkJoinPool.shutdownNow();
}
            resultBatchJob.forEach(System.out::println);
            exceptions.forEach(System.out::println);
} catch (InterruptedException e) {
    System.out.println("timeout");
}

  • Để main-thread biết được có lỗi xảy ra ở trong thread hay chưa, cần hứng exception vào 1 BlockingQueue ( hoặc print log và ignore)
  • submit(Callable<Class>) dùng cho việc collect data về, có return
  • run(Runnable) chạy và ko cần collect return ( có thể hứng vào 1 cấu trúc dữ liệu thread-safe)
  • Main-thread bị block cho tới khi task được finish hết -> nếu ko set timeout cho awaitTermination thì đợi vô cực
  • Các object share vào thread-scope khác bắt buộc phải thread-safe như BlockingQueue, Atomic, ConcurrentMap ...
 
Ở phần demo trên, thì tồn tại limit về mặt Memory, khi phải load hết job (Callable<Object>) vào mem trước khi xử lý và hứng kết quả. Cách này không khả thi khi xử lý batch quá lớn như đọc từ file nén vài GB hoặc SELECT từ DB vài chục triệu row.

Thì ở post này, mình giới thiệu 1 kỹ thuật hay dùng trong xử lý data: Pipe (data pipeline)

Bash:
# đây là 1 ví dụ về pipe / data pipe-line trên Shell
cat /proc/cpuinfo | grep processor | wc -l

Như ví dụ trên, thì Bash/Shell ( là 1 ứng dụng trên OS) sẽ pipe stdout của command_1(process1) -> stdin của command_2(process2) ... Kỹ thuật pipe trên shell dùng file_descriptor để sync giữa các process. Tất nhiên không phải kiểu dữ liệu input nào cũng pipe được, thường thì mình phải chia bằng delimiter newline("\n") để dễ dàng pipe qua lại các process.

Mình sẽ ví dụ kỹ thuật pipe, nhưng ko phải thông qua file_descriptor mà dùng trực tiếp trên memory và multi-threading:

Java:
try {
            final File fileCpuInfo = new File("/proc/cpuinfo");
            final long timeOut = 1L;
            final TimeUnit timeUnit = TimeUnit.SECONDS;
            final AtomicBoolean isPushingDone = new AtomicBoolean(false);
            final AtomicReference<Exception> exceptionInThreadScope = new AtomicReference<>(null);
            final int threadPoolSize = 2;
            final ExecutorService es = Executors.newFixedThreadPool(threadPoolSize);

            final BlockingQueue<String> fileInputQueue = new ArrayBlockingQueue<>(1024);
            final BlockingQueue<String> outputResult = new LinkedBlockingDeque<>();
            // Start while-loop in another thread
            for (int i = 0; i < threadPoolSize; i++) {
                es.execute(() -> {
                    while (!isPushingDone.get() ||
                            exceptionInThreadScope.get() == null ||
                            !fileInputQueue.isEmpty()) {
                        try {
                            final String line = fileInputQueue.poll(timeOut, timeUnit);
                            if (line != null && line.contains("processor")) {
                                // Push output to another queue -> input to next thread
                                outputResult.offer(line, timeOut, timeUnit);
                                System.out.println(line);
                            }
                        } catch (Exception e) {
                            exceptionInThreadScope.set(e);
                            System.err.println("Cannot push to queue in 1s");
                        }
                    }
                });
            }

            // Another thread-scope use output from outputResult
            final ExecutorService outputProcessor = Executors.newSingleThreadExecutor();
            outputProcessor.execute(() -> {
                while (!isPushingDone.get() || !outputResult.isEmpty()) {
                    try {
                        final String processorLine = outputResult.poll(timeOut,timeUnit);
                        if ( processorLine != null && !processorLine.isEmpty()) {
                            final String[] split = processorLine.split(":");
                            if (split.length > 0) {
                                System.out.printf("CoreId : %s%n", split[0]);  
                            }
                        }
                    } catch (Exception e) {
                        System.err.println("Output processing fail!");
                    }
                }
            });


            try (final FileReader fr = new FileReader(fileCpuInfo)) {
                try (final BufferedReader br = new BufferedReader(fr)) {
                    String currentLine = br.readLine();
                    while (currentLine != null) {
                        if (!fileInputQueue.offer(currentLine, timeOut, timeUnit)) {
                            System.err.println("fileInputQueue is full!");
                        }
                        currentLine = br.readLine();
                    }
                }
            } finally {
                isPushingDone.set(true);
            }

            // đợi các threadPool finish ở đây
           // như post trước awaitTermination


        } catch (Exception e) {
            System.err.println("File reading error!");
        }

  • Ta có thể share blockingQueue trong object giữa các method hoặc chỉ khởi tạo trong 1 function-scope -> tùy vào logic xử lý.
  • Set-timeout và try-catch là best-practice , exception throws trong thread-scope khác sẽ không try-catch được ở main-thread -> bị nuốt lỗi nếu ko xử lý exception trong thread-scope.
  • Phần input thì thường là single-thread vì thường file hoặc db-connection chỉ cho 1 thread access ( nếu đọc file nén thì có thể 1 thread giải nén - 1 thread push data vào queue)
  • Nên limit cho queue , nếu queue size ko limit -> OOM hoặc treo luôn JVM ( ở Java 11 vs GC1 thì JVM ko kill process khi OOM, mà sẽ stop-the-world vô cực -> set flag KillOnOOM để kill)
  • ThreadPool cũng có 1 task-queue , bt là unbound-queue -> task push vào quá nhiều mà ko consume cũng gây OOM
 
Back
Top