Open-source weather station for astronomy
1# Credits: https://github.com/michael-sulyak/micropython-mlx90640 (MIT License)
2
3import array
4import math
5import struct
6
7import machine
8
9
10def init_float_array(size) -> array.array:
11 return array.array('f', (0 for _ in range(size)))
12
13
14def init_int_array(size) -> array.array:
15 return array.array('i', (0 for _ in range(size)))
16
17
18class RefreshRate:
19 """Enum-like class for MLX90640's refresh rate."""
20 REFRESH_0_5_HZ = 0b000 # 0.5Hz
21 REFRESH_1_HZ = 0b001 # 1Hz
22 REFRESH_2_HZ = 0b010 # 2Hz
23 REFRESH_4_HZ = 0b011 # 4Hz
24 REFRESH_8_HZ = 0b100 # 8Hz
25 REFRESH_16_HZ = 0b101 # 16Hz
26 REFRESH_32_HZ = 0b110 # 32Hz
27 REFRESH_64_HZ = 0b111 # 64Hz
28
29
30class I2CDevice:
31 """
32 Represents a single I2C device and manages locking the bus and the device
33 address.
34 """
35
36 def __init__(self, i2c, device_address, probe=True):
37 self.i2c = i2c
38 self.device_address = device_address
39
40 if probe:
41 self._probe_for_device()
42
43 def read_into(self, buf, *, start=0, end=None):
44 """Read into buffer from the device without allocation."""
45 if end is None:
46 end = len(buf)
47
48 self.i2c.readfrom_into(self.device_address, buf, start=start, end=end)
49
50 def write(self, buf):
51 """Write buffer to the device."""
52 self.i2c.writeto(self.device_address, buf)
53
54 def write_then_read_into(self, out_buffer, in_buffer, *, out_start=0, out_end=None, in_start=0, in_end=None):
55 """Write to the device and read from the device into a buffer."""
56 if out_end is None:
57 out_end = len(out_buffer)
58
59 if in_end is None:
60 in_end = len(in_buffer)
61
62 self.i2c.writeto(self.device_address, memoryview(out_buffer)[out_start:out_end], False)
63 self.i2c.readfrom_into(self.device_address, memoryview(in_buffer)[in_start:in_end])
64
65 def _probe_for_device(self):
66 """Probe for the device, ensuring it is responding on the bus."""
67
68 try:
69 self.i2c.writeto(self.device_address, b'')
70 except OSError:
71 try:
72 result = bytearray(1)
73 self.i2c.readfrom_into(self.device_address, result)
74 except OSError:
75 raise ValueError(f'No I2C device at address: 0x{self.device_address:x}')
76
77
78class MLX90640:
79 """Interface to the MLX90640 temperature sensor."""
80
81 ee_data = init_int_array(834)
82 i2c_read_len = 128
83 scale_alpha = 0.000001
84 mlx90640_deviceid1 = 0x2407
85 openair_ta_shift = 8
86
87 def __init__(self, i2c_bus: machine.I2C, address: int = 0x33) -> None:
88 self.inbuf = bytearray(2 * self.i2c_read_len)
89 self.addrbuf = bytearray(2)
90 self.i2c_device = I2CDevice(i2c_bus, address)
91 self.mlx90640_frame = init_int_array(834)
92 self._i2c_read_words(0x2400, self.ee_data)
93
94 # Attributes initialized through extraction methods
95 self.k_vdd = 0
96 self.vdd25 = 0
97 self.kv_ptat = 0
98 self.kt_ptat = 0
99 self.v_ptat25 = 0
100 self.alpha_ptat = 0
101 self.gain_ee = 0
102 self.tgc = 0
103 self.resolution_ee = 0
104 self.ks_ta = 0
105 self.ct = [0] * 4
106 self.ks_to = [0] * 5
107 self.cp_alpha = [0, 0]
108 self.cp_offset = [0, 0]
109 self.alpha = None
110 self.alpha_scale = 0
111 self.offset = None
112 self.kta = None
113 self.kta_scale = 0
114 self.kv = None
115 self.kv_scale = 0
116 self.il_chess_c = [0, 0, 0]
117 self.broken_pixels = set()
118 self.outlier_pixels = set()
119 self.calibration_mode_ee = 0
120
121 self._extract_parameters()
122
123 @property
124 def serial_number(self):
125 """3-item tuple of hex values that are unique to each MLX90640"""
126 serial_words = [0, 0, 0]
127 self._i2c_read_words(self.mlx90640_deviceid1, serial_words)
128 return serial_words
129
130 @property
131 def refresh_rate(self) -> int:
132 """How fast the MLX90640 will spit out data. Start at lowest speed in
133 RefreshRate and then slowly increase I2C clock rate and rate until you
134 max out. The sensor does not like it if the I2C host cannot 'keep up'!"""
135 control_register = [0]
136 self._i2c_read_words(0x800D, control_register)
137 return (control_register[0] >> 7) & 0x07
138
139 @refresh_rate.setter
140 def refresh_rate(self, rate: int) -> None:
141 control_register = [0]
142 value = (rate & 0x7) << 7
143 self._i2c_read_words(0x800D, control_register)
144 value |= control_register[0] & 0xFC7F
145 self._i2c_write_word(0x800D, value)
146
147 def get_frame(self, framebuf) -> None:
148 """Request both 'halves' of a frame from the sensor, merge them
149 and calculate the temperature in C for each of 32x24 pixels. Placed
150 into the 768-element array passed in!"""
151 emissivity = 0.95
152
153 status = self._get_frame_data()
154
155 if status < 0:
156 raise RuntimeError('Frame data error')
157
158 tr = self._get_ta() - self.openair_ta_shift
159
160 self._calculate_to(emissivity, tr, framebuf)
161
162 def _get_frame_data(self) -> int:
163 data_ready = 0
164 cnt = 0
165 status_register = [0]
166 control_register = [0]
167
168 while data_ready == 0:
169 self._i2c_read_words(0x8000, status_register)
170 data_ready = status_register[0] & 0x0008
171
172 while (data_ready != 0) and (cnt < 5):
173 self._i2c_write_word(0x8000, 0x0030)
174 self._i2c_read_words(0x0400, self.mlx90640_frame, end=832)
175
176 self._i2c_read_words(0x8000, status_register)
177 data_ready = status_register[0] & 0x0008
178 cnt += 1
179
180 if cnt > 4:
181 raise RuntimeError('Too many retries')
182
183 self._i2c_read_words(0x800D, control_register)
184 self.mlx90640_frame[832] = control_register[0]
185 self.mlx90640_frame[833] = status_register[0] & 0x0001
186 return self.mlx90640_frame[833]
187
188 def _get_ta(self) -> float:
189 vdd = self._get_vdd()
190
191 ptat = self.mlx90640_frame[800]
192 if ptat > 32767:
193 ptat -= 65536
194
195 ptat_art = self.mlx90640_frame[768]
196 if ptat_art > 32767:
197 ptat_art -= 65536
198 ptat_art = (ptat / (ptat * self.alpha_ptat + ptat_art)) * math.pow(2, 18)
199
200 ta = ptat_art / (1 + self.kv_ptat * (vdd - 3.3)) - self.v_ptat25
201 ta = ta / self.kt_ptat + 25
202 return ta
203
204 def _get_vdd(self) -> int:
205 vdd = self.mlx90640_frame[810]
206
207 if vdd > 32767:
208 vdd -= 65536
209
210 resolution_ram = (self.mlx90640_frame[832] & 0x0C00) >> 10
211 resolution_correction = math.pow(2, self.resolution_ee) / math.pow(2, resolution_ram)
212 vdd = (resolution_correction * vdd - self.vdd25) / self.k_vdd + 3.3
213
214 return vdd
215
216 def _calculate_to(self, emissivity: float, tr: float, result) -> None:
217 sub_page = self.mlx90640_frame[833]
218 alpha_corr_r = [0] * 4
219 ir_data_cp = [0, 0]
220
221 vdd = self._get_vdd()
222 ta = self._get_ta()
223
224 ta4 = (ta + 273.15) ** 4
225 tr4 = (tr + 273.15) ** 4
226 ta_tr = tr4 - (tr4 - ta4) / emissivity
227
228 kta_scale = math.pow(2, self.kta_scale)
229 kv_scale = math.pow(2, self.kv_scale)
230 alpha_scale = math.pow(2, self.alpha_scale)
231
232 alpha_corr_r[0] = 1 / (1 + self.ks_to[0] * 40)
233 alpha_corr_r[1] = 1
234 alpha_corr_r[2] = 1 + self.ks_to[1] * self.ct[2]
235 alpha_corr_r[3] = alpha_corr_r[2] * (1 + self.ks_to[2] * (self.ct[3] - self.ct[2]))
236
237 gain = self.mlx90640_frame[778]
238 if gain > 32767:
239 gain -= 65536
240 gain = self.gain_ee / gain
241
242 mode = (self.mlx90640_frame[832] & 0x1000) >> 5
243
244 ir_data_cp[0] = self.mlx90640_frame[776]
245 ir_data_cp[1] = self.mlx90640_frame[808]
246 for i in range(2):
247 if ir_data_cp[i] > 32767:
248 ir_data_cp[i] -= 65536
249 ir_data_cp[i] *= gain
250
251 ir_data_cp[0] -= self.cp_offset[0] * (1 + self.cp_kta * (ta - 25)) * (1 + self.cp_kv * (vdd - 3.3))
252 if mode == self.calibration_mode_ee:
253 ir_data_cp[1] -= self.cp_offset[1] * (1 + self.cp_kta * (ta - 25)) * (1 + self.cp_kv * (vdd - 3.3))
254 else:
255 ir_data_cp[1] -= (self.cp_offset[1] + self.il_chess_c[0]) * (1 + self.cp_kta * (ta - 25)) * (
256 1 + self.cp_kv * (vdd - 3.3))
257
258 for pixel_number in range(768):
259 if self._is_pixel_bad(pixel_number):
260 result[pixel_number] = -273.15
261 continue
262
263 il_pattern = pixel_number // 32 - (pixel_number // 64) * 2
264 conversion_pattern = ((pixel_number + 2) // 4 - (pixel_number + 3) // 4 + (
265 pixel_number + 1) // 4 - pixel_number // 4) * (1 - 2 * il_pattern)
266
267 if mode == 0:
268 pattern = il_pattern
269 else:
270 chess_pattern = il_pattern ^ (pixel_number - (pixel_number // 2) * 2)
271 pattern = chess_pattern
272
273 if pattern == sub_page:
274 ir_data = self.mlx90640_frame[pixel_number]
275
276 if ir_data > 32767:
277 ir_data -= 65536
278
279 ir_data *= gain
280
281 kta = self.kta[pixel_number] / kta_scale
282 kv = self.kv[pixel_number] / kv_scale
283 ir_data -= self.offset[pixel_number] * (1 + kta * (ta - 25)) * (1 + kv * (vdd - 3.3))
284
285 if mode != self.calibration_mode_ee:
286 ir_data += self.il_chess_c[2] * (2 * il_pattern - 1) - self.il_chess_c[1] * conversion_pattern
287
288 ir_data = ir_data - self.tgc * ir_data_cp[sub_page]
289 ir_data /= emissivity
290
291 alpha_compensated = (
292 (self.scale_alpha * alpha_scale / self.alpha[pixel_number])
293 * (1 + self.ks_ta * (ta - 25))
294 )
295
296 sx = math.sqrt(math.sqrt(
297 alpha_compensated
298 * alpha_compensated
299 * alpha_compensated
300 * (ir_data + alpha_compensated * ta_tr)
301 ))
302 to = math.sqrt(math.sqrt(
303 (ir_data / (alpha_compensated * (1 - self.ks_to[1] * 273.15) + sx) + ta_tr)
304 )) - 273.15
305
306 if to < self.ct[1]:
307 torange = 0
308 elif to < self.ct[2]:
309 torange = 1
310 elif to < self.ct[3]:
311 torange = 2
312 else:
313 torange = 3
314
315 to = math.sqrt(math.sqrt(
316 ir_data / (
317 alpha_compensated
318 * alpha_corr_r[torange]
319 * (1 + self.ks_to[torange] * (to - self.ct[torange]))
320 ) + ta_tr
321 )) - 273.15
322
323 result[pixel_number] = to
324
325 def _extract_parameters(self) -> None:
326 self._extract_vdd_parameters()
327 self._extract_ptat_parameters()
328 self._extract_gain_parameters()
329 self._extract_tgc_parameters()
330 self._extract_resolution_parameters()
331 self._extract_ks_ta_parameters()
332 self._extract_ks_to_parameters()
333 self._extract_cp_parameters()
334 self._extract_alpha_parameters()
335 self._extract_offset_parameters()
336 self._extract_kta_pixel_parameters()
337 self._extract_kv_pixel_parameters()
338 self._extract_cilc_parameters()
339 self._extract_deviating_pixels()
340
341 def _extract_vdd_parameters(self) -> None:
342 # extract VDD
343 self.k_vdd = (self.ee_data[51] & 0xFF00) >> 8
344 if self.k_vdd > 127:
345 self.k_vdd -= 256 # convert to signed
346 self.k_vdd *= 32
347 self.vdd25 = self.ee_data[51] & 0x00FF
348 self.vdd25 = ((self.vdd25 - 256) << 5) - 8192
349
350 def _extract_ptat_parameters(self) -> None:
351 self.kv_ptat = (self.ee_data[50] & 0xFC00) >> 10
352 if self.kv_ptat > 31:
353 self.kv_ptat -= 64
354 self.kv_ptat /= 4096
355 self.kt_ptat = self.ee_data[50] & 0x03FF
356 if self.kt_ptat > 511:
357 self.kt_ptat -= 1024
358 self.kt_ptat /= 8
359 self.v_ptat25 = self.ee_data[49]
360 self.alpha_ptat = (self.ee_data[16] & 0xF000) / math.pow(2, 14) + 8
361
362 def _extract_gain_parameters(self) -> None:
363 self.gain_ee = self.ee_data[48]
364 if self.gain_ee > 32767:
365 self.gain_ee -= 65536
366
367 def _extract_tgc_parameters(self) -> None:
368 self.tgc = self.ee_data[60] & 0x00FF
369 if self.tgc > 127:
370 self.tgc -= 256
371 self.tgc /= 32
372
373 def _extract_resolution_parameters(self) -> None:
374 self.resolution_ee = (self.ee_data[56] & 0x3000) >> 12
375
376 def _extract_ks_ta_parameters(self) -> None:
377 self.ks_ta = (self.ee_data[60] & 0xFF00) >> 8
378 if self.ks_ta > 127:
379 self.ks_ta -= 256
380 self.ks_ta /= 8192
381
382 def _extract_ks_to_parameters(self) -> None:
383 step = ((self.ee_data[63] & 0x3000) >> 12) * 10
384 self.ct[0] = -40
385 self.ct[1] = 0
386 self.ct[2] = (self.ee_data[63] & 0x00F0) >> 4
387 self.ct[3] = (self.ee_data[63] & 0x0F00) >> 8
388 self.ct[2] *= step
389 self.ct[3] = self.ct[2] + self.ct[3] * step
390
391 ks_to_scale = (self.ee_data[63] & 0x000F) + 8
392 ks_to_scale = 1 << ks_to_scale
393
394 self.ks_to[0] = self.ee_data[61] & 0x00FF
395 self.ks_to[1] = (self.ee_data[61] & 0xFF00) >> 8
396 self.ks_to[2] = self.ee_data[62] & 0x00FF
397 self.ks_to[3] = (self.ee_data[62] & 0xFF00) >> 8
398
399 for i in range(4):
400 if self.ks_to[i] > 127:
401 self.ks_to[i] -= 256
402 self.ks_to[i] /= ks_to_scale
403 self.ks_to[4] = -0.
404 # extract CP
405 offset_sp = [0] * 2
406 alpha_sp = [0] * 2
407
408 alpha_scale = ((self.ee_data[32] & 0xF000) >> 12) + 27
409
410 offset_sp[0] = self.ee_data[58] & 0x03FF
411 if offset_sp[0] > 511:
412 offset_sp[0] -= 1024
413
414 offset_sp[1] = (self.ee_data[58] & 0xFC00) >> 10
415 if offset_sp[1] > 31:
416 offset_sp[1] -= 64
417 offset_sp[1] += offset_sp[0]
418
419 alpha_sp[0] = self.ee_data[57] & 0x03FF
420 if alpha_sp[0] > 511:
421 alpha_sp[0] -= 1024
422 alpha_sp[0] /= math.pow(2, alpha_scale)
423
424 alpha_sp[1] = (self.ee_data[57] & 0xFC00) >> 10
425 if alpha_sp[1] > 31:
426 alpha_sp[1] -= 64
427 alpha_sp[1] = (1 + alpha_sp[1] / 128) * alpha_sp[0]
428
429 cp_kta = self.ee_data[59] & 0x00FF
430 if cp_kta > 127:
431 cp_kta -= 256
432 kta_scale1 = ((self.ee_data[56] & 0x00F0) >> 4) + 8
433 self.cp_kta = cp_kta / math.pow(2, kta_scale1)
434
435 cp_kv = (self.ee_data[59] & 0xFF00) >> 8
436 if cp_kv > 127:
437 cp_kv -= 256
438 kv_scale = (self.ee_data[56] & 0x0F00) >> 8
439 self.cp_kv = cp_kv / math.pow(2, kv_scale)
440
441 self.cp_alpha[0] = alpha_sp[0]
442 self.cp_alpha[1] = alpha_sp[1]
443 self.cp_offset[0] = offset_sp[0]
444 self.cp_offset[1] = offset_sp[1]
445
446 def _extract_cp_parameters(self):
447 # Compensation Pixel (CP) parameters extraction from EEPROM data
448 offset_sp = [0] * 2
449 alpha_sp = [0] * 2
450
451 alpha_scale = ((self.ee_data[32] & 0xF000) >> 12) + 27
452
453 offset_sp[0] = self.ee_data[58] & 0x03FF
454 if offset_sp[0] > 511:
455 offset_sp[0] -= 1024
456
457 offset_sp[1] = (self.ee_data[58] & 0xFC00) >> 10
458 if offset_sp[1] > 31:
459 offset_sp[1] -= 64
460 offset_sp[1] += offset_sp[0]
461
462 alpha_sp[0] = self.ee_data[57] & 0x03FF
463 if alpha_sp[0] > 511:
464 alpha_sp[0] -= 1024
465 alpha_sp[0] /= math.pow(2, alpha_scale)
466
467 alpha_sp[1] = (self.ee_data[57] & 0xFC00) >> 10
468 if alpha_sp[1] > 31:
469 alpha_sp[1] -= 64
470 alpha_sp[1] = (1 + alpha_sp[1] / 128) * alpha_sp[0]
471
472 cp_kta = self.ee_data[59] & 0x00FF
473 if cp_kta > 127:
474 cp_kta -= 256
475 kta_scale1 = ((self.ee_data[56] & 0x00F0) >> 4) + 8
476 self.cp_kta = cp_kta / math.pow(2, kta_scale1)
477
478 cp_kv = (self.ee_data[59] & 0xFF00) >> 8
479 if cp_kv > 127:
480 cp_kv -= 256
481 kv_scale = (self.ee_data[56] & 0x0F00) >> 8
482 self.cp_kv = cp_kv / math.pow(2, kv_scale)
483
484 self.cp_alpha = alpha_sp
485 self.cp_offset = offset_sp
486
487 def _extract_alpha_parameters(self) -> None:
488 # extract alpha
489 acc_rem_scale = self.ee_data[32] & 0x000F
490 acc_column_scale = (self.ee_data[32] & 0x00F0) >> 4
491 acc_row_scale = (self.ee_data[32] & 0x0F00) >> 8
492 alpha_scale = ((self.ee_data[32] & 0xF000) >> 12) + 30
493 alpha_ref = self.ee_data[33]
494 acc_row = init_int_array(24)
495 acc_column = init_int_array(32)
496 alpha_temp = init_float_array(768)
497
498 for i in range(6):
499 p = i * 4
500 acc_row[p + 0] = self.ee_data[34 + i] & 0x000F
501 acc_row[p + 1] = (self.ee_data[34 + i] & 0x00F0) >> 4
502 acc_row[p + 2] = (self.ee_data[34 + i] & 0x0F00) >> 8
503 acc_row[p + 3] = (self.ee_data[34 + i] & 0xF000) >> 12
504
505 for i in range(24):
506 if acc_row[i] > 7:
507 acc_row[i] -= 16
508
509 for i in range(8):
510 p = i * 4
511 acc_column[p + 0] = self.ee_data[40 + i] & 0x000F
512 acc_column[p + 1] = (self.ee_data[40 + i] & 0x00F0) >> 4
513 acc_column[p + 2] = (self.ee_data[40 + i] & 0x0F00) >> 8
514 acc_column[p + 3] = (self.ee_data[40 + i] & 0xF000) >> 12
515
516 for i in range(32):
517 if acc_column[i] > 7:
518 acc_column[i] -= 16
519
520 for i in range(24):
521 for j in range(32):
522 p = 32 * i + j
523 alpha_temp[p] = (self.ee_data[64 + p] & 0x03F0) >> 4
524 if alpha_temp[p] > 31:
525 alpha_temp[p] -= 64
526 alpha_temp[p] *= 1 << acc_rem_scale
527 alpha_temp[p] += (
528 alpha_ref
529 + (acc_row[i] << acc_row_scale)
530 + (acc_column[j] << acc_column_scale)
531 )
532 alpha_temp[p] /= math.pow(2, alpha_scale)
533 alpha_temp[p] -= self.tgc * (self.cp_alpha[0] + self.cp_alpha[1]) / 2
534 alpha_temp[p] = self.scale_alpha / alpha_temp[p]
535
536 temp = max(alpha_temp)
537
538 alpha_scale = 0
539 while temp < 32768:
540 temp *= 2
541 alpha_scale += 1
542
543 for i in range(768):
544 temp = alpha_temp[i] * math.pow(2, alpha_scale)
545 alpha_temp[i] = int(temp + 0.5)
546
547 self.alpha = alpha_temp
548 self.alpha_scale = alpha_scale
549
550 def _extract_offset_parameters(self) -> None:
551 # extract offset
552 occ_row = [0] * 24
553 occ_column = [0] * 32
554
555 occ_rem_scale = self.ee_data[16] & 0x000F
556 occ_column_scale = (self.ee_data[16] & 0x00F0) >> 4
557 occ_row_scale = (self.ee_data[16] & 0x0F00) >> 8
558 offset_ref = self.ee_data[17]
559 if offset_ref > 32767:
560 offset_ref -= 65536
561
562 for i in range(6):
563 p = i * 4
564 occ_row[p + 0] = self.ee_data[18 + i] & 0x000F
565 occ_row[p + 1] = (self.ee_data[18 + i] & 0x00F0) >> 4
566 occ_row[p + 2] = (self.ee_data[18 + i] & 0x0F00) >> 8
567 occ_row[p + 3] = (self.ee_data[18 + i] & 0xF000) >> 12
568
569 for i in range(24):
570 if occ_row[i] > 7:
571 occ_row[i] -= 16
572
573 for i in range(8):
574 p = i * 4
575 occ_column[p + 0] = self.ee_data[24 + i] & 0x000F
576 occ_column[p + 1] = (self.ee_data[24 + i] & 0x00F0) >> 4
577 occ_column[p + 2] = (self.ee_data[24 + i] & 0x0F00) >> 8
578 occ_column[p + 3] = (self.ee_data[24 + i] & 0xF000) >> 12
579
580 for i in range(32):
581 if occ_column[i] > 7:
582 occ_column[i] -= 16
583
584 self.offset = init_float_array(768)
585
586 for i in range(24):
587 for j in range(32):
588 p = 32 * i + j
589 self.offset[p] = (self.ee_data[64 + p] & 0xFC00) >> 10
590 if self.offset[p] > 31:
591 self.offset[p] -= 64
592 self.offset[p] *= 1 << occ_rem_scale
593 self.offset[p] += (
594 offset_ref
595 + (occ_row[i] << occ_row_scale)
596 + (occ_column[j] << occ_column_scale)
597 )
598
599 def _extract_kta_pixel_parameters(self):
600 # Extract KtaPixel
601 kta_rc = [0] * 4
602 kta_temp = init_float_array(768)
603
604 kta_ro_co = (self.ee_data[54] & 0xFF00) >> 8
605 if kta_ro_co > 127:
606 kta_ro_co -= 256
607 kta_rc[0] = kta_ro_co
608
609 kta_re_co = self.ee_data[54] & 0x00FF
610 if kta_re_co > 127:
611 kta_re_co -= 256
612 kta_rc[2] = kta_re_co
613
614 kta_ro_ce = (self.ee_data[55] & 0xFF00) >> 8
615 if kta_ro_ce > 127:
616 kta_ro_ce -= 256
617 kta_rc[1] = kta_ro_ce
618
619 kta_re_ce = self.ee_data[55] & 0x00FF
620 if kta_re_ce > 127:
621 kta_re_ce -= 256
622 kta_rc[3] = kta_re_ce
623
624 kta_scale1 = ((self.ee_data[56] & 0x00F0) >> 4) + 8
625 kta_scale2 = self.ee_data[56] & 0x000F
626
627 for i in range(24):
628 for j in range(32):
629 p = 32 * i + j
630 split = 2 * (p // 32 - (p // 64) * 2) + p % 2
631 kta_temp[p] = (self.ee_data[64 + p] & 0x000E) >> 1
632 if kta_temp[p] > 3:
633 kta_temp[p] -= 8
634 kta_temp[p] *= 1 << kta_scale2
635 kta_temp[p] += kta_rc[split]
636 kta_temp[p] /= math.pow(2, kta_scale1)
637
638 temp = max(abs(k) for k in kta_temp)
639
640 kta_scale1 = 0
641 while temp < 64:
642 temp *= 2
643 kta_scale1 += 1
644
645 for i in range(768):
646 temp = kta_temp[i] * math.pow(2, kta_scale1)
647 if temp < 0:
648 kta_temp[i] = int(temp - 0.5)
649 else:
650 kta_temp[i] = int(temp + 0.5)
651
652 self.kta = kta_temp
653 self.kta_scale = kta_scale1
654
655 def _extract_kv_pixel_parameters(self):
656 # Extract KvPixel
657 kv_t = [0] * 4
658 kv_temp = init_float_array(768)
659
660 kv_ro_co = (self.ee_data[52] & 0xF000) >> 12
661 if kv_ro_co > 7:
662 kv_ro_co -= 16
663 kv_t[0] = kv_ro_co
664
665 kv_re_co = (self.ee_data[52] & 0x0F00) >> 8
666 if kv_re_co > 7:
667 kv_re_co -= 16
668 kv_t[2] = kv_re_co
669
670 kv_ro_ce = (self.ee_data[52] & 0x00F0) >> 4
671 if kv_ro_ce > 7:
672 kv_ro_ce -= 16
673 kv_t[1] = kv_ro_ce
674
675 kv_re_ce = self.ee_data[52] & 0x000F
676 if kv_re_ce > 7:
677 kv_re_ce -= 16
678 kv_t[3] = kv_re_ce
679
680 kv_scale = (self.ee_data[56] & 0x0F00) >> 8
681
682 for i in range(24):
683 for j in range(32):
684 p = 32 * i + j
685 split = 2 * (p // 32 - (p // 64) * 2) + p % 2
686 kv_temp[p] = kv_t[split]
687 kv_temp[p] /= math.pow(2, kv_scale)
688
689 temp = max(abs(kv) for kv in kv_temp)
690
691 kv_scale = 0
692 while temp < 64:
693 temp *= 2
694 kv_scale += 1
695
696 for i in range(768):
697 temp = kv_temp[i] * math.pow(2, kv_scale)
698 if temp < 0:
699 kv_temp[i] = int(temp - 0.5)
700 else:
701 kv_temp[i] = int(temp + 0.5)
702
703 self.kv = kv_temp
704 self.kv_scale = kv_scale
705
706 def _extract_cilc_parameters(self):
707 # Extract CILC parameters
708 self.calibration_mode_ee = (self.ee_data[10] & 0x0800) >> 4
709 self.calibration_mode_ee = self.calibration_mode_ee ^ 0x80
710
711 il_chess_c = [0] * 3
712 il_chess_c[0] = self.ee_data[53] & 0x003F
713 if il_chess_c[0] > 31:
714 il_chess_c[0] -= 64
715 il_chess_c[0] /= 16.0
716
717 il_chess_c[1] = (self.ee_data[53] & 0x07C0) >> 6
718 if il_chess_c[1] > 15:
719 il_chess_c[1] -= 32
720 il_chess_c[1] /= 2.0
721
722 il_chess_c[2] = (self.ee_data[53] & 0xF800) >> 11
723 if il_chess_c[2] > 15:
724 il_chess_c[2] -= 32
725 il_chess_c[2] /= 8.0
726
727 self.il_chess_c = il_chess_c
728
729 def _extract_deviating_pixels(self):
730 # Extract information about deviating pixels
731 pix_cnt = 0
732 while (
733 (pix_cnt < 768)
734 and (len(self.broken_pixels) < 5)
735 and (len(self.outlier_pixels) < 5)
736 ):
737 if self.ee_data[pix_cnt + 64] == 0:
738 self.broken_pixels.add(pix_cnt)
739 elif (self.ee_data[pix_cnt + 64] & 0x0001) != 0:
740 self.outlier_pixels.add(pix_cnt)
741 pix_cnt += 1
742
743 if len(self.broken_pixels) > 4:
744 raise RuntimeError('More than 4 broken pixels')
745 if len(self.outlier_pixels) > 4:
746 raise RuntimeError('More than 4 outlier pixels')
747 if (len(self.broken_pixels) + len(self.outlier_pixels)) > 4:
748 raise RuntimeError('More than 4 faulty pixels')
749
750 for broken_pixel1, broken_pixel2 in self._unique_list_pairs(self.broken_pixels):
751 if self._are_pixels_adjacent(broken_pixel1, broken_pixel2):
752 raise RuntimeError('Adjacent broken pixels')
753
754 for outlier_pixel1, outlier_pixel2 in self._unique_list_pairs(self.outlier_pixels):
755 if self._are_pixels_adjacent(outlier_pixel1, outlier_pixel2):
756 raise RuntimeError('Adjacent outlier pixels')
757
758 for broken_pixel in self.broken_pixels:
759 for outlier_pixel in self.outlier_pixels:
760 if self._are_pixels_adjacent(broken_pixel, outlier_pixel):
761 raise RuntimeError('Adjacent broken and outlier pixels')
762
763 def _unique_list_pairs(self, input_list):
764 for i, list_value1 in enumerate(input_list):
765 for list_value2 in input_list[i + 1:]:
766 yield list_value1, list_value2
767
768 def _are_pixels_adjacent(self, pix1: int, pix2: int) -> bool:
769 pix_pos_dif = pix1 - pix2
770
771 if -34 < pix_pos_dif < -30:
772 return True
773 if -2 < pix_pos_dif < 2:
774 return True
775 if 30 < pix_pos_dif < 34:
776 return True
777
778 return False
779
780 def _is_pixel_bad(self, pixel: int) -> bool:
781 return pixel in self.broken_pixels or pixel in self.outlier_pixels
782
783 def _i2c_write_word(self, write_address: int, data: int) -> None:
784 cmd = bytearray(4)
785 cmd[0] = write_address >> 8
786 cmd[1] = write_address & 0x00FF
787 cmd[2] = data >> 8
788 cmd[3] = data & 0x00FF
789 data_check = [0]
790
791 self.i2c_device.write(cmd)
792 self._i2c_read_words(write_address, data_check)
793
794 def _i2c_read_words(
795 self,
796 addr: int,
797 buffer,
798 *,
799 end = None,
800 ) -> None:
801 if end is None:
802 remaining_words = len(buffer)
803 else:
804 remaining_words = end
805 offset = 0
806
807 while remaining_words:
808 self.addrbuf[0] = addr >> 8 # MSB
809 self.addrbuf[1] = addr & 0xFF # LSB
810 read_words = min(remaining_words, self.i2c_read_len)
811 self.i2c_device.write_then_read_into(
812 self.addrbuf,
813 self.inbuf,
814 in_end=read_words * 2,
815 )
816
817 outwords = struct.unpack(
818 '>' + 'H' * read_words,
819 self.inbuf if len(self.inbuf) == read_words * 2 else self.inbuf[:read_words * 2],
820 )
821 for i, w in enumerate(outwords):
822 buffer[offset + i] = w
823
824 offset += read_words
825 remaining_words -= read_words
826 addr += read_words